提交 d01dd345 作者: guofeng

修改了增量更新的流程逻辑

上级 2c8d9d43
...@@ -223,7 +223,6 @@ public interface GPMapper { ...@@ -223,7 +223,6 @@ public interface GPMapper {
@CacheEvict(key = "'selectLikp'+':'+#p0.vbeln+','+#p0.mandt") @CacheEvict(key = "'selectLikp'+':'+#p0.vbeln+','+#p0.mandt")
public void updateLikp(Likp element); public void updateLikp(Likp element);
public List<Likp> selectLikpCheck(Likp build); public List<Likp> selectLikpCheck(Likp build);
public List<Likp> selectLikpCheckByUpdate(Likp build);
@Cacheable(key = "#root.method.name+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt", unless="#result == null") @Cacheable(key = "#root.method.name+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt", unless="#result == null")
public Lips selectLips(Lips lips); // 查询替代删除 public Lips selectLips(Lips lips); // 查询替代删除
...@@ -233,7 +232,6 @@ public interface GPMapper { ...@@ -233,7 +232,6 @@ public interface GPMapper {
@CacheEvict(key = "'selectLips'+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt") @CacheEvict(key = "'selectLips'+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt")
public void updateLips(Lips item); public void updateLips(Lips item);
public List<Lips> selectLipsCheck(Lips build); public List<Lips> selectLipsCheck(Lips build);
public List<Lips> selectLipsCheckByUpdate(Lips build);
@Cacheable(key = "#root.method.name+':'+#p0.matnr+','+#p0.mandt+','+#p0.spras", unless="#result == null") @Cacheable(key = "#root.method.name+':'+#p0.matnr+','+#p0.mandt+','+#p0.spras", unless="#result == null")
public Makt selectMakt(Makt makt); // 查询替代删除 public Makt selectMakt(Makt makt); // 查询替代删除
...@@ -270,7 +268,6 @@ public interface GPMapper { ...@@ -270,7 +268,6 @@ public interface GPMapper {
@CacheEvict(key = "'selectPa0002'+':'+#p0.mandt+','+#p0.pernr+','+#p0.subty+','+#p0.objps+','+#p0.sprps+','+#p0.endda+','+#p0.begda+','+#p0.seqnr") @CacheEvict(key = "'selectPa0002'+':'+#p0.mandt+','+#p0.pernr+','+#p0.subty+','+#p0.objps+','+#p0.sprps+','+#p0.endda+','+#p0.begda+','+#p0.seqnr")
public void deletePa0002(Pa0002 pa0002); public void deletePa0002(Pa0002 pa0002);
public List<Pa0002> selectPa0002Check(Pa0002 build); public List<Pa0002> selectPa0002Check(Pa0002 build);
public List<Pa0002> selectPa0002CheckByUpdate(Pa0002 build);
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.ssour+','+#p0.vrsio+','+#p0.kkber+','+#p0.knkli+','+#p0.spmon+','+#p0.sptags+','+#p0.spwocs+','+#p0.spbups", unless="#result == null") @Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.ssour+','+#p0.vrsio+','+#p0.kkber+','+#p0.knkli+','+#p0.spmon+','+#p0.sptags+','+#p0.spwocs+','+#p0.spbups", unless="#result == null")
public S066 selectS066(S066 so66); public S066 selectS066(S066 so66);
...@@ -316,7 +313,6 @@ public interface GPMapper { ...@@ -316,7 +313,6 @@ public interface GPMapper {
@CacheEvict(key = "'selectVbak'+':'+#p0.vbeln+','+#p0.mandt") @CacheEvict(key = "'selectVbak'+':'+#p0.vbeln+','+#p0.mandt")
public void deleteVbak(Vbak item); public void deleteVbak(Vbak item);
public List<Vbak> selectVbakCheck(Vbak build); public List<Vbak> selectVbakCheck(Vbak build);
public List<Vbak> selectVbakCheckByUpdate(Vbak build);
@Cacheable(key = "#root.method.name+':'+#p0.vbeln+','+#p0.posnr+','+#p0.etenr+','+#p0.mandt", unless="#result == null") @Cacheable(key = "#root.method.name+':'+#p0.vbeln+','+#p0.posnr+','+#p0.etenr+','+#p0.mandt", unless="#result == null")
public Vbep selectVbep(Vbep vbep); // 查询替代删除 public Vbep selectVbep(Vbep vbep); // 查询替代删除
...@@ -344,7 +340,6 @@ public interface GPMapper { ...@@ -344,7 +340,6 @@ public interface GPMapper {
@CacheEvict(key = "'selectVbrk'+':'+#p0.vbeln+','+#p0.mandt") @CacheEvict(key = "'selectVbrk'+':'+#p0.vbeln+','+#p0.mandt")
public void deleteVbrk(Vbrk item); public void deleteVbrk(Vbrk item);
public List<Vbrk> selectVbrkCheck(Vbrk build); public List<Vbrk> selectVbrkCheck(Vbrk build);
public List<Vbrk> selectVbrkCheckByUpdate(Vbrk build);
@Cacheable(key = "#root.method.name+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt", unless="#result == null") @Cacheable(key = "#root.method.name+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt", unless="#result == null")
public Vbrp selectVbrp(Vbrp item); // 查询替代删除 public Vbrp selectVbrp(Vbrp item); // 查询替代删除
...@@ -381,8 +376,6 @@ public interface GPMapper { ...@@ -381,8 +376,6 @@ public interface GPMapper {
@CacheEvict(key = "'selectZsd06'+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt") @CacheEvict(key = "'selectZsd06'+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt")
public void deleteZsd06(Zsd06 item); public void deleteZsd06(Zsd06 item);
public List<Zsd06> selectZsd06Check(Zsd06 build); public List<Zsd06> selectZsd06Check(Zsd06 build);
public List<Zsd06> selectZsd06CheckByUpdate(Zsd06 build);
@Cacheable(key = "#root.method.name+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt", unless="#result == null") @Cacheable(key = "#root.method.name+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt", unless="#result == null")
public Zsdfhzl selectZsdfhzl(Zsdfhzl zsdfhzl); // 查询替代删除 public Zsdfhzl selectZsdfhzl(Zsdfhzl zsdfhzl); // 查询替代删除
...@@ -392,7 +385,6 @@ public interface GPMapper { ...@@ -392,7 +385,6 @@ public interface GPMapper {
@CacheEvict(key = "'selectZsdfhzl'+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt") @CacheEvict(key = "'selectZsdfhzl'+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt")
public void deleteZsdfhzl(Zsdfhzl item); public void deleteZsdfhzl(Zsdfhzl item);
public List<Zsdfhzl> selectZsdfhzlCheck(Zsdfhzl build); public List<Zsdfhzl> selectZsdfhzlCheck(Zsdfhzl build);
public List<Zsdfhzl> selectZsdfhzlCheckByUpdate(Zsdfhzl build);
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.bukrs+','+#p0.kunnr+','+#p0.umsks+','+#p0.umskz+','+#p0.augdt+','+#p0.augbl+','+#p0.zuonr+','+#p0.gjahr+','+#p0.belnr+','+#p0.buzei", unless="#result == null") @Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.bukrs+','+#p0.kunnr+','+#p0.umsks+','+#p0.umskz+','+#p0.augdt+','+#p0.augbl+','+#p0.zuonr+','+#p0.gjahr+','+#p0.belnr+','+#p0.buzei", unless="#result == null")
public Bsad selectBsad(Bsad bsad); public Bsad selectBsad(Bsad bsad);
......
...@@ -936,14 +936,15 @@ public class DeleteUpdateJobServiceImpl { ...@@ -936,14 +936,15 @@ public class DeleteUpdateJobServiceImpl {
} }
public void checkJob4() { public void checkJob4() {
// selectAufmCheck(); // 17 注释掉
// selectKonvCheck(); // 21 注释掉
selectAfkoCheck(); // 14 selectAfkoCheck(); // 14
selectAfpoCheck(); // 15 selectAfpoCheck(); // 15
selectAufkCheck(); // 16 selectAufkCheck(); // 16
// selectAufmCheck(); // 17
selectKna1Check(); // 18 selectKna1Check(); // 18
selectKnkkCheck(); // 19 selectKnkkCheck(); // 19
selectKnvvCheck(); // 20 selectKnvvCheck(); // 20
// selectKonvCheck(); // 21
selectLipsCheck(); // 23 selectLipsCheck(); // 23
selectMaktCheck(); // 24 selectMaktCheck(); // 24
selectMaraCheck(); // 25 selectMaraCheck(); // 25
...@@ -1216,7 +1217,7 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1216,7 +1217,7 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Kna1:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Kna1:rowNum", getErrorInfoFromException(e));
} }
} }
// 计数器复位删除 // 计数器复位删除1
private void selectKnkkCheck() { private void selectKnkkCheck() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
...@@ -1262,7 +1263,7 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1262,7 +1263,7 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Knkk:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Knkk:rowNum", getErrorInfoFromException(e));
} }
} }
private void selectKnkkCheckUpdate() { public void selectKnkkCheckUpdate() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkUpdate:Knkk:rowids", "0"); opsForValue.setIfAbsent("huazheng:checkUpdate:Knkk:rowids", "0");
...@@ -1278,17 +1279,19 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1278,17 +1279,19 @@ public class DeleteUpdateJobServiceImpl {
slist.forEach(source -> { // 遍历源库中的数据,去目标库中查询数据 slist.forEach(source -> { // 遍历源库中的数据,去目标库中查询数据
Knkk target = gpMapper.selectKnkk(source); Knkk target = gpMapper.selectKnkk(source);
String operator = "none"; String operator = "none";
if (source == null) { // 如果源库中没有数据 Long srowids = source.getRowids();
gpMapper.deleteKnkk(target); // 删除数仓中的数据 if (target != null) {
operator = "delete"; source.setRowids(null);
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果 String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果 String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致 if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash); source.setHashResult(shash);
// ===============================
// ===============================
while (true) { while (true) {
try { try {
gpMapper.updateKnkk(source); // 更新数据到数仓中 gpMapper.updateKnkk(source); // 更新数据到数仓中
operator = "update";
break; break;
} catch (RuntimeException e) { } catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500); log.error(e.getMessage());ThreadUtil.safeSleep(500);
...@@ -1297,10 +1300,10 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1297,10 +1300,10 @@ public class DeleteUpdateJobServiceImpl {
ThreadUtil.safeSleep(500); ThreadUtil.safeSleep(500);
} }
} }
redis1Template.opsForValue().set("huazheng:checkUpdate:Knkk:rowids", target.getRowNum()); redis1Template.opsForValue().set("huazheng:checkUpdate:Knkk:rowids", srowids.toString());
if (!operator.equals("none")) { if (!operator.equals("none")) {
log.info(String.format("selectKnkkCheckUpdate --> rowids:%s, operator:%s", target.getRowNum(), operator)); log.info(String.format("selectKnkkCheckUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
} }
}); });
} catch (Exception e) { } catch (Exception e) {
...@@ -1391,7 +1394,6 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1391,7 +1394,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Afvc:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Afvc:rowNum", getErrorInfoFromException(e));
} }
} }
private void selectKnvpCheck() { private void selectKnvpCheck() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
...@@ -1434,7 +1436,6 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1434,7 +1436,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Knvp:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Knvp:rowNum", getErrorInfoFromException(e));
} }
} }
private void selectMkpfCheck() { private void selectMkpfCheck() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
...@@ -1477,7 +1478,6 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1477,7 +1478,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Mkpf:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Mkpf:rowNum", getErrorInfoFromException(e));
} }
} }
private void selectMsegCheck() { private void selectMsegCheck() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
...@@ -1520,7 +1520,6 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1520,7 +1520,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Mseg:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Mseg:rowNum", getErrorInfoFromException(e));
} }
} }
private void selectAfruCheck() { private void selectAfruCheck() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
...@@ -1563,7 +1562,6 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1563,7 +1562,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Afru:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Afru:rowNum", getErrorInfoFromException(e));
} }
} }
private void selectAuspCheck() { private void selectAuspCheck() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
...@@ -1606,7 +1604,6 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1606,7 +1604,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Ausp:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Ausp:rowNum", getErrorInfoFromException(e));
} }
} }
private void selectTspatCheck() { private void selectTspatCheck() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
...@@ -1649,7 +1646,6 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1649,7 +1646,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Tspat:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Tspat:rowNum", getErrorInfoFromException(e));
} }
} }
private void selectZpoeditCheck() { private void selectZpoeditCheck() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
...@@ -1736,7 +1732,7 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1736,7 +1732,7 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Konv:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Konv:rowNum", getErrorInfoFromException(e));
} }
} }
// 计数器复位删除 // 计数器复位删除2
private void selectLikpCheck() { private void selectLikpCheck() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
...@@ -1746,17 +1742,10 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1746,17 +1742,10 @@ public class DeleteUpdateJobServiceImpl {
List<Likp> list = gpMapper.selectLikpCheck(build); // 从数仓中查询一组数据 List<Likp> list = gpMapper.selectLikpCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步 if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
// 按更新时间查询,查询大于今天的数据,此时就不需要重置计数器了 redis1Template.opsForValue().set("huazheng:check:Likp:rowNum", "0"); // 计数器复位
String aedat = DateUtil.format(DateUtil.yesterday(), "yyyyMMdd"); // 昨天的日期 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
build = Likp.builder().rowNum(rowNum).aedat(aedat).build();
// 查询出列表20个,下面的代码运行后会复位一个阶段的rowNum,如此往复list.size()就不会为0了
list = gpMapper.selectLikpCheckByUpdate(build);
// 是否为list.size() == 0的时候的更新时间查询的列表
opsForValue.set("huazheng:check:Likp:isUpdateList", "true");
} else {
redis1Template.delete("huazheng:check:Likp:isUpdateList");
} }
list.forEach(target -> { // 遍历要检查的数据 list.forEach(target -> { // 遍历要检查的数据
Likp source = sapMapper.selectLikpById(target); // 根据主键查询源库中的数据 Likp source = sapMapper.selectLikpById(target); // 根据主键查询源库中的数据
String operator = "none"; String operator = "none";
...@@ -1784,53 +1773,65 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1784,53 +1773,65 @@ public class DeleteUpdateJobServiceImpl {
} }
} }
String mode = null; redis1Template.opsForValue().set("huazheng:check:Likp:rowNum", target.getRowNum());
String isUpdateList = opsForValue.get("huazheng:check:Likp:isUpdateList");
if (isUpdateList == null) { // 非更新模式下的列表才更新计数器
redis1Template.opsForValue().set("huazheng:check:Likp:rowNum", target.getRowNum());
mode = "计数器模式";
} else {
mode = "更新时间模式";
}
if (!operator.equals("none")) { if (!operator.equals("none")) {
log.info(String.format("selectLikpCheck --> rowNum:%s, operator:%s, %s", target.getRowNum(), operator, mode)); log.info(String.format("selectLikpCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
} }
}); });
} catch (Exception e) { } catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Likp:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Likp:rowNum", getErrorInfoFromException(e));
} }
} }
private void selectLikpCheckDelete() { public void selectLikpCheckUpdate() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Likp:rowNum", "0"); opsForValue.setIfAbsent("huazheng:checkUpdate:Likp:rowids", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Likp:rowNum"); Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Likp:rowids"));
Likp build = Likp.builder().rowNum(rowNum).build(); Likp build = Likp.builder().rowids(rowids).build();
List<Likp> list = gpMapper.selectLikpCheck(build); // 从数仓中查询一组数据 List<Likp> slist = sapMapper.selectLikpCheckByUpdate(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步 if (slist.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkDelete:Likp:rowNum", "0"); // 计数器复位 redis1Template.opsForValue().set("huazheng:checkUpdate:Likp:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
list.forEach(target -> { // 遍历要检查的数据 slist.forEach(source -> { // 遍历要检查的数据
Likp source = sapMapper.selectLikpById(target); // 根据主键查询源库中的数据 Likp target = gpMapper.selectLikp(source); // 根据主键查询源库中的数据
String operator = "none"; String operator = "none";
if (source == null) { // 如果源库中没有数据 Long srowids = source.getRowids();
gpMapper.deleteLikp(target); // 删除数仓中的数据 if (target != null) {
operator = "delete"; source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
source.setErdat1(caDate(source.getErdat())); // 日期00000000格式转换,已处理异常
source.setWadat_ist1(caDate(source.getWadat_ist())); // 日期00000000格式转换,已处理异常
// ===============================
while (true) {
try {
gpMapper.updateLikp(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
} }
redis1Template.opsForValue().set("huazheng:checkDelete:Likp:rowNum", target.getRowNum());
redis1Template.opsForValue().set("huazheng:checkUpdate:Likp:rowids", srowids.toString());
if (!operator.equals("none")) { if (!operator.equals("none")) {
log.info(String.format("selectLikpCheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator)); log.info(String.format("selectLikpCheckUpdate --> rowNum:%s, operator:%s", srowids.toString(), operator));
} }
}); });
} catch (Exception e) { } catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Likp:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkUpdateError:Likp:rowids", getErrorInfoFromException(e));
} }
} }
// 计数器复位删除 // 计数器复位删除3
private void selectLipsCheck() { private void selectLipsCheck() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
...@@ -1840,15 +1841,8 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1840,15 +1841,8 @@ public class DeleteUpdateJobServiceImpl {
List<Lips> list = gpMapper.selectLipsCheck(build); // 从数仓中查询一组数据 List<Lips> list = gpMapper.selectLipsCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步 if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
// 按更新时间查询,查询大于今天的数据,此时就不需要重置计数器了 redis1Template.opsForValue().set("huazheng:check:Lips:rowNum", "0"); // 计数器复位
String aedat = DateUtil.format(DateUtil.yesterday(), "yyyyMMdd"); // 昨天的日期 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
build = Lips.builder().rowNum(rowNum).aedat(aedat).build();
// 查询出列表20个,下面的代码运行后会复位一个阶段的rowNum,如此往复list.size()就不会为0了
list = gpMapper.selectLipsCheckByUpdate(build);
// 是否为list.size() == 0的时候的更新时间查询的列表
opsForValue.set("huazheng:check:Lips:isUpdateList", "true");
} else {
redis1Template.delete("huazheng:check:Lips:isUpdateList");
} }
list.forEach(target -> { // 遍历要检查的数据 list.forEach(target -> { // 遍历要检查的数据
...@@ -1884,48 +1878,68 @@ public class DeleteUpdateJobServiceImpl { ...@@ -1884,48 +1878,68 @@ public class DeleteUpdateJobServiceImpl {
} }
} }
String mode = null; redis1Template.opsForValue().set("huazheng:check:Lips:rowNum", target.getRowNum());
String isUpdateList = opsForValue.get("huazheng:check:Lips:isUpdateList");
if (isUpdateList == null) { // 非更新模式下的列表才更新计数器
redis1Template.opsForValue().set("huazheng:check:Lips:rowNum", target.getRowNum());
mode = "计数器模式";
} else {
mode = "更新时间模式";
}
if (!operator.equals("none")) { if (!operator.equals("none")) {
log.info(String.format("selectLipsCheck --> rowNum:%s, operator:%s, %s", target.getRowNum(), operator, mode)); log.info(String.format("selectLipsCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
} }
}); });
} catch (Exception e) { } catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Lips:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Lips:rowNum", getErrorInfoFromException(e));
} }
} }
private void selectLipsCheckDelete() { public void selectLipsCheckUpdate() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Lips:rowNum", "0"); opsForValue.setIfAbsent("huazheng:checkUpdate:Lips:rowids", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Lips:rowNum"); Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Lips:rowids"));
Lips build = Lips.builder().rowNum(rowNum).build(); Lips build = Lips.builder().rowids(rowids).build();
List<Lips> list = gpMapper.selectLipsCheck(build); // 从数仓中查询一组数据 List<Lips> slist = sapMapper.selectLipsCheckByUpdate(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步 if (slist.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkUpdate:Lips:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
list.forEach(target -> { // 遍历要检查的数据 slist.forEach(source -> { // 遍历要检查的数据
Lips source = sapMapper.selectLipsById(target); // 根据主键查询源库中的数据 Lips target = gpMapper.selectLips(source); // 根据主键查询源库中的数据
String operator = "none"; String operator = "none";
if (source == null) { // 如果源库中没有数据 Long srowids = source.getRowids();
gpMapper.deleteLips(target); // 删除数仓中的数据 if (target != null) {
operator = "delete"; source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getErdat() != null && source.getErzet() != null) {
String erdat2 = new StringBuffer(source.getErdat()).insert(4, "-").insert(7, "-").toString();
String erzet2 = new StringBuffer(source.getErzet()).insert(2, ":").insert(5, ":").toString();
String dateStr = erdat2 + " " + erzet2;
Date date = DateUtil.parse(dateStr);
source.setErdat1(date);
source.setErdat2(date);
}
// ===============================
while (true) {
try {
gpMapper.updateLips(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
} }
redis1Template.opsForValue().set("huazheng:checkDelete:Lips:rowNum", target.getRowNum());
redis1Template.opsForValue().set("huazheng:checkUpdate:Lips:rowids", srowids.toString());
if (!operator.equals("none")) { if (!operator.equals("none")) {
log.info(String.format("selectLipsCheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator)); log.info(String.format("selectLipscheckUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
} }
}); });
} catch (Exception e) { } catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Lips:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkUpdateError:Lips:rowids", getErrorInfoFromException(e));
} }
} }
private void selectMaktCheck() { private void selectMaktCheck() {
...@@ -2091,15 +2105,8 @@ public class DeleteUpdateJobServiceImpl { ...@@ -2091,15 +2105,8 @@ public class DeleteUpdateJobServiceImpl {
List<Pa0002> list = gpMapper.selectPa0002Check(build); // 从数仓中查询一组数据 List<Pa0002> list = gpMapper.selectPa0002Check(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步 if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
// 按更新时间查询,查询大于今天的数据,此时就不需要重置计数器了 redis1Template.opsForValue().set("huazheng:check:Pa0002:rowNum", "0"); // 计数器复位
String aedtm = DateUtil.format(DateUtil.yesterday(), "yyyyMMdd"); // 昨天的日期 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
build = Pa0002.builder().rowNum(rowNum).aedtm(aedtm).build();
// 查询出列表20个,下面的代码运行后会复位一个阶段的rowNum,如此往复list.size()就不会为0了
list = gpMapper.selectPa0002CheckByUpdate(build);
// 是否为list.size() == 0的时候的更新时间查询的列表
opsForValue.set("huazheng:check:Pa0002:isUpdateList", "true");
} else {
redis1Template.delete("huazheng:check:Pa0002:isUpdateList");
} }
list.forEach(target -> { // 遍历要检查的数据 list.forEach(target -> { // 遍历要检查的数据
...@@ -2132,50 +2139,67 @@ public class DeleteUpdateJobServiceImpl { ...@@ -2132,50 +2139,67 @@ public class DeleteUpdateJobServiceImpl {
ThreadUtil.safeSleep(500); ThreadUtil.safeSleep(500);
} }
} }
String mode = null;
String isUpdateList = opsForValue.get("huazheng:check:Vbrk:isUpdateList");
if (isUpdateList == null) { // 非更新模式下的列表才更新计数器
redis1Template.opsForValue().set("huazheng:check:Pa0002:rowNum", target.getRowNum());
mode = "计数器模式";
} else {
mode = "更新时间模式";
}
redis1Template.opsForValue().set("huazheng:check:Pa0002:rowNum", target.getRowNum());
if (!operator.equals("none")) { if (!operator.equals("none")) {
log.info(String.format("selectPa0002Check --> rowNum:%s, operator:%s, %s", target.getRowNum(), operator, mode)); log.info(String.format("selectPa0002Check --> rowNum:%s, operator:%s", target.getRowNum(), operator));
} }
}); });
} catch (Exception e) { } catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Pa0002:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Pa0002:rowNum", getErrorInfoFromException(e));
} }
} }
private void selectPa0002CheckDelete() { public void selectPa0002CheckUpdate() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Pa0002:rowNum", "0"); opsForValue.setIfAbsent("huazheng:checkUpdate:Pa0002:rowids", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Pa0002:rowNum"); Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Pa0002:rowids"));
Pa0002 build = Pa0002.builder().rowNum(rowNum).build(); Pa0002 build = Pa0002.builder().rowids(rowids).build();
List<Pa0002> list = gpMapper.selectPa0002Check(build); // 从数仓中查询一组数据 List<Pa0002> slist = sapMapper.selectPa0002CheckByUpdate(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步 if (slist.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkDelete:Pa0002:rowNum", "0"); // 计数器复位 redis1Template.opsForValue().set("huazheng:checkUpdate:Pa0002:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
list.forEach(target -> { // 遍历要检查的数据 slist.forEach(source -> { // 遍历要检查的数据
Pa0002 source = sapMapper.selectPa0002ById(target); // 根据主键查询源库中的数据 Pa0002 target = gpMapper.selectPa0002(source); // 根据主键查询源库中的数据
String operator = "none"; String operator = "none";
if (source == null) { // 如果源库中没有数据 Long srowids = source.getRowids();
gpMapper.deletePa0002(target); // 删除数仓中的数据 if (target != null) {
operator = "delete"; source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getEndda() != null) {
String erdat2 = new StringBuffer(source.getEndda()).insert(4, "-").insert(7, "-").toString();
Date date = DateUtil.parse(erdat2);
source.setEndda1(date);
}
source.setYeWuYMC(source.getNachn() + source.getVorna());
// ===============================
while (true) {
try {
gpMapper.updatePa0002(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
} }
redis1Template.opsForValue().set("huazheng:checkDelete:Pa0002:rowNum", target.getRowNum());
redis1Template.opsForValue().set("huazheng:checkUpdate:Pa0002:rowids", srowids.toString());
if (!operator.equals("none")) { if (!operator.equals("none")) {
log.info(String.format("selectPa0002CheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator)); log.info(String.format("selectPa0002checkUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
} }
}); });
} catch (Exception e) { } catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Pa0002:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkUpdateError:Pa0002:rowids", getErrorInfoFromException(e));
} }
} }
private void selectS066Check() { private void selectS066Check() {
...@@ -2394,15 +2418,8 @@ public class DeleteUpdateJobServiceImpl { ...@@ -2394,15 +2418,8 @@ public class DeleteUpdateJobServiceImpl {
List<Vbak> list = gpMapper.selectVbakCheck(build); // 从数仓中查询一组数据 List<Vbak> list = gpMapper.selectVbakCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) { if (list.size() == 0) {
// 按更新时间查询,查询大于今天的数据,此时就不需要重置计数器了 redis1Template.opsForValue().set("huazheng:check:Vbak:rowNum", "0"); // 计数器复位
String aedat = DateUtil.format(DateUtil.yesterday(), "yyyyMMdd"); // 昨天的日期 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
build = Vbak.builder().rowNum(rowNum).aedat(aedat).build();
// 查询出列表20个,下面的代码运行后会复位一个阶段的rowNum,如此往复list.size()就不会为0了
list = gpMapper.selectVbakCheckByUpdate(build);
// 是否为list.size() == 0的时候的更新时间查询的列表
opsForValue.set("huazheng:check:Vbak:isUpdateList", "true");
} else {
redis1Template.delete("huazheng:check:Vbak:isUpdateList");
} }
list.forEach(target -> { // 遍历要检查的数据 list.forEach(target -> { // 遍历要检查的数据
...@@ -2431,49 +2448,61 @@ public class DeleteUpdateJobServiceImpl { ...@@ -2431,49 +2448,61 @@ public class DeleteUpdateJobServiceImpl {
} }
} }
String mode = null; redis1Template.opsForValue().set("huazheng:check:Vbak:rowNum", target.getRowNum());
String isUpdateList = opsForValue.get("huazheng:check:Vbak:isUpdateList");
if (isUpdateList == null) { // 非更新模式下的列表才更新计数器
redis1Template.opsForValue().set("huazheng:check:Vbak:rowNum", target.getRowNum());
mode = "计数器模式";
} else {
mode = "更新时间模式";
}
if (!operator.equals("none")) { if (!operator.equals("none")) {
log.info(String.format("selectVbakCheck --> rowNum:%s, operator:%s, %s", target.getRowNum(), operator, mode)); log.info(String.format("selectVbakCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
} }
}); });
} catch (Exception e) { } catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Vbak:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Vbak:rowNum", getErrorInfoFromException(e));
} }
} }
private void selectVbakCheckDelete() { public void selectVbakCheckUpdate() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Vbak:rowNum", "0"); opsForValue.setIfAbsent("huazheng:checkUpdate:Vbak:rowids", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Vbak:rowNum"); Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Vbak:rowids"));
Vbak build = Vbak.builder().rowNum(rowNum).build(); Vbak build = Vbak.builder().rowids(rowids).build();
List<Vbak> list = gpMapper.selectVbakCheck(build); // 从数仓中查询一组数据 List<Vbak> slist = sapMapper.selectVbakCheckByUpdate(build); // 从数仓中查询一组数据
if (list.size() == 0) { if (slist.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkDelete:Vbak:rowNum", "0"); // 计数器复位 redis1Template.opsForValue().set("huazheng:checkUpdate:Vbak:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
list.forEach(target -> { // 遍历要检查的数据
Vbak source = sapMapper.selectVbakById(target); // 根据主键查询源库中的数据 slist.forEach(source -> { // 遍历要检查的数据
Vbak target = gpMapper.selectVbak(source); // 根据主键查询源库中的数据
String operator = "none"; String operator = "none";
if (source == null) { // 如果源库中没有数据 Long srowids = source.getRowids();
gpMapper.deleteVbak(target); // 删除数仓中的数据 if (target != null) {
operator = "delete"; source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
source.setAudat1(caDate(source.getAudat())); // 日期00000000格式转换,已处理异常
// ===============================
while (true) {
try {
gpMapper.updateVbak(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
} }
redis1Template.opsForValue().set("huazheng:checkDelete:Vbak:rowNum", target.getRowNum());
redis1Template.opsForValue().set("huazheng:checkUpdate:Vbak:rowids", srowids.toString());
if (!operator.equals("none")) { if (!operator.equals("none")) {
log.info(String.format("selectVbakCheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator)); log.info(String.format("selectVbakcheckUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
} }
}); });
} catch (Exception e) { } catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Vbak:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkUpdateError:Vbak:rowids", getErrorInfoFromException(e));
} }
} }
private void selectVbepCheck() { private void selectVbepCheck() {
...@@ -2579,15 +2608,8 @@ public class DeleteUpdateJobServiceImpl { ...@@ -2579,15 +2608,8 @@ public class DeleteUpdateJobServiceImpl {
List<Vbrk> list = gpMapper.selectVbrkCheck(build); // 从数仓中查询一组数据 List<Vbrk> list = gpMapper.selectVbrkCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步 if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
// 按更新时间查询,查询大于今天的数据,此时就不需要重置计数器了 redis1Template.opsForValue().set("huazheng:check:Vbrk:rowNum", "0"); // 计数器复位
String aedat = DateUtil.format(DateUtil.yesterday(), "yyyyMMdd"); // 昨天的日期 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
build = Vbrk.builder().rowNum(rowNum).aedat(aedat).build();
// 查询出列表20个,下面的代码运行后会复位一个阶段的rowNum,如此往复list.size()就不会为0了
list = gpMapper.selectVbrkCheckByUpdate(build);
// 是否为list.size() == 0的时候的更新时间查询的列表
opsForValue.set("huazheng:check:Vbrk:isUpdateList", "true");
} else {
redis1Template.delete("huazheng:check:Vbrk:isUpdateList");
} }
list.forEach(target -> { // 遍历要检查的数据 list.forEach(target -> { // 遍历要检查的数据
...@@ -2620,50 +2642,65 @@ public class DeleteUpdateJobServiceImpl { ...@@ -2620,50 +2642,65 @@ public class DeleteUpdateJobServiceImpl {
} }
} }
String mode = null; redis1Template.opsForValue().set("huazheng:check:Vbrk:rowNum", target.getRowNum());
String isUpdateList = opsForValue.get("huazheng:check:Vbrk:isUpdateList");
if (isUpdateList == null) { // 非更新模式下的列表才更新计数器
redis1Template.opsForValue().set("huazheng:check:Vbrk:rowNum", target.getRowNum());
mode = "计数器模式";
} else {
mode = "更新时间模式";
}
if (!operator.equals("none")) { if (!operator.equals("none")) {
log.info(String.format("selectVbrkCheck --> rowNum:%s, operator:%s, %s", target.getRowNum(), operator, mode)); log.info(String.format("selectVbrkCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
} }
}); });
} catch (Exception e) { } catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Vbrk:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Vbrk:rowNum", getErrorInfoFromException(e));
} }
} }
private void selectVbrkCheckDelete() { public void selectVbrkCheckUpdate() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Vbrk:rowNum", "0"); opsForValue.setIfAbsent("huazheng:checkUpdate:Vbrk:rowids", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Vbrk:rowNum"); Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Vbrk:rowids"));
Vbrk build = Vbrk.builder().rowNum(rowNum).build(); Vbrk build = Vbrk.builder().rowids(rowids).build();
List<Vbrk> list = gpMapper.selectVbrkCheck(build); // 从数仓中查询一组数据 List<Vbrk> slist = sapMapper.selectVbrkCheckByUpdate(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步 if (slist.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkDelete:Vbrk:rowNum", "0"); // 计数器复位 redis1Template.opsForValue().set("huazheng:checkUpdate:Vbrk:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
list.forEach(target -> { // 遍历要检查的数据 slist.forEach(source -> { // 遍历要检查的数据
Vbrk source = sapMapper.selectVbrkById(target); // 根据主键查询源库中的数据 Vbrk target = gpMapper.selectVbrk(source); // 根据主键查询源库中的数据
String operator = "none"; String operator = "none";
if (source == null) { // 如果源库中没有数据 Long srowids = source.getRowids();
gpMapper.deleteVbrk(target); // 删除数仓中的数据 if (target != null) {
operator = "delete"; source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getFkdat() != null) {
String erdat2 = new StringBuffer(source.getFkdat()).insert(4, "-").insert(7, "-").toString();
Date date = DateUtil.parse(erdat2);
source.setFkdat1(date);
}
// ===============================
while (true) {
try {
gpMapper.updateVbrk(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
} }
redis1Template.opsForValue().set("huazheng:checkDelete:Vbrk:rowNum", target.getRowNum());
redis1Template.opsForValue().set("huazheng:checkUpdate:Vbrk:rowids", srowids.toString());
if (!operator.equals("none")) { if (!operator.equals("none")) {
log.info(String.format("selectVbrkCheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator)); log.info(String.format("selectVbrkcheckUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
} }
}); });
} catch (Exception e) { } catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Vbrk:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkUpdateError:Vbrk:rowids", getErrorInfoFromException(e));
} }
} }
private void selectVbrpCheck() { private void selectVbrpCheck() {
...@@ -2808,15 +2845,8 @@ public class DeleteUpdateJobServiceImpl { ...@@ -2808,15 +2845,8 @@ public class DeleteUpdateJobServiceImpl {
List<Zsd06> list = gpMapper.selectZsd06Check(build); // 从数仓中查询一组数据 List<Zsd06> list = gpMapper.selectZsd06Check(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步 if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
// 按更新时间查询,查询大于今天的数据,此时就不需要重置计数器了 redis1Template.opsForValue().set("huazheng:check:Zsd06:rowNum", "0"); // 计数器复位
String cha_datum = DateUtil.format(DateUtil.yesterday(), "yyyyMMdd"); // 昨天的日期 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
build = Zsd06.builder().rowNum(rowNum).cha_datum(cha_datum).build();
// 查询出列表20个,下面的代码运行后会复位一个阶段的rowNum,如此往复list.size()就不会为0了
list = gpMapper.selectZsd06CheckByUpdate(build);
// 是否为list.size() == 0的时候的更新时间查询的列表
opsForValue.set("huazheng:check:Zsd06:isUpdateList", "true");
} else {
redis1Template.delete("huazheng:check:Zsd06:isUpdateList");
} }
list.forEach(target -> { // 遍历要检查的数据 list.forEach(target -> { // 遍历要检查的数据
...@@ -2850,50 +2880,67 @@ public class DeleteUpdateJobServiceImpl { ...@@ -2850,50 +2880,67 @@ public class DeleteUpdateJobServiceImpl {
ThreadUtil.safeSleep(500); ThreadUtil.safeSleep(500);
} }
} }
String mode = null;
String isUpdateList = opsForValue.get("huazheng:check:Zsd06:isUpdateList"); redis1Template.opsForValue().set("huazheng:check:Zsd06:rowNum", target.getRowNum());
if (isUpdateList == null) { // 非更新模式下的列表才更新计数器
redis1Template.opsForValue().set("huazheng:check:Zsd06:rowNum", target.getRowNum());
mode = "计数器模式";
} else {
mode = "更新时间模式";
}
if (!operator.equals("none")) { if (!operator.equals("none")) {
log.info(String.format("selectZsd06Check --> rowNum:%s, operator:%s, %s", target.getRowNum(), operator, mode)); log.info(String.format("selectZsd06Check --> rowNum:%s, operator:%s", target.getRowNum(), operator));
} }
}); });
} catch (Exception e) { } catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Zsd06:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Zsd06:rowNum", getErrorInfoFromException(e));
} }
} }
private void selectZsd06CheckDelete() { public void selectZsd06CheckUpdate() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Zsd06:rowNum", "0"); opsForValue.setIfAbsent("huazheng:checkUpdate:Zsd06:rowids", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Zsd06:rowNum"); Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Zsd06:rowids"));
Zsd06 build = Zsd06.builder().rowNum(rowNum).build(); Zsd06 build = Zsd06.builder().rowids(rowids).build();
List<Zsd06> list = gpMapper.selectZsd06Check(build); // 从数仓中查询一组数据 List<Zsd06> slist = sapMapper.selectZsd06CheckByUpdate(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步 if (slist.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkDelete:Zsd06:rowNum", "0"); // 计数器复位 redis1Template.opsForValue().set("huazheng:checkUpdate:Zsd06:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
list.forEach(target -> { // 遍历要检查的数据 slist.forEach(source -> { // 遍历要检查的数据
Zsd06 source = sapMapper.selectZsd06ById(target); // 根据主键查询源库中的数据 Zsd06 target = gpMapper.selectZsd06(source); // 根据主键查询源库中的数据
String operator = "none"; String operator = "none";
if (source == null) { // 如果源库中没有数据 Long srowids = source.getRowids();
gpMapper.deleteZsd06(target); // 删除数仓中的数据 if (target != null) {
operator = "delete"; source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getDatum() != null) {
String erdat2 = new StringBuffer(source.getDatum()).insert(4, "-").insert(7, "-").toString();
Date date = DateUtil.parse(erdat2);
source.setDatum1(date);
}
source.setCrt_datum1(caDate(source.getCrt_datum()));
source.setCha_datum1(caDate(source.getCha_datum()));
// ===============================
while (true) {
try {
gpMapper.updateZsd06(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
} }
redis1Template.opsForValue().set("huazheng:checkDelete:Zsd06:rowNum", target.getRowNum());
redis1Template.opsForValue().set("huazheng:checkUpdate:Zsd06:rowids", srowids.toString());
if (!operator.equals("none")) { if (!operator.equals("none")) {
log.info(String.format("selectZsd06CheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator)); log.info(String.format("selectZsd06checkUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
} }
}); });
} catch (Exception e) { } catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Zsd06:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkUpdateError:Zsd06:rowids", getErrorInfoFromException(e));
} }
} }
// 计数器复位删除 // 计数器复位删除
...@@ -2906,15 +2953,8 @@ public class DeleteUpdateJobServiceImpl { ...@@ -2906,15 +2953,8 @@ public class DeleteUpdateJobServiceImpl {
List<Zsdfhzl> list = gpMapper.selectZsdfhzlCheck(build); // 从数仓中查询一组数据 List<Zsdfhzl> list = gpMapper.selectZsdfhzlCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步 if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
// 按更新时间查询,查询大于今天的数据,此时就不需要重置计数器了 redis1Template.opsForValue().set("huazheng:check:Zsdfhzl:rowNum", "0"); // 计数器复位
String cha_datum = DateUtil.format(DateUtil.yesterday(), "yyyyMMdd"); // 昨天的日期 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
build = Zsdfhzl.builder().rowNum(rowNum).cha_datum(cha_datum).build();
// 查询出列表20个,下面的代码运行后会复位一个阶段的rowNum,如此往复list.size()就不会为0了
list = gpMapper.selectZsdfhzlCheckByUpdate(build);
// 是否为list.size() == 0的时候的更新时间查询的列表
opsForValue.set("huazheng:check:Zsdfhzl:isUpdateList", "true");
} else {
redis1Template.delete("huazheng:check:Zsdfhzl:isUpdateList");
} }
list.forEach(target -> { // 遍历要检查的数据 list.forEach(target -> { // 遍历要检查的数据
...@@ -2953,53 +2993,72 @@ public class DeleteUpdateJobServiceImpl { ...@@ -2953,53 +2993,72 @@ public class DeleteUpdateJobServiceImpl {
ThreadUtil.safeSleep(500); ThreadUtil.safeSleep(500);
} }
} }
String mode = null; redis1Template.opsForValue().set("huazheng:check:Zsdfhzl:rowNum", target.getRowNum());
String isUpdateList = opsForValue.get("huazheng:check:Zsdfhzl:isUpdateList");
if (isUpdateList == null) { // 非更新模式下的列表才更新计数器
redis1Template.opsForValue().set("huazheng:check:Zsdfhzl:rowNum", target.getRowNum());
mode = "计数器模式";
} else {
mode = "更新时间模式";
}
if (!operator.equals("none")) { if (!operator.equals("none")) {
log.info(String.format("selectZsdfhzlCheck --> rowNum:%s, operator:%s, %s", target.getRowNum(), operator, mode)); log.info(String.format("selectZsdfhzlCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
} }
}); });
} catch (Exception e) { } catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Zsdfhzl:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Zsdfhzl:rowNum", getErrorInfoFromException(e));
} }
} }
private void selectZsdfhzlCheckDelete() { public void selectZsdfhzlCheckUpdate() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Zsdfhzl:rowNum", "0"); opsForValue.setIfAbsent("huazheng:checkUpdate:Zsdfhzl:rowids", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Zsdfhzl:rowNum"); Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Zsdfhzl:rowids"));
Zsdfhzl build = Zsdfhzl.builder().rowNum(rowNum).build(); Zsdfhzl build = Zsdfhzl.builder().rowids(rowids).build();
List<Zsdfhzl> list = gpMapper.selectZsdfhzlCheck(build); // 从数仓中查询一组数据 List<Zsdfhzl> list = sapMapper.selectZsdfhzlCheckByUpdate(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步 if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkDelete:Zsdfhzl:rowNum", "0"); // 计数器复位 redis1Template.opsForValue().set("huazheng:checkUpdate:Zsdfhzl:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
list.forEach(target -> { // 遍历要检查的数据 list.forEach(source -> { // 遍历要检查的数据
Zsdfhzl source = sapMapper.selectZsdfhzlById(target); // 根据主键查询源库中的数据 Zsdfhzl target = gpMapper.selectZsdfhzl(source); // 根据主键查询源库中的数据
String operator = "none"; String operator = "none";
if (source == null) { // 如果源库中没有数据 Long srowids = source.getRowids();
gpMapper.deleteZsdfhzl(target); // 删除数仓中的数据 if (target != null) {
operator = "delete"; source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getCdate() != null && source.getCtime() != null) {
String erdat2 = new StringBuffer(source.getCdate()).insert(4, "-").insert(7, "-").toString();
String erzet2 = new StringBuffer(source.getCtime()).insert(2, ":").insert(5, ":").toString();
String dateStr = erdat2 + " " + erzet2;
Date date = DateUtil.parse(dateStr);
source.setCdate1(date);
}
if (source.getFh_date() != null && source.getFh_date().indexOf("--") == -1) {
String erdat2 = new StringBuffer(source.getFh_date()).insert(4, "-").insert(7, "-").toString();
Date date = DateUtil.parse(erdat2);
source.setFh_date1(date);
}
// ===============================
while (true) {
try {
gpMapper.updateZsdfhzl(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
} }
redis1Template.opsForValue().set("huazheng:checkDelete:Zsdfhzl:rowNum", target.getRowNum()); redis1Template.opsForValue().set("huazheng:checkUpdate:Zsdfhzl:rowids", srowids.toString());
if (!operator.equals("none")) { if (!operator.equals("none")) {
log.info(String.format("selectZsdfhzlCheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator)); log.info(String.format("selectZsdfhzlcheckUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
} }
}); });
} catch (Exception e) { } catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Zsdfhzl:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkUpdateError:Zsdfhzl:rowids", getErrorInfoFromException(e));
} }
} }
private void selectTvkbtCheck() { private void selectTvkbtCheck() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
...@@ -3278,22 +3337,6 @@ public class DeleteUpdateJobServiceImpl { ...@@ -3278,22 +3337,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Vbap:rowNum", getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkError:Vbap:rowNum", getErrorInfoFromException(e));
} }
} }
// 有更新时间的表的删除计数器需要拆分出来,避免数据不能删除
// 把这些方法的删除部分从原方法种剥离出来
public void checkJob6() {
// selectKnkkCheckDelete();
selectKnkkCheckUpdate();
selectLikpCheckDelete();
selectLipsCheckDelete();
selectPa0002CheckDelete();
selectVbakCheckDelete();
selectVbrkCheckDelete();
selectZsd06CheckDelete();
selectZsdfhzlCheckDelete();
selectVbapCheckDelete();
}
private void selectVbapCheckDelete() { private void selectVbapCheckDelete() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
...@@ -3325,6 +3368,21 @@ public class DeleteUpdateJobServiceImpl { ...@@ -3325,6 +3368,21 @@ public class DeleteUpdateJobServiceImpl {
} }
} }
// 有更新时间的表的删除计数器需要拆分出来,避免数据不能删除
// 把这些方法的删除部分从原方法种剥离出来
public void checkJob6() {
// selectKnkkCheckUpdate();
// selectLikpCheckUpdate();
// selectLipsCheckUpdate();
// selectPa0002CheckUpdate();
// selectVbakCheckUpdate();
// selectVbrkCheckUpdate();
// selectZsd06CheckUpdate();
// selectZsdfhzlCheckUpdate();
selectVbapCheckDelete();
}
public void checkJob7() { public void checkJob7() {
selectBsadCheck(); // 41 Bsid2Bsad 43 selectBsadCheck(); // 41 Bsid2Bsad 43
selectBsidCheck(); // 42 Bsid2Bsad 43 selectBsidCheck(); // 42 Bsid2Bsad 43
......
...@@ -85,6 +85,13 @@ public interface SapMapper { ...@@ -85,6 +85,13 @@ public interface SapMapper {
public List<Knvp> selectKnvpNew(Knvp knvp); public List<Knvp> selectKnvpNew(Knvp knvp);
public List<Knkk> selectKnkkCheckByUpdate(Knkk knkk); public List<Knkk> selectKnkkCheckByUpdate(Knkk knkk);
public List<Likp> selectLikpCheckByUpdate(Likp likp);
public List<Lips> selectLipsCheckByUpdate(Lips lips);
public List<Pa0002> selectPa0002CheckByUpdate(Pa0002 pa0002);
public List<Vbak> selectVbakCheckByUpdate(Vbak vbak);
public List<Vbrk> selectVbrkCheckByUpdate(Vbrk vbrk);
public List<Zsd06> selectZsd06CheckByUpdate(Zsd06 zsd06);
public List<Zsdfhzl> selectZsdfhzlCheckByUpdate(Zsdfhzl zsdfhzl);
public Knvp selectKnvpById(Knvp target); public Knvp selectKnvpById(Knvp target);
public Ausp selectAuspById(Ausp target); public Ausp selectAuspById(Ausp target);
......
...@@ -18,7 +18,7 @@ druid.hana.password: Hzxc888888 ...@@ -18,7 +18,7 @@ druid.hana.password: Hzxc888888
druid.hana.validationQuery: select 'hello world' from dummy druid.hana.validationQuery: select 'hello world' from dummy
druid.hana.initialSize: 5 druid.hana.initialSize: 5
druid.hana.minIdle: 10 druid.hana.minIdle: 10
druid.hana.maxActive: 50 druid.hana.maxActive: 100
druid.hana.maxWait: 60000 druid.hana.maxWait: 60000
druid.hana.timeBetweenEvictionRunsMillis: 60000 druid.hana.timeBetweenEvictionRunsMillis: 60000
druid.hana.minEvictableIdleTimeMillis: 300000 druid.hana.minEvictableIdleTimeMillis: 300000
......
...@@ -2,4 +2,6 @@ select top 20 "$rowid$" as rowids, ...@@ -2,4 +2,6 @@ select top 20 "$rowid$" as rowids,
mandt,kunnr,kkber,klimk,skfor,ssobl,aedat mandt,kunnr,kkber,klimk,skfor,ssobl,aedat
from sapabap1.Knkk from sapabap1.Knkk
where aedat != '00000000' and mandt = '800' where aedat != '00000000' and mandt = '800'
order by "$rowid$"; order by "$rowid$";
\ No newline at end of file
select "$rowid$" as rowids,lfimg,aedat from sapabap1.lips where vgbel = '0010073086' and vgpos = '000100' limit 10;
20201115问题整理
20201115问题整理
1、全量、增量和删除任务的问题
问题:
1、全量更新表:数据量增多周期越来越慢。aufm、konv已停止执行更新删除任务,因为数据量增长速度过快,循环周期过长。
2、增量更新表:数据量增多周期越来越慢。增量更新首先是一个时间范围去取源表数据0-20rowid,然后取21-40rowid,取到最后的rowid,然后又循环到0-20的rowid。成为一个周期
随着数据量的增多,rowid的总量会越来越多,所以也会越来越慢。
3、由于新的表不断的加入,每个表都需要新增更新删除任务,相当于每个表需要占用3个数据库连接,1个新增数据、1个更新任务、1个删除任务。
目前有64个表,需要数仓占用250个连接,同时相对的sap那边也需要占用250个连接,这种查询压力是很大的,如果减小连接数,那么查询回排队,速度也会降低。
目前的设计是,一些任务被控制再一个连接内,所以速度进一步降低,如果展开一个表一个任务一个连接,那么数据库有超载查询的风险。
目前的思路:
1、全量更新和增量更新都,都取一部分的数据更新,比如取最新N条数据进行更新。周期缩短,最新的数据能得到最快的更新。抛弃历史数据的更新
会导致的问题:
1、N条以外的数据,删除不干净
2、N条以外的数据,更新不到(如果N=500此时有550条数据需要更新,那么50条数据没有更新到,因为并不知道多少数据需要更新)
全量更新和增量更新逻辑上处理是相同的,不同的是,增量更新加了时间条件,过滤了大部分数据,缩短了更新周期。
...@@ -803,9 +803,6 @@ ...@@ -803,9 +803,6 @@
<select id="selectLikpCheck" parameterType="com.huazheng.project.hana.model.Likp" resultType="com.huazheng.project.hana.model.Likp"> <select id="selectLikpCheck" parameterType="com.huazheng.project.hana.model.Likp" resultType="com.huazheng.project.hana.model.Likp">
select * from Likp where rownum &gt; #{rowNum} order by rownum limit 20 select * from Likp where rownum &gt; #{rowNum} order by rownum limit 20
</select> </select>
<select id="selectLikpCheckByUpdate" parameterType="com.huazheng.project.hana.model.Likp" resultType="com.huazheng.project.hana.model.Likp">
select * from Likp where aedat &gt;= #{aedat} order by rownum
</select>
<select id="selectLips" parameterType="com.huazheng.project.hana.model.Lips" resultType="com.huazheng.project.hana.model.Lips"> <select id="selectLips" parameterType="com.huazheng.project.hana.model.Lips" resultType="com.huazheng.project.hana.model.Lips">
select * from lips where vbeln = #{vbeln} and posnr = #{posnr} and mandt = #{mandt} select * from lips where vbeln = #{vbeln} and posnr = #{posnr} and mandt = #{mandt}
...@@ -829,9 +826,6 @@ ...@@ -829,9 +826,6 @@
<select id="selectLipsCheck" parameterType="com.huazheng.project.hana.model.Lips" resultType="com.huazheng.project.hana.model.Lips"> <select id="selectLipsCheck" parameterType="com.huazheng.project.hana.model.Lips" resultType="com.huazheng.project.hana.model.Lips">
select * from Lips where rownum &gt; #{rowNum} order by rownum limit 20 select * from Lips where rownum &gt; #{rowNum} order by rownum limit 20
</select> </select>
<select id="selectLipsCheckByUpdate" parameterType="com.huazheng.project.hana.model.Lips" resultType="com.huazheng.project.hana.model.Lips">
select * from Lips where aedat &gt;= #{aedat} order by rownum
</select>
<select id="selectMakt" parameterType="com.huazheng.project.hana.model.Makt" resultType="com.huazheng.project.hana.model.Makt"> <select id="selectMakt" parameterType="com.huazheng.project.hana.model.Makt" resultType="com.huazheng.project.hana.model.Makt">
select * from makt where matnr = #{matnr} and mandt = #{mandt} and spras = #{spras} select * from makt where matnr = #{matnr} and mandt = #{mandt} and spras = #{spras}
...@@ -925,9 +919,6 @@ ...@@ -925,9 +919,6 @@
<select id="selectPa0002Check" parameterType="com.huazheng.project.hana.model.Pa0002" resultType="com.huazheng.project.hana.model.Pa0002"> <select id="selectPa0002Check" parameterType="com.huazheng.project.hana.model.Pa0002" resultType="com.huazheng.project.hana.model.Pa0002">
select * from Pa0002 where rownum &gt; #{rowNum} order by rownum limit 20 select * from Pa0002 where rownum &gt; #{rowNum} order by rownum limit 20
</select> </select>
<select id="selectPa0002CheckByUpdate" parameterType="com.huazheng.project.hana.model.Pa0002" resultType="com.huazheng.project.hana.model.Pa0002">
select * from Pa0002 where aedtm &gt;= #{aedtm} order by rownum
</select>
<select id="selectS066" parameterType="com.huazheng.project.hana.model.S066" resultType="com.huazheng.project.hana.model.S066"> <select id="selectS066" parameterType="com.huazheng.project.hana.model.S066" resultType="com.huazheng.project.hana.model.S066">
select * from S066 select * from S066
...@@ -1051,9 +1042,6 @@ ...@@ -1051,9 +1042,6 @@
<select id="selectVbakCheck" parameterType="com.huazheng.project.hana.model.Vbak" resultType="com.huazheng.project.hana.model.Vbak"> <select id="selectVbakCheck" parameterType="com.huazheng.project.hana.model.Vbak" resultType="com.huazheng.project.hana.model.Vbak">
select * from Vbak where rownum &gt; #{rowNum} order by rownum limit 20 select * from Vbak where rownum &gt; #{rowNum} order by rownum limit 20
</select> </select>
<select id="selectVbakCheckByUpdate" parameterType="com.huazheng.project.hana.model.Vbak" resultType="com.huazheng.project.hana.model.Vbak">
select * from Vbak where aedat &gt;= #{aedat} order by rownum
</select>
<select id="selectVbep" parameterType="com.huazheng.project.hana.model.Vbep" resultType="com.huazheng.project.hana.model.Vbep"> <select id="selectVbep" parameterType="com.huazheng.project.hana.model.Vbep" resultType="com.huazheng.project.hana.model.Vbep">
select * from vbep select * from vbep
...@@ -1123,9 +1111,6 @@ ...@@ -1123,9 +1111,6 @@
<select id="selectVbrkCheck" parameterType="com.huazheng.project.hana.model.Vbrk" resultType="com.huazheng.project.hana.model.Vbrk"> <select id="selectVbrkCheck" parameterType="com.huazheng.project.hana.model.Vbrk" resultType="com.huazheng.project.hana.model.Vbrk">
select * from Vbrk where rownum &gt; #{rowNum} order by rownum limit 20 select * from Vbrk where rownum &gt; #{rowNum} order by rownum limit 20
</select> </select>
<select id="selectVbrkCheckByUpdate" parameterType="com.huazheng.project.hana.model.Vbrk" resultType="com.huazheng.project.hana.model.Vbrk">
select * from Vbrk where aedat &gt;= #{aedat} order by rownum
</select>
<select id="selectVbrp" parameterType="com.huazheng.project.hana.model.Vbrp" resultType="com.huazheng.project.hana.model.Vbrp"> <select id="selectVbrp" parameterType="com.huazheng.project.hana.model.Vbrp" resultType="com.huazheng.project.hana.model.Vbrp">
select * from Vbrp select * from Vbrp
...@@ -1218,9 +1203,6 @@ ...@@ -1218,9 +1203,6 @@
<select id="selectZsd06Check" parameterType="com.huazheng.project.hana.model.Zsd06" resultType="com.huazheng.project.hana.model.Zsd06"> <select id="selectZsd06Check" parameterType="com.huazheng.project.hana.model.Zsd06" resultType="com.huazheng.project.hana.model.Zsd06">
select * from Zsd06 where rownum &gt; #{rowNum} order by rownum limit 20 select * from Zsd06 where rownum &gt; #{rowNum} order by rownum limit 20
</select> </select>
<select id="selectZsd06CheckByUpdate" parameterType="com.huazheng.project.hana.model.Zsd06" resultType="com.huazheng.project.hana.model.Zsd06">
select * from Zsd06 where cha_datum &gt;= #{cha_datum} order by rownum
</select>
<select id="selectZsdfhzl" parameterType="com.huazheng.project.hana.model.Zsdfhzl" resultType="com.huazheng.project.hana.model.Zsdfhzl"> <select id="selectZsdfhzl" parameterType="com.huazheng.project.hana.model.Zsdfhzl" resultType="com.huazheng.project.hana.model.Zsdfhzl">
select * from Zsdfhzl select * from Zsdfhzl
...@@ -1246,9 +1228,6 @@ ...@@ -1246,9 +1228,6 @@
<select id="selectZsdfhzlCheck" parameterType="com.huazheng.project.hana.model.Zsdfhzl" resultType="com.huazheng.project.hana.model.Zsdfhzl"> <select id="selectZsdfhzlCheck" parameterType="com.huazheng.project.hana.model.Zsdfhzl" resultType="com.huazheng.project.hana.model.Zsdfhzl">
select * from Zsdfhzl where rownum &gt; #{rowNum} order by rownum limit 20 select * from Zsdfhzl where rownum &gt; #{rowNum} order by rownum limit 20
</select> </select>
<select id="selectZsdfhzlCheckByUpdate" parameterType="com.huazheng.project.hana.model.Zsdfhzl" resultType="com.huazheng.project.hana.model.Zsdfhzl">
select * from Zsdfhzl where cha_datum &gt;= #{cha_datum} order by rownum
</select>
<select id="selectBsad" parameterType="com.huazheng.project.hana.model.Bsad" resultType="com.huazheng.project.hana.model.Bsad"> <select id="selectBsad" parameterType="com.huazheng.project.hana.model.Bsad" resultType="com.huazheng.project.hana.model.Bsad">
select * from Bsad select * from Bsad
......
...@@ -313,13 +313,84 @@ ...@@ -313,13 +313,84 @@
</select> </select>
<select id="selectKnkkCheckByUpdate" parameterType="Knkk" resultType="Knkk"> <select id="selectKnkkCheckByUpdate" parameterType="Knkk" resultType="Knkk">
select top 20 select top 20 "$rowid$" as rowids,
mandt,kunnr,kkber,klimk,skfor,ssobl,aedat mandt,kunnr,kkber,klimk,skfor,ssobl,aedat
from ${hana_user}.Knkk from ${hana_user}.Knkk
where "$rowid$" &gt; #{rowids} and aedat != '00000000' ${hana_mandt} where "$rowid$" &gt; #{rowids} and aedat != '00000000' ${hana_mandt}
order by "$rowid$" order by "$rowid$"
</select> </select>
<select id="selectLikpCheckByUpdate" parameterType="Likp" resultType="Likp">
select top 20 "$rowid$" as rowids,
vbeln, mandt, erdat, wadat_ist, ctlpc, kunnr, aedat,lfuhr,
case wadat_ist when '00000000' then null else (to_date(wadat_ist)||' '||to_time(lfuhr)) end as wadat_ist2lfuhr
from ${hana_user}.likp
where "$rowid$" &gt;#{rowids} and aedat != '00000000' ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectLipsCheckByUpdate" parameterType="Lips" resultType="Lips">
select top 20 "$rowid$" as rowids,
vbeln, posnr, vgbel, vgpos, mandt, matnr, matkl, arktx, werks, lgort,
charg, lfimg, uebto, untto, erdat, erzet, aedat
from ${hana_user}.lips
where "$rowid$" &gt; #{rowids} and aedat != '00000000' ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectPa0002CheckByUpdate" parameterType="Pa0002" resultType="Pa0002">
select top 20 "$rowid$" as rowids,
mandt,pernr,subty,objps,sprps,endda,begda,seqnr,nachn,vorna,aedtm
from ${hana_user}.Pa0002
where "$rowid$" &gt; #{rowids} and aedtm != '00000000' ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectVbakCheckByUpdate" parameterType="Vbak" resultType="Vbak">
select top 20 a."$rowid$" as rowids,
a.vbeln, a.mandt, a.vbtyp, a.audat, a.waerk, a.kalsm, a.ctlpc, a.kunnr, a.bukrs_vf, a.bstnk,
a.bname, a.telf1, a.netwr, a.vkbur, a.knumv, a.vkorg, a.vtweg, a.kkber, a.auart, a.aedat, b.pernr,
case a.erdat when '00000000' then null else (to_date(a.erdat)||' '||to_time(a.erzet)) end as erdat2erzet,
a.erdat, a.erzet, a.spart
from ${hana_user}.vbak a
left join (
select q.vbeln,r.pernr from ${hana_user}.vbap q
left join (
select vbeln,posnr,pernr from ${hana_user}.vbpa where mandt = #{mandt} and pernr != '00000000' group by vbeln,posnr,pernr
) r on q.vbeln = r.vbeln and q.posnr = r.posnr
where q.mandt = #{mandt} and r.pernr is not null
group by q.vbeln,r.pernr
) b on a.vbeln = b.vbeln
where "$rowid$" &gt; #{rowids} and a.aedat != '00000000' and a.mandt = '800'
order by "$rowid$"
</select>
<select id="selectVbrkCheckByUpdate" parameterType="com.huazheng.project.hana.model.Vbrk" resultType="com.huazheng.project.hana.model.Vbrk">
select top 20 "$rowid$" as rowids,
vbeln,fkart,waerk,erdat,erzet,zterm,kunrg,bstnk_vf,
zbillty,zbz01,zbz02,zdc_flg,bukrs,ktgrd,fkdat,inco1,
inco2,netwr,mwsbk,aedat,mandt
from ${hana_user}.Vbrk
where "$rowid$" &gt; #{rowids} and aedat != '00000000' ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectZsd06CheckByUpdate" parameterType="Zsd06" resultType="Zsd06">
select top 20 "$rowid$" as rowids,
vbeln, posnr, mandt, datum, uname, crt_uname, crt_datum,
crt_time, cha_name, cha_datum, cha_time
from ${hana_user}.Zsd_06
where "$rowid$" &gt; #{rowids} and cha_datum != '00000000' ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectZsdfhzlCheckByUpdate" parameterType="com.huazheng.project.hana.model.Zsdfhzl" resultType="com.huazheng.project.hana.model.Zsdfhzl">
select top 20 "$rowid$" as rowids,
vbeln, posnr, mandt, vgbel, vgpos, uname, matnr, fmenge, cdate, ctime, fh_date, werks, cha_datum
from ${hana_user}.zsd_fhzl
where "$rowid$" &gt; #{rowids} and cha_datum != '00000000' ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectAfkoById" parameterType="Afko" resultType="Afko"> <select id="selectAfkoById" parameterType="Afko" resultType="Afko">
......
...@@ -11,7 +11,7 @@ org.quartz.scheduler.wrapJobExecutionInUserTransaction=false ...@@ -11,7 +11,7 @@ org.quartz.scheduler.wrapJobExecutionInUserTransaction=false
# ThreadPool # ThreadPool
#============================================================================ #============================================================================
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=20 org.quartz.threadPool.threadCount=30
org.quartz.threadPool.threadPriority=5 org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true
......
...@@ -695,6 +695,79 @@ ...@@ -695,6 +695,79 @@
<property name="cronExpression" value="* * * * * ?" /> <property name="cronExpression" value="* * * * * ?" />
</bean> </bean>
<!-- ========================== 独立的任务 ========================== -->
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectKnkkCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectLikpCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectLipsCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectPa0002CheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectVbakCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectVbrkCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectZsd06CheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectZsdfhzlCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
</list> </list>
</constructor-arg> </constructor-arg>
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论