Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
H
huazheng-project-flink
概览
概览
详情
活动
周期分析
版本库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
统计图
问题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
huazheng
huazheng-project-flink
Commits
2c8d9d43
提交
2c8d9d43
authored
11月 14, 2020
作者:
guofeng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
按时间更新流程knkk
上级
28e6096c
隐藏空白字符变更
内嵌
并排
正在显示
6 个修改的文件
包含
54 行增加
和
56 行删除
+54
-56
GPMapper.java
.../java/com/huazheng/project/greenplum/mapper/GPMapper.java
+0
-1
DeleteUpdateJobServiceImpl.java
...ct/greenplum/service/impl/DeleteUpdateJobServiceImpl.java
+37
-34
SapMapper.java
...main/java/com/huazheng/project/hana/mapper/SapMapper.java
+2
-0
hana2.sql
src/main/resources/devtools/dev/hana2.sql
+6
-0
GPMapper_greenplum.xml
src/main/resources/mapper/greenplum/GPMapper_greenplum.xml
+0
-3
SapMapper_hana.xml
src/main/resources/mapper/hana/SapMapper_hana.xml
+9
-18
没有找到文件。
src/main/java/com/huazheng/project/greenplum/mapper/GPMapper.java
浏览文件 @
2c8d9d43
...
...
@@ -187,7 +187,6 @@ public interface GPMapper {
@CacheEvict
(
key
=
"'selectKnkk'+':'+#p0.mandt+','+#p0.kunnr+','+#p0.kkber"
)
public
void
updateKnkk
(
Knkk
element
);
public
List
<
Knkk
>
selectKnkkCheck
(
Knkk
build
);
public
List
<
Knkk
>
selectKnkkCheckByUpdate
(
Knkk
build
);
@Cacheable
(
key
=
"#root.method.name+':'+#p0.mandt+','+#p0.kunnr+','+#p0.vkorg+','+#p0.vtweg+','+#p0.spart"
,
unless
=
"#result == null"
)
public
Knvv
selectKnvv
(
Knvv
knvv
);
...
...
src/main/java/com/huazheng/project/greenplum/service/impl/DeleteUpdateJobServiceImpl.java
浏览文件 @
2c8d9d43
...
...
@@ -1226,17 +1226,10 @@ public class DeleteUpdateJobServiceImpl {
List
<
Knkk
>
list
=
gpMapper
.
selectKnkkCheck
(
build
);
// 从数仓中查询一组数据
if
(
list
.
size
()
==
0
)
{
// rowid已完成,则转为由更新时间字段开始同步
// 按更新时间查询,查询大于今天的数据,此时就不需要重置计数器了
String
aedat
=
DateUtil
.
format
(
DateUtil
.
yesterday
(),
"yyyyMMdd"
);
// 昨天的日期
build
=
Knkk
.
builder
().
rowNum
(
rowNum
).
aedat
(
aedat
).
build
();
// 查询出列表20个,下面的代码运行后会复位一个阶段的rowNum,如此往复list.size()就不会为0了
list
=
gpMapper
.
selectKnkkCheckByUpdate
(
build
);
// 是否为list.size() == 0的时候的更新时间查询的列表
opsForValue
.
set
(
"huazheng:check:Knkk:isUpdateList"
,
"true"
);
}
else
{
redis1Template
.
delete
(
"huazheng:check:Knkk:isUpdateList"
);
redis1Template
.
opsForValue
().
set
(
"huazheng:check:Knkk:rowNum"
,
"0"
);
// 计数器复位
ThreadUtil
.
sleep
(
1000
);
// 没有数据了,休眠一下
}
list
.
forEach
(
target
->
{
// 遍历要检查的数据
Knkk
source
=
sapMapper
.
selectKnkkById
(
target
);
// 根据主键查询源库中的数据
String
operator
=
"none"
;
...
...
@@ -1260,50 +1253,58 @@ public class DeleteUpdateJobServiceImpl {
}
}
String
mode
=
null
;
String
isUpdateList
=
opsForValue
.
get
(
"huazheng:check:Knkk:isUpdateList"
);
if
(
isUpdateList
==
null
)
{
// 非更新模式下的列表才更新计数器
redis1Template
.
opsForValue
().
set
(
"huazheng:check:Knkk:rowNum"
,
target
.
getRowNum
());
mode
=
"计数器模式"
;
}
else
{
mode
=
"更新时间模式"
;
}
redis1Template
.
opsForValue
().
set
(
"huazheng:check:Knkk:rowNum"
,
target
.
getRowNum
());
if
(!
operator
.
equals
(
"none"
))
{
log
.
info
(
String
.
format
(
"selectKnkkCheck --> rowNum:%s, operator:%s
, %s"
,
target
.
getRowNum
(),
operator
,
mode
));
log
.
info
(
String
.
format
(
"selectKnkkCheck --> rowNum:%s, operator:%s
"
,
target
.
getRowNum
(),
operator
));
}
});
}
catch
(
Exception
e
)
{
redis1Template
.
opsForValue
().
set
(
"huazheng:checkError:Knkk:rowNum"
,
getErrorInfoFromException
(
e
));
}
}
private
void
selectKnkkCheck
Dele
te
()
{
private
void
selectKnkkCheck
Upda
te
()
{
try
{
ValueOperations
<
String
,
String
>
opsForValue
=
redis1Template
.
opsForValue
();
opsForValue
.
setIfAbsent
(
"huazheng:check
Delete:Knkk:rowNum
"
,
"0"
);
String
rowNum
=
opsForValue
.
get
(
"huazheng:checkDelete:Knkk:rowNum"
);
Knkk
build
=
Knkk
.
builder
().
row
Num
(
rowNum
).
build
();
opsForValue
.
setIfAbsent
(
"huazheng:check
Update:Knkk:rowids
"
,
"0"
);
Long
rowids
=
Long
.
parseLong
(
opsForValue
.
get
(
"huazheng:checkUpdate:Knkk:rowids"
)
);
Knkk
build
=
Knkk
.
builder
().
row
ids
(
rowids
).
build
();
List
<
Knkk
>
list
=
gpMapper
.
selectKnkkCheck
(
build
);
// 从数仓中查询一组数据
if
(
list
.
size
()
==
0
)
{
// rowid已完成,则转为由更新时间字段开始同步
redis1Template
.
opsForValue
().
set
(
"huazheng:check
Delete:Knkk:rowNum
"
,
"0"
);
// 计数器复位
List
<
Knkk
>
slist
=
sapMapper
.
selectKnkkCheckByUpdate
(
build
);
// 从源库中按更新时间查询
if
(
s
list
.
size
()
==
0
)
{
// rowid已完成,则转为由更新时间字段开始同步
redis1Template
.
opsForValue
().
set
(
"huazheng:check
Update:Knkk:rowids
"
,
"0"
);
// 计数器复位
ThreadUtil
.
sleep
(
1000
);
// 没有数据了,休眠一下
}
list
.
forEach
(
target
->
{
// 遍历要检查的
数据
Knkk
source
=
sapMapper
.
selectKnkkById
(
target
);
// 根据主键查询源库中的数据
slist
.
forEach
(
source
->
{
// 遍历源库中的数据,去目标库中查询
数据
Knkk
target
=
gpMapper
.
selectKnkk
(
source
);
String
operator
=
"none"
;
if
(
source
==
null
)
{
// 如果源库中没有数据
gpMapper
.
deleteKnkk
(
target
);
// 删除数仓中的数据
operator
=
"delete"
;
}
else
{
// 源库中有数据
String
shash
=
SecureUtil
.
md5
(
JSONUtil
.
toJsonStr
(
source
));
// 源库中数据的hash结果
String
thash
=
target
.
getHashResult
();
// 数仓中数据的hash结果
if
(!
shash
.
equals
(
thash
))
{
// 如果hash结果不一致
source
.
setHashResult
(
shash
);
while
(
true
)
{
try
{
gpMapper
.
updateKnkk
(
source
);
// 更新数据到数仓中
break
;
}
catch
(
RuntimeException
e
)
{
log
.
error
(
e
.
getMessage
());
ThreadUtil
.
safeSleep
(
500
);
}
}
ThreadUtil
.
safeSleep
(
500
);
}
}
redis1Template
.
opsForValue
().
set
(
"huazheng:checkDelete:Knkk:rowNum"
,
target
.
getRowNum
());
redis1Template
.
opsForValue
().
set
(
"huazheng:checkUpdate:Knkk:rowids"
,
target
.
getRowNum
());
if
(!
operator
.
equals
(
"none"
))
{
log
.
info
(
String
.
format
(
"selectKnkkCheck
Delete --> rowNum
:%s, operator:%s"
,
target
.
getRowNum
(),
operator
));
log
.
info
(
String
.
format
(
"selectKnkkCheck
Update --> rowids
:%s, operator:%s"
,
target
.
getRowNum
(),
operator
));
}
});
}
catch
(
Exception
e
)
{
redis1Template
.
opsForValue
().
set
(
"huazheng:check
Error:Knkk:rowNum
"
,
getErrorInfoFromException
(
e
));
redis1Template
.
opsForValue
().
set
(
"huazheng:check
UpdateError:Knkk:rowids
"
,
getErrorInfoFromException
(
e
));
}
}
private
void
selectKnvvCheck
()
{
...
...
@@ -3281,7 +3282,9 @@ public class DeleteUpdateJobServiceImpl {
// 有更新时间的表的删除计数器需要拆分出来,避免数据不能删除
// 把这些方法的删除部分从原方法种剥离出来
public
void
checkJob6
()
{
selectKnkkCheckDelete
();
// selectKnkkCheckDelete();
selectKnkkCheckUpdate
();
selectLikpCheckDelete
();
selectLipsCheckDelete
();
selectPa0002CheckDelete
();
...
...
src/main/java/com/huazheng/project/hana/mapper/SapMapper.java
浏览文件 @
2c8d9d43
...
...
@@ -84,6 +84,8 @@ public interface SapMapper {
public
List
<
Ausp
>
selectAuspNew
(
Ausp
ausp
);
public
List
<
Knvp
>
selectKnvpNew
(
Knvp
knvp
);
public
List
<
Knkk
>
selectKnkkCheckByUpdate
(
Knkk
knkk
);
public
Knvp
selectKnvpById
(
Knvp
target
);
public
Ausp
selectAuspById
(
Ausp
target
);
public
Zpoedit
selectZpoeditById
(
Zpoedit
target
);
...
...
src/main/resources/devtools/dev/hana2.sql
浏览文件 @
2c8d9d43
select
top
20
"$rowid$"
as
rowids
,
mandt
,
kunnr
,
kkber
,
klimk
,
skfor
,
ssobl
,
aedat
from
sapabap1
.
Knkk
where
aedat
!=
'00000000'
and
mandt
=
'800'
order
by
"$rowid$"
;
\ No newline at end of file
src/main/resources/mapper/greenplum/GPMapper_greenplum.xml
浏览文件 @
2c8d9d43
...
...
@@ -589,9 +589,6 @@
<select
id=
"selectKnkkCheck"
parameterType=
"com.huazheng.project.hana.model.Knkk"
resultType=
"com.huazheng.project.hana.model.Knkk"
>
select * from Knkk where rownum
>
#{rowNum} order by rownum limit 20
</select>
<select
id=
"selectKnkkCheckByUpdate"
parameterType=
"com.huazheng.project.hana.model.Knkk"
resultType=
"com.huazheng.project.hana.model.Knkk"
>
select * from Knkk where aedat
>
= #{aedat} order by rownum
</select>
<select
id=
"selectKnvv"
parameterType=
"com.huazheng.project.hana.model.Knvv"
resultType=
"com.huazheng.project.hana.model.Knvv"
>
select * from Knvv where mandt = #{mandt} and kunnr = #{kunnr} and vkorg = #{vkorg} and vtweg = #{vtweg} and spart = #{spart}
...
...
src/main/resources/mapper/hana/SapMapper_hana.xml
浏览文件 @
2c8d9d43
...
...
@@ -179,7 +179,6 @@
where "$rowid$"
>
#{rowids} ${hana_mandt}
order by "$rowid$"
</select>
<select
id=
"selectKnvpNew"
parameterType=
"Knvp"
resultType=
"Knvp"
>
select top 20 "$rowid$" as rowids,
mandt, kunnr, vkorg, vtweg, spart, parvw, parza, kunn2, pernr, knref
...
...
@@ -187,7 +186,6 @@
where "$rowid$"
>
#{rowids} ${hana_mandt}
order by "$rowid$"
</select>
<select
id=
"selectAuspNew"
parameterType=
"Ausp"
resultType=
"Ausp"
>
select top 20 "$rowid$" as rowids,
mandt, objek, atinn, atzhl, mafid, klart, adzhl, atwrt
...
...
@@ -195,7 +193,6 @@
where "$rowid$"
>
#{rowids} ${hana_mandt}
order by "$rowid$"
</select>
<select
id=
"selectAfruNew"
parameterType=
"Afru"
resultType=
"Afru"
>
select top 20 "$rowid$" as rowids,
mandt, rueck, rmzhl, ersda, wablnr, aufnr, stokz, stzhl
...
...
@@ -203,7 +200,6 @@
where "$rowid$"
>
#{rowids} ${hana_mandt}
order by "$rowid$"
</select>
<select
id=
"selectTspatNew"
parameterType=
"Tspat"
resultType=
"Tspat"
>
select top 20 "$rowid$" as rowids,
mandt, spras, spart, vtext
...
...
@@ -211,7 +207,6 @@
where "$rowid$"
>
#{rowids} ${hana_mandt}
order by "$rowid$"
</select>
<select
id=
"selectZpoeditNew"
parameterType=
"Zpoedit"
resultType=
"Zpoedit"
>
select top 20 "$rowid$" as rowids,
mandt, pwerk, aufnr, reason
...
...
@@ -219,8 +214,6 @@
where "$rowid$"
>
#{rowids} ${hana_mandt}
order by "$rowid$"
</select>
<select
id=
"selectMkpfNew"
parameterType=
"Mkpf"
resultType=
"Mkpf"
>
select top 20 "$rowid$" as rowids,
mandt, mblnr, mjahr, bldat, cputm
...
...
@@ -228,7 +221,6 @@
where "$rowid$"
>
#{rowids} ${hana_mandt}
order by "$rowid$"
</select>
<select
id=
"selectMsegNew"
parameterType=
"Mseg"
resultType=
"Mseg"
>
select top 20 "$rowid$" as rowids,
mandt, mblnr, mjahr, zeile, bwart, charg, menge, budat_mkpf, cputm_mkpf
...
...
@@ -236,7 +228,6 @@
where "$rowid$"
>
#{rowids} ${hana_mandt}
order by "$rowid$"
</select>
<select
id=
"selectKnkkNew"
parameterType=
"Knkk"
resultType=
"Knkk"
>
select top 20 "$rowid$" as rowids,
mandt,kunnr,kkber,klimk,skfor,ssobl,aedat
...
...
@@ -322,6 +313,15 @@
</select>
<select
id=
"selectKnkkCheckByUpdate"
parameterType=
"Knkk"
resultType=
"Knkk"
>
select top 20
mandt,kunnr,kkber,klimk,skfor,ssobl,aedat
from ${hana_user}.Knkk
where "$rowid$"
>
#{rowids} and aedat != '00000000' ${hana_mandt}
order by "$rowid$"
</select>
<select
id=
"selectAfkoById"
parameterType=
"Afko"
resultType=
"Afko"
>
select
aufnr, mandt, gltrp, gamng, plnbez, gmein, plnnr, plnal,
...
...
@@ -367,50 +367,41 @@
from ${hana_user}.Afvc
where mandt = #{mandt} and aufpl = #{aufpl} and aplzl = #{aplzl}
</select>
<select
id=
"selectKnvpById"
parameterType=
"Knvp"
resultType=
"Knvp"
>
select mandt, kunnr, vkorg, vtweg, spart, parvw, parza, kunn2, pernr, knref
from ${hana_user}.Knvp
where mandt = #{mandt} and kunnr = #{kunnr} and vkorg = #{vkorg} and vtweg = #{vtweg} and spart = #{spart} and parvw = #{parvw} and parza = #{parza}
</select>
<select
id=
"selectAuspById"
parameterType=
"Ausp"
resultType=
"Ausp"
>
select mandt, objek, atinn, atzhl, mafid, klart, adzhl, atwrt
from ${hana_user}.Ausp
where mandt = #{mandt} and objek = #{objek} and atinn = #{atinn} and atzhl = #{atzhl} and mafid = #{mafid} and klart = #{klart} and adzhl = #{adzhl}
</select>
<select
id=
"selectAfruById"
parameterType=
"Afru"
resultType=
"Afru"
>
select mandt, rueck, rmzhl, ersda, wablnr, aufnr, stokz, stzhl
from ${hana_user}.Afru
where mandt = #{mandt} and rueck = #{rueck} and rmzhl = #{rmzhl}
</select>
<select
id=
"selectTspatById"
parameterType=
"Tspat"
resultType=
"Tspat"
>
select mandt, spras, spart, vtext
from ${hana_user}.Tspat
where mandt = #{mandt} and spras = #{spras} and spart = #{spart}
</select>
<select
id=
"selectZpoeditById"
parameterType=
"Zpoedit"
resultType=
"Zpoedit"
>
select mandt, pwerk, aufnr, reason
from ${hana_user}.Zpo_edit
where mandt = #{mandt} and pwerk = #{pwerk} and aufnr = #{aufnr}
</select>
<select
id=
"selectMkpfById"
parameterType=
"Mkpf"
resultType=
"Mkpf"
>
select mandt, mblnr, mjahr, bldat, cputm
from ${hana_user}.Mkpf
where mandt = #{mandt} and mblnr = #{mblnr} and mjahr = #{mjahr}
</select>
<select
id=
"selectMsegById"
parameterType=
"Mseg"
resultType=
"Mseg"
>
select mandt, mblnr, mjahr, zeile, bwart, charg, menge, budat_mkpf, cputm_mkpf
from ${hana_user}.Mseg
where mandt = #{mandt} and mblnr = #{mblnr} and mjahr = #{mjahr} and zeile = #{zeile}
</select>
<select
id=
"selectKonvById"
parameterType=
"Konv"
resultType=
"Konv"
>
select mandt,knumv,kposn,stunr,zaehk,kntyp,kbetr
from ${hana_user}.Konv
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论