提交 d1a3178d 作者: guofeng

vbap、VBEP、VBPA、ZMDPC、Sys_SAPreturnNo更新

上级 743692a9
......@@ -26,9 +26,13 @@ import com.huazheng.project.hana.model.Makt;
import com.huazheng.project.hana.model.Mara;
import com.huazheng.project.hana.model.Pa0002;
import com.huazheng.project.hana.model.Vbak;
import com.huazheng.project.hana.model.Vbap;
import com.huazheng.project.hana.model.Vbep;
import com.huazheng.project.hana.model.Vbpa;
import com.huazheng.project.hana.model.Vbrk;
import com.huazheng.project.hana.model.Vbrp;
import com.huazheng.project.hana.model.Vbuk;
import com.huazheng.project.hana.model.Zmdpc;
import com.huazheng.project.hana.model.Zpoedit;
import com.huazheng.project.hana.model.Zsd06;
import com.huazheng.project.hana.model.Zsdfhzl;
......@@ -41,6 +45,8 @@ import com.huazheng.project.mssql.model.SampleApplicationProcess;
import com.huazheng.project.mssql.model.SampleApplicationProcessMingXi;
import com.huazheng.project.mssql.model.SampleClosingProcess;
import com.huazheng.project.mssql.model.SampleClosingProcessMingXi;
import com.huazheng.project.mssql2.mapper.HzcrmMapper;
import com.huazheng.project.mssql2.model.SysSAPreturnNo;
import com.huazheng.project.mysql.mapper.TmsMapper;
import com.huazheng.project.mysql.model.Handover;
import com.huazheng.project.mysql.model.HandoverTask;
......@@ -66,6 +72,8 @@ public class CheckDeleteServiceImpl {
private CrmMapper crmMapper;
@Autowired
private TmsMapper tmsMapper;
@Autowired
private HzcrmMapper hzcrmMapper;
public void selectAufkCheckByDelete() {
try {
......@@ -733,6 +741,123 @@ public class CheckDeleteServiceImpl {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Bsid:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectVbapCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Vbap:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Vbap:rowNum");
Vbap build = Vbap.builder().rowNum(rowNum).build();
List<Vbap> list = gpMapper.selectVbapCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkDelete:Vbap:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Vbap source = sapMapper.selectVbapById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteVbap(target); // 删除数仓中的数据
gpMapper.deleteVbapAdv(target);
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:Vbap:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectVbapcheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Vbap:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectVbepCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Vbep:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Vbep:rowNum");
Vbep build = Vbep.builder().rowNum(rowNum).build();
List<Vbep> list = gpMapper.selectVbepCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkDelete:Vbep:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Vbep source = sapMapper.selectVbepById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteVbep(target); // 删除数仓中的数据
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:Vbep:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectVbepcheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Vbep:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectVbpaCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Vbpa:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Vbpa:rowNum");
Vbpa build = Vbpa.builder().rowNum(rowNum).build();
List<Vbpa> list = gpMapper.selectVbpaCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkDelete:Vbpa:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Vbpa source = sapMapper.selectVbpaById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteVbpa(target); // 删除数仓中的数据
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:Vbpa:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectVbpacheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Vbpa:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectZmdpcCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Zmdpc:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Zmdpc:rowNum");
Zmdpc build = Zmdpc.builder().rowNum(rowNum).build();
List<Zmdpc> list = gpMapper.selectZmdpcCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkDelete:Zmdpc:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Zmdpc source = sapMapper.selectZmdpcById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteZmdpc(target); // 删除数仓中的数据
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:Zmdpc:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectZmdpccheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Zmdpc:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectSalesContractProcessCheckByDelete() {
try {
......@@ -1085,4 +1210,34 @@ public class CheckDeleteServiceImpl {
}
}
public void selectSysSAPreturnNoCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:SysSAPreturnNo:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:SysSAPreturnNo:rowNum");
SysSAPreturnNo build = SysSAPreturnNo.builder().rowNum(rowNum).build();
List<SysSAPreturnNo> list = gpMapper.selectSysSAPreturnNoCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkDelete:SysSAPreturnNo:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
SysSAPreturnNo source = hzcrmMapper.selectSysSAPreturnNoById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteSysSAPreturnNo(target); // 删除数仓中的数据
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:SysSAPreturnNo:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectSysSAPreturnNocheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:SysSAPreturnNo:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
}
package com.huazheng.project.greenplum.service.impl;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
......@@ -26,10 +30,16 @@ import com.huazheng.project.hana.model.Lips;
import com.huazheng.project.hana.model.Makt;
import com.huazheng.project.hana.model.Mara;
import com.huazheng.project.hana.model.Pa0002;
import com.huazheng.project.hana.model.T023t;
import com.huazheng.project.hana.model.Vbak;
import com.huazheng.project.hana.model.Vbap;
import com.huazheng.project.hana.model.VbapAdv;
import com.huazheng.project.hana.model.Vbep;
import com.huazheng.project.hana.model.Vbpa;
import com.huazheng.project.hana.model.Vbrk;
import com.huazheng.project.hana.model.Vbrp;
import com.huazheng.project.hana.model.Vbuk;
import com.huazheng.project.hana.model.Zmdpc;
import com.huazheng.project.hana.model.Zpoedit;
import com.huazheng.project.hana.model.Zsd06;
import com.huazheng.project.hana.model.Zsdfhzl;
......@@ -42,6 +52,8 @@ import com.huazheng.project.mssql.model.SampleApplicationProcess;
import com.huazheng.project.mssql.model.SampleApplicationProcessMingXi;
import com.huazheng.project.mssql.model.SampleClosingProcess;
import com.huazheng.project.mssql.model.SampleClosingProcessMingXi;
import com.huazheng.project.mssql2.mapper.HzcrmMapper;
import com.huazheng.project.mssql2.model.SysSAPreturnNo;
import com.huazheng.project.mysql.mapper.TmsMapper;
import com.huazheng.project.mysql.model.Handover;
import com.huazheng.project.mysql.model.HandoverTask;
......@@ -72,6 +84,10 @@ public class CheckUpdateServiceImpl {
private CrmMapper crmMapper;
@Autowired
private TmsMapper tmsMapper;
@Autowired
private HzcrmMapper hzcrmMapper;
private ExecutorService service = Executors.newFixedThreadPool(15);
public void selectAufkCheckByUpdate() {
try {
......@@ -1559,7 +1575,7 @@ public class CheckUpdateServiceImpl {
opsForValue.setIfAbsent("huazheng:checkUpdate:TransformHistoryNode:rowids", "0");
Integer rowids = Integer.parseInt(opsForValue.get("huazheng:checkUpdate:TransformHistoryNode:rowids"));
TransformHistoryNode build = TransformHistoryNode.builder().id(rowids).build();
// 新加了字段要调试
List<TransformHistoryNode> slist = tmsMapper.selectTransformHistoryNodeCheckByUpdate(build); // 从数仓中查询一组数据
if (slist.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkUpdate:TransformHistoryNode:rowids", "0"); // 计数器复位
......@@ -1638,4 +1654,307 @@ public class CheckUpdateServiceImpl {
}
}
public void selectVbapCheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkUpdate:Vbap:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Vbap:rowids"));
Vbap build = Vbap.builder().rowids(rowids).build();
List<Vbap> slist = sapMapper.selectVbapCheckByUpdate(build); // 从数仓中查询一组数据
if (slist.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkUpdate:Vbap:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历要检查的数据
if (source.getErdat() != null && source.getErzet() != null) {
String erdat2 = new StringBuffer(source.getErdat()).insert(4, "-").insert(7, "-").toString();
String erzet2 = new StringBuffer(source.getErzet()).insert(2, ":").insert(5, ":").toString();
String dateStr = erdat2 + " " + erzet2;
Date date = DateUtil.parse(dateStr);
source.setErdat1(date);
source.setErdat2(date);
}
Vbap target = gpMapper.selectVbap(source); // 根据主键查询源库中的数据
String operator = "none";
Long srowids = source.getRowids();
if (target != null) {
source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
// ===============================
while (true) {
try {
gpMapper.updateVbap(source); // 更新数据到数仓中
gpMapper.updateVbapAdv(source);
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
// 级联更新业务
List<Vbep> sVbepList = sapMapper.cascadeVbepByVbap(source); // 级联查询源库Vbep表
for (Vbep sVbep : sVbepList) {
Vbep tVbep = gpMapper.selectVbep(sVbep); // 查询目标库中Vbep表
cascadeVbepCheckByUpdate(sVbep, tVbep); // 级联更新afko表
}
// 级联更新业务
List<Vbpa> sVbpaList = sapMapper.cascadeVbpaByVbap(source); // 级联查询源库Vbpa表
for (Vbpa sVbpa : sVbpaList) {
Vbpa tVbpa = gpMapper.selectVbpa(sVbpa); // 查询目标库中Vbpa表
cascadeVbpaCheckByUpdate(sVbpa, tVbpa); // 级联更新afko表
}
// 级联更新业务
List<Zmdpc> sZmdpcList = sapMapper.cascadeZmdpcByVbap(source); // 级联查询源库Zmdpc表
for (Zmdpc sZmdpc : sZmdpcList) {
Zmdpc tZmdpc = gpMapper.selectZmdpc(sZmdpc); // 查询目标库中Zmdpc表
cascadeZmdpcCheckByUpdate(sZmdpc, tZmdpc); // 级联更新afko表
}
// 级联更新业务
List<SysSAPreturnNo> sSysSAPreturnNoList = hzcrmMapper.cascadeSysSAPreturnNoByVbap(source); // 级联查询源库SysSAPreturnNo表
for (SysSAPreturnNo sSysSAPreturnNo : sSysSAPreturnNoList) {
SysSAPreturnNo tSysSAPreturnNo = gpMapper.selectSysSAPreturnNo(sSysSAPreturnNo); // 查询目标库中SysSAPreturnNo表
cascadeSysSAPreturnNoCheckByUpdate(sSysSAPreturnNo, tSysSAPreturnNo); // 级联更新afko表
}
ThreadUtil.safeSleep(500);
try {
Future<VbapAdv> vbapAdvSubmit = service.submit(() -> {
return gpMapper.selectVbapAdv(source);
});
Future<BigDecimal> ccslSubmit = service.submit(() -> {
return gpMapper.caLips4ccsl(source);
});
Future<BigDecimal> jhslSubmit = service.submit(() -> {
return gpMapper.caVbep4jhsl(source);
});
Future<BigDecimal> rkslSubmit = service.submit(() -> {
return gpMapper.caAfpo4rksl(source);
});
Future<BigDecimal> pcslSubmit = service.submit(() -> {
return gpMapper.caAfpo4pcsl(source);
});
Future<BigDecimal> kpslSubmit = service.submit(() -> {
return gpMapper.caVbrp4kpsl(source);
});
Future<Date> jhsjSubmit = service.submit(() -> {
return gpMapper.caVbep4jhsj(source);
});
Future<Date> kpsjSubmit = service.submit(() -> {
return gpMapper.caVbrk4kpsj(source);
});
Future<Date> rksjSubmit = service.submit(() -> {
return gpMapper.caAufm4rksj(source);
});
Future<Date> ccsjSubmit = service.submit(() -> {
return gpMapper.caHandover4ccsj(source);
});
Future<Date> wlsjSubmit = service.submit(() -> {
return gpMapper.caHandover4wlsj(source);
});
VbapAdv vbapAdv = vbapAdvSubmit.get();
vbapAdv.setCcsl(ccslSubmit.get());
vbapAdv.setJhsl(jhslSubmit.get());
vbapAdv.setRksl(rkslSubmit.get());
vbapAdv.setPcsl(pcslSubmit.get());
vbapAdv.setKpsl(kpslSubmit.get());
vbapAdv.setJhsj(jhsjSubmit.get());
vbapAdv.setKpsj(kpsjSubmit.get());
vbapAdv.setRksj(rksjSubmit.get());
vbapAdv.setCcsj(ccsjSubmit.get());
vbapAdv.setWlsj(wlsjSubmit.get());
vbapAdv.setHtsl(source.getKwmeng());
vbapAdv.setPssj(source.getErdat1());
vbapAdv.setPcsj(source.getErdat1());
gpMapper.updatevbapAdv4CaAll(vbapAdv);
Aufk aufk = gpMapper.selectVbapAdv4Aufk(source); // vbap-aufk
if (aufk != null) {
aufk.setVbap(source);
gpMapper.updateVbapAdv4Aufk(aufk); // vbap-aufk
}
SysSAPreturnNo sysSAPreturnNo = gpMapper.selectVbapAdv4SysSAPreturnNo(source); // vbap-SysSAPreturnNo
if (sysSAPreturnNo != null) {
sysSAPreturnNo.setVbap(source);
gpMapper.updateVbapAdv4SysSAPreturnNo(sysSAPreturnNo); // vbap-SysSAPreturnNo
}
SalesContractProcess salesContractProcess = gpMapper.selectVbapAdv4SalesContractProcess(source); // vbap-SysSAPreturnNo-SalesContractProcess
if (salesContractProcess != null) {
salesContractProcess.setVbap(source);
gpMapper.updateVbapAdv4SalesContractProcess(salesContractProcess); // vbap-SysSAPreturnNo-SalesContractProcess
}
Mara mara = gpMapper.selectVbapAdv4Mara(source); // vbap-Mara
if (mara != null) {
mara.setVbap(source);
gpMapper.updateVbapAdv4Mara(mara); // vbap-Mara
}
Makt makt = gpMapper.selectVbapAdv4Makt(source); // vbap-Makt
if (makt != null) {
makt.setVbap(source);
gpMapper.updateVbapAdv4Makt(makt); // vbap-Makt
}
Zsd06 zsd06 = gpMapper.selectVbapAdv4Zsd06(source); // vbap-Zsd06
if (zsd06 != null) {
zsd06.setVbap(source);
gpMapper.updateVbapAdv4Zsd06(zsd06); // vbap-Zsd06
}
Vbak vbak = gpMapper.selectVbapAdv4Vbak(source); // vbap-Vbak
if (vbak != null) {
vbak.setVbap(source);
gpMapper.updateVbapAdv4Vbak(vbak); // vbap-Vbak
}
Likp likp = gpMapper.selectVbapAdv4Likp(source); // vbap-lips-Likp
if (likp != null) {
likp.setVbap(source);
gpMapper.updateVbapAdv4Likp(likp); // vbap-lips-Likp
}
Handover handover = gpMapper.selectVbapAdv4Handover(source); // vbap-lips-Likp-Handover
if (handover != null) {
handover.setVbap(source);
gpMapper.updateVbapAdv4Handover(handover); // vbap-lips-Likp-Handover
}
Zmdpc zmdpc = gpMapper.selectVbapAdv4Zmdpc(source); // vbap-Zmdpc
if (zmdpc != null) {
zmdpc.setVbap(source);
gpMapper.updateVbapAdv4Zmdpc(zmdpc); // vbap-Zmdpc
}
T023t t023t = gpMapper.selectVbapAdv4T023t(source); // vbap-mara-T023t
if (t023t != null) {
t023t.setVbap(source);
gpMapper.updateVbapAdv4T023t(t023t); // vbap-mara-T023t
}
Kna1 kna1 = gpMapper.selectVbapAdv4Kna1(source); // vbap-vbak-Kna1
if (kna1 != null) {
kna1.setVbap(source);
gpMapper.updateVbapAdv4Kna1(kna1); // vbap-vbak-Kna1
}
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Vbap:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
}
redis1Template.opsForValue().set("huazheng:checkUpdate:Vbap:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectVbapcheckUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Vbap:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
// 所属selectVbapCheckUpdate的级联
private void cascadeVbepCheckByUpdate(Vbep source, Vbep target) {
if (target != null) { // 目标库有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getEdatu() != null) {
String erdat2 = new StringBuffer(source.getEdatu()).insert(4, "-").insert(7, "-").toString();
Date date = DateUtil.parse(erdat2);
source.setEdatu1(date);
}
// ===============================
while (true) {
try {
gpMapper.updateVbep(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
}
// 所属selectVbapCheckUpdate的级联
private void cascadeVbpaCheckByUpdate(Vbpa source, Vbpa target) {
if (target != null) { // 目标库有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
// ===============================
while (true) {
try {
gpMapper.updateVbpa(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
}
// 所属selectVbapCheckUpdate的级联
private void cascadeZmdpcCheckByUpdate(Zmdpc source, Zmdpc target) {
if (target != null) { // 目标库有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
// ===============================
while (true) {
try {
gpMapper.updateZmdpc(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
}
// 所属selectVbapCheckUpdate的级联
private void cascadeSysSAPreturnNoCheckByUpdate(SysSAPreturnNo source, SysSAPreturnNo target) {
if (target != null) { // 目标库有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
// ===============================
while (true) {
try {
gpMapper.updateSysSAPreturnNo(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
}
}
......@@ -2,12 +2,8 @@ package com.huazheng.project.greenplum.service.impl;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
......@@ -17,13 +13,8 @@ import org.springframework.stereotype.Service;
import com.huazheng.project.greenplum.mapper.GPMapper;
import com.huazheng.project.hana.mapper.SapMapper;
import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Knkk;
import com.huazheng.project.hana.model.Knvp;
import com.huazheng.project.hana.model.Likp;
import com.huazheng.project.hana.model.Makt;
import com.huazheng.project.hana.model.Mara;
import com.huazheng.project.hana.model.Mska;
import com.huazheng.project.hana.model.S066;
import com.huazheng.project.hana.model.S067;
......@@ -31,16 +22,7 @@ import com.huazheng.project.hana.model.T001w;
import com.huazheng.project.hana.model.T023t;
import com.huazheng.project.hana.model.Tspat;
import com.huazheng.project.hana.model.Tvkbt;
import com.huazheng.project.hana.model.Vbak;
import com.huazheng.project.hana.model.Vbap;
import com.huazheng.project.hana.model.VbapAdv;
import com.huazheng.project.hana.model.Vbep;
import com.huazheng.project.hana.model.Vbpa;
import com.huazheng.project.hana.model.Zmdpc;
import com.huazheng.project.hana.model.Zsd06;
import com.huazheng.project.mssql.model.SalesContractProcess;
import com.huazheng.project.mssql2.mapper.HzcrmMapper;
import com.huazheng.project.mssql2.model.SysSAPreturnNo;
import com.huazheng.project.mssql2.model.TKeHu;
import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZ;
import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZYeWuY;
......@@ -49,7 +31,6 @@ import com.huazheng.project.mssql3.mapper.Cinderellaw2Mapper;
import com.huazheng.project.mssql3.model.PersonComp;
import com.huazheng.project.mssql4.mapper.UltimusDBMapper;
import com.huazheng.project.mssql4.model.Tasks;
import com.huazheng.project.mysql.model.Handover;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.thread.ThreadUtil;
......@@ -76,8 +57,6 @@ public class DeleteUpdateJobServiceImpl {
@Autowired
private Cinderellaw2Mapper cinderellaw2Mapper;
private ExecutorService service = Executors.newFixedThreadPool(15);
public String getErrorInfoFromException(Exception e) {
try {
StringWriter sw = new StringWriter();
......@@ -89,56 +68,12 @@ public class DeleteUpdateJobServiceImpl {
}
}
public void checkJob2() {
selectSysSAPreturnNoCheck(); // 5
selectTKeHuCheck(); // 6
selectTKeHuXiaoShouZZCheck(); // 7
selectTKeHuXiaoShouZZYeWuYCheck(); // 8
selectTWuLiaoDBCheck(); // 9
}
private void selectSysSAPreturnNoCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:SysSAPreturnNo:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:SysSAPreturnNo:rowNum");
SysSAPreturnNo build = SysSAPreturnNo.builder().rowNum(rowNum).build();
List<SysSAPreturnNo> list = gpMapper.selectSysSAPreturnNoCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:check:SysSAPreturnNo:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
SysSAPreturnNo source = hzcrmMapper.selectSysSAPreturnNoById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteSysSAPreturnNo(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
while (true) {
try {
gpMapper.updateSysSAPreturnNo(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:check:SysSAPreturnNo:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectSysSAPreturnNoCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:SysSAPreturnNo:rowNum", getErrorInfoFromException(e));
}
}
private void selectTKeHuCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -315,9 +250,6 @@ public class DeleteUpdateJobServiceImpl {
selectS067Check(); // 29
selectT001wCheck(); // 30
selectT023tCheck(); // 31
selectVbepCheck(); // 33
selectVbpaCheck(); // 34
selectZmdpcCheck(); // 38
selectTvkbtCheck(); //
selectAfvcCheck();
selectTspatCheck();
......@@ -793,143 +725,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:T023t:rowNum", getErrorInfoFromException(e));
}
}
private void selectVbepCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Vbep:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Vbep:rowNum");
Vbep build = Vbep.builder().rowNum(rowNum).build();
List<Vbep> list = gpMapper.selectVbepCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:check:Vbep:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Vbep source = sapMapper.selectVbepById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteVbep(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getEdatu() != null) {
String erdat2 = new StringBuffer(source.getEdatu()).insert(4, "-").insert(7, "-").toString();
Date date = DateUtil.parse(erdat2);
source.setEdatu1(date);
}
// ===============================
while (true) {
try {
gpMapper.updateVbep(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:check:Vbep:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectVbepCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Vbep:rowNum", getErrorInfoFromException(e));
}
}
private void selectVbpaCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Vbpa:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Vbpa:rowNum");
Vbpa build = Vbpa.builder().rowNum(rowNum).build();
List<Vbpa> list = gpMapper.selectVbpaCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:check:Vbpa:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Vbpa source = sapMapper.selectVbpaById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteVbpa(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
// ===============================
while (true) {
try {
gpMapper.updateVbpa(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:check:Vbpa:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectVbpaCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Vbpa:rowNum", getErrorInfoFromException(e));
}
}
private void selectZmdpcCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Zmdpc:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Zmdpc:rowNum");
Zmdpc build = Zmdpc.builder().rowNum(rowNum).build();
List<Zmdpc> list = gpMapper.selectZmdpcCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:check:Zmdpc:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Zmdpc source = sapMapper.selectZmdpcById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteZmdpc(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
// ===============================
while (true) {
try {
gpMapper.updateZmdpc(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:check:Zmdpc:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectZmdpcCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Zmdpc:rowNum", getErrorInfoFromException(e));
}
}
private void selectTvkbtCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -974,276 +769,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Tvkbt:rowNum", getErrorInfoFromException(e));
}
}
public void checkJob5() {
selectVbapCheck(); // 44
}
// 计数器复位删除
private void selectVbapCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Vbap:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Vbap:rowNum");
Vbap build = Vbap.builder().rowNum(rowNum).build();
List<Vbap> list = gpMapper.selectVbapCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:check:Vbap:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} else {
redis1Template.delete("huazheng:check:Vbap:isUpdateList");
}
list.forEach(target -> { // 遍历要检查的数据
Vbap source = sapMapper.selectVbapById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteVbap(target); // 删除数仓中的数据
gpMapper.deleteVbapAdv(target);
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getErdat() != null && source.getErzet() != null) {
String erdat2 = new StringBuffer(source.getErdat()).insert(4, "-").insert(7, "-").toString();
String erzet2 = new StringBuffer(source.getErzet()).insert(2, ":").insert(5, ":").toString();
String dateStr = erdat2 + " " + erzet2;
Date date = DateUtil.parse(dateStr);
source.setErdat1(date);
source.setErdat2(date);
}
// ===============================
while (true) {
try {
gpMapper.updateVbap(source); // 更新数据到数仓中
gpMapper.updateVbapAdv(source);
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
try {
Future<VbapAdv> vbapAdvSubmit = service.submit(() -> {
return gpMapper.selectVbapAdv(source);
});
Future<BigDecimal> ccslSubmit = service.submit(() -> {
return gpMapper.caLips4ccsl(source);
});
Future<BigDecimal> jhslSubmit = service.submit(() -> {
return gpMapper.caVbep4jhsl(source);
});
Future<BigDecimal> rkslSubmit = service.submit(() -> {
return gpMapper.caAfpo4rksl(source);
});
Future<BigDecimal> pcslSubmit = service.submit(() -> {
return gpMapper.caAfpo4pcsl(source);
});
Future<BigDecimal> kpslSubmit = service.submit(() -> {
return gpMapper.caVbrp4kpsl(source);
});
Future<Date> jhsjSubmit = service.submit(() -> {
return gpMapper.caVbep4jhsj(source);
});
Future<Date> kpsjSubmit = service.submit(() -> {
return gpMapper.caVbrk4kpsj(source);
});
Future<Date> rksjSubmit = service.submit(() -> {
return gpMapper.caAufm4rksj(source);
});
Future<Date> ccsjSubmit = service.submit(() -> {
return gpMapper.caHandover4ccsj(source);
});
Future<Date> wlsjSubmit = service.submit(() -> {
return gpMapper.caHandover4wlsj(source);
});
VbapAdv vbapAdv = vbapAdvSubmit.get();
vbapAdv.setCcsl(ccslSubmit.get());
vbapAdv.setJhsl(jhslSubmit.get());
vbapAdv.setRksl(rkslSubmit.get());
vbapAdv.setPcsl(pcslSubmit.get());
vbapAdv.setKpsl(kpslSubmit.get());
vbapAdv.setJhsj(jhsjSubmit.get());
vbapAdv.setKpsj(kpsjSubmit.get());
vbapAdv.setRksj(rksjSubmit.get());
vbapAdv.setCcsj(ccsjSubmit.get());
vbapAdv.setWlsj(wlsjSubmit.get());
vbapAdv.setHtsl(source.getKwmeng());
vbapAdv.setPssj(source.getErdat1());
vbapAdv.setPcsj(source.getErdat1());
gpMapper.updatevbapAdv4CaAll(vbapAdv);
// Future<String> s1 = service.submit(() -> {
Aufk aufk = gpMapper.selectVbapAdv4Aufk(source); // vbap-aufk
if (aufk != null) {
aufk.setVbap(source);
gpMapper.updateVbapAdv4Aufk(aufk); // vbap-aufk
}
// return "ok";
// });
// Future<String> s2 = service.submit(() -> {
SysSAPreturnNo sysSAPreturnNo = gpMapper.selectVbapAdv4SysSAPreturnNo(source); // vbap-SysSAPreturnNo
if (sysSAPreturnNo != null) {
sysSAPreturnNo.setVbap(source);
gpMapper.updateVbapAdv4SysSAPreturnNo(sysSAPreturnNo); // vbap-SysSAPreturnNo
}
// return "ok";
// });
// Future<String> s3 = service.submit(() -> {
SalesContractProcess salesContractProcess = gpMapper.selectVbapAdv4SalesContractProcess(source); // vbap-SysSAPreturnNo-SalesContractProcess
if (salesContractProcess != null) {
salesContractProcess.setVbap(source);
gpMapper.updateVbapAdv4SalesContractProcess(salesContractProcess); // vbap-SysSAPreturnNo-SalesContractProcess
}
// return "ok";
// });
// Future<String> s4 = service.submit(() -> {
Mara mara = gpMapper.selectVbapAdv4Mara(source); // vbap-Mara
if (mara != null) {
mara.setVbap(source);
gpMapper.updateVbapAdv4Mara(mara); // vbap-Mara
}
// return "ok";
// });
// Future<String> s5 = service.submit(() -> {
Makt makt = gpMapper.selectVbapAdv4Makt(source); // vbap-Makt
if (makt != null) {
makt.setVbap(source);
gpMapper.updateVbapAdv4Makt(makt); // vbap-Makt
}
// return "ok";
// });
// Future<String> s6 = service.submit(() -> {
Zsd06 zsd06 = gpMapper.selectVbapAdv4Zsd06(source); // vbap-Zsd06
if (zsd06 != null) {
zsd06.setVbap(source);
gpMapper.updateVbapAdv4Zsd06(zsd06); // vbap-Zsd06
}
// return "ok";
// });
// Future<String> s7 = service.submit(() -> {
Vbak vbak = gpMapper.selectVbapAdv4Vbak(source); // vbap-Vbak
if (vbak != null) {
vbak.setVbap(source);
gpMapper.updateVbapAdv4Vbak(vbak); // vbap-Vbak
}
// return "ok";
// });
// Future<String> s8 = service.submit(() -> {
Likp likp = gpMapper.selectVbapAdv4Likp(source); // vbap-lips-Likp
if (likp != null) {
likp.setVbap(source);
gpMapper.updateVbapAdv4Likp(likp); // vbap-lips-Likp
}
// return "ok";
// });
// Future<String> s9 = service.submit(() -> {
Handover handover = gpMapper.selectVbapAdv4Handover(source); // vbap-lips-Likp-Handover
if (handover != null) {
handover.setVbap(source);
gpMapper.updateVbapAdv4Handover(handover); // vbap-lips-Likp-Handover
}
// return "ok";
// });
// Future<String> s10 = service.submit(() -> {
Zmdpc zmdpc = gpMapper.selectVbapAdv4Zmdpc(source); // vbap-Zmdpc
if (zmdpc != null) {
zmdpc.setVbap(source);
gpMapper.updateVbapAdv4Zmdpc(zmdpc); // vbap-Zmdpc
}
// return "ok";
// });
// Future<String> s11 = service.submit(() -> {
T023t t023t = gpMapper.selectVbapAdv4T023t(source); // vbap-mara-T023t
if (t023t != null) {
t023t.setVbap(source);
gpMapper.updateVbapAdv4T023t(t023t); // vbap-mara-T023t
}
// return "ok";
// });
// Future<String> s12 = service.submit(() -> {
Kna1 kna1 = gpMapper.selectVbapAdv4Kna1(source); // vbap-vbak-Kna1
if (kna1 != null) {
kna1.setVbap(source);
gpMapper.updateVbapAdv4Kna1(kna1); // vbap-vbak-Kna1
}
// return "ok";
// });
// log.info("=========== vbap ===== ca" + source.getVbeln());
// s1.get();log.info("=========== vbap ===== cas1" + source.getVbeln());
// s2.get();log.info("=========== vbap ===== cas2" + source.getVbeln());
// s3.get();log.info("=========== vbap ===== cas3" + source.getVbeln());
// s4.get();log.info("=========== vbap ===== cas4" + source.getVbeln());
// s5.get();log.info("=========== vbap ===== cas5" + source.getVbeln());
// s6.get();log.info("=========== vbap ===== cas6" + source.getVbeln());
// s7.get();log.info("=========== vbap ===== cas7" + source.getVbeln());
// s8.get();log.info("=========== vbap ===== cas8" + source.getVbeln());
// s9.get();log.info("=========== vbap ===== cas9" + source.getVbeln());
// s10.get();log.info("=========== vbap ===== cas10" + source.getVbeln());
// s11.get();log.info("=========== vbap ===== cas11" + source.getVbeln());
// s12.get();log.info("=========== vbap ===== cas12" + source.getVbeln());
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Vbap:rowNum", getErrorInfoFromException(e));
}
}
}
String mode = null;
String isUpdateList = opsForValue.get("huazheng:check:Vbap:isUpdateList");
if (isUpdateList == null) { // 非更新模式下的列表才更新计数器
redis1Template.opsForValue().set("huazheng:check:Vbap:rowNum", target.getRowNum());
mode = "计数器模式";
} else {
mode = "更新时间模式";
}
if (!operator.equals("none")) {
log.info(String.format("selectVbapCheck --> rowNum:%s, operator:%s, %s", target.getRowNum(), operator, mode));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Vbap:rowNum", getErrorInfoFromException(e));
}
}
private void selectVbapCheckDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Vbap:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Vbap:rowNum");
Vbap build = Vbap.builder().rowNum(rowNum).build();
List<Vbap> list = gpMapper.selectVbapCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkDelete:Vbap:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Vbap source = sapMapper.selectVbapById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteVbap(target); // 删除数仓中的数据
gpMapper.deleteVbapAdv(target);
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:Vbap:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectVbapCheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Vbap:rowNum", getErrorInfoFromException(e));
}
}
// 有更新时间的表的删除计数器需要拆分出来,避免数据不能删除
// 把这些方法的删除部分从原方法种剥离出来
public void checkJob6() {
selectVbapCheckDelete();
}
public void checkJob8() {
selectTasksCheck();
......
......@@ -101,6 +101,7 @@ public interface SapMapper {
public List<Zsd06> selectZsd06CheckByUpdate(Zsd06 zsd06);
public List<Zsdfhzl> selectZsdfhzlCheckByUpdate(Zsdfhzl zsdfhzl);
public List<Mara> selectMaraCheckByUpdate(Mara mara);
public List<Vbap> selectVbapCheckByUpdate(Vbap vbap);
// 按天更新的级联
public Afko cascadeAfkoByAufk(Aufk aufk);
public Afpo cascadeAfpoByAufk(Aufk aufk);
......@@ -113,9 +114,9 @@ public interface SapMapper {
public List<Zpoedit> cascadeZpoeditByAufk(Aufk aufk);
public List<Bsad> cascadeBsadByBkpf(Bkpf bkpf);
public List<Bsid> cascadeBsidByBkpf(Bkpf bkpf);
public List<Vbep> cascadeVbepByVbap(Vbap vbap);
public List<Vbpa> cascadeVbpaByVbap(Vbap vbap);
public List<Zmdpc> cascadeZmdpcByVbap(Vbap vbap);
// ......
public Bkpf selectBkpfById(Bkpf target);
......@@ -154,7 +155,5 @@ public interface SapMapper {
public Bsid selectBsidById(Bsid target);
public Vbap selectVbapById(Vbap target);
public Tvkbt selectTvkbtById(Tvkbt target);
}
package com.huazheng.project.hana.model;
import java.io.Serializable;
import java.util.Date;
import org.springframework.format.annotation.DateTimeFormat;
......
package com.huazheng.project.hana.model;
import java.io.Serializable;
import java.util.Date;
import org.springframework.format.annotation.DateTimeFormat;
......
......@@ -2,6 +2,7 @@ package com.huazheng.project.mssql2.mapper;
import java.util.List;
import com.huazheng.project.hana.model.Vbap;
import com.huazheng.project.mssql2.model.SysSAPreturnNo;
import com.huazheng.project.mssql2.model.TKeHu;
import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZ;
......@@ -24,4 +25,7 @@ public interface HzcrmMapper {
public TKeHuXiaoShouZZYeWuY selectTKeHuXiaoShouZZYeWuYById(TKeHuXiaoShouZZYeWuY target);
public TWuLiaoDB selectTWuLiaoDBById(TWuLiaoDB target);
// 按天更新的级联
public List<SysSAPreturnNo> cascadeSysSAPreturnNoByVbap(Vbap vbap);
}
......@@ -2,56 +2,19 @@ select count(1) from sapabap1.kna1 where updat = '00000000' union all
select count(1) from sapabap1.kna1 where updat != '00000000' union all
select count(1) from sapabap1.kna1 where updat is null;
select count(1) from sapabap1.Bkpf where aedat = '00000000' and mandt = '800' union all
select count(1) from sapabap1.Bkpf where aedat != '00000000' and mandt = '800';
select top 20 "$rowid$" as rowids,
mandt, bukrs, belnr, gjahr, blart, bldat, budat, monat, cpudt, cputm,
case aedat when '00000000' then null else to_date(aedat) end as aedat, upddt,
(to_date(cpudt)||' '||to_time(cputm)) as cpudt_cputm
from sapabap1.Bkpf
where "$rowid$" > '2702057' and mandt = '800'
order by "$rowid$";
select top 20 "$rowid$" as rowids,
mandt, bukrs, belnr, gjahr, blart, bldat, budat, monat, cpudt, cputm,
case aedat when '00000000' then null else to_date(aedat) end as aedat, upddt,
(to_date(cpudt)||' '||to_time(cputm)) as cpudt_cputm
from sapabap1.Bkpf
where "$rowid$" > '2702306' and mandt = '800'
order by "$rowid$";
select top 20 "$rowid$" as rowids,
mandt, bukrs, belnr, gjahr, blart, bldat, budat, monat, cpudt, cputm,
case aedat when '00000000' then null else to_date(aedat) end as aedat, upddt,
(to_date(cpudt)||' '||to_time(cputm)) as cpudt_cputm
from sapabap1.Bkpf
where "$rowid$" > '2702607' and mandt = '800'
order by "$rowid$";
select top 20 "$rowid$" as rowids,
mandt, bukrs, belnr, gjahr, blart, bldat, budat, monat, cpudt, cputm, upddt
,aedat
,(cpudt||cputm) as cpudt_cputm
from sapabap1.Bkpf
where "$rowid$" > '2702607' and mandt = '800'
order by "$rowid$";
select top 20 "$rowid$" as rowids,
vbeln, mandt, erdat, wadat_ist, ctlpc, kunnr, aedat,lfuhr,
case wadat_ist when '00000000' then null else (to_date(wadat_ist)||' '||to_time(lfuhr)) end as wadat_ist2lfuhr
from sapabap1.likp
where "$rowid$" > '10' AND MANDT = '800'
order by "$rowid$"
select * from sapabap1.kna1 where mandt = '800' limit 10
p.vbeln, p.posnr, p.matnr, p.mandt, p.matkl, p.spart, p.kwmeng, p.netwr, p.mwsbp, p.waerk, p.vrkme, p.abgru,
p.uebto, p.untto, p.werks, p.vstel, p.mvgr1, p.plavo, p.mvgr4, p.mvgr5, p.ulxh, p.zma_ftb_02, p.zma_ftb_03,
p.zma_ftb_45, p.zma_ftb_06, p.zma_ftb_07, p.zma_ftb_08, p.zma_jyb_02, p.zma_jyb_03, p.zma_jyb_45,
p.zma_jyb_06, p.zma_jyb_07, p.zma_ljb_03, p.zma_ljb_04, p.zma_ljb_06, p.zma_ljb_07, p.zma_ljb_08,
p.zma_ljb_11, p.zma_ljb_12, p.zma_ljb_13, p.zma_yhbg_02, p.zma_yhbg_04, p.zma_yhbg_05, p.zma_yhbg_06,
p.tbpz, p.ws, p.cu_maktx, p.cu_pcdst, p.ztype, p.barcode, p.gdno, p.lh, p.dgxx, p.zklx, p.erdat, p.erzet, p.netpr, p.aedat,q.pernr
from sapabap1.vbap p
left join (
select a.vbeln,a.posnr,a.pernr from sapabap1.vbpa a
left join sapabap1.vbak b on a.vbeln = b.vbeln
where a.mandt = '800' and a.pernr != '00000000'
group by a.vbeln,a.posnr,a.pernr
) q on p.vbeln = q.vbeln and p.posnr = q.posnr
where p."$rowid$" > 0 and p.mandt = '800'
order by p."$rowid$"
\ No newline at end of file
......@@ -9,4 +9,6 @@ PRIMARY KEY (id)
Distributed by (id);
alter table transformHistoryNode add column hashResult text;
alter table transformHistoryNode add column rowNum serial;
\ No newline at end of file
alter table transformHistoryNode add column rowNum serial;
alter table transformHistoryNode add column modified_time timestamp;
......@@ -137,8 +137,8 @@
where id = #{id}
</select>
<insert id="insertTransformHistoryNode" parameterType="com.huazheng.project.mysql.model.TransformHistoryNode">
insert into TransformHistoryNode (id, task_no, last_commit_time, second_node, hashResult)
values(#{id}, #{task_no}, #{last_commit_time}, #{second_node}, #{hashResult})
insert into TransformHistoryNode (id, task_no, last_commit_time, second_node, modified_time, hashResult)
values(#{id}, #{task_no}, #{last_commit_time}, #{second_node}, #{modified_time}, #{hashResult})
</insert>
<delete id="deleteTransformHistoryNode" parameterType="com.huazheng.project.mysql.model.TransformHistoryNode">
delete from TransformHistoryNode
......@@ -146,7 +146,8 @@
</delete>
<update id="updateTransformHistoryNode" parameterType="com.huazheng.project.mysql.model.TransformHistoryNode">
update TransformHistoryNode set
id = #{id}, task_no = #{task_no}, last_commit_time = #{last_commit_time}, second_node = #{second_node}, hashResult = #{hashResult}
id = #{id}, task_no = #{task_no}, last_commit_time = #{last_commit_time}, second_node = #{second_node},
modified_time = #{modified_time}, hashResult = #{hashResult}
where id = #{id}
</update>
<select id="selectTransformHistoryNodeCheck" parameterType="com.huazheng.project.mysql.model.TransformHistoryNode" resultType="com.huazheng.project.mysql.model.TransformHistoryNode">
......
......@@ -17,7 +17,7 @@
where a.mandt = #{mandt} and a.pernr != '00000000'
group by a.vbeln,a.posnr,a.pernr
) q on p.vbeln = q.vbeln and p.posnr = q.posnr
where p."$rowid$" > 10 and p.mandt = #{mandt}
where p."$rowid$" &gt; #{rowids} and p.mandt = #{mandt}
order by p."$rowid$"
</select>
<select id="selectBsidNew" parameterType="Bsid" resultType="Bsid">
......@@ -385,9 +385,9 @@
left join (
select q.vbeln,r.pernr from ${hana_user}.vbap q
left join (
select vbeln,posnr,pernr from ${hana_user}.vbpa where mandt = #{mandt} and pernr != '00000000' group by vbeln,posnr,pernr
select vbeln,posnr,pernr from ${hana_user}.vbpa where mandt = '800' and pernr != '00000000' group by vbeln,posnr,pernr
) r on q.vbeln = r.vbeln and q.posnr = r.posnr
where q.mandt = #{mandt} and r.pernr is not null
where q.mandt = '800' and r.pernr is not null
group by q.vbeln,r.pernr
) b on a.vbeln = b.vbeln
where "$rowid$" &gt; #{rowids} and a.aedat != '00000000' and a.aedat = CURRENT_DATE and a.mandt = '800'
......@@ -425,6 +425,27 @@
where "$rowid$" &gt; #{rowids} and laeda != '00000000' and laeda = CURRENT_DATE ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectVbapCheckByUpdate" parameterType="Vbap" resultType="Vbap">
select top 20 "$rowid$" as rowids,
p.vbeln, p.posnr, p.matnr, p.mandt, p.matkl, p.spart, p.kwmeng, p.netwr, p.mwsbp, p.waerk, p.vrkme, p.abgru,
p.uebto, p.untto, p.werks, p.vstel, p.mvgr1, p.plavo, p.mvgr4, p.mvgr5, p.ulxh, p.zma_ftb_02, p.zma_ftb_03,
p.zma_ftb_45, p.zma_ftb_06, p.zma_ftb_07, p.zma_ftb_08, p.zma_jyb_02, p.zma_jyb_03, p.zma_jyb_45,
p.zma_jyb_06, p.zma_jyb_07, p.zma_ljb_03, p.zma_ljb_04, p.zma_ljb_06, p.zma_ljb_07, p.zma_ljb_08,
p.zma_ljb_11, p.zma_ljb_12, p.zma_ljb_13, p.zma_yhbg_02, p.zma_yhbg_04, p.zma_yhbg_05, p.zma_yhbg_06,
p.tbpz, p.ws, p.cu_maktx, p.cu_pcdst, p.ztype, p.barcode, p.gdno, p.lh, p.dgxx, p.zklx, p.erdat, p.erzet, p.netpr, p.aedat,q.pernr
from ${hana_user}.vbap p
left join (
select a.vbeln,a.posnr,a.pernr from ${hana_user}.vbpa a
left join ${hana_user}.vbak b on a.vbeln = b.vbeln
where a.mandt = '800' and a.pernr != '00000000'
group by a.vbeln,a.posnr,a.pernr
) q on p.vbeln = q.vbeln and p.posnr = q.posnr
where p."$rowid$" &gt; #{rowids} and p.aedat != '00000000' and p.mandt = '800'
order by p."$rowid$"
</select>
<!-- and p.aedat = CURRENT_DATE -->
<select id="cascadeKnvvByKna1" parameterType="Kna1" resultType="Knvv">
select
......@@ -497,6 +518,26 @@
from ${hana_user}.bsid a
where a.bukrs = #{bukrs} and a.belnr = #{belnr} and a.gjahr = #{gjahr} and a.mandt = '800'
</select>
<select id="cascadeVbepByVbap" parameterType="Vbap" resultType="Vbep">
select
vbeln, posnr, edatu, ettyp, wmeng, bmeng, mandt, etenr, aufnr
from ${hana_user}.vbep
where vbeln=#{vbeln} and posnr=#{posnr} ${hana_mandt}
</select>
<select id="cascadeVbpaByVbap" parameterType="Vbap" resultType="Vbpa">
select
mandt, vbeln, posnr, parvw, pernr
from ${hana_user}.Vbpa
where vbeln=#{vbeln} and parvw = 'VE' ${hana_mandt}
</select>
<select id="cascadeZmdpcByVbap" parameterType="Vbap" resultType="Zmdpc">
select
vbeln,posnr,f_plant,plant,x_plant,mandt
from ${hana_user}.Zmdpc
where vbeln=#{vbeln} and posnr=#{posnr} ${hana_mandt}
</select>
<select id="selectAfkoById" parameterType="Afko" resultType="Afko">
......
......@@ -66,4 +66,16 @@
where a.id = #{id}
</select>
<select id="cascadeSysSAPreturnNoByVbap" parameterType="com.huazheng.project.hana.model.Vbap" resultType="SysSAPreturnNo">
select top 20 a.* from (
select id as rowids,
id,processName,shiLiH,sapNo,chuangJianSJ
from Sys_SAPreturnNo
) a where sapNo = #{vbeln}
</select>
</mapper>
......@@ -11,7 +11,7 @@ org.quartz.scheduler.wrapJobExecutionInUserTransaction=false
# ThreadPool
#============================================================================
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=50
org.quartz.threadPool.threadCount=70
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true
......
......@@ -632,24 +632,6 @@
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="checkJob5" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="checkJob6" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="checkJob8" />
</bean>
</property>
......@@ -853,6 +835,16 @@
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkUpdateServiceImpl" />
<property name="targetMethod" value="selectVbapCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<!-- 删除流程 -->
......@@ -1166,6 +1158,53 @@
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectVbapCheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectVbepCheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectVbpaCheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectZmdpcCheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectSysSAPreturnNoCheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<!-- 以上66个任务 -->
</list>
</constructor-arg>
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论