提交 73087449 作者: guofeng

vbak\vbrk增量更新业务

上级 a5252897
......@@ -18,6 +18,8 @@ import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Likp;
import com.huazheng.project.hana.model.Lips;
import com.huazheng.project.hana.model.Pa0002;
import com.huazheng.project.hana.model.Vbak;
import com.huazheng.project.hana.model.Vbrk;
import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2;
......@@ -271,5 +273,67 @@ public class CheckDeleteServiceImpl {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Pa0002:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectVbakCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Vbak:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Vbak:rowNum");
Vbak build = Vbak.builder().rowNum(rowNum).build();
List<Vbak> list = gpMapper.selectVbakCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkDelete:Vbak:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Vbak source = sapMapper.selectVbakById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteVbak(target); // 删除数仓中的数据
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:Vbak:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectVbakcheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Vbak:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectVbrkCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Vbrk:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Vbrk:rowNum");
Vbrk build = Vbrk.builder().rowNum(rowNum).build();
List<Vbrk> list = gpMapper.selectVbrkCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkDelete:Vbrk:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Vbrk source = sapMapper.selectVbrkById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteVbrk(target); // 删除数仓中的数据
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:Vbrk:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectVbrkcheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Vbrk:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
}
......@@ -19,6 +19,8 @@ import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Likp;
import com.huazheng.project.hana.model.Lips;
import com.huazheng.project.hana.model.Pa0002;
import com.huazheng.project.hana.model.Vbak;
import com.huazheng.project.hana.model.Vbrk;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.thread.ThreadUtil;
......@@ -445,5 +447,105 @@ public class CheckUpdateServiceImpl {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Pa0002:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectVbakCheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkUpdate:Vbak:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Vbak:rowids"));
Vbak build = Vbak.builder().rowids(rowids).build();
List<Vbak> slist = sapMapper.selectVbakCheckByUpdate(build); // 从数仓中查询一组数据
if (slist.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkUpdate:Vbak:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历要检查的数据
Vbak target = gpMapper.selectVbak(source); // 根据主键查询源库中的数据
String operator = "none";
Long srowids = source.getRowids();
if (target != null) {
source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
source.setAudat1(SomeUtils.caDate(source.getAudat())); // 日期00000000格式转换,已处理异常
// ===============================
while (true) {
try {
gpMapper.updateVbak(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:checkUpdate:Vbak:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectVbakcheckUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Vbak:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectVbrkCheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkUpdate:Vbrk:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Vbrk:rowids"));
Vbrk build = Vbrk.builder().rowids(rowids).build();
List<Vbrk> slist = sapMapper.selectVbrkCheckByUpdate(build); // 从数仓中查询一组数据
if (slist.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkUpdate:Vbrk:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历要检查的数据
Vbrk target = gpMapper.selectVbrk(source); // 根据主键查询源库中的数据
String operator = "none";
Long srowids = source.getRowids();
if (target != null) {
source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getFkdat() != null) {
String erdat2 = new StringBuffer(source.getFkdat()).insert(4, "-").insert(7, "-").toString();
Date date = DateUtil.parse(erdat2);
source.setFkdat1(date);
}
// ===============================
while (true) {
try {
gpMapper.updateVbrk(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:checkUpdate:Vbrk:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectVbrkcheckUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Vbrk:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
}
......@@ -126,7 +126,7 @@ druid.greenplum.password: gpadmin
druid.greenplum.validationQuery: select '1'
druid.greenplum.initialSize: 5
druid.greenplum.minIdle: 10
druid.greenplum.maxActive: 50
druid.greenplum.maxActive: 100
druid.greenplum.maxWait: 60000
druid.greenplum.timeBetweenEvictionRunsMillis: 60000
druid.greenplum.minEvictableIdleTimeMillis: 300000
......
......@@ -332,32 +332,8 @@
<select id="selectVbakCheckByUpdate" parameterType="Vbak" resultType="Vbak">
select top 20 a."$rowid$" as rowids,
a.vbeln, a.mandt, a.vbtyp, a.audat, a.waerk, a.kalsm, a.ctlpc, a.kunnr, a.bukrs_vf, a.bstnk,
a.bname, a.telf1, a.netwr, a.vkbur, a.knumv, a.vkorg, a.vtweg, a.kkber, a.auart, a.aedat, b.pernr,
a.erdat, a.erzet, a.spart
from ${hana_user}.vbak a
left join (
select q.vbeln,r.pernr from ${hana_user}.vbap q
left join (
select vbeln,posnr,pernr from ${hana_user}.vbpa where mandt = #{mandt} and pernr != '00000000' group by vbeln,posnr,pernr
) r on q.vbeln = r.vbeln and q.posnr = r.posnr
where q.mandt = #{mandt} and r.pernr is not null
group by q.vbeln,r.pernr
) b on a.vbeln = b.vbeln
where "$rowid$" &gt; #{rowids} and a.aedat != '00000000' and a.mandt = '800'
order by "$rowid$"
</select>
<select id="selectVbrkCheckByUpdate" parameterType="com.huazheng.project.hana.model.Vbrk" resultType="com.huazheng.project.hana.model.Vbrk">
select top 20 "$rowid$" as rowids,
vbeln,fkart,waerk,erdat,erzet,zterm,kunrg,bstnk_vf,
zbillty,zbz01,zbz02,zdc_flg,bukrs,ktgrd,fkdat,inco1,
inco2,netwr,mwsbk,aedat,mandt
from ${hana_user}.Vbrk
where "$rowid$" &gt; #{rowids} and aedat != '00000000' ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectZsd06CheckByUpdate" parameterType="Zsd06" resultType="Zsd06">
select top 20 "$rowid$" as rowids,
vbeln, posnr, mandt, datum, uname, crt_uname, crt_datum,
......@@ -430,7 +406,33 @@
select top 20 "$rowid$" as rowids,
mandt,pernr,subty,objps,sprps,endda,begda,seqnr,nachn,vorna,aedtm
from ${hana_user}.Pa0002
where "$rowid$" &gt; #{rowids} and aedtm != '00000000' ${hana_mandt}
where "$rowid$" &gt; #{rowids} and aedtm != '00000000' and aedtm = CURRENT_DATE ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectVbakCheckByUpdate" parameterType="Vbak" resultType="Vbak">
select top 20 a."$rowid$" as rowids,
a.vbeln, a.mandt, a.vbtyp, a.audat, a.waerk, a.kalsm, a.ctlpc, a.kunnr, a.bukrs_vf, a.bstnk,
a.bname, a.telf1, a.netwr, a.vkbur, a.knumv, a.vkorg, a.vtweg, a.kkber, a.auart, a.aedat, b.pernr,
a.erdat, a.erzet, a.spart
from ${hana_user}.vbak a
left join (
select q.vbeln,r.pernr from ${hana_user}.vbap q
left join (
select vbeln,posnr,pernr from ${hana_user}.vbpa where mandt = #{mandt} and pernr != '00000000' group by vbeln,posnr,pernr
) r on q.vbeln = r.vbeln and q.posnr = r.posnr
where q.mandt = #{mandt} and r.pernr is not null
group by q.vbeln,r.pernr
) b on a.vbeln = b.vbeln
where "$rowid$" &gt; #{rowids} and a.aedat != '00000000' and a.aedat = CURRENT_DATE and a.mandt = '800'
order by "$rowid$"
</select>
<select id="selectVbrkCheckByUpdate" parameterType="Vbrk" resultType="Vbrk">
select top 20 "$rowid$" as rowids,
vbeln,fkart,waerk,erdat,erzet,zterm,kunrg,bstnk_vf,
zbillty,zbz01,zbz02,zdc_flg,bukrs,ktgrd,fkdat,inco1,
inco2,netwr,mwsbk,aedat,mandt
from ${hana_user}.Vbrk
where "$rowid$" &gt; #{rowids} and aedat != '00000000' and aedat = CURRENT_DATE ${hana_mandt}
order by "$rowid$"
</select>
......
......@@ -684,16 +684,6 @@
</bean>
<!-- ========================== deleteUpdateJobServiceImpl busy ========================== -->
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="checkBusy" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<!-- ========================== 独立的任务 ========================== -->
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
......@@ -705,35 +695,6 @@
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectPa0002CheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectVbakCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectVbrkCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
......@@ -753,7 +714,6 @@
<property name="cronExpression" value="* * * * * ?" />
</bean>
<!-- 新增流程 -->
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
......@@ -820,6 +780,24 @@
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkUpdateServiceImpl" />
<property name="targetMethod" value="selectVbakCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkUpdateServiceImpl" />
<property name="targetMethod" value="selectVbrkCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<!-- 删除流程 -->
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
......@@ -894,7 +872,24 @@
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectVbakCheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectVbrkCheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
</list>
</constructor-arg>
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论