提交 704db6b7 作者: think

添加表ausp

上级 a8a221a3
......@@ -22,6 +22,7 @@ import com.huazheng.project.greenplum.source.hana.AfruSource;
import com.huazheng.project.greenplum.source.hana.AfvcSource;
import com.huazheng.project.greenplum.source.hana.AufkSource;
import com.huazheng.project.greenplum.source.hana.AufmSource;
import com.huazheng.project.greenplum.source.hana.AuspSource;
import com.huazheng.project.greenplum.source.hana.BsadSource;
import com.huazheng.project.greenplum.source.hana.BsidSource;
import com.huazheng.project.greenplum.source.hana.Kna1Source;
......@@ -74,6 +75,7 @@ import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm;
import com.huazheng.project.hana.model.Ausp;
import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid;
import com.huazheng.project.hana.model.Kna1;
......@@ -203,6 +205,7 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
private static Kna1Source kna1Source;
private static KnvvSource knvvSource;
private static AfvcSource afvcSource;
private static AuspSource auspSource;
private static TspatSource tspatSource;
private static AfruSource afruSource;
private static ZpoeditSource zpoeditSource;
......@@ -270,6 +273,7 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
kna1Source = (Kna1Source) context.getBean("kna1Source");
knvvSource = (KnvvSource) context.getBean("knvvSource");
afvcSource = (AfvcSource) context.getBean("afvcSource");
auspSource = (AuspSource) context.getBean("auspSource");
afruSource = (AfruSource) context.getBean("afruSource");
tspatSource = (TspatSource) context.getBean("tspatSource");
zpoeditSource = (ZpoeditSource) context.getBean("zpoeditSource");
......@@ -924,6 +928,7 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
env.addSource(afruSource).setParallelism(1).name("输入Afru队列").flatMap(greenPlumFlatMapFunction).returns(Afru.class).setParallelism(1).name("拉取Afru数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Afru数据");
env.addSource(tspatSource).setParallelism(1).name("输入Tspat队列").flatMap(greenPlumFlatMapFunction).returns(Tspat.class).setParallelism(1).name("拉取Tspat数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Tspat数据");
env.addSource(zpoeditSource).setParallelism(1).name("输入Zpoedit队列").flatMap(greenPlumFlatMapFunction).returns(Zpoedit.class).setParallelism(1).name("拉取zpoedit数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Zpoedit数据");
env.addSource(auspSource).setParallelism(1).name("输入Ausp队列").flatMap(greenPlumFlatMapFunction).returns(Ausp.class).setParallelism(1).name("拉取Ausp数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Ausp数据");
env.execute("华正数据迁移任务");
}
......
......@@ -14,6 +14,7 @@ import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm;
import com.huazheng.project.hana.model.Ausp;
import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid;
import com.huazheng.project.hana.model.CAVThisMonth;
......@@ -69,6 +70,15 @@ import com.huazheng.project.mysql.model.TransformNewNode;
@CacheConfig(cacheNames = "huazheng")
public interface GPMapper {
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.objek+','+#p0.atinn+','+#p0.atzhl+','+#p0.mafid+','+#p0.klart+','+#p0.adzhl", unless="#result == null")
public Ausp selectAusp(Ausp ausp); // 查询替代删除
public void insertAusp(Ausp element);
@CacheEvict(key = "'selectAusp'+':'+#p0.mandt+','+#p0.objek+','+#p0.atinn+','+#p0.atzhl+','+#p0.mafid+','+#p0.klart+','+#p0.adzhl")
public void deleteAusp(Ausp item);
@CacheEvict(key = "'selectAusp'+':'+#p0.mandt+','+#p0.objek+','+#p0.atinn+','+#p0.atzhl+','+#p0.mafid+','+#p0.klart+','+#p0.adzhl")
public void updateAusp(Ausp element);
public List<Ausp> selectAuspCheck(Ausp build);
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.pwerk+','+#p0.aufnr", unless="#result == null")
public Zpoedit selectZpoedit(Zpoedit zpoedit); // 查询替代删除
public void insertZpoedit(Zpoedit element);
......
......@@ -22,6 +22,7 @@ import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm;
import com.huazheng.project.hana.model.Ausp;
import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid;
//import com.huazheng.project.hana.model.Bsid2Bsad;
......@@ -966,6 +967,7 @@ public class DeleteUpdateJobServiceImpl {
selectAfruCheck();
selectTspatCheck();
selectZpoeditCheck();
selectAuspCheck();
}
private void selectAfkoCheck() {
try {
......@@ -1516,6 +1518,49 @@ public class DeleteUpdateJobServiceImpl {
}
}
private void selectAuspCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Ausp:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Ausp:rowNum");
Ausp build = Ausp.builder().rowNum(rowNum).build();
List<Ausp> list = gpMapper.selectAuspCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:check:Ausp:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Ausp source = sapMapper.selectAuspById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteAusp(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
while (true) {
try {
gpMapper.updateAusp(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:check:Ausp:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectAuspCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Ausp:rowNum", getErrorInfoFromException(e));
}
}
private void selectTspatCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......
......@@ -18,6 +18,7 @@ import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm;
import com.huazheng.project.hana.model.Ausp;
import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid;
import com.huazheng.project.hana.model.Kna1;
......@@ -2112,4 +2113,31 @@ public class GPServiceImpl {
}
}
public void processAusp(Ausp data, Collector<Ausp> out) {
try {
Ausp exist = gpMapper.selectAusp(data);
if (exist != null) {
data.setExist(true); // 已经在库
}
out.collect(data);
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:Ausp:error", "processAusp", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
public void sinkAusp(Ausp element) {
try {
if (element.isExist() == false) {
log.debug("GPServiceImpl.sinkAusp()");
gpMapper.insertAusp(element);
}
} catch (RuntimeException e) {
redis1Template.opsForHash().put("huazheng:Ausp:error", "sinkAusp", getErrorInfoFromException(e));
log.error(e.getMessage());
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:Ausp:error", "sinkAusp", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
}
......@@ -20,6 +20,7 @@ import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm;
import com.huazheng.project.hana.model.Ausp;
import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid;
import com.huazheng.project.hana.model.Kna1;
......@@ -127,6 +128,7 @@ public class JobServiceImpl {
selectAfkoNew();
selectAfpoNew();
selectAfvcNew();
selectAuspNew();
selectAfruNew();
selectTspatNew();
selectZpoeditNew();
......@@ -703,6 +705,32 @@ public class JobServiceImpl {
}
}
private void selectAuspNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap4send.lua")));
List<String> keys = Arrays.asList("huazheng:Ausp:sendcount", "huazheng:Ausp:rowids", "huazheng:list:Ausp");
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:Ausp:sendcount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:Ausp:receivecount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:Ausp:rowids", "0"); // 不存在则创建,存在则么有操作
Long sendcount = Long.valueOf(opsForValue.get("huazheng:Ausp:sendcount"));
Long receivecount = Long.valueOf(opsForValue.get("huazheng:Ausp:receivecount"));
if (sendcount - receivecount <= 20) { // 如果发送数和消费数的差小于5则往队列中写数据
String rowids = opsForValue.get("huazheng:Ausp:rowids"); // 标记id
Ausp ausp = Ausp.builder().rowids(Long.valueOf(rowids)).build();
List<Ausp> list = sapMapper.selectAuspNew(ausp);
if (!list.isEmpty()) {
list.forEach(item -> {
JSONObject json = JSONUtil.parseObj(item, false);
String execute = redis1Template.execute(script, keys, item.getRowids().toString(), json.toString());
log.info("标记时间回写 --> " + execute);
});
}
}
}
private void selectAfpoNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
......
package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream;
import com.huazheng.project.hana.model.Ausp;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONUtil;
import lombok.extern.log4j.Log4j2;
@Log4j2
@Service
public class AuspSource implements SourceFunction<Ausp> {
private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) {
try {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) {
return "bad getErrorInfoFromException";
}
}
@Override
public void run(SourceContext<Ausp> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Ausp:sendcount", "huazheng:Ausp:id", "huazheng:list:Ausp", "huazheng:Ausp:receivecount");
while (true) {
try {
String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("==========");
String checkString = values[0];
String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check);
Ausp data = JSONUtil.toBean(values[1], Ausp.class);
ctx.collect(data);
} else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
} catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Ausp:error", "receivecount_elseerror", getErrorInfoFromException(e));
}
}
}
@Override
public void cancel() {
}
}
......@@ -8,6 +8,7 @@ import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm;
import com.huazheng.project.hana.model.Ausp;
import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid;
import com.huazheng.project.hana.model.Kna1;
......@@ -79,7 +80,9 @@ public interface SapMapper {
public List<Afru> selectAfruNew(Afru afru);
public List<Tspat> selectTspatNew(Tspat tspat);
public List<Zpoedit> selectZpoeditNew(Zpoedit zpoedit);
public List<Ausp> selectAuspNew(Ausp ausp);
public Ausp selectAuspById(Ausp target);
public Zpoedit selectZpoeditById(Zpoedit target);
public Tspat selectTspatById(Tspat target);
public Afru selectAfruById(Afru target);
......
package com.huazheng.project.hana.model;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
@Builder
public class Ausp implements Serializable {
private static final long serialVersionUID = 1L;
/* === 组合键 === */
private String mandt; // 集团
private String objek; // 对象码
private String atinn; // 内部特征
private String atzhl; // 特征值计数器
private String mafid; // 标识:对象/类
private String klart; // 类别种类
private String adzhl; // 内部计数器
/* === 组合键 === */
private String atwrt; // 特征值计数器
private Long rowids; // sap那边的rowid
private boolean exist; // 用于标记,不是字段
private String hashResult; // 数据hash标记
private String rowNum; // 用于标记,不是字段
}
update vbap set hashresult = null;
update vbap set hashresult = null;
redis-cli -n 1 --raw keys "huazheng:check:Vbap:rowNum" | xargs redis-cli -n 1 del
drop table ausp;
CREATE TABLE ausp (
mandt text,
objek text,
atinn text,
atzhl text,
mafid text,
klart text,
adzhl text,
atwrt text,
hashResult text,
rowNum serial,
PRIMARY KEY (mandt,objek,atinn,atzhl,mafid,klart,adzhl)
)
Distributed by (mandt,objek,atinn,atzhl,mafid,klart,adzhl);
......@@ -298,3 +298,7 @@ alter table vbapadv drop column vbpa_posnr;
alter table vbapadv drop column vbpa_parvw;
alter table vbapadv drop column vbpa_pernr;
alter table vbapadv add column pernr text;
alter table vbapadv add column aufk_auart text;
alter table vbapadv add column aufk_werks text;
......@@ -631,6 +631,24 @@
select * from Afvc where rownum &gt; #{rowNum} order by rownum limit 20
</select>
<select id="selectAusp" parameterType="com.huazheng.project.hana.model.Ausp" resultType="com.huazheng.project.hana.model.Ausp">
select * from Ausp where mandt = #{mandt} and objek = #{objek} and atinn = #{atinn} and atzhl = #{atzhl} and mafid = #{mafid} and klart = #{klart} and adzhl = #{adzhl}
</select>
<insert id="insertAusp" parameterType="com.huazheng.project.hana.model.Ausp">
insert into Ausp (mandt, objek, atinn, atzhl, mafid, klart, adzhl, atwrt, hashResult)
values(#{mandt},#{objek},#{atinn},#{atzhl},#{mafid},#{klart},#{adzhl},#{atwrt},#{hashResult})
</insert>
<delete id="deleteAusp" parameterType="com.huazheng.project.hana.model.Ausp">
delete from Ausp where mandt = #{mandt} and objek = #{objek} and atinn = #{atinn} and atzhl = #{atzhl} and mafid = #{mafid} and klart = #{klart} and adzhl = #{adzhl}
</delete>
<update id="updateAusp" parameterType="com.huazheng.project.hana.model.Ausp">
update Ausp set
mandt = #{mandt}, objek = #{objek}, atinn = #{atinn}, atzhl = #{atzhl}, mafid = #{mafid}, klart = #{klart}, adzhl = #{adzhl}, atwrt = #{atwrt}, hashResult = #{hashResult}
where mandt = #{mandt} and objek = #{objek} and atinn = #{atinn} and atzhl = #{atzhl} and mafid = #{mafid} and klart = #{klart} and adzhl = #{adzhl}
</update>
<select id="selectAuspCheck" parameterType="com.huazheng.project.hana.model.Ausp" resultType="com.huazheng.project.hana.model.Ausp">
select * from Ausp where rownum &gt; #{rowNum} order by rownum limit 20
</select>
<select id="selectAfru" parameterType="com.huazheng.project.hana.model.Afru" resultType="com.huazheng.project.hana.model.Afru">
select * from Afru where mandt = #{mandt} and rueck = #{rueck} and rmzhl = #{rmzhl}
......@@ -1537,7 +1555,7 @@
update VbapAdv set
aufk_kdauf = #{kdauf}, aufk_kdpos = #{kdpos}, aufk_mandt = #{mandt}, aufk_aufnr = #{aufnr},
aufk_erdat = #{erdat}, aufk_erfzeit = #{erfzeit}, aufk_ernam = #{ernam}, aufk_aenam = #{aenam},
aufk_bukrs = #{bukrs}, aufk_erdat1 = #{erdat1}, aufk_erdat2 = #{erdat2},
aufk_bukrs = #{bukrs}, aufk_erdat1 = #{erdat1}, aufk_erdat2 = #{erdat2}, aufk_auart=#{auart}, aufk_werks=#{werks},
pcsj = #{erdat1}
where vbeln = #{vbap.vbeln} and posnr = #{vbap.posnr} and mandt = #{vbap.mandt} and erdat1 = #{vbap.erdat1}
</update>
......
......@@ -200,6 +200,14 @@
order by "$rowid$"
</select>
<select id="selectAuspNew" parameterType="Ausp" resultType="Ausp">
select top 20 "$rowid$" as rowids,
mandt, objek, atinn, atzhl, mafid, klart, adzhl, atwrt
from ${hana_user}.Ausp
where "$rowid$" &gt; #{rowids} ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectAfruNew" parameterType="Afru" resultType="Afru">
select top 20 "$rowid$" as rowids,
mandt, rueck, rmzhl, ersda, wablnr, aufnr, stokz, stzhl
......@@ -372,6 +380,12 @@
where mandt = #{mandt} and aufpl = #{aufpl} and aplzl = #{aplzl}
</select>
<select id="selectAuspById" parameterType="Ausp" resultType="Ausp">
select mandt, objek, atinn, atzhl, mafid, klart, adzhl, atwrt
from ${hana_user}.Ausp
where mandt = #{mandt} and objek = #{objek} and atinn = #{atinn} and atzhl = #{atzhl} and mafid = #{mafid} and klart = #{klart} and adzhl = #{adzhl}
</select>
<select id="selectAfruById" parameterType="Afru" resultType="Afru">
select mandt, rueck, rmzhl, ersda, wablnr, aufnr, stokz, stzhl
from ${hana_user}.Afru
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论