提交 8f961425 作者: guofeng

2

上级 50f630ee
......@@ -50,13 +50,13 @@ public class CheckUpdateServiceImpl {
opsForValue.setIfAbsent("huazheng:checkUpdate:Aufk:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Aufk:rowids"));
Aufk build = Aufk.builder().rowids(rowids).build();
List<Aufk> slist = sapMapper.selectAufkCheckByUpdate(build); // 从源库中按更新时间查询,只更新今天的数据
if (slist.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkUpdate:Aufk:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历源库中的数据,去目标库中查询数据
Aufk target = gpMapper.selectAufk(source);
String operator = "none";
......@@ -91,24 +91,25 @@ public class CheckUpdateServiceImpl {
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
// 级联更新业务
Afko sAfko = sapMapper.cascadeAfkoByAufk(source); // 级联查询源库afko表
Afko tAfko = gpMapper.selectAfko(sAfko); // 查询目标库中afko表
cascadeAfkoCheckByUpdate(sAfko, tAfko); // 级联更新afko表
// 级联更新业务
Afpo sAfpo = sapMapper.cascadeAfpoByAufk(source); // 级联查询源库afko表
Afpo tAfpo = gpMapper.selectAfpo(sAfpo); // 查询目标库中afko表
cascadeAfpoCheckByUpdate(sAfpo, tAfpo); // 级联更新afko表
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:checkUpdate:Aufk:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectAufkcheckUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
......@@ -118,71 +119,19 @@ public class CheckUpdateServiceImpl {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Aufk:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
// 所属selectAufkCheckByUpdate的级联
private void cascadeAfkoCheckByUpdate(Afko source, Afko target) {
if (target != null) { // 目标库有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getGltrp() != null) {
String erdat2 = new StringBuffer(source.getGltrp()).insert(4, "-").insert(7, "-").toString();
Date date = DateUtil.parse(erdat2);
source.setGltrp1(date);
}
if (source.getGstrp() != null) {
String erdat2 = new StringBuffer(source.getGstrp()).insert(4, "-").insert(7, "-").toString();
Date date = DateUtil.parse(erdat2);
source.setGstrp1(date);
}
// ===============================
while (true) {
try {
gpMapper.updateAfko(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
}
// 所属selectAufkCheckByUpdate的级联
private void cascadeAfpoCheckByUpdate(Afpo source, Afpo target) {
if (target != null) {
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
// ===============================
while (true) {
try {
gpMapper.updateAfpo(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
}
public void selectBkpfCheckByUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkUpdate:Bkpf:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Bkpf:rowids"));
Bkpf build = Bkpf.builder().rowids(rowids).build();
List<Bkpf> slist = sapMapper.selectBkpfCheckByUpdate(build); // 从源库中按更新时间查询,只更新今天的数据
if (slist.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkUpdate:Bkpf:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历源库中的数据,去目标库中查询数据
Bkpf target = gpMapper.selectBkpf(source);
String operator = "none";
......@@ -207,7 +156,7 @@ public class CheckUpdateServiceImpl {
if ("00000000".equals(cpudt)) {
source.setCpudt_cputm(null);
} else {
Date date = DateUtil.parse(cpudt+cputm, "yyyyMMddHHmmss");
Date date = DateUtil.parse(cpudt + cputm, "yyyyMMddHHmmss");
source.setCpudt_cputm(date);
}
// ===============================
......@@ -217,7 +166,8 @@ public class CheckUpdateServiceImpl {
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
......@@ -238,13 +188,13 @@ public class CheckUpdateServiceImpl {
opsForValue.setIfAbsent("huazheng:checkUpdate:Kna1:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Kna1:rowids"));
Kna1 build = Kna1.builder().rowids(rowids).build();
List<Kna1> slist = sapMapper.selectKna1CheckUpdate(build); // 从数仓中查询一组数据
if (slist.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkUpdate:Kna1:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(target -> { // 遍历要检查的数据,去目标库中查询数据
Kna1 source = gpMapper.selectKna1(target);
String operator = "none";
......@@ -271,19 +221,20 @@ public class CheckUpdateServiceImpl {
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
// 级联更新业务
Knvv sKnvv = sapMapper.cascadeKnvvByKna1(source); // 级联查询源库afko表
Knvv tKnvv = gpMapper.selectKnvv(sKnvv); // 查询目标库中afko表
cascadeKnvvCheckByUpdate(sKnvv, tKnvv); // 级联更新afko表
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:checkUpdate:Kna1:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectKna1checkUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
......@@ -293,28 +244,6 @@ public class CheckUpdateServiceImpl {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Kna1:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
// 所属selectKna1CheckByUpdate的级联
public void cascadeKnvvCheckByUpdate(Knvv source, Knvv target) {
if (target != null) { // 目标库有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
// ===============================
while (true) {
try {
gpMapper.updateKnvv(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
}
public void selectLikpCheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -344,7 +273,7 @@ public class CheckUpdateServiceImpl {
if ("00000000".equals(wadat_ist)) {
source.setWadat_ist2lfuhr(null);
} else {
Date date = DateUtil.parse(wadat_ist+lfuhr, "yyyyMMddHHmmss");
Date date = DateUtil.parse(wadat_ist + lfuhr, "yyyyMMddHHmmss");
source.setWadat_ist2lfuhr(date);
}
source.setErdat1(SomeUtils.caDate(source.getErdat())); // 日期00000000格式转换,已处理异常
......@@ -356,7 +285,8 @@ public class CheckUpdateServiceImpl {
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
......@@ -411,7 +341,8 @@ public class CheckUpdateServiceImpl {
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
......@@ -464,7 +395,8 @@ public class CheckUpdateServiceImpl {
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
......@@ -512,7 +444,8 @@ public class CheckUpdateServiceImpl {
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
......@@ -540,7 +473,7 @@ public class CheckUpdateServiceImpl {
redis1Template.opsForValue().set("huazheng:checkUpdate:Vbrk:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历要检查的数据
Vbrk target = gpMapper.selectVbrk(source); // 根据主键查询源库中的数据
String operator = "none";
......@@ -564,7 +497,8 @@ public class CheckUpdateServiceImpl {
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
......@@ -618,13 +552,14 @@ public class CheckUpdateServiceImpl {
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:checkUpdate:Zsd06:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectZsd06checkUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
......@@ -677,7 +612,8 @@ public class CheckUpdateServiceImpl {
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
......@@ -698,13 +634,13 @@ public class CheckUpdateServiceImpl {
opsForValue.setIfAbsent("huazheng:checkUpdate:Mara:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Mara:rowids"));
Mara build = Mara.builder().rowids(rowids).build();
List<Mara> slist = sapMapper.selectMaraCheckByUpdate(build); // 从源库中按更新时间查询,只更新今天的数据
if (slist.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkUpdate:Mara:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历要检查的数据
Mara target = gpMapper.selectMara(source); // 根据主键查询源库中的数据
String operator = "none";
......@@ -733,17 +669,18 @@ public class CheckUpdateServiceImpl {
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
// 级联更新业务
List<Ausp> sAuspList = sapMapper.cascadeAuspByMara(source); // 级联查询源库afko表
for (Ausp sAusp : sAuspList) {
Ausp tAusp = gpMapper.selectAusp(sAusp); // 查询目标库中afko表
cascadeAuspCheckByUpdate(sAusp, tAusp); // 级联更新afko表
}
ThreadUtil.safeSleep(500);
}
}
......@@ -756,7 +693,7 @@ public class CheckUpdateServiceImpl {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Mara:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
// 所属selectMaraCheckUpdate的级联
private void cascadeAuspCheckByUpdate(Ausp source, Ausp target) {
if (target != null) { // 目标库有数据
......@@ -771,12 +708,89 @@ public class CheckUpdateServiceImpl {
gpMapper.updateAusp(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
}
// 所属selectKna1CheckByUpdate的级联
public void cascadeKnvvCheckByUpdate(Knvv source, Knvv target) {
if (target != null) { // 目标库有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
// ===============================
while (true) {
try {
gpMapper.updateKnvv(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
}
// 所属selectAufkCheckByUpdate的级联
private void cascadeAfkoCheckByUpdate(Afko source, Afko target) {
if (target != null) { // 目标库有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getGltrp() != null) {
String erdat2 = new StringBuffer(source.getGltrp()).insert(4, "-").insert(7, "-").toString();
Date date = DateUtil.parse(erdat2);
source.setGltrp1(date);
}
if (source.getGstrp() != null) {
String erdat2 = new StringBuffer(source.getGstrp()).insert(4, "-").insert(7, "-").toString();
Date date = DateUtil.parse(erdat2);
source.setGstrp1(date);
}
// ===============================
while (true) {
try {
gpMapper.updateAfko(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
}
// 所属selectAufkCheckByUpdate的级联
private void cascadeAfpoCheckByUpdate(Afpo source, Afpo target) {
if (target != null) {
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
// ===============================
while (true) {
try {
gpMapper.updateAfpo(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
}
}
......@@ -89,22 +89,16 @@ public class DeleteUpdateJobServiceImpl {
@Autowired
private SapMapper sapMapper;
@Autowired
private CrmMapper crmMapper;
@Autowired
private GPMapper gpMapper;
@Autowired
private TmsMapper tmsMapper;
@Autowired
private HzcrmMapper hzcrmMapper;
@Autowired
private UltimusDBMapper ultimusDBMapper;
@Autowired
private Cinderellaw2Mapper cinderellaw2Mapper;
......
......@@ -806,7 +806,6 @@
<property name="cronExpression" value="* * * * * ?" />
</bean>
<!-- 删除流程 -->
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论