Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
H
huazheng-project-flink
概览
概览
详情
活动
周期分析
版本库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
统计图
问题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
huazheng
huazheng-project-flink
Commits
94507111
提交
94507111
authored
11月 23, 2020
作者:
guofeng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
vbrp增量
上级
8f961425
隐藏空白字符变更
内嵌
并排
正在显示
6 个修改的文件
包含
123 行增加
和
30 行删除
+123
-30
CheckDeleteServiceImpl.java
...roject/greenplum/service/impl/CheckDeleteServiceImpl.java
+32
-0
CheckUpdateServiceImpl.java
...roject/greenplum/service/impl/CheckUpdateServiceImpl.java
+39
-6
SapMapper.java
...main/java/com/huazheng/project/hana/mapper/SapMapper.java
+10
-3
hana.sql
src/main/resources/devtools/dev/hana.sql
+1
-1
SapMapper_hana.xml
src/main/resources/mapper/hana/SapMapper_hana.xml
+31
-20
spring-init.xml
src/main/resources/spring-init.xml
+10
-0
没有找到文件。
src/main/java/com/huazheng/project/greenplum/service/impl/CheckDeleteServiceImpl.java
浏览文件 @
94507111
...
...
@@ -23,6 +23,7 @@ import com.huazheng.project.hana.model.Mara;
import
com.huazheng.project.hana.model.Pa0002
;
import
com.huazheng.project.hana.model.Vbak
;
import
com.huazheng.project.hana.model.Vbrk
;
import
com.huazheng.project.hana.model.Vbrp
;
import
com.huazheng.project.hana.model.Zsd06
;
import
com.huazheng.project.hana.model.Zsdfhzl
;
...
...
@@ -504,4 +505,35 @@ public class CheckDeleteServiceImpl {
}
}
public
void
selectVbrpCheckByDelete
()
{
try
{
ValueOperations
<
String
,
String
>
opsForValue
=
redis1Template
.
opsForValue
();
opsForValue
.
setIfAbsent
(
"huazheng:checkDelete:Vbrp:rowNum"
,
"0"
);
String
rowNum
=
opsForValue
.
get
(
"huazheng:checkDelete:Vbrp:rowNum"
);
Vbrp
build
=
Vbrp
.
builder
().
rowNum
(
rowNum
).
build
();
List
<
Vbrp
>
list
=
gpMapper
.
selectVbrpCheck
(
build
);
// 从数仓中查询一组数据
if
(
list
.
size
()
==
0
)
{
redis1Template
.
opsForValue
().
set
(
"huazheng:checkDelete:Vbrp:rowNum"
,
"0"
);
// 计数器复位
ThreadUtil
.
sleep
(
1000
);
// 没有数据了,休眠一下
}
list
.
forEach
(
target
->
{
// 遍历要检查的数据
Vbrp
source
=
sapMapper
.
selectVbrpById
(
target
);
// 根据主键查询源库中的数据
String
operator
=
"none"
;
if
(
source
==
null
)
{
// 如果源库中没有数据
gpMapper
.
deleteVbrp
(
target
);
// 删除数仓中的数据
operator
=
"delete"
;
}
redis1Template
.
opsForValue
().
set
(
"huazheng:checkDelete:Vbrp:rowNum"
,
target
.
getRowNum
());
if
(!
operator
.
equals
(
"none"
))
{
log
.
info
(
String
.
format
(
"selectVbrpcheckDelete --> rowNum:%s, operator:%s"
,
target
.
getRowNum
(),
operator
));
}
});
}
catch
(
Exception
e
)
{
redis1Template
.
opsForValue
().
set
(
"huazheng:checkDeleteError:Vbrp:rowNum"
,
SomeUtils
.
getErrorInfoFromException
(
e
));
}
}
}
src/main/java/com/huazheng/project/greenplum/service/impl/CheckUpdateServiceImpl.java
浏览文件 @
94507111
...
...
@@ -24,6 +24,7 @@ import com.huazheng.project.hana.model.Mara;
import
com.huazheng.project.hana.model.Pa0002
;
import
com.huazheng.project.hana.model.Vbak
;
import
com.huazheng.project.hana.model.Vbrk
;
import
com.huazheng.project.hana.model.Vbrp
;
import
com.huazheng.project.hana.model.Zsd06
;
import
com.huazheng.project.hana.model.Zsdfhzl
;
...
...
@@ -195,8 +196,8 @@ public class CheckUpdateServiceImpl {
ThreadUtil
.
sleep
(
1000
);
// 没有数据了,休眠一下
}
slist
.
forEach
(
target
->
{
// 遍历要检查的数据,去目标库中查询数据
Kna1
source
=
gpMapper
.
selectKna1
(
target
);
slist
.
forEach
(
source
->
{
// 遍历要检查的数据,去目标库中查询数据
Kna1
target
=
gpMapper
.
selectKna1
(
source
);
String
operator
=
"none"
;
Long
srowids
=
source
.
getRowids
();
if
(
target
!=
null
)
{
...
...
@@ -227,9 +228,11 @@ public class CheckUpdateServiceImpl {
}
// 级联更新业务
Knvv
sKnvv
=
sapMapper
.
cascadeKnvvByKna1
(
source
);
// 级联查询源库afko表
Knvv
tKnvv
=
gpMapper
.
selectKnvv
(
sKnvv
);
// 查询目标库中afko表
cascadeKnvvCheckByUpdate
(
sKnvv
,
tKnvv
);
// 级联更新afko表
List
<
Knvv
>
sKnvvList
=
sapMapper
.
cascadeKnvvByKna1
(
source
);
// 级联查询源库afko表
for
(
Knvv
sKnvv
:
sKnvvList
)
{
Knvv
tKnvv
=
gpMapper
.
selectKnvv
(
sKnvv
);
// 查询目标库中afko表
cascadeKnvvCheckByUpdate
(
sKnvv
,
tKnvv
);
// 级联更新afko表
}
ThreadUtil
.
safeSleep
(
500
);
}
...
...
@@ -501,6 +504,14 @@ public class CheckUpdateServiceImpl {
ThreadUtil
.
safeSleep
(
500
);
}
}
// 级联更新业务
List
<
Vbrp
>
sVbrpList
=
sapMapper
.
cascadeVbrpByVbrk
(
source
);
// 级联查询源库afko表
for
(
Vbrp
sVbrp
:
sVbrpList
)
{
Vbrp
tVbrp
=
gpMapper
.
selectVbrp
(
sVbrp
);
// 查询目标库中afko表
cascadeVbrpCheckByUpdate
(
sVbrp
,
tVbrp
);
// 级联更新afko表
}
ThreadUtil
.
safeSleep
(
500
);
}
}
...
...
@@ -792,5 +803,27 @@ public class CheckUpdateServiceImpl {
}
}
}
// 所属selectVbrkCheckUpdate的级联
private
void
cascadeVbrpCheckByUpdate
(
Vbrp
source
,
Vbrp
target
)
{
if
(
target
!=
null
)
{
String
shash
=
SecureUtil
.
md5
(
JSONUtil
.
toJsonStr
(
source
));
// 源库中数据的hash结果
String
thash
=
target
.
getHashResult
();
// 数仓中数据的hash结果
if
(!
shash
.
equals
(
thash
))
{
// 如果hash结果不一致
source
.
setHashResult
(
shash
);
// ===============================
// ===============================
while
(
true
)
{
try
{
gpMapper
.
updateVbrp
(
source
);
// 更新数据到数仓中
break
;
}
catch
(
RuntimeException
e
)
{
log
.
error
(
e
.
getMessage
());
ThreadUtil
.
safeSleep
(
500
);
}
}
ThreadUtil
.
safeSleep
(
500
);
}
}
}
}
src/main/java/com/huazheng/project/hana/mapper/SapMapper.java
浏览文件 @
94507111
...
...
@@ -91,11 +91,8 @@ public interface SapMapper {
public
List
<
Aufk
>
selectAufkCheckByUpdate
(
Aufk
aufk
);
public
Afko
cascadeAfkoByAufk
(
Aufk
aufk
);
public
Afpo
cascadeAfpoByAufk
(
Aufk
aufk
);
public
List
<
Bkpf
>
selectBkpfCheckByUpdate
(
Bkpf
bkpf
);
public
List
<
Kna1
>
selectKna1CheckUpdate
(
Kna1
kna1
);
public
Knvv
cascadeKnvvByKna1
(
Kna1
kna1
);
public
List
<
Likp
>
selectLikpCheckByUpdate
(
Likp
likp
);
public
List
<
Lips
>
selectLipsCheckByUpdate
(
Lips
lips
);
public
List
<
Pa0002
>
selectPa0002CheckByUpdate
(
Pa0002
pa0002
);
...
...
@@ -104,8 +101,17 @@ public interface SapMapper {
public
List
<
Zsd06
>
selectZsd06CheckByUpdate
(
Zsd06
zsd06
);
public
List
<
Zsdfhzl
>
selectZsdfhzlCheckByUpdate
(
Zsdfhzl
zsdfhzl
);
public
List
<
Mara
>
selectMaraCheckByUpdate
(
Mara
mara
);
public
Afko
cascadeAfkoByAufk
(
Aufk
aufk
);
public
Afpo
cascadeAfpoByAufk
(
Aufk
aufk
);
public
List
<
Knvv
>
cascadeKnvvByKna1
(
Kna1
kna1
);
public
List
<
Vbrp
>
cascadeVbrpByVbrk
(
Vbrk
source
);
public
List
<
Ausp
>
cascadeAuspByMara
(
Mara
mara
);
// ......
public
Bkpf
selectBkpfById
(
Bkpf
target
);
public
Knvp
selectKnvpById
(
Knvp
target
);
...
...
@@ -143,5 +149,6 @@ public interface SapMapper {
public
Bsid
selectBsidById
(
Bsid
target
);
public
Vbap
selectVbapById
(
Vbap
target
);
public
Tvkbt
selectTvkbtById
(
Tvkbt
target
);
}
src/main/resources/devtools/dev/hana.sql
浏览文件 @
94507111
...
...
@@ -54,4 +54,4 @@ where "$rowid$" > '10' AND MANDT = '800'
order
by
"$rowid$"
select
*
from
sapabap1
.
kna1
where
mandt
=
'800'
limit
10
src/main/resources/mapper/hana/SapMapper_hana.xml
浏览文件 @
94507111
...
...
@@ -339,20 +339,7 @@
where "$rowid$"
>
#{rowids} and aedat != '00000000' and aedat = CURRENT_DATE ${hana_mandt}
order by "$rowid$"
</select>
<select
id=
"cascadeAfkoByAufk"
parameterType=
"Aufk"
resultType=
"Afko"
>
select
aufnr, mandt, gltrp, gamng, plnbez, gmein, plnnr, plnal,
stlnr, cy_seqnr, gstrp, aufpl, aplzt
from ${hana_user}.afko
where aufnr = #{aufnr} ${hana_mandt}
</select>
<select
id=
"cascadeAfpoByAufk"
parameterType=
"Aufk"
resultType=
"Afpo"
>
select
kdauf, kdpos, aufnr, mandt, posnr, psmng, wemng, meins, matnr,
uebto, untto, pwerk, verid, dwerk, dauat, krsnr, sernr, plnum
from ${hana_user}.afpo
where aufnr = #{aufnr} ${hana_mandt}
</select>
<select
id=
"selectBkpfCheckByUpdate"
parameterType=
"Bkpf"
resultType=
"Bkpf"
>
select top 20 "$rowid$" as rowids,
mandt, bukrs, belnr, gjahr, blart, bldat, budat, monat, cpudt, cputm,
...
...
@@ -368,12 +355,6 @@
where "$rowid$"
>
#{rowids} and updat != '00000000' and updat = CURRENT_DATE ${hana_mandt}
order by "$rowid$"
</select>
<select
id=
"cascadeKnvvByKna1"
parameterType=
"Kna1"
resultType=
"Knvv"
>
select
mandt, kunnr, vkorg, vtweg, spart, waers, klabc
from ${hana_user}.Knvv
where kunnr = #{kunnr} ${hana_mandt}
</select>
<select
id=
"selectLikpCheckByUpdate"
parameterType=
"Likp"
resultType=
"Likp"
>
select top 20 "$rowid$" as rowids,
vbeln, mandt, erdat, wadat_ist, ctlpc, kunnr, aedat,lfuhr
...
...
@@ -445,12 +426,42 @@
where "$rowid$"
>
#{rowids} and laeda != '00000000' and laeda = CURRENT_DATE ${hana_mandt}
order by "$rowid$"
</select>
<select
id=
"cascadeKnvvByKna1"
parameterType=
"Kna1"
resultType=
"Knvv"
>
select
mandt, kunnr, vkorg, vtweg, spart, waers, klabc
from ${hana_user}.Knvv
where kunnr = #{kunnr} ${hana_mandt}
</select>
<select
id=
"cascadeAfkoByAufk"
parameterType=
"Aufk"
resultType=
"Afko"
>
select
aufnr, mandt, gltrp, gamng, plnbez, gmein, plnnr, plnal,
stlnr, cy_seqnr, gstrp, aufpl, aplzt
from ${hana_user}.afko
where aufnr = #{aufnr} ${hana_mandt}
</select>
<select
id=
"cascadeAfpoByAufk"
parameterType=
"Aufk"
resultType=
"Afpo"
>
select
kdauf, kdpos, aufnr, mandt, posnr, psmng, wemng, meins, matnr,
uebto, untto, pwerk, verid, dwerk, dauat, krsnr, sernr, plnum
from ${hana_user}.afpo
where aufnr = #{aufnr} ${hana_mandt}
</select>
<select
id=
"cascadeAuspByMara"
parameterType=
"Mara"
resultType=
"Ausp"
>
select
mandt, objek, atinn, atzhl, mafid, klart, adzhl, atwrt
from ${hana_user}.Ausp
where objek = #{matnr} ${hana_mandt}
</select>
<select
id=
"cascadeVbrpByVbrk"
parameterType=
"Vbrk"
resultType=
"Vbrp"
>
select
vbeln,posnr,fkimg,meins,vgbel,vgpos,aubel,aupos,matnr,
arktx,matkl,spart,zarktx_js,zfkimg_js,zvrkme_js, mandt,netwr,mwsbp
from ${hana_user}.Vbrp
where vbeln = #{vbeln} ${hana_mandt}
</select>
...
...
src/main/resources/spring-init.xml
浏览文件 @
94507111
...
...
@@ -942,6 +942,16 @@
</property>
<property
name=
"cronExpression"
value=
"* * * * * ?"
/>
</bean>
<bean
class=
"org.springframework.scheduling.quartz.CronTriggerFactoryBean"
>
<property
name=
"jobDetail"
>
<bean
parent=
"methodJobDetail"
>
<property
name=
"targetObject"
ref=
"checkDeleteServiceImpl"
/>
<property
name=
"targetMethod"
value=
"selectVbrpCheckByDelete"
/>
</bean>
</property>
<property
name=
"cronExpression"
value=
"* * * * * ?"
/>
</bean>
</list>
</constructor-arg>
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论