提交 a8a221a3 作者: think

添加表 afru tspat zpoedit

上级 2e159274
......@@ -18,6 +18,7 @@ import com.huazheng.project.greenplum.richsink.GreenPlumRichSinkFunction;
import com.huazheng.project.greenplum.service.impl.GPServiceImpl;
import com.huazheng.project.greenplum.source.hana.AfkoSource;
import com.huazheng.project.greenplum.source.hana.AfpoSource;
import com.huazheng.project.greenplum.source.hana.AfruSource;
import com.huazheng.project.greenplum.source.hana.AfvcSource;
import com.huazheng.project.greenplum.source.hana.AufkSource;
import com.huazheng.project.greenplum.source.hana.AufmSource;
......@@ -39,6 +40,7 @@ import com.huazheng.project.greenplum.source.hana.S066Source;
import com.huazheng.project.greenplum.source.hana.S067Source;
import com.huazheng.project.greenplum.source.hana.T001wSource;
import com.huazheng.project.greenplum.source.hana.T023tSource;
import com.huazheng.project.greenplum.source.hana.TspatSource;
import com.huazheng.project.greenplum.source.hana.TvkbtSource;
import com.huazheng.project.greenplum.source.hana.VbakSource;
import com.huazheng.project.greenplum.source.hana.VbapSource;
......@@ -48,6 +50,7 @@ import com.huazheng.project.greenplum.source.hana.VbrkSource;
import com.huazheng.project.greenplum.source.hana.VbrpSource;
import com.huazheng.project.greenplum.source.hana.VbukSource;
import com.huazheng.project.greenplum.source.hana.ZmdpcSource;
import com.huazheng.project.greenplum.source.hana.ZpoeditSource;
import com.huazheng.project.greenplum.source.hana.Zsd06Source;
import com.huazheng.project.greenplum.source.hana.ZsdfhzlSource;
import com.huazheng.project.greenplum.source.mssql.SalesContractProcessSource;
......@@ -67,6 +70,7 @@ import com.huazheng.project.greenplum.source.mysql.TransformHistoryNodeSource;
import com.huazheng.project.greenplum.source.mysql.TransformNewNodeSource;
import com.huazheng.project.hana.model.Afko;
import com.huazheng.project.hana.model.Afpo;
import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm;
......@@ -88,6 +92,7 @@ import com.huazheng.project.hana.model.S066;
import com.huazheng.project.hana.model.S067;
import com.huazheng.project.hana.model.T001w;
import com.huazheng.project.hana.model.T023t;
import com.huazheng.project.hana.model.Tspat;
import com.huazheng.project.hana.model.Tvkbt;
import com.huazheng.project.hana.model.Vbak;
import com.huazheng.project.hana.model.Vbap;
......@@ -97,6 +102,7 @@ import com.huazheng.project.hana.model.Vbrk;
import com.huazheng.project.hana.model.Vbrp;
import com.huazheng.project.hana.model.Vbuk;
import com.huazheng.project.hana.model.Zmdpc;
import com.huazheng.project.hana.model.Zpoedit;
import com.huazheng.project.hana.model.Zsd06;
import com.huazheng.project.hana.model.Zsdfhzl;
import com.huazheng.project.mssql.model.SalesContractProcess;
......@@ -197,6 +203,9 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
private static Kna1Source kna1Source;
private static KnvvSource knvvSource;
private static AfvcSource afvcSource;
private static TspatSource tspatSource;
private static AfruSource afruSource;
private static ZpoeditSource zpoeditSource;
private static MkpfSource mkpfSource;
private static KnkkSource knkkSource;
private static MaraSource maraSource;
......@@ -261,6 +270,9 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
kna1Source = (Kna1Source) context.getBean("kna1Source");
knvvSource = (KnvvSource) context.getBean("knvvSource");
afvcSource = (AfvcSource) context.getBean("afvcSource");
afruSource = (AfruSource) context.getBean("afruSource");
tspatSource = (TspatSource) context.getBean("tspatSource");
zpoeditSource = (ZpoeditSource) context.getBean("zpoeditSource");
mkpfSource = (MkpfSource) context.getBean("mkpfSource");
knkkSource = (KnkkSource) context.getBean("knkkSource");
maraSource = (MaraSource) context.getBean("maraSource");
......@@ -909,6 +921,9 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
env.addSource(afvcSource).setParallelism(1).name("输入Afvc队列").flatMap(greenPlumFlatMapFunction).returns(Afvc.class).setParallelism(1).name("拉取Afvc数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Afvc数据");
env.addSource(mkpfSource).setParallelism(1).name("输入Mkpf队列").flatMap(greenPlumFlatMapFunction).returns(Mkpf.class).setParallelism(1).name("拉取Mkpf数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Mkpf数据");
env.addSource(msegSource).setParallelism(1).name("输入Mseg队列").flatMap(greenPlumFlatMapFunction).returns(Mseg.class).setParallelism(1).name("拉取Mseg数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Mseg数据");
env.addSource(afruSource).setParallelism(1).name("输入Afru队列").flatMap(greenPlumFlatMapFunction).returns(Afru.class).setParallelism(1).name("拉取Afru数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Afru数据");
env.addSource(tspatSource).setParallelism(1).name("输入Tspat队列").flatMap(greenPlumFlatMapFunction).returns(Tspat.class).setParallelism(1).name("拉取Tspat数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Tspat数据");
env.addSource(zpoeditSource).setParallelism(1).name("输入Zpoedit队列").flatMap(greenPlumFlatMapFunction).returns(Zpoedit.class).setParallelism(1).name("拉取zpoedit数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Zpoedit数据");
env.execute("华正数据迁移任务");
}
......
......@@ -10,6 +10,7 @@ import org.springframework.cache.annotation.Cacheable;
import com.huazheng.project.hana.model.Afko;
import com.huazheng.project.hana.model.Afpo;
import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm;
......@@ -32,6 +33,7 @@ import com.huazheng.project.hana.model.S066;
import com.huazheng.project.hana.model.S067;
import com.huazheng.project.hana.model.T001w;
import com.huazheng.project.hana.model.T023t;
import com.huazheng.project.hana.model.Tspat;
import com.huazheng.project.hana.model.Tvkbt;
import com.huazheng.project.hana.model.Vbak;
import com.huazheng.project.hana.model.Vbap;
......@@ -42,6 +44,7 @@ import com.huazheng.project.hana.model.Vbrk;
import com.huazheng.project.hana.model.Vbrp;
import com.huazheng.project.hana.model.Vbuk;
import com.huazheng.project.hana.model.Zmdpc;
import com.huazheng.project.hana.model.Zpoedit;
import com.huazheng.project.hana.model.Zsd06;
import com.huazheng.project.hana.model.Zsdfhzl;
import com.huazheng.project.mssql.model.SalesContractProcess;
......@@ -66,6 +69,33 @@ import com.huazheng.project.mysql.model.TransformNewNode;
@CacheConfig(cacheNames = "huazheng")
public interface GPMapper {
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.pwerk+','+#p0.aufnr", unless="#result == null")
public Zpoedit selectZpoedit(Zpoedit zpoedit); // 查询替代删除
public void insertZpoedit(Zpoedit element);
@CacheEvict(key = "'selectZpoedit'+':'+#p0.mandt+','+#p0.pwerk+','+#p0.aufnr")
public void deleteZpoedit(Zpoedit item);
@CacheEvict(key = "'selectZpoedit'+':'+#p0.mandt+','+#p0.pwerk+','+#p0.aufnr")
public void updateZpoedit(Zpoedit element);
public List<Zpoedit> selectZpoeditCheck(Zpoedit build);
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.spras+','+#p0.spart", unless="#result == null")
public Tspat selectTspat(Tspat tspat); // 查询替代删除
public void insertTspat(Tspat element);
@CacheEvict(key = "'selectTspat'+':'+#p0.mandt+','+#p0.spras+','+#p0.spart")
public void deleteTspat(Tspat item);
@CacheEvict(key = "'selectTspat'+':'+#p0.mandt+','+#p0.spras+','+#p0.spart")
public void updateTspat(Tspat element);
public List<Tspat> selectTspatCheck(Tspat build);
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.rueck+','+#p0.rmzhl", unless="#result == null")
public Afru selectAfru(Afru afru); // 查询替代删除
public void insertAfru(Afru element);
@CacheEvict(key = "'selectAfru'+':'+#p0.mandt+','+#p0.rueck+','+#p0.rmzhl")
public void deleteAfru(Afru item);
@CacheEvict(key = "'selectAfru'+':'+#p0.mandt+','+#p0.rueck+','+#p0.rmzhl")
public void updateAfru(Afru element);
public List<Afru> selectAfruCheck(Afru build);
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.mblnr+','+#p0.mjahr+','+#p0.zeile", unless="#result == null")
public Mseg selectMseg(Mseg mseg); // 查询替代删除
public void insertMseg(Mseg element);
......
......@@ -18,6 +18,7 @@ import com.huazheng.project.greenplum.mapper.GPMapper;
import com.huazheng.project.hana.mapper.SapMapper;
import com.huazheng.project.hana.model.Afko;
import com.huazheng.project.hana.model.Afpo;
import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm;
......@@ -40,6 +41,7 @@ import com.huazheng.project.hana.model.S066;
import com.huazheng.project.hana.model.S067;
import com.huazheng.project.hana.model.T001w;
import com.huazheng.project.hana.model.T023t;
import com.huazheng.project.hana.model.Tspat;
import com.huazheng.project.hana.model.Tvkbt;
import com.huazheng.project.hana.model.Vbak;
import com.huazheng.project.hana.model.Vbap;
......@@ -50,6 +52,7 @@ import com.huazheng.project.hana.model.Vbrk;
import com.huazheng.project.hana.model.Vbrp;
import com.huazheng.project.hana.model.Vbuk;
import com.huazheng.project.hana.model.Zmdpc;
import com.huazheng.project.hana.model.Zpoedit;
import com.huazheng.project.hana.model.Zsd06;
import com.huazheng.project.hana.model.Zsdfhzl;
import com.huazheng.project.mssql.mapper.CrmMapper;
......@@ -960,6 +963,9 @@ public class DeleteUpdateJobServiceImpl {
selectAfvcCheck();
selectMkpfCheck();
selectMsegCheck();
selectAfruCheck();
selectTspatCheck();
selectZpoeditCheck();
}
private void selectAfkoCheck() {
try {
......@@ -1467,6 +1473,135 @@ public class DeleteUpdateJobServiceImpl {
}
}
private void selectAfruCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Afru:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Afru:rowNum");
Afru build = Afru.builder().rowNum(rowNum).build();
List<Afru> list = gpMapper.selectAfruCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:check:Afru:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Afru source = sapMapper.selectAfruById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteAfru(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
while (true) {
try {
gpMapper.updateAfru(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:check:Afru:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectAfruCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Afru:rowNum", getErrorInfoFromException(e));
}
}
private void selectTspatCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Tspat:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Tspat:rowNum");
Tspat build = Tspat.builder().rowNum(rowNum).build();
List<Tspat> list = gpMapper.selectTspatCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:check:Tspat:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Tspat source = sapMapper.selectTspatById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteTspat(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
while (true) {
try {
gpMapper.updateTspat(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:check:Tspat:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectTspatCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Tspat:rowNum", getErrorInfoFromException(e));
}
}
private void selectZpoeditCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Zpoedit:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Zpoedit:rowNum");
Zpoedit build = Zpoedit.builder().rowNum(rowNum).build();
List<Zpoedit> list = gpMapper.selectZpoeditCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:check:Zpoedit:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Zpoedit source = sapMapper.selectZpoeditById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteZpoedit(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
while (true) {
try {
gpMapper.updateZpoedit(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:check:Zpoedit:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectZpoeditCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Zpoedit:rowNum", getErrorInfoFromException(e));
}
}
// 数据量过大不再执行任务
private void selectKonvCheck() {
try {
......
......@@ -14,6 +14,7 @@ import org.springframework.stereotype.Service;
import com.huazheng.project.greenplum.mapper.GPMapper;
import com.huazheng.project.hana.model.Afko;
import com.huazheng.project.hana.model.Afpo;
import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm;
......@@ -35,6 +36,7 @@ import com.huazheng.project.hana.model.S066;
import com.huazheng.project.hana.model.S067;
import com.huazheng.project.hana.model.T001w;
import com.huazheng.project.hana.model.T023t;
import com.huazheng.project.hana.model.Tspat;
import com.huazheng.project.hana.model.Tvkbt;
import com.huazheng.project.hana.model.Vbak;
import com.huazheng.project.hana.model.Vbap;
......@@ -45,6 +47,7 @@ import com.huazheng.project.hana.model.Vbrk;
import com.huazheng.project.hana.model.Vbrp;
import com.huazheng.project.hana.model.Vbuk;
import com.huazheng.project.hana.model.Zmdpc;
import com.huazheng.project.hana.model.Zpoedit;
import com.huazheng.project.hana.model.Zsd06;
import com.huazheng.project.hana.model.Zsdfhzl;
import com.huazheng.project.mssql.mapper.CrmMapper;
......@@ -2028,4 +2031,85 @@ public class GPServiceImpl {
}
}
public void processAfru(Afru data, Collector<Afru> out) {
try {
Afru exist = gpMapper.selectAfru(data);
if (exist != null) {
data.setExist(true); // 已经在库
}
out.collect(data);
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:Afru:error", "processAfru", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
public void sinkAfru(Afru element) {
try {
if (element.isExist() == false) {
log.debug("GPServiceImpl.sinkAfru()");
gpMapper.insertAfru(element);
}
} catch (RuntimeException e) {
redis1Template.opsForHash().put("huazheng:Afru:error", "sinkAfru", getErrorInfoFromException(e));
log.error(e.getMessage());
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:Afru:error", "sinkAfru", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
public void processTspat(Tspat data, Collector<Tspat> out) {
try {
Tspat exist = gpMapper.selectTspat(data);
if (exist != null) {
data.setExist(true); // 已经在库
}
out.collect(data);
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:Tspat:error", "processTspat", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
public void sinkTspat(Tspat element) {
try {
if (element.isExist() == false) {
log.debug("GPServiceImpl.sinkTspat()");
gpMapper.insertTspat(element);
}
} catch (RuntimeException e) {
redis1Template.opsForHash().put("huazheng:Tspat:error", "sinkTspat", getErrorInfoFromException(e));
log.error(e.getMessage());
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:Tspat:error", "sinkTspat", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
public void processZpoedit(Zpoedit data, Collector<Zpoedit> out) {
try {
Zpoedit exist = gpMapper.selectZpoedit(data);
if (exist != null) {
data.setExist(true); // 已经在库
}
out.collect(data);
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:Zpoedit:error", "processZpoedit", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
public void sinkZpoedit(Zpoedit element) {
try {
if (element.isExist() == false) {
log.debug("GPServiceImpl.sinkZpoedit()");
gpMapper.insertZpoedit(element);
}
} catch (RuntimeException e) {
redis1Template.opsForHash().put("huazheng:Zpoedit:error", "sinkZpoedit", getErrorInfoFromException(e));
log.error(e.getMessage());
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:Zpoedit:error", "sinkZpoedit", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
}
......@@ -16,6 +16,7 @@ import org.springframework.stereotype.Service;
import com.huazheng.project.hana.mapper.SapMapper;
import com.huazheng.project.hana.model.Afko;
import com.huazheng.project.hana.model.Afpo;
import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm;
......@@ -37,6 +38,7 @@ import com.huazheng.project.hana.model.S066;
import com.huazheng.project.hana.model.S067;
import com.huazheng.project.hana.model.T001w;
import com.huazheng.project.hana.model.T023t;
import com.huazheng.project.hana.model.Tspat;
import com.huazheng.project.hana.model.Tvkbt;
import com.huazheng.project.hana.model.Vbak;
import com.huazheng.project.hana.model.Vbap;
......@@ -46,6 +48,7 @@ import com.huazheng.project.hana.model.Vbrk;
import com.huazheng.project.hana.model.Vbrp;
import com.huazheng.project.hana.model.Vbuk;
import com.huazheng.project.hana.model.Zmdpc;
import com.huazheng.project.hana.model.Zpoedit;
import com.huazheng.project.hana.model.Zsd06;
import com.huazheng.project.hana.model.Zsdfhzl;
import com.huazheng.project.mssql.mapper.CrmMapper;
......@@ -124,6 +127,9 @@ public class JobServiceImpl {
selectAfkoNew();
selectAfpoNew();
selectAfvcNew();
selectAfruNew();
selectTspatNew();
selectZpoeditNew();
selectMkpfNew();
selectMsegNew();
selectVbepNew();
......@@ -567,6 +573,84 @@ public class JobServiceImpl {
}
}
private void selectAfruNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap4send.lua")));
List<String> keys = Arrays.asList("huazheng:Afru:sendcount", "huazheng:Afru:rowids", "huazheng:list:Afru");
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:Afru:sendcount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:Afru:receivecount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:Afru:rowids", "0"); // 不存在则创建,存在则么有操作
Long sendcount = Long.valueOf(opsForValue.get("huazheng:Afru:sendcount"));
Long receivecount = Long.valueOf(opsForValue.get("huazheng:Afru:receivecount"));
if (sendcount - receivecount <= 20) { // 如果发送数和消费数的差小于5则往队列中写数据
String rowids = opsForValue.get("huazheng:Afru:rowids"); // 标记id
Afru afru = Afru.builder().rowids(Long.valueOf(rowids)).build();
List<Afru> list = sapMapper.selectAfruNew(afru);
if (!list.isEmpty()) {
list.forEach(item -> {
JSONObject json = JSONUtil.parseObj(item, false);
String execute = redis1Template.execute(script, keys, item.getRowids().toString(), json.toString());
log.info("标记时间回写 --> " + execute);
});
}
}
}
private void selectTspatNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap4send.lua")));
List<String> keys = Arrays.asList("huazheng:Tspat:sendcount", "huazheng:Tspat:rowids", "huazheng:list:Tspat");
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:Tspat:sendcount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:Tspat:receivecount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:Tspat:rowids", "0"); // 不存在则创建,存在则么有操作
Long sendcount = Long.valueOf(opsForValue.get("huazheng:Tspat:sendcount"));
Long receivecount = Long.valueOf(opsForValue.get("huazheng:Tspat:receivecount"));
if (sendcount - receivecount <= 20) { // 如果发送数和消费数的差小于5则往队列中写数据
String rowids = opsForValue.get("huazheng:Tspat:rowids"); // 标记id
Tspat tspat = Tspat.builder().rowids(Long.valueOf(rowids)).build();
List<Tspat> list = sapMapper.selectTspatNew(tspat);
if (!list.isEmpty()) {
list.forEach(item -> {
JSONObject json = JSONUtil.parseObj(item, false);
String execute = redis1Template.execute(script, keys, item.getRowids().toString(), json.toString());
log.info("标记时间回写 --> " + execute);
});
}
}
}
private void selectZpoeditNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap4send.lua")));
List<String> keys = Arrays.asList("huazheng:Zpoedit:sendcount", "huazheng:Zpoedit:rowids", "huazheng:list:Zpoedit");
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:Zpoedit:sendcount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:Zpoedit:receivecount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:Zpoedit:rowids", "0"); // 不存在则创建,存在则么有操作
Long sendcount = Long.valueOf(opsForValue.get("huazheng:Zpoedit:sendcount"));
Long receivecount = Long.valueOf(opsForValue.get("huazheng:Zpoedit:receivecount"));
if (sendcount - receivecount <= 20) { // 如果发送数和消费数的差小于5则往队列中写数据
String rowids = opsForValue.get("huazheng:Zpoedit:rowids"); // 标记id
Zpoedit zpoedit = Zpoedit.builder().rowids(Long.valueOf(rowids)).build();
List<Zpoedit> list = sapMapper.selectZpoeditNew(zpoedit);
if (!list.isEmpty()) {
list.forEach(item -> {
JSONObject json = JSONUtil.parseObj(item, false);
String execute = redis1Template.execute(script, keys, item.getRowids().toString(), json.toString());
log.info("标记时间回写 --> " + execute);
});
}
}
}
private void selectMkpfNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
......
package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream;
import com.huazheng.project.hana.model.Afru;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONUtil;
import lombok.extern.log4j.Log4j2;
@Log4j2
@Service
public class AfruSource implements SourceFunction<Afru> {
private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) {
try {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) {
return "bad getErrorInfoFromException";
}
}
@Override
public void run(SourceContext<Afru> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Afru:sendcount", "huazheng:Afru:id", "huazheng:list:Afru", "huazheng:Afru:receivecount");
while (true) {
try {
String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("==========");
String checkString = values[0];
String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check);
Afru data = JSONUtil.toBean(values[1], Afru.class);
ctx.collect(data);
} else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
} catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Afru:error", "receivecount_elseerror", getErrorInfoFromException(e));
}
}
}
@Override
public void cancel() {
}
}
package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream;
import com.huazheng.project.hana.model.Tspat;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONUtil;
import lombok.extern.log4j.Log4j2;
@Log4j2
@Service
public class TspatSource implements SourceFunction<Tspat> {
private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) {
try {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) {
return "bad getErrorInfoFromException";
}
}
@Override
public void run(SourceContext<Tspat> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Tspat:sendcount", "huazheng:Tspat:id", "huazheng:list:Tspat", "huazheng:Tspat:receivecount");
while (true) {
try {
String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("==========");
String checkString = values[0];
String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check);
Tspat data = JSONUtil.toBean(values[1], Tspat.class);
ctx.collect(data);
} else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
} catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Tspat:error", "receivecount_elseerror", getErrorInfoFromException(e));
}
}
}
@Override
public void cancel() {
}
}
package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream;
import com.huazheng.project.hana.model.Zpoedit;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONUtil;
import lombok.extern.log4j.Log4j2;
@Log4j2
@Service
public class ZpoeditSource implements SourceFunction<Zpoedit> {
private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) {
try {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) {
return "bad getErrorInfoFromException";
}
}
@Override
public void run(SourceContext<Zpoedit> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Zpoedit:sendcount", "huazheng:Zpoedit:id", "huazheng:list:Zpoedit", "huazheng:Zpoedit:receivecount");
while (true) {
try {
String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("==========");
String checkString = values[0];
String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check);
Zpoedit data = JSONUtil.toBean(values[1], Zpoedit.class);
ctx.collect(data);
} else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
} catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Zpoedit:error", "receivecount_elseerror", getErrorInfoFromException(e));
}
}
}
@Override
public void cancel() {
}
}
......@@ -4,6 +4,7 @@ import java.util.List;
import com.huazheng.project.hana.model.Afko;
import com.huazheng.project.hana.model.Afpo;
import com.huazheng.project.hana.model.Afru;
import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Aufm;
......@@ -25,6 +26,7 @@ import com.huazheng.project.hana.model.S066;
import com.huazheng.project.hana.model.S067;
import com.huazheng.project.hana.model.T001w;
import com.huazheng.project.hana.model.T023t;
import com.huazheng.project.hana.model.Tspat;
import com.huazheng.project.hana.model.Tvkbt;
import com.huazheng.project.hana.model.Vbak;
import com.huazheng.project.hana.model.Vbap;
......@@ -34,6 +36,7 @@ import com.huazheng.project.hana.model.Vbrk;
import com.huazheng.project.hana.model.Vbrp;
import com.huazheng.project.hana.model.Vbuk;
import com.huazheng.project.hana.model.Zmdpc;
import com.huazheng.project.hana.model.Zpoedit;
import com.huazheng.project.hana.model.Zsd06;
import com.huazheng.project.hana.model.Zsdfhzl;
......@@ -73,7 +76,13 @@ public interface SapMapper {
public List<Zsdfhzl> selectZsdfhzlNew(Zsdfhzl zsdfhzl);
public List<Mkpf> selectMkpfNew(Mkpf mkpf);
public List<Mseg> selectMsegNew(Mseg mseg);
public List<Afru> selectAfruNew(Afru afru);
public List<Tspat> selectTspatNew(Tspat tspat);
public List<Zpoedit> selectZpoeditNew(Zpoedit zpoedit);
public Zpoedit selectZpoeditById(Zpoedit target);
public Tspat selectTspatById(Tspat target);
public Afru selectAfruById(Afru target);
public Mseg selectMsegById(Mseg target);
public Mkpf selectMkpfById(Mkpf target);
public Afko selectAfkoById(Afko target);
......
package com.huazheng.project.hana.model;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
@Builder
public class Afru implements Serializable {
private static final long serialVersionUID = 1L;
/* === 组合键 === */
private String mandt; // 集团
private String rueck; // 操作完成的确认编号
private String rmzhl; // 确认计数器
/* === 组合键 === */
private String ersda; // 确认输入日期
private String wablnr; // 控制码
private String aufnr; // 物料凭证编号
private String stokz; // 标识:凭证已被冲销
private String stzhl; // 被取消确认的确认计数器
private Long rowids; // sap那边的rowid
private boolean exist; // 用于标记,不是字段
private String hashResult; // 数据hash标记
private String rowNum; // 用于标记,不是字段
}
package com.huazheng.project.hana.model;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
@Builder
public class Tspat implements Serializable {
private static final long serialVersionUID = 1L;
/* === 组合键 === */
private String mandt; // 集团
private String spras; // 语言代码
private String spart; // 产品组
/* === 组合键 === */
private String vtext; // 名称
private Long rowids; // sap那边的rowid
private boolean exist; // 用于标记,不是字段
private String hashResult; // 数据hash标记
private String rowNum; // 用于标记,不是字段
}
package com.huazheng.project.hana.model;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
@Builder
public class Zpoedit implements Serializable {
private static final long serialVersionUID = 1L;
/* === 组合键 === */
private String mandt; // 集团
private String pwerk; // 订单的计划工厂
private String aufnr; // 订单号
/* === 组合键 === */
private String reason; // 修改原因
private Long rowids; // sap那边的rowid
private boolean exist; // 用于标记,不是字段
private String hashResult; // 数据hash标记
private String rowNum; // 用于标记,不是字段
}
drop table afru;
CREATE TABLE afru (
mandt text,
rueck text,
rmzhl text,
ersda text,
wablnr text,
aufnr text,
stokz text,
stzhl text,
hashResult text,
rowNum serial,
PRIMARY KEY (mandt,rueck,rmzhl)
)
Distributed by (mandt,rueck,rmzhl);
drop table tspat;
CREATE TABLE tspat (
mandt text,
spras text,
spart text,
vtext text,
hashResult text,
rowNum serial,
PRIMARY KEY (mandt,spras,spart)
)
Distributed by (mandt,spras,spart);
drop table zpoedit;
CREATE TABLE zpoedit (
mandt text,
pwerk text,
aufnr text,
reason text,
hashResult text,
rowNum serial,
PRIMARY KEY (mandt,pwerk,aufnr)
)
Distributed by (mandt,pwerk,aufnr);
......@@ -631,6 +631,65 @@
select * from Afvc where rownum &gt; #{rowNum} order by rownum limit 20
</select>
<select id="selectAfru" parameterType="com.huazheng.project.hana.model.Afru" resultType="com.huazheng.project.hana.model.Afru">
select * from Afru where mandt = #{mandt} and rueck = #{rueck} and rmzhl = #{rmzhl}
</select>
<insert id="insertAfru" parameterType="com.huazheng.project.hana.model.Afru">
insert into Afru (mandt, rueck, rmzhl, ersda, wablnr, aufnr, stokz, stzhl, hashResult)
values(#{mandt},#{rueck},#{rmzhl},#{ersda},#{wablnr},#{aufnr},#{stokz},#{stzhl},#{hashResult})
</insert>
<delete id="deleteAfru" parameterType="com.huazheng.project.hana.model.Afru">
delete from Afru where mandt = #{mandt} and rueck = #{rueck} and rmzhl = #{rmzhl}
</delete>
<update id="updateAfru" parameterType="com.huazheng.project.hana.model.Afru">
update Afru set
mandt = #{mandt}, rueck = #{rueck}, rmzhl = #{rmzhl}, ersda = #{ersda}, wablnr = #{wablnr}, aufnr = #{aufnr}, stokz = #{stokz}, stzhl = #{stzhl}, hashResult = #{hashResult}
where mandt = #{mandt} and rueck = #{rueck} and rmzhl = #{rmzhl}
</update>
<select id="selectAfruCheck" parameterType="com.huazheng.project.hana.model.Afru" resultType="com.huazheng.project.hana.model.Afru">
select * from Afru where rownum &gt; #{rowNum} order by rownum limit 20
</select>
<select id="selectTspat" parameterType="com.huazheng.project.hana.model.Tspat" resultType="com.huazheng.project.hana.model.Tspat">
select * from Tspat where mandt = #{mandt} and spras = #{spras} and spart = #{spart}
</select>
<insert id="insertTspat" parameterType="com.huazheng.project.hana.model.Tspat">
insert into Tspat (mandt, spras, spart, vtext, hashResult)
values(#{mandt},#{spras},#{spart},#{vtext},#{hashResult})
</insert>
<delete id="deleteTspat" parameterType="com.huazheng.project.hana.model.Tspat">
delete from Tspat where mandt = #{mandt} and spras = #{spras} and spart = #{spart}
</delete>
<update id="updateTspat" parameterType="com.huazheng.project.hana.model.Tspat">
update Tspat set
mandt = #{mandt}, spras = #{spras}, spart = #{spart}, vtext = #{vtext}, hashResult = #{hashResult}
where mandt = #{mandt} and spras = #{spras} and spart = #{spart}
</update>
<select id="selectTspatCheck" parameterType="com.huazheng.project.hana.model.Tspat" resultType="com.huazheng.project.hana.model.Tspat">
select * from Tspat where rownum &gt; #{rowNum} order by rownum limit 20
</select>
<select id="selectZpoedit" parameterType="com.huazheng.project.hana.model.Zpoedit" resultType="com.huazheng.project.hana.model.Zpoedit">
select * from Zpoedit where mandt = #{mandt} and pwerk = #{pwerk} and aufnr = #{aufnr}
</select>
<insert id="insertZpoedit" parameterType="com.huazheng.project.hana.model.Zpoedit">
insert into Zpoedit (mandt, pwerk, aufnr, reason, hashResult)
values(#{mandt},#{pwerk},#{aufnr},#{reason},#{hashResult})
</insert>
<delete id="deleteZpoedit" parameterType="com.huazheng.project.hana.model.Zpoedit">
delete from Zpoedit where mandt = #{mandt} and pwerk = #{pwerk} and aufnr = #{aufnr}
</delete>
<update id="updateZpoedit" parameterType="com.huazheng.project.hana.model.Zpoedit">
update Zpoedit set
mandt = #{mandt}, pwerk = #{pwerk}, aufnr = #{aufnr}, reason = #{reason}, hashResult = #{hashResult}
where mandt = #{mandt} and pwerk = #{pwerk} and aufnr = #{aufnr}
</update>
<select id="selectZpoeditCheck" parameterType="com.huazheng.project.hana.model.Zpoedit" resultType="com.huazheng.project.hana.model.Zpoedit">
select * from Zpoedit where rownum &gt; #{rowNum} order by rownum limit 20
</select>
<select id="selectMseg" parameterType="com.huazheng.project.hana.model.Mseg" resultType="com.huazheng.project.hana.model.Mseg">
select * from Mseg where mandt = #{mandt} and mblnr = #{mblnr} and mjahr = #{mjahr} and zeile = #{zeile}
</select>
......
......@@ -200,6 +200,31 @@
order by "$rowid$"
</select>
<select id="selectAfruNew" parameterType="Afru" resultType="Afru">
select top 20 "$rowid$" as rowids,
mandt, rueck, rmzhl, ersda, wablnr, aufnr, stokz, stzhl
from ${hana_user}.Afru
where "$rowid$" &gt; #{rowids} ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectTspatNew" parameterType="Tspat" resultType="Tspat">
select top 20 "$rowid$" as rowids,
mandt, spras, spart, vtext
from ${hana_user}.Tspat
where "$rowid$" &gt; #{rowids} ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectZpoeditNew" parameterType="Zpoedit" resultType="Zpoedit">
select top 20 "$rowid$" as rowids,
mandt, pwerk, aufnr, reason
from ${hana_user}.Zpo_edit
where "$rowid$" &gt; #{rowids} ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectMkpfNew" parameterType="Mkpf" resultType="Mkpf">
select top 20 "$rowid$" as rowids,
mandt, mblnr, mjahr, bldat, cputm
......@@ -347,6 +372,24 @@
where mandt = #{mandt} and aufpl = #{aufpl} and aplzl = #{aplzl}
</select>
<select id="selectAfruById" parameterType="Afru" resultType="Afru">
select mandt, rueck, rmzhl, ersda, wablnr, aufnr, stokz, stzhl
from ${hana_user}.Afru
where mandt = #{mandt} and rueck = #{rueck} and rmzhl = #{rmzhl}
</select>
<select id="selectTspatById" parameterType="Tspat" resultType="Tspat">
select mandt, spras, spart, vtext
from ${hana_user}.Tspat
where mandt = #{mandt} and spras = #{spras} and spart = #{spart}
</select>
<select id="selectZpoeditById" parameterType="Zpoedit" resultType="Zpoedit">
select mandt, pwerk, aufnr, reason
from ${hana_user}.Zpo_edit
where mandt = #{mandt} and pwerk = #{pwerk} and aufnr = #{aufnr}
</select>
<select id="selectMkpfById" parameterType="Mkpf" resultType="Mkpf">
select mandt, mblnr, mjahr, bldat, cputm
from ${hana_user}.Mkpf
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论