提交 9dc1ce1a 作者: guofeng

Merge branch 'master' of

http://47.103.50.109:8091/huazheng/huazheng-project-flink.git

Conflicts:
	src/main/resources/devtools/doc/华正项目-数据库表设计20201107.xlsx
...@@ -22,10 +22,12 @@ import com.huazheng.project.greenplum.source.hana.AfruSource; ...@@ -22,10 +22,12 @@ import com.huazheng.project.greenplum.source.hana.AfruSource;
import com.huazheng.project.greenplum.source.hana.AfvcSource; import com.huazheng.project.greenplum.source.hana.AfvcSource;
import com.huazheng.project.greenplum.source.hana.AufkSource; import com.huazheng.project.greenplum.source.hana.AufkSource;
import com.huazheng.project.greenplum.source.hana.AufmSource; import com.huazheng.project.greenplum.source.hana.AufmSource;
import com.huazheng.project.greenplum.source.hana.AuspSource;
import com.huazheng.project.greenplum.source.hana.BsadSource; import com.huazheng.project.greenplum.source.hana.BsadSource;
import com.huazheng.project.greenplum.source.hana.BsidSource; import com.huazheng.project.greenplum.source.hana.BsidSource;
import com.huazheng.project.greenplum.source.hana.Kna1Source; import com.huazheng.project.greenplum.source.hana.Kna1Source;
import com.huazheng.project.greenplum.source.hana.KnkkSource; import com.huazheng.project.greenplum.source.hana.KnkkSource;
import com.huazheng.project.greenplum.source.hana.KnvpSource;
import com.huazheng.project.greenplum.source.hana.KnvvSource; import com.huazheng.project.greenplum.source.hana.KnvvSource;
import com.huazheng.project.greenplum.source.hana.KonvSource; import com.huazheng.project.greenplum.source.hana.KonvSource;
import com.huazheng.project.greenplum.source.hana.LikpSource; import com.huazheng.project.greenplum.source.hana.LikpSource;
...@@ -74,10 +76,12 @@ import com.huazheng.project.hana.model.Afru; ...@@ -74,10 +76,12 @@ import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc; import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk; import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm; import com.huazheng.project.hana.model.Aufm;
import com.huazheng.project.hana.model.Ausp;
import com.huazheng.project.hana.model.Bsad; import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid; import com.huazheng.project.hana.model.Bsid;
import com.huazheng.project.hana.model.Kna1; import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Knkk; import com.huazheng.project.hana.model.Knkk;
import com.huazheng.project.hana.model.Knvp;
import com.huazheng.project.hana.model.Knvv; import com.huazheng.project.hana.model.Knvv;
import com.huazheng.project.hana.model.Konv; import com.huazheng.project.hana.model.Konv;
import com.huazheng.project.hana.model.Likp; import com.huazheng.project.hana.model.Likp;
...@@ -203,6 +207,8 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del ...@@ -203,6 +207,8 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
private static Kna1Source kna1Source; private static Kna1Source kna1Source;
private static KnvvSource knvvSource; private static KnvvSource knvvSource;
private static AfvcSource afvcSource; private static AfvcSource afvcSource;
private static KnvpSource knvpSource;
private static AuspSource auspSource;
private static TspatSource tspatSource; private static TspatSource tspatSource;
private static AfruSource afruSource; private static AfruSource afruSource;
private static ZpoeditSource zpoeditSource; private static ZpoeditSource zpoeditSource;
...@@ -270,6 +276,8 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del ...@@ -270,6 +276,8 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
kna1Source = (Kna1Source) context.getBean("kna1Source"); kna1Source = (Kna1Source) context.getBean("kna1Source");
knvvSource = (KnvvSource) context.getBean("knvvSource"); knvvSource = (KnvvSource) context.getBean("knvvSource");
afvcSource = (AfvcSource) context.getBean("afvcSource"); afvcSource = (AfvcSource) context.getBean("afvcSource");
knvpSource = (KnvpSource) context.getBean("knvpSource");
auspSource = (AuspSource) context.getBean("auspSource");
afruSource = (AfruSource) context.getBean("afruSource"); afruSource = (AfruSource) context.getBean("afruSource");
tspatSource = (TspatSource) context.getBean("tspatSource"); tspatSource = (TspatSource) context.getBean("tspatSource");
zpoeditSource = (ZpoeditSource) context.getBean("zpoeditSource"); zpoeditSource = (ZpoeditSource) context.getBean("zpoeditSource");
...@@ -924,6 +932,8 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del ...@@ -924,6 +932,8 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
env.addSource(afruSource).setParallelism(1).name("输入Afru队列").flatMap(greenPlumFlatMapFunction).returns(Afru.class).setParallelism(1).name("拉取Afru数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Afru数据"); env.addSource(afruSource).setParallelism(1).name("输入Afru队列").flatMap(greenPlumFlatMapFunction).returns(Afru.class).setParallelism(1).name("拉取Afru数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Afru数据");
env.addSource(tspatSource).setParallelism(1).name("输入Tspat队列").flatMap(greenPlumFlatMapFunction).returns(Tspat.class).setParallelism(1).name("拉取Tspat数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Tspat数据"); env.addSource(tspatSource).setParallelism(1).name("输入Tspat队列").flatMap(greenPlumFlatMapFunction).returns(Tspat.class).setParallelism(1).name("拉取Tspat数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Tspat数据");
env.addSource(zpoeditSource).setParallelism(1).name("输入Zpoedit队列").flatMap(greenPlumFlatMapFunction).returns(Zpoedit.class).setParallelism(1).name("拉取zpoedit数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Zpoedit数据"); env.addSource(zpoeditSource).setParallelism(1).name("输入Zpoedit队列").flatMap(greenPlumFlatMapFunction).returns(Zpoedit.class).setParallelism(1).name("拉取zpoedit数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Zpoedit数据");
env.addSource(auspSource).setParallelism(1).name("输入Ausp队列").flatMap(greenPlumFlatMapFunction).returns(Ausp.class).setParallelism(1).name("拉取Ausp数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Ausp数据");
env.addSource(knvpSource).setParallelism(1).name("输入Knvp队列").flatMap(greenPlumFlatMapFunction).returns(Knvp.class).setParallelism(1).name("拉取Knvp数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Knvp数据");
env.execute("华正数据迁移任务"); env.execute("华正数据迁移任务");
} }
......
...@@ -14,11 +14,13 @@ import com.huazheng.project.hana.model.Afru; ...@@ -14,11 +14,13 @@ import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc; import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk; import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm; import com.huazheng.project.hana.model.Aufm;
import com.huazheng.project.hana.model.Ausp;
import com.huazheng.project.hana.model.Bsad; import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid; import com.huazheng.project.hana.model.Bsid;
import com.huazheng.project.hana.model.CAVThisMonth; import com.huazheng.project.hana.model.CAVThisMonth;
import com.huazheng.project.hana.model.Kna1; import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Knkk; import com.huazheng.project.hana.model.Knkk;
import com.huazheng.project.hana.model.Knvp;
import com.huazheng.project.hana.model.Knvv; import com.huazheng.project.hana.model.Knvv;
import com.huazheng.project.hana.model.Konv; import com.huazheng.project.hana.model.Konv;
import com.huazheng.project.hana.model.Likp; import com.huazheng.project.hana.model.Likp;
...@@ -69,6 +71,24 @@ import com.huazheng.project.mysql.model.TransformNewNode; ...@@ -69,6 +71,24 @@ import com.huazheng.project.mysql.model.TransformNewNode;
@CacheConfig(cacheNames = "huazheng") @CacheConfig(cacheNames = "huazheng")
public interface GPMapper { public interface GPMapper {
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.kunnr+','+#p0.vkorg+','+#p0.vtweg+','+#p0.spart+','+#p0.parvw+','+#p0.parza", unless="#result == null")
public Knvp selectKnvp(Knvp knvp); // 查询替代删除
public void insertKnvp(Knvp element);
@CacheEvict(key = "'selectKnvp'+':'+#p0.mandt+','+#p0.kunnr+','+#p0.vkorg+','+#p0.vtweg+','+#p0.spart+','+#p0.parvw+','+#p0.parza")
public void deleteKnvp(Knvp item);
@CacheEvict(key = "'selectKnvp'+':'+#p0.mandt+','+#p0.kunnr+','+#p0.vkorg+','+#p0.vtweg+','+#p0.spart+','+#p0.parvw+','+#p0.parza")
public void updateKnvp(Knvp element);
public List<Knvp> selectKnvpCheck(Knvp build);
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.objek+','+#p0.atinn+','+#p0.atzhl+','+#p0.mafid+','+#p0.klart+','+#p0.adzhl", unless="#result == null")
public Ausp selectAusp(Ausp ausp); // 查询替代删除
public void insertAusp(Ausp element);
@CacheEvict(key = "'selectAusp'+':'+#p0.mandt+','+#p0.objek+','+#p0.atinn+','+#p0.atzhl+','+#p0.mafid+','+#p0.klart+','+#p0.adzhl")
public void deleteAusp(Ausp item);
@CacheEvict(key = "'selectAusp'+':'+#p0.mandt+','+#p0.objek+','+#p0.atinn+','+#p0.atzhl+','+#p0.mafid+','+#p0.klart+','+#p0.adzhl")
public void updateAusp(Ausp element);
public List<Ausp> selectAuspCheck(Ausp build);
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.pwerk+','+#p0.aufnr", unless="#result == null") @Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.pwerk+','+#p0.aufnr", unless="#result == null")
public Zpoedit selectZpoedit(Zpoedit zpoedit); // 查询替代删除 public Zpoedit selectZpoedit(Zpoedit zpoedit); // 查询替代删除
public void insertZpoedit(Zpoedit element); public void insertZpoedit(Zpoedit element);
......
...@@ -22,11 +22,13 @@ import com.huazheng.project.hana.model.Afru; ...@@ -22,11 +22,13 @@ import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc; import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk; import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm; import com.huazheng.project.hana.model.Aufm;
import com.huazheng.project.hana.model.Ausp;
import com.huazheng.project.hana.model.Bsad; import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid; import com.huazheng.project.hana.model.Bsid;
//import com.huazheng.project.hana.model.Bsid2Bsad; //import com.huazheng.project.hana.model.Bsid2Bsad;
import com.huazheng.project.hana.model.Kna1; import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Knkk; import com.huazheng.project.hana.model.Knkk;
import com.huazheng.project.hana.model.Knvp;
import com.huazheng.project.hana.model.Knvv; import com.huazheng.project.hana.model.Knvv;
import com.huazheng.project.hana.model.Konv; import com.huazheng.project.hana.model.Konv;
import com.huazheng.project.hana.model.Likp; import com.huazheng.project.hana.model.Likp;
...@@ -966,6 +968,8 @@ public class DeleteUpdateJobServiceImpl { ...@@ -966,6 +968,8 @@ public class DeleteUpdateJobServiceImpl {
selectAfruCheck(); selectAfruCheck();
selectTspatCheck(); selectTspatCheck();
selectZpoeditCheck(); selectZpoeditCheck();
selectAuspCheck();
selectKnvpCheck();
} }
private void selectAfkoCheck() { private void selectAfkoCheck() {
try { try {
...@@ -1387,6 +1391,49 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1387,6 +1391,49 @@ public class DeleteUpdateJobServiceImpl {
} }
} }
private void selectKnvpCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Knvp:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Knvp:rowNum");
Knvp build = Knvp.builder().rowNum(rowNum).build();
List<Knvp> list = gpMapper.selectKnvpCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:check:Knvp:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Knvp source = sapMapper.selectKnvpById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteKnvp(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
while (true) {
try {
gpMapper.updateKnvp(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:check:Knvp:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectKnvpCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Knvp:rowNum", getErrorInfoFromException(e));
}
}
private void selectMkpfCheck() { private void selectMkpfCheck() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
...@@ -1516,6 +1563,49 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1516,6 +1563,49 @@ public class DeleteUpdateJobServiceImpl {
} }
} }
private void selectAuspCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Ausp:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Ausp:rowNum");
Ausp build = Ausp.builder().rowNum(rowNum).build();
List<Ausp> list = gpMapper.selectAuspCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:check:Ausp:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Ausp source = sapMapper.selectAuspById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteAusp(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
while (true) {
try {
gpMapper.updateAusp(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:check:Ausp:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectAuspCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Ausp:rowNum", getErrorInfoFromException(e));
}
}
private void selectTspatCheck() { private void selectTspatCheck() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......
...@@ -18,10 +18,12 @@ import com.huazheng.project.hana.model.Afru; ...@@ -18,10 +18,12 @@ import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc; import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk; import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm; import com.huazheng.project.hana.model.Aufm;
import com.huazheng.project.hana.model.Ausp;
import com.huazheng.project.hana.model.Bsad; import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid; import com.huazheng.project.hana.model.Bsid;
import com.huazheng.project.hana.model.Kna1; import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Knkk; import com.huazheng.project.hana.model.Knkk;
import com.huazheng.project.hana.model.Knvp;
import com.huazheng.project.hana.model.Knvv; import com.huazheng.project.hana.model.Knvv;
import com.huazheng.project.hana.model.Konv; import com.huazheng.project.hana.model.Konv;
import com.huazheng.project.hana.model.Likp; import com.huazheng.project.hana.model.Likp;
...@@ -2112,4 +2114,58 @@ public class GPServiceImpl { ...@@ -2112,4 +2114,58 @@ public class GPServiceImpl {
} }
} }
public void processAusp(Ausp data, Collector<Ausp> out) {
try {
Ausp exist = gpMapper.selectAusp(data);
if (exist != null) {
data.setExist(true); // 已经在库
}
out.collect(data);
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:Ausp:error", "processAusp", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
public void sinkAusp(Ausp element) {
try {
if (element.isExist() == false) {
log.debug("GPServiceImpl.sinkAusp()");
gpMapper.insertAusp(element);
}
} catch (RuntimeException e) {
redis1Template.opsForHash().put("huazheng:Ausp:error", "sinkAusp", getErrorInfoFromException(e));
log.error(e.getMessage());
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:Ausp:error", "sinkAusp", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
public void processKnvp(Knvp data, Collector<Knvp> out) {
try {
Knvp exist = gpMapper.selectKnvp(data);
if (exist != null) {
data.setExist(true); // 已经在库
}
out.collect(data);
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:Knvp:error", "processKnvp", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
public void sinkKnvp(Knvp element) {
try {
if (element.isExist() == false) {
log.debug("GPServiceImpl.sinkKnvp()");
gpMapper.insertKnvp(element);
}
} catch (RuntimeException e) {
redis1Template.opsForHash().put("huazheng:Knvp:error", "sinkKnvp", getErrorInfoFromException(e));
log.error(e.getMessage());
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:Knvp:error", "sinkKnvp", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
} }
...@@ -20,10 +20,12 @@ import com.huazheng.project.hana.model.Afru; ...@@ -20,10 +20,12 @@ import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc; import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk; import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm; import com.huazheng.project.hana.model.Aufm;
import com.huazheng.project.hana.model.Ausp;
import com.huazheng.project.hana.model.Bsad; import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid; import com.huazheng.project.hana.model.Bsid;
import com.huazheng.project.hana.model.Kna1; import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Knkk; import com.huazheng.project.hana.model.Knkk;
import com.huazheng.project.hana.model.Knvp;
import com.huazheng.project.hana.model.Knvv; import com.huazheng.project.hana.model.Knvv;
import com.huazheng.project.hana.model.Konv; import com.huazheng.project.hana.model.Konv;
import com.huazheng.project.hana.model.Likp; import com.huazheng.project.hana.model.Likp;
...@@ -127,6 +129,7 @@ public class JobServiceImpl { ...@@ -127,6 +129,7 @@ public class JobServiceImpl {
selectAfkoNew(); selectAfkoNew();
selectAfpoNew(); selectAfpoNew();
selectAfvcNew(); selectAfvcNew();
selectAuspNew();
selectAfruNew(); selectAfruNew();
selectTspatNew(); selectTspatNew();
selectZpoeditNew(); selectZpoeditNew();
...@@ -146,6 +149,7 @@ public class JobServiceImpl { ...@@ -146,6 +149,7 @@ public class JobServiceImpl {
selectT001wNew(); selectT001wNew();
selectMskaNew(); selectMskaNew();
selectZsdfhzlNew(); selectZsdfhzlNew();
selectKnvpNew();
} }
private void selectVbapNew() { private void selectVbapNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
...@@ -573,6 +577,32 @@ public class JobServiceImpl { ...@@ -573,6 +577,32 @@ public class JobServiceImpl {
} }
} }
private void selectKnvpNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap4send.lua")));
List<String> keys = Arrays.asList("huazheng:Knvp:sendcount", "huazheng:Knvp:rowids", "huazheng:list:Knvp");
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:Knvp:sendcount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:Knvp:receivecount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:Knvp:rowids", "0"); // 不存在则创建,存在则么有操作
Long sendcount = Long.valueOf(opsForValue.get("huazheng:Knvp:sendcount"));
Long receivecount = Long.valueOf(opsForValue.get("huazheng:Knvp:receivecount"));
if (sendcount - receivecount <= 20) { // 如果发送数和消费数的差小于5则往队列中写数据
String rowids = opsForValue.get("huazheng:Knvp:rowids"); // 标记id
Knvp knvp = Knvp.builder().rowids(Long.valueOf(rowids)).build();
List<Knvp> list = sapMapper.selectKnvpNew(knvp);
if (!list.isEmpty()) {
list.forEach(item -> {
JSONObject json = JSONUtil.parseObj(item, false);
String execute = redis1Template.execute(script, keys, item.getRowids().toString(), json.toString());
log.info("标记时间回写 --> " + execute);
});
}
}
}
private void selectAfruNew() { private void selectAfruNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
...@@ -703,6 +733,32 @@ public class JobServiceImpl { ...@@ -703,6 +733,32 @@ public class JobServiceImpl {
} }
} }
private void selectAuspNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap4send.lua")));
List<String> keys = Arrays.asList("huazheng:Ausp:sendcount", "huazheng:Ausp:rowids", "huazheng:list:Ausp");
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:Ausp:sendcount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:Ausp:receivecount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:Ausp:rowids", "0"); // 不存在则创建,存在则么有操作
Long sendcount = Long.valueOf(opsForValue.get("huazheng:Ausp:sendcount"));
Long receivecount = Long.valueOf(opsForValue.get("huazheng:Ausp:receivecount"));
if (sendcount - receivecount <= 20) { // 如果发送数和消费数的差小于5则往队列中写数据
String rowids = opsForValue.get("huazheng:Ausp:rowids"); // 标记id
Ausp ausp = Ausp.builder().rowids(Long.valueOf(rowids)).build();
List<Ausp> list = sapMapper.selectAuspNew(ausp);
if (!list.isEmpty()) {
list.forEach(item -> {
JSONObject json = JSONUtil.parseObj(item, false);
String execute = redis1Template.execute(script, keys, item.getRowids().toString(), json.toString());
log.info("标记时间回写 --> " + execute);
});
}
}
}
private void selectAfpoNew() { private void selectAfpoNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
......
package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream;
import com.huazheng.project.hana.model.Ausp;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONUtil;
import lombok.extern.log4j.Log4j2;
@Log4j2
@Service
public class AuspSource implements SourceFunction<Ausp> {
private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) {
try {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) {
return "bad getErrorInfoFromException";
}
}
@Override
public void run(SourceContext<Ausp> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Ausp:sendcount", "huazheng:Ausp:id", "huazheng:list:Ausp", "huazheng:Ausp:receivecount");
while (true) {
try {
String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("==========");
String checkString = values[0];
String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check);
Ausp data = JSONUtil.toBean(values[1], Ausp.class);
ctx.collect(data);
} else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
} catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Ausp:error", "receivecount_elseerror", getErrorInfoFromException(e));
}
}
}
@Override
public void cancel() {
}
}
package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream;
import com.huazheng.project.hana.model.Knvp;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONUtil;
import lombok.extern.log4j.Log4j2;
@Log4j2
@Service
public class KnvpSource implements SourceFunction<Knvp> {
private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) {
try {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) {
return "bad getErrorInfoFromException";
}
}
@Override
public void run(SourceContext<Knvp> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Knvp:sendcount", "huazheng:Knvp:id", "huazheng:list:Knvp", "huazheng:Knvp:receivecount");
while (true) {
try {
String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("==========");
String checkString = values[0];
String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check);
Knvp data = JSONUtil.toBean(values[1], Knvp.class);
ctx.collect(data);
} else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
} catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Knvp:error", "receivecount_elseerror", getErrorInfoFromException(e));
}
}
}
@Override
public void cancel() {
}
}
...@@ -8,10 +8,12 @@ import com.huazheng.project.hana.model.Afru; ...@@ -8,10 +8,12 @@ import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc; import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk; import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm; import com.huazheng.project.hana.model.Aufm;
import com.huazheng.project.hana.model.Ausp;
import com.huazheng.project.hana.model.Bsad; import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid; import com.huazheng.project.hana.model.Bsid;
import com.huazheng.project.hana.model.Kna1; import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Knkk; import com.huazheng.project.hana.model.Knkk;
import com.huazheng.project.hana.model.Knvp;
import com.huazheng.project.hana.model.Knvv; import com.huazheng.project.hana.model.Knvv;
import com.huazheng.project.hana.model.Konv; import com.huazheng.project.hana.model.Konv;
import com.huazheng.project.hana.model.Likp; import com.huazheng.project.hana.model.Likp;
...@@ -79,7 +81,11 @@ public interface SapMapper { ...@@ -79,7 +81,11 @@ public interface SapMapper {
public List<Afru> selectAfruNew(Afru afru); public List<Afru> selectAfruNew(Afru afru);
public List<Tspat> selectTspatNew(Tspat tspat); public List<Tspat> selectTspatNew(Tspat tspat);
public List<Zpoedit> selectZpoeditNew(Zpoedit zpoedit); public List<Zpoedit> selectZpoeditNew(Zpoedit zpoedit);
public List<Ausp> selectAuspNew(Ausp ausp);
public List<Knvp> selectKnvpNew(Knvp knvp);
public Knvp selectKnvpById(Knvp target);
public Ausp selectAuspById(Ausp target);
public Zpoedit selectZpoeditById(Zpoedit target); public Zpoedit selectZpoeditById(Zpoedit target);
public Tspat selectTspatById(Tspat target); public Tspat selectTspatById(Tspat target);
public Afru selectAfruById(Afru target); public Afru selectAfruById(Afru target);
......
package com.huazheng.project.hana.model;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
@Builder
public class Ausp implements Serializable {
private static final long serialVersionUID = 1L;
/* === 组合键 === */
private String mandt; // 集团
private String objek; // 对象码
private String atinn; // 内部特征
private String atzhl; // 特征值计数器
private String mafid; // 标识:对象/类
private String klart; // 类别种类
private String adzhl; // 内部计数器
/* === 组合键 === */
private String atwrt; // 特征值计数器
private Long rowids; // sap那边的rowid
private boolean exist; // 用于标记,不是字段
private String hashResult; // 数据hash标记
private String rowNum; // 用于标记,不是字段
}
package com.huazheng.project.hana.model;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
@Builder
public class Knvp implements Serializable {
private static final long serialVersionUID = 1L;
/* === 组合键 === */
private String mandt; // 集团
private String kunnr; // 客户编号
private String vkorg; // 销售组织
private String vtweg; // 分销渠道
private String spart; // 产品组
private String parvw; // 合作伙伴功能
private String parza; // 合作伙伴计数器
/* === 组合键 === */
private String kunn2; // 业务伙伴的客户号
private Integer pernr; // 人员编号
private String knref; // 业务合作伙伴的客户描述
private Long rowids; // sap那边的rowid
private boolean exist; // 用于标记,不是字段
private String hashResult; // 数据hash标记
private String rowNum; // 用于标记,不是字段
}
...@@ -57,6 +57,8 @@ public class Vbak implements Serializable { ...@@ -57,6 +57,8 @@ public class Vbak implements Serializable {
private String aedat; // 更新日期 private String aedat; // 更新日期
private String spart; // 产品组
private String erdat; // 输入日期 private String erdat; // 输入日期
private String erzet; // 输入日期 private String erzet; // 输入日期
......
...@@ -33,6 +33,7 @@ public class Vbep implements Serializable { ...@@ -33,6 +33,7 @@ public class Vbep implements Serializable {
private String mandt; // 集团 private String mandt; // 集团
private String etenr; // 交货计划行号 private String etenr; // 交货计划行号
private String aufnr; // 生产订单号
private Vbap vbap; private Vbap vbap;
private VbapAdv vbapAdv; private VbapAdv vbapAdv;
......
update vbap set hashresult = null;
update vbap set hashresult = null;
redis-cli -n 1 --raw keys "huazheng:check:Vbap:rowNum" | xargs redis-cli -n 1 del
drop table ausp;
CREATE TABLE ausp (
mandt text,
objek text,
atinn text,
atzhl text,
mafid text,
klart text,
adzhl text,
atwrt text,
hashResult text,
rowNum serial,
PRIMARY KEY (mandt,objek,atinn,atzhl,mafid,klart,adzhl)
)
Distributed by (mandt,objek,atinn,atzhl,mafid,klart,adzhl);
drop table knvp;
CREATE TABLE knvp (
mandt text,
kunnr text,
vkorg text,
vtweg text,
spart text,
parvw text,
parza text,
kunn2 text,
pernr integer,
knref text,
hashResult text,
rowNum serial,
PRIMARY KEY (mandt,kunnr,vkorg,vtweg,spart,parvw,parza)
)
Distributed by (mandt,kunnr,vkorg,vtweg,spart,parvw,parza);
...@@ -34,3 +34,4 @@ alter table vbak add column erdat2erzet timestamp; ...@@ -34,3 +34,4 @@ alter table vbak add column erdat2erzet timestamp;
alter table vbak add column erdat text; alter table vbak add column erdat text;
alter table vbak add column erzet text; alter table vbak add column erzet text;
alter table vbak add column spart text;
...@@ -298,3 +298,7 @@ alter table vbapadv drop column vbpa_posnr; ...@@ -298,3 +298,7 @@ alter table vbapadv drop column vbpa_posnr;
alter table vbapadv drop column vbpa_parvw; alter table vbapadv drop column vbpa_parvw;
alter table vbapadv drop column vbpa_pernr; alter table vbapadv drop column vbpa_pernr;
alter table vbapadv add column pernr text; alter table vbapadv add column pernr text;
alter table vbapadv add column aufk_auart text;
alter table vbapadv add column aufk_werks text;
...@@ -13,5 +13,6 @@ PRIMARY KEY (mandt, vbeln, posnr, etenr) ...@@ -13,5 +13,6 @@ PRIMARY KEY (mandt, vbeln, posnr, etenr)
) )
Distributed by (mandt, vbeln, posnr, etenr); Distributed by (mandt, vbeln, posnr, etenr);
alter table vbep add column aufnr text;
alter table vbep add column hashResult text; alter table vbep add column hashResult text;
alter table vbep add column rowNum serial; alter table vbep add column rowNum serial;
...@@ -31,3 +31,4 @@ alter table vbrk add column hashResult text; ...@@ -31,3 +31,4 @@ alter table vbrk add column hashResult text;
alter table vbrk add column rowNum serial; alter table vbrk add column rowNum serial;
alter table vbrk add column aedat text; alter table vbrk add column aedat text;
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论