提交 b251d724 作者: guofeng

ZSD_06、ZSD_FHZL增量更新

上级 73087449
......@@ -20,6 +20,8 @@ import com.huazheng.project.hana.model.Lips;
import com.huazheng.project.hana.model.Pa0002;
import com.huazheng.project.hana.model.Vbak;
import com.huazheng.project.hana.model.Vbrk;
import com.huazheng.project.hana.model.Zsd06;
import com.huazheng.project.hana.model.Zsdfhzl;
import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2;
......@@ -333,7 +335,64 @@ public class CheckDeleteServiceImpl {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Vbrk:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectZsd06CheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Zsd06:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Zsd06:rowNum");
Zsd06 build = Zsd06.builder().rowNum(rowNum).build();
List<Zsd06> list = gpMapper.selectZsd06Check(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkDelete:Zsd06:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Zsd06 source = sapMapper.selectZsd06ById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteZsd06(target); // 删除数仓中的数据
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:Zsd06:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectZsd06checkDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Zsd06:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectZsdfhzlCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Zsdfhzl:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Zsdfhzl:rowNum");
Zsdfhzl build = Zsdfhzl.builder().rowNum(rowNum).build();
List<Zsdfhzl> list = gpMapper.selectZsdfhzlCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkDelete:Zsdfhzl:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Zsdfhzl source = sapMapper.selectZsdfhzlById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteZsdfhzl(target); // 删除数仓中的数据
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:Zsdfhzl:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectZsdfhzlcheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Zsdfhzl:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
}
......@@ -21,6 +21,8 @@ import com.huazheng.project.hana.model.Lips;
import com.huazheng.project.hana.model.Pa0002;
import com.huazheng.project.hana.model.Vbak;
import com.huazheng.project.hana.model.Vbrk;
import com.huazheng.project.hana.model.Zsd06;
import com.huazheng.project.hana.model.Zsdfhzl;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.thread.ThreadUtil;
......@@ -547,5 +549,117 @@ public class CheckUpdateServiceImpl {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Vbrk:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectZsd06CheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkUpdate:Zsd06:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Zsd06:rowids"));
Zsd06 build = Zsd06.builder().rowids(rowids).build();
List<Zsd06> slist = sapMapper.selectZsd06CheckByUpdate(build); // 从数仓中查询一组数据
if (slist.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkUpdate:Zsd06:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历要检查的数据
Zsd06 target = gpMapper.selectZsd06(source); // 根据主键查询源库中的数据
String operator = "none";
Long srowids = source.getRowids();
if (target != null) {
source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getDatum() != null) {
String erdat2 = new StringBuffer(source.getDatum()).insert(4, "-").insert(7, "-").toString();
Date date = DateUtil.parse(erdat2);
source.setDatum1(date);
}
source.setCrt_datum1(SomeUtils.caDate(source.getCrt_datum()));
source.setCha_datum1(SomeUtils.caDate(source.getCha_datum()));
// ===============================
while (true) {
try {
gpMapper.updateZsd06(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:checkUpdate:Zsd06:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectZsd06checkUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Zsd06:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectZsdfhzlCheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkUpdate:Zsdfhzl:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Zsdfhzl:rowids"));
Zsdfhzl build = Zsdfhzl.builder().rowids(rowids).build();
List<Zsdfhzl> list = sapMapper.selectZsdfhzlCheckByUpdate(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkUpdate:Zsdfhzl:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(source -> { // 遍历要检查的数据
Zsdfhzl target = gpMapper.selectZsdfhzl(source); // 根据主键查询源库中的数据
String operator = "none";
Long srowids = source.getRowids();
if (target != null) {
source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getCdate() != null && source.getCtime() != null) {
String erdat2 = new StringBuffer(source.getCdate()).insert(4, "-").insert(7, "-").toString();
String erzet2 = new StringBuffer(source.getCtime()).insert(2, ":").insert(5, ":").toString();
String dateStr = erdat2 + " " + erzet2;
Date date = DateUtil.parse(dateStr);
source.setCdate1(date);
}
if (source.getFh_date() != null && source.getFh_date().indexOf("--") == -1) {
String erdat2 = new StringBuffer(source.getFh_date()).insert(4, "-").insert(7, "-").toString();
Date date = DateUtil.parse(erdat2);
source.setFh_date1(date);
}
// ===============================
while (true) {
try {
gpMapper.updateZsdfhzl(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:checkUpdate:Zsdfhzl:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectZsdfhzlcheckUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Zsdfhzl:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
}
......@@ -330,25 +330,6 @@
order by "$rowid$"
</select>
<select id="selectZsd06CheckByUpdate" parameterType="Zsd06" resultType="Zsd06">
select top 20 "$rowid$" as rowids,
vbeln, posnr, mandt, datum, uname, crt_uname, crt_datum,
crt_time, cha_name, cha_datum, cha_time
from ${hana_user}.Zsd_06
where "$rowid$" &gt; #{rowids} and cha_datum != '00000000' ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectZsdfhzlCheckByUpdate" parameterType="com.huazheng.project.hana.model.Zsdfhzl" resultType="com.huazheng.project.hana.model.Zsdfhzl">
select top 20 "$rowid$" as rowids,
vbeln, posnr, mandt, vgbel, vgpos, uname, matnr, fmenge, cdate, ctime, fh_date, werks, cha_datum
from ${hana_user}.zsd_fhzl
where "$rowid$" &gt; #{rowids} and cha_datum != '00000000' ${hana_mandt}
order by "$rowid$"
</select>
<!-- 第二次修改的,限制时间今天内的,以后更新逻辑按照这个模式来做 -->
<select id="selectAufkCheckByUpdate" parameterType="Aufk" resultType="Aufk">
select top 20 "$rowid$" as rowids,
......@@ -435,6 +416,23 @@
where "$rowid$" &gt; #{rowids} and aedat != '00000000' and aedat = CURRENT_DATE ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectZsd06CheckByUpdate" parameterType="Zsd06" resultType="Zsd06">
select top 20 "$rowid$" as rowids,
vbeln, posnr, mandt, datum, uname, crt_uname, crt_datum,
crt_time, cha_name, cha_datum, cha_time
from ${hana_user}.Zsd_06
where "$rowid$" &gt; #{rowids} and cha_datum != '00000000' and cha_datum = CURRENT_DATE ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectZsdfhzlCheckByUpdate" parameterType="com.huazheng.project.hana.model.Zsdfhzl" resultType="com.huazheng.project.hana.model.Zsdfhzl">
select top 20 "$rowid$" as rowids,
vbeln, posnr, mandt, vgbel, vgpos, uname, matnr, fmenge, cdate, ctime, fh_date, werks, cha_datum
from ${hana_user}.zsd_fhzl
where "$rowid$" &gt; #{rowids} and cha_datum != '00000000' and cha_datum = CURRENT_DATE ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectAfkoById" parameterType="Afko" resultType="Afko">
......
......@@ -11,7 +11,7 @@ org.quartz.scheduler.wrapJobExecutionInUserTransaction=false
# ThreadPool
#============================================================================
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=30
org.quartz.threadPool.threadCount=50
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true
......
......@@ -683,8 +683,6 @@
<property name="cronExpression" value="* * * * * ?" />
</bean>
<!-- ========================== 独立的任务 ========================== -->
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
......@@ -695,24 +693,6 @@
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectZsd06CheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectZsdfhzlCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<!-- 新增流程 -->
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
......@@ -798,6 +778,24 @@
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkUpdateServiceImpl" />
<property name="targetMethod" value="selectZsd06CheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkUpdateServiceImpl" />
<property name="targetMethod" value="selectZsdfhzlCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<!-- 删除流程 -->
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
......@@ -890,6 +888,25 @@
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectZsd06CheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectZsdfhzlCheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
</list>
</constructor-arg>
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论