Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
H
huazheng-project-flink
概览
概览
详情
活动
周期分析
版本库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
统计图
问题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
huazheng
huazheng-project-flink
Commits
26ca619a
提交
26ca619a
authored
1月 26, 2021
作者:
guofeng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'master' of
http://47.103.50.109:8091/huazheng/huazheng-project-flink.git
上级
31768357
49005a87
隐藏空白字符变更
内嵌
并排
正在显示
15 个修改的文件
包含
348 行增加
和
4 行删除
+348
-4
HZDataStream.java
src/main/java/com/huazheng/project/HZDataStream.java
+15
-0
GPMapper.java
.../java/com/huazheng/project/greenplum/mapper/GPMapper.java
+10
-0
CheckDeleteServiceImpl.java
...roject/greenplum/service/impl/CheckDeleteServiceImpl.java
+31
-0
CheckUpdateServiceImpl.java
...roject/greenplum/service/impl/CheckUpdateServiceImpl.java
+38
-0
GPServiceImpl.java
...uazheng/project/greenplum/service/impl/GPServiceImpl.java
+38
-0
JobServiceImpl.java
...azheng/project/greenplum/service/impl/JobServiceImpl.java
+27
-0
VbkdSource.java
...om/huazheng/project/greenplum/source/hana/VbkdSource.java
+71
-0
SapMapper.java
...main/java/com/huazheng/project/hana/mapper/SapMapper.java
+4
-0
Vbkd.java
src/main/java/com/huazheng/project/hana/model/Vbkd.java
+38
-0
Zsd06.java
src/main/java/com/huazheng/project/hana/model/Zsd06.java
+2
-2
华正项目-数据库表设计20210118.xlsx
src/main/resources/devtools/doc/华正项目-数据库表设计20210118.xlsx
+0
-0
t_vbkd.sql
src/main/resources/devtools/table/hana/t_vbkd.sql
+14
-0
GPMapper_greenplum.xml
src/main/resources/mapper/greenplum/GPMapper_greenplum.xml
+20
-1
SapMapper_hana.xml
src/main/resources/mapper/hana/SapMapper_hana.xml
+21
-0
spring-init.xml
src/main/resources/spring-init.xml
+19
-1
没有找到文件。
src/main/java/com/huazheng/project/HZDataStream.java
浏览文件 @
26ca619a
...
@@ -48,6 +48,7 @@ import com.huazheng.project.greenplum.source.hana.TvkbtSource;
...
@@ -48,6 +48,7 @@ import com.huazheng.project.greenplum.source.hana.TvkbtSource;
import
com.huazheng.project.greenplum.source.hana.VbakSource
;
import
com.huazheng.project.greenplum.source.hana.VbakSource
;
import
com.huazheng.project.greenplum.source.hana.VbapSource
;
import
com.huazheng.project.greenplum.source.hana.VbapSource
;
import
com.huazheng.project.greenplum.source.hana.VbepSource
;
import
com.huazheng.project.greenplum.source.hana.VbepSource
;
import
com.huazheng.project.greenplum.source.hana.VbkdSource
;
import
com.huazheng.project.greenplum.source.hana.VbpaSource
;
import
com.huazheng.project.greenplum.source.hana.VbpaSource
;
import
com.huazheng.project.greenplum.source.hana.VbrkSource
;
import
com.huazheng.project.greenplum.source.hana.VbrkSource
;
import
com.huazheng.project.greenplum.source.hana.VbrpSource
;
import
com.huazheng.project.greenplum.source.hana.VbrpSource
;
...
@@ -106,6 +107,7 @@ import com.huazheng.project.hana.model.Tvkbt;
...
@@ -106,6 +107,7 @@ import com.huazheng.project.hana.model.Tvkbt;
import
com.huazheng.project.hana.model.Vbak
;
import
com.huazheng.project.hana.model.Vbak
;
import
com.huazheng.project.hana.model.Vbap
;
import
com.huazheng.project.hana.model.Vbap
;
import
com.huazheng.project.hana.model.Vbep
;
import
com.huazheng.project.hana.model.Vbep
;
import
com.huazheng.project.hana.model.Vbkd
;
import
com.huazheng.project.hana.model.Vbpa
;
import
com.huazheng.project.hana.model.Vbpa
;
import
com.huazheng.project.hana.model.Vbrk
;
import
com.huazheng.project.hana.model.Vbrk
;
import
com.huazheng.project.hana.model.Vbrp
;
import
com.huazheng.project.hana.model.Vbrp
;
...
@@ -206,6 +208,7 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
...
@@ -206,6 +208,7 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
private
static
S067Source
s067Source
;
private
static
S067Source
s067Source
;
private
static
VbukSource
vbukSource
;
private
static
VbukSource
vbukSource
;
private
static
VbupSource
vbupSource
;
private
static
VbupSource
vbupSource
;
private
static
VbkdSource
vbkdSource
;
private
static
LikpSource
likpSource
;
private
static
LikpSource
likpSource
;
private
static
KonvSource
konvSource
;
private
static
KonvSource
konvSource
;
private
static
LipsSource
lipsSource
;
private
static
LipsSource
lipsSource
;
...
@@ -279,6 +282,7 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
...
@@ -279,6 +282,7 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
s067Source
=
(
S067Source
)
context
.
getBean
(
"s067Source"
);
s067Source
=
(
S067Source
)
context
.
getBean
(
"s067Source"
);
vbukSource
=
(
VbukSource
)
context
.
getBean
(
"vbukSource"
);
vbukSource
=
(
VbukSource
)
context
.
getBean
(
"vbukSource"
);
vbupSource
=
(
VbupSource
)
context
.
getBean
(
"vbupSource"
);
vbupSource
=
(
VbupSource
)
context
.
getBean
(
"vbupSource"
);
vbkdSource
=
(
VbkdSource
)
context
.
getBean
(
"vbkdSource"
);
likpSource
=
(
LikpSource
)
context
.
getBean
(
"likpSource"
);
likpSource
=
(
LikpSource
)
context
.
getBean
(
"likpSource"
);
konvSource
=
(
KonvSource
)
context
.
getBean
(
"konvSource"
);
konvSource
=
(
KonvSource
)
context
.
getBean
(
"konvSource"
);
lipsSource
=
(
LipsSource
)
context
.
getBean
(
"lipsSource"
);
lipsSource
=
(
LipsSource
)
context
.
getBean
(
"lipsSource"
);
...
@@ -930,6 +934,17 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
...
@@ -930,6 +934,17 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
}
}
}).
setParallelism
(
1
).
name
(
"拉取Vbup数据"
).
addSink
(
greenPlumRichSinkFunction
).
setParallelism
(
1
).
name
(
"输出Vbup数据"
);
}).
setParallelism
(
1
).
name
(
"拉取Vbup数据"
).
addSink
(
greenPlumRichSinkFunction
).
setParallelism
(
1
).
name
(
"输出Vbup数据"
);
// ================= VbkdRedis 队列 =================
// 销售凭证 : 项目状态
DataStream
<
String
>
vbkdRedis
=
env
.
addSource
(
vbkdSource
).
setParallelism
(
1
).
name
(
"输入Vbkd队列"
);
vbkdRedis
.
flatMap
(
new
FlatMapFunction
<
String
,
Vbkd
>()
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
flatMap
(
String
value
,
Collector
<
Vbkd
>
out
)
throws
Exception
{
gpserviceImpl
.
processVbkd
(
value
,
out
);
}
}).
setParallelism
(
1
).
name
(
"拉取Vbkd数据"
).
addSink
(
greenPlumRichSinkFunction
).
setParallelism
(
1
).
name
(
"输出Vbkd数据"
);
// =========================================== mysql ===========================================
// =========================================== mysql ===========================================
// Handover
// Handover
DataStream
<
String
>
handoverRedis
=
env
.
addSource
(
handoverSource
).
setParallelism
(
1
).
name
(
"输入Handover队列"
);
DataStream
<
String
>
handoverRedis
=
env
.
addSource
(
handoverSource
).
setParallelism
(
1
).
name
(
"输入Handover队列"
);
...
...
src/main/java/com/huazheng/project/greenplum/mapper/GPMapper.java
浏览文件 @
26ca619a
...
@@ -42,6 +42,7 @@ import com.huazheng.project.hana.model.Vbak;
...
@@ -42,6 +42,7 @@ import com.huazheng.project.hana.model.Vbak;
import
com.huazheng.project.hana.model.Vbap
;
import
com.huazheng.project.hana.model.Vbap
;
import
com.huazheng.project.hana.model.VbapAdv
;
import
com.huazheng.project.hana.model.VbapAdv
;
import
com.huazheng.project.hana.model.Vbep
;
import
com.huazheng.project.hana.model.Vbep
;
import
com.huazheng.project.hana.model.Vbkd
;
import
com.huazheng.project.hana.model.Vbpa
;
import
com.huazheng.project.hana.model.Vbpa
;
import
com.huazheng.project.hana.model.Vbrk
;
import
com.huazheng.project.hana.model.Vbrk
;
import
com.huazheng.project.hana.model.Vbrp
;
import
com.huazheng.project.hana.model.Vbrp
;
...
@@ -387,6 +388,15 @@ public interface GPMapper {
...
@@ -387,6 +388,15 @@ public interface GPMapper {
public
List
<
Vbup
>
selectVbupCheck
(
Vbup
build
);
public
List
<
Vbup
>
selectVbupCheck
(
Vbup
build
);
@Cacheable
(
key
=
"#root.method.name+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt"
,
unless
=
"#result == null"
)
@Cacheable
(
key
=
"#root.method.name+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt"
,
unless
=
"#result == null"
)
public
Vbkd
selectVbkd
(
Vbkd
vbkd
);
// 查询替代删除
public
void
insertVbkd
(
Vbkd
element
);
@CacheEvict
(
key
=
"'selectVbkd'+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt"
)
public
void
updateVbkd
(
Vbkd
vbkd
);
@CacheEvict
(
key
=
"'selectVbkd'+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt"
)
public
void
deleteVbkd
(
Vbkd
vbkd
);
public
List
<
Vbkd
>
selectVbkdCheck
(
Vbkd
build
);
@Cacheable
(
key
=
"#root.method.name+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt"
,
unless
=
"#result == null"
)
public
Zmdpc
selectZmdpc
(
Zmdpc
zmdpc
);
// 查询替代删除
public
Zmdpc
selectZmdpc
(
Zmdpc
zmdpc
);
// 查询替代删除
public
void
insertZmdpc
(
Zmdpc
element
);
public
void
insertZmdpc
(
Zmdpc
element
);
@CacheEvict
(
key
=
"'selectZmdpc'+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt"
)
@CacheEvict
(
key
=
"'selectZmdpc'+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt"
)
...
...
src/main/java/com/huazheng/project/greenplum/service/impl/CheckDeleteServiceImpl.java
浏览文件 @
26ca619a
...
@@ -28,6 +28,7 @@ import com.huazheng.project.hana.model.Pa0002;
...
@@ -28,6 +28,7 @@ import com.huazheng.project.hana.model.Pa0002;
import
com.huazheng.project.hana.model.Vbak
;
import
com.huazheng.project.hana.model.Vbak
;
import
com.huazheng.project.hana.model.Vbap
;
import
com.huazheng.project.hana.model.Vbap
;
import
com.huazheng.project.hana.model.Vbep
;
import
com.huazheng.project.hana.model.Vbep
;
import
com.huazheng.project.hana.model.Vbkd
;
import
com.huazheng.project.hana.model.Vbpa
;
import
com.huazheng.project.hana.model.Vbpa
;
import
com.huazheng.project.hana.model.Vbrk
;
import
com.huazheng.project.hana.model.Vbrk
;
import
com.huazheng.project.hana.model.Vbrp
;
import
com.huazheng.project.hana.model.Vbrp
;
...
@@ -573,6 +574,36 @@ public class CheckDeleteServiceImpl {
...
@@ -573,6 +574,36 @@ public class CheckDeleteServiceImpl {
}
}
}
}
public
void
selectVbkdCheckByDelete
()
{
try
{
ValueOperations
<
String
,
String
>
opsForValue
=
redis1Template
.
opsForValue
();
opsForValue
.
setIfAbsent
(
"huazheng:checkDelete:Vbkd:rowNum"
,
"0"
);
String
rowNum
=
opsForValue
.
get
(
"huazheng:checkDelete:Vbkd:rowNum"
);
Vbkd
build
=
Vbkd
.
builder
().
rowNum
(
rowNum
).
build
();
List
<
Vbkd
>
list
=
gpMapper
.
selectVbkdCheck
(
build
);
// 从数仓中查询一组数据
if
(
list
.
size
()
==
0
)
{
redis1Template
.
opsForValue
().
set
(
"huazheng:checkDelete:Vbkd:rowNum"
,
"0"
);
// 计数器复位
ThreadUtil
.
sleep
(
1000
);
// 没有数据了,休眠一下
}
list
.
forEach
(
target
->
{
// 遍历要检查的数据
Vbkd
source
=
sapMapper
.
selectVbkdById
(
target
);
// 根据主键查询源库中的数据
String
operator
=
"none"
;
if
(
source
==
null
)
{
// 如果源库中没有数据
gpMapper
.
deleteVbkd
(
target
);
// 删除数仓中的数据
operator
=
"delete"
;
}
redis1Template
.
opsForValue
().
set
(
"huazheng:checkDelete:Vbkd:rowNum"
,
target
.
getRowNum
());
if
(!
operator
.
equals
(
"none"
))
{
log
.
info
(
String
.
format
(
"selectVbkdcheckDelete --> rowNum:%s, operator:%s"
,
target
.
getRowNum
(),
operator
));
}
});
}
catch
(
Exception
e
)
{
redis1Template
.
opsForValue
().
set
(
"huazheng:checkDeleteError:Vbkd:rowNum"
,
SomeUtils
.
getErrorInfoFromException
(
e
));
}
}
public
void
selectZpoeditCheckByDelete
()
{
public
void
selectZpoeditCheckByDelete
()
{
try
{
try
{
ValueOperations
<
String
,
String
>
opsForValue
=
redis1Template
.
opsForValue
();
ValueOperations
<
String
,
String
>
opsForValue
=
redis1Template
.
opsForValue
();
...
...
src/main/java/com/huazheng/project/greenplum/service/impl/CheckUpdateServiceImpl.java
浏览文件 @
26ca619a
...
@@ -36,6 +36,7 @@ import com.huazheng.project.hana.model.Vbak;
...
@@ -36,6 +36,7 @@ import com.huazheng.project.hana.model.Vbak;
import
com.huazheng.project.hana.model.Vbap
;
import
com.huazheng.project.hana.model.Vbap
;
import
com.huazheng.project.hana.model.VbapAdv
;
import
com.huazheng.project.hana.model.VbapAdv
;
import
com.huazheng.project.hana.model.Vbep
;
import
com.huazheng.project.hana.model.Vbep
;
import
com.huazheng.project.hana.model.Vbkd
;
import
com.huazheng.project.hana.model.Vbpa
;
import
com.huazheng.project.hana.model.Vbpa
;
import
com.huazheng.project.hana.model.Vbrk
;
import
com.huazheng.project.hana.model.Vbrk
;
import
com.huazheng.project.hana.model.Vbrp
;
import
com.huazheng.project.hana.model.Vbrp
;
...
@@ -805,6 +806,29 @@ public class CheckUpdateServiceImpl {
...
@@ -805,6 +806,29 @@ public class CheckUpdateServiceImpl {
}
}
}
}
// 所属selectVbapCheckUpdate的级联
private
void
cascadeVbkdCheckByUpdate
(
Vbkd
source
,
Vbkd
target
)
{
if
(
target
!=
null
)
{
// 目标库有数据
String
shash
=
SecureUtil
.
md5
(
JSONUtil
.
toJsonStr
(
source
));
// 源库中数据的hash结果
String
thash
=
target
.
getHashResult
();
// 数仓中数据的hash结果
if
(!
shash
.
equals
(
thash
))
{
// 如果hash结果不一致
source
.
setHashResult
(
shash
);
// ===============================
// ===============================
while
(
true
)
{
try
{
gpMapper
.
updateVbkd
(
source
);
// 更新数据到数仓中
break
;
}
catch
(
RuntimeException
e
)
{
log
.
error
(
e
.
getMessage
());
ThreadUtil
.
safeSleep
(
500
);
}
}
ThreadUtil
.
safeSleep
(
500
);
}
}
}
// 所属selectMaraCheckUpdate的级联
// 所属selectMaraCheckUpdate的级联
private
void
cascadeMaktCheckByUpdate
(
Makt
source
,
Makt
target
)
{
private
void
cascadeMaktCheckByUpdate
(
Makt
source
,
Makt
target
)
{
if
(
target
!=
null
)
{
// 目标库有数据
if
(
target
!=
null
)
{
// 目标库有数据
...
@@ -1410,6 +1434,12 @@ public class CheckUpdateServiceImpl {
...
@@ -1410,6 +1434,12 @@ public class CheckUpdateServiceImpl {
SysSAPreturnNo
tSysSAPreturnNo
=
gpMapper
.
selectSysSAPreturnNo
(
sSysSAPreturnNo
);
// 查询目标库中SysSAPreturnNo表
SysSAPreturnNo
tSysSAPreturnNo
=
gpMapper
.
selectSysSAPreturnNo
(
sSysSAPreturnNo
);
// 查询目标库中SysSAPreturnNo表
cascadeSysSAPreturnNoCheckByUpdate
(
sSysSAPreturnNo
,
tSysSAPreturnNo
);
// 级联更新afko表
cascadeSysSAPreturnNoCheckByUpdate
(
sSysSAPreturnNo
,
tSysSAPreturnNo
);
// 级联更新afko表
}
}
// 级联更新业务
List
<
Vbkd
>
sVbkdList
=
sapMapper
.
cascadeVbkdByVbap
(
source
);
// 级联查询源库Vbkd表
for
(
Vbkd
sVbkd
:
sVbkdList
)
{
Vbkd
tVbkd
=
gpMapper
.
selectVbkd
(
sVbkd
);
// 查询目标库中Vbkd表
cascadeVbkdCheckByUpdate
(
sVbkd
,
tVbkd
);
// 级联更新Vbkd表
}
ThreadUtil
.
safeSleep
(
500
);
ThreadUtil
.
safeSleep
(
500
);
try
{
try
{
...
@@ -1812,6 +1842,14 @@ public class CheckUpdateServiceImpl {
...
@@ -1812,6 +1842,14 @@ public class CheckUpdateServiceImpl {
// ===============================
// ===============================
while
(
true
)
{
while
(
true
)
{
try
{
try
{
String
erdat
=
source
.
getErdat
();
String
erzet
=
source
.
getErzet
();
if
(
"00000000"
.
equals
(
erdat
))
{
source
.
setErdat2erzet
(
null
);
}
else
{
Date
date
=
DateUtil
.
parse
(
erdat
+
erzet
,
"yyyyMMddHHmmss"
);
source
.
setErdat2erzet
(
date
);
}
gpMapper
.
updateVbak
(
source
);
// 更新数据到数仓中
gpMapper
.
updateVbak
(
source
);
// 更新数据到数仓中
operator
=
"update"
;
operator
=
"update"
;
break
;
break
;
...
...
src/main/java/com/huazheng/project/greenplum/service/impl/GPServiceImpl.java
浏览文件 @
26ca619a
...
@@ -45,6 +45,7 @@ import com.huazheng.project.hana.model.Vbak;
...
@@ -45,6 +45,7 @@ import com.huazheng.project.hana.model.Vbak;
import
com.huazheng.project.hana.model.Vbap
;
import
com.huazheng.project.hana.model.Vbap
;
import
com.huazheng.project.hana.model.VbapAdv
;
import
com.huazheng.project.hana.model.VbapAdv
;
import
com.huazheng.project.hana.model.Vbep
;
import
com.huazheng.project.hana.model.Vbep
;
import
com.huazheng.project.hana.model.Vbkd
;
import
com.huazheng.project.hana.model.Vbpa
;
import
com.huazheng.project.hana.model.Vbpa
;
import
com.huazheng.project.hana.model.Vbrk
;
import
com.huazheng.project.hana.model.Vbrk
;
import
com.huazheng.project.hana.model.Vbrp
;
import
com.huazheng.project.hana.model.Vbrp
;
...
@@ -1168,6 +1169,35 @@ public class GPServiceImpl {
...
@@ -1168,6 +1169,35 @@ public class GPServiceImpl {
log
.
error
(
e
.
getMessage
());
log
.
error
(
e
.
getMessage
());
}
}
}
}
public
void
processVbkd
(
String
value
,
Collector
<
Vbkd
>
out
)
{
try
{
Vbkd
data
=
JSONUtil
.
toBean
(
value
,
Vbkd
.
class
);
Vbkd
exist
=
gpMapper
.
selectVbkd
(
data
);
if
(
exist
!=
null
)
{
data
.
setExist
(
true
);
// 已经在库
}
out
.
collect
(
data
);
}
catch
(
Exception
e
)
{
redis1Template
.
opsForHash
().
put
(
"huazheng:Vbkd:error"
,
"processVbkd"
,
getErrorInfoFromException
(
e
));
log
.
error
(
e
.
getMessage
());
}
}
public
void
sinkVbkd
(
Vbkd
element
)
{
try
{
if
(
element
.
isExist
()
==
false
)
{
log
.
debug
(
"GPServiceImpl.sinkVbkd()"
);
gpMapper
.
insertVbkd
(
element
);
}
}
catch
(
RuntimeException
e
)
{
redis1Template
.
opsForHash
().
put
(
"huazheng:Vbkd:error"
,
"sinkVbkd"
,
getErrorInfoFromException
(
e
));
log
.
error
(
e
.
getMessage
());
}
catch
(
Exception
e
)
{
redis1Template
.
opsForHash
().
put
(
"huazheng:Vbkd:error"
,
"sinkVbkd"
,
getErrorInfoFromException
(
e
));
log
.
error
(
e
.
getMessage
());
}
}
public
void
processHandover
(
String
value
,
Collector
<
Handover
>
out
)
{
public
void
processHandover
(
String
value
,
Collector
<
Handover
>
out
)
{
try
{
try
{
...
@@ -1855,6 +1885,14 @@ public class GPServiceImpl {
...
@@ -1855,6 +1885,14 @@ public class GPServiceImpl {
if
(
exist
!=
null
)
{
if
(
exist
!=
null
)
{
data
.
setExist
(
true
);
// 已经在库
data
.
setExist
(
true
);
// 已经在库
}
else
{
}
else
{
String
erdat
=
data
.
getErdat
();
String
erzet
=
data
.
getErzet
();
if
(
"00000000"
.
equals
(
erdat
))
{
data
.
setErdat2erzet
(
null
);
}
else
{
Date
date
=
DateUtil
.
parse
(
erdat
+
erzet
,
"yyyyMMddHHmmss"
);
data
.
setErdat2erzet
(
date
);
}
try
{
try
{
data
.
setAudat1
(
caDate
(
data
.
getAudat
()));
// 日期00000000格式转换,已处理异常
data
.
setAudat1
(
caDate
(
data
.
getAudat
()));
// 日期00000000格式转换,已处理异常
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
...
...
src/main/java/com/huazheng/project/greenplum/service/impl/JobServiceImpl.java
浏览文件 @
26ca619a
...
@@ -48,6 +48,7 @@ import com.huazheng.project.hana.model.Tvkbt;
...
@@ -48,6 +48,7 @@ import com.huazheng.project.hana.model.Tvkbt;
import
com.huazheng.project.hana.model.Vbak
;
import
com.huazheng.project.hana.model.Vbak
;
import
com.huazheng.project.hana.model.Vbap
;
import
com.huazheng.project.hana.model.Vbap
;
import
com.huazheng.project.hana.model.Vbep
;
import
com.huazheng.project.hana.model.Vbep
;
import
com.huazheng.project.hana.model.Vbkd
;
import
com.huazheng.project.hana.model.Vbpa
;
import
com.huazheng.project.hana.model.Vbpa
;
import
com.huazheng.project.hana.model.Vbrk
;
import
com.huazheng.project.hana.model.Vbrk
;
import
com.huazheng.project.hana.model.Vbrp
;
import
com.huazheng.project.hana.model.Vbrp
;
...
@@ -1032,6 +1033,32 @@ public class JobServiceImpl {
...
@@ -1032,6 +1033,32 @@ public class JobServiceImpl {
}
}
}
}
}
}
public
void
selectVbkdNew
()
{
DefaultRedisScript
<
String
>
script
=
new
DefaultRedisScript
<
String
>();
script
.
setResultType
(
String
.
class
);
script
.
setScriptSource
(
new
ResourceScriptSource
(
new
ClassPathResource
(
"luascript/vbap4send.lua"
)));
List
<
String
>
keys
=
Arrays
.
asList
(
"huazheng:Vbkd:sendcount"
,
"huazheng:Vbkd:rowids"
,
"huazheng:list:Vbkd"
);
ValueOperations
<
String
,
String
>
opsForValue
=
redis1Template
.
opsForValue
();
opsForValue
.
setIfAbsent
(
"huazheng:Vbkd:sendcount"
,
"0"
);
// 不存在则创建,存在则么有操作
opsForValue
.
setIfAbsent
(
"huazheng:Vbkd:receivecount"
,
"0"
);
// 不存在则创建,存在则么有操作
opsForValue
.
setIfAbsent
(
"huazheng:Vbkd:rowids"
,
"0"
);
// 不存在则创建,存在则么有操作
Long
sendcount
=
Long
.
valueOf
(
opsForValue
.
get
(
"huazheng:Vbkd:sendcount"
));
Long
receivecount
=
Long
.
valueOf
(
opsForValue
.
get
(
"huazheng:Vbkd:receivecount"
));
if
(
sendcount
-
receivecount
<=
20
)
{
// 如果发送数和消费数的差小于5则往队列中写数据
String
rowids
=
opsForValue
.
get
(
"huazheng:Vbkd:rowids"
);
// 标记id
Vbkd
vbkd
=
Vbkd
.
builder
().
rowids
(
Long
.
valueOf
(
rowids
)).
build
();
List
<
Vbkd
>
list
=
sapMapper
.
selectVbkdNew
(
vbkd
);
if
(!
list
.
isEmpty
())
{
list
.
forEach
(
item
->
{
JSONObject
json
=
JSONUtil
.
parseObj
(
item
,
false
);
String
execute
=
redis1Template
.
execute
(
script
,
keys
,
item
.
getRowids
().
toString
(),
json
.
toString
());
log
.
info
(
"标记时间回写 --> "
+
execute
);
});
}
}
}
public
void
selectVbapNew
()
{
public
void
selectVbapNew
()
{
DefaultRedisScript
<
String
>
script
=
new
DefaultRedisScript
<
String
>();
DefaultRedisScript
<
String
>
script
=
new
DefaultRedisScript
<
String
>();
script
.
setResultType
(
String
.
class
);
script
.
setResultType
(
String
.
class
);
...
...
src/main/java/com/huazheng/project/greenplum/source/hana/VbkdSource.java
0 → 100644
浏览文件 @
26ca619a
package
com
.
huazheng
.
project
.
greenplum
.
source
.
hana
;
import
java.io.PrintWriter
;
import
java.io.StringWriter
;
import
java.util.Arrays
;
import
java.util.List
;
import
org.apache.flink.streaming.api.functions.source.SourceFunction
;
import
org.springframework.core.io.ClassPathResource
;
import
org.springframework.data.redis.core.script.DefaultRedisScript
;
import
org.springframework.scripting.support.ResourceScriptSource
;
import
org.springframework.stereotype.Service
;
import
com.huazheng.project.HZDataStream
;
import
cn.hutool.core.thread.ThreadUtil
;
import
lombok.extern.log4j.Log4j2
;
@Log4j2
@Service
public
class
VbkdSource
implements
SourceFunction
<
String
>
{
private
static
final
long
serialVersionUID
=
1L
;
public
String
getErrorInfoFromException
(
Exception
e
)
{
try
{
StringWriter
sw
=
new
StringWriter
();
PrintWriter
pw
=
new
PrintWriter
(
sw
);
e
.
printStackTrace
(
pw
);
return
"\r\n"
+
sw
.
toString
()
+
"\r\n"
;
}
catch
(
Exception
e2
)
{
return
"bad getErrorInfoFromException"
;
}
}
@Override
public
void
run
(
SourceContext
<
String
>
ctx
)
throws
Exception
{
DefaultRedisScript
<
String
>
script
=
new
DefaultRedisScript
<
String
>();
script
.
setResultType
(
String
.
class
);
script
.
setScriptSource
(
new
ResourceScriptSource
(
new
ClassPathResource
(
"luascript/vbap.lua"
)));
List
<
String
>
keys
=
Arrays
.
asList
(
"huazheng:Vbkd:sendcount"
,
"huazheng:Vbkd:id"
,
"huazheng:list:Vbkd"
,
"huazheng:Vbkd:receivecount"
);
while
(
true
)
{
try
{
String
value
=
HZDataStream
.
redis1Template
.
execute
(
script
,
keys
,
""
);
String
[]
values
=
value
.
toString
().
split
(
"=========="
);
String
checkString
=
values
[
0
];
String
[]
split
=
checkString
.
split
(
", "
);
boolean
check
=
split
[
0
].
split
(
":"
)[
1
].
equals
(
split
[
1
].
split
(
":"
)[
1
]);
if
(
values
.
length
>
1
)
{
// 有数据字符串
// log.info(msg + " " + value.toString() + " " + check);
log
.
info
(
checkString
+
" "
+
check
);
ctx
.
collect
(
values
[
1
]);
}
else
{
// 没有数据字符串
ThreadUtil
.
sleep
(
1000
);
// 没有数据了,休眠一下
}
}
catch
(
Exception
e
)
{
HZDataStream
.
redis1Template
.
opsForHash
().
put
(
"huazheng:Vbkd:error"
,
"receivecount_elseerror"
,
getErrorInfoFromException
(
e
));
}
}
}
@Override
public
void
cancel
()
{
}
}
src/main/java/com/huazheng/project/hana/mapper/SapMapper.java
浏览文件 @
26ca619a
...
@@ -35,6 +35,7 @@ import com.huazheng.project.hana.model.Tvkbt;
...
@@ -35,6 +35,7 @@ import com.huazheng.project.hana.model.Tvkbt;
import
com.huazheng.project.hana.model.Vbak
;
import
com.huazheng.project.hana.model.Vbak
;
import
com.huazheng.project.hana.model.Vbap
;
import
com.huazheng.project.hana.model.Vbap
;
import
com.huazheng.project.hana.model.Vbep
;
import
com.huazheng.project.hana.model.Vbep
;
import
com.huazheng.project.hana.model.Vbkd
;
import
com.huazheng.project.hana.model.Vbpa
;
import
com.huazheng.project.hana.model.Vbpa
;
import
com.huazheng.project.hana.model.Vbrk
;
import
com.huazheng.project.hana.model.Vbrk
;
import
com.huazheng.project.hana.model.Vbrp
;
import
com.huazheng.project.hana.model.Vbrp
;
...
@@ -60,6 +61,7 @@ public interface SapMapper {
...
@@ -60,6 +61,7 @@ public interface SapMapper {
public
List
<
S067
>
selectS067New
(
S067
s067
);
public
List
<
S067
>
selectS067New
(
S067
s067
);
public
List
<
Vbuk
>
selectVbukNew
(
Vbuk
vbuk
);
public
List
<
Vbuk
>
selectVbukNew
(
Vbuk
vbuk
);
public
List
<
Vbup
>
selectVbupNew
(
Vbup
vbup
);
public
List
<
Vbup
>
selectVbupNew
(
Vbup
vbup
);
public
List
<
Vbkd
>
selectVbkdNew
(
Vbkd
vbkd
);
public
List
<
Likp
>
selectLikpNew
(
Likp
likp
);
public
List
<
Likp
>
selectLikpNew
(
Likp
likp
);
public
List
<
Konv
>
selectKonvNew
(
Konv
konv
);
public
List
<
Konv
>
selectKonvNew
(
Konv
konv
);
public
List
<
Lips
>
selectLipsNew
(
Lips
lips
);
public
List
<
Lips
>
selectLipsNew
(
Lips
lips
);
...
@@ -115,6 +117,7 @@ public interface SapMapper {
...
@@ -115,6 +117,7 @@ public interface SapMapper {
public
List
<
Makt
>
cascadeMaktByMara
(
Mara
mara
);
public
List
<
Makt
>
cascadeMaktByMara
(
Mara
mara
);
public
List
<
Vbuk
>
cascadeVbukByLips
(
Lips
lips
);
public
List
<
Vbuk
>
cascadeVbukByLips
(
Lips
lips
);
public
List
<
Vbup
>
cascadeVbupByLips
(
Lips
lips
);
public
List
<
Vbup
>
cascadeVbupByLips
(
Lips
lips
);
public
List
<
Vbkd
>
cascadeVbkdByVbap
(
Vbap
vbap
);
public
List
<
Zpoedit
>
cascadeZpoeditByAufk
(
Aufk
aufk
);
public
List
<
Zpoedit
>
cascadeZpoeditByAufk
(
Aufk
aufk
);
public
List
<
Bsad
>
cascadeBsadByBkpf
(
Bkpf
bkpf
);
public
List
<
Bsad
>
cascadeBsadByBkpf
(
Bkpf
bkpf
);
public
List
<
Bsid
>
cascadeBsidByBkpf
(
Bkpf
bkpf
);
public
List
<
Bsid
>
cascadeBsidByBkpf
(
Bkpf
bkpf
);
...
@@ -153,6 +156,7 @@ public interface SapMapper {
...
@@ -153,6 +156,7 @@ public interface SapMapper {
public
Vbrp
selectVbrpById
(
Vbrp
target
);
public
Vbrp
selectVbrpById
(
Vbrp
target
);
public
Vbuk
selectVbukById
(
Vbuk
target
);
public
Vbuk
selectVbukById
(
Vbuk
target
);
public
Vbup
selectVbupById
(
Vbup
target
);
public
Vbup
selectVbupById
(
Vbup
target
);
public
Vbkd
selectVbkdById
(
Vbkd
target
);
public
Zmdpc
selectZmdpcById
(
Zmdpc
target
);
public
Zmdpc
selectZmdpcById
(
Zmdpc
target
);
public
Zsd06
selectZsd06ById
(
Zsd06
target
);
public
Zsd06
selectZsd06ById
(
Zsd06
target
);
public
Zsdfhzl
selectZsdfhzlById
(
Zsdfhzl
target
);
public
Zsdfhzl
selectZsdfhzlById
(
Zsdfhzl
target
);
...
...
src/main/java/com/huazheng/project/hana/model/Vbkd.java
0 → 100644
浏览文件 @
26ca619a
package
com
.
huazheng
.
project
.
hana
.
model
;
import
java.io.Serializable
;
import
lombok.AllArgsConstructor
;
import
lombok.Builder
;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
import
lombok.experimental.Accessors
;
/**
* 销售凭证 : 业务数据
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors
(
chain
=
true
)
@Builder
public
class
Vbkd
implements
Serializable
{
private
static
final
long
serialVersionUID
=
1L
;
private
String
mandt
;
// 集团
private
String
vbeln
;
// 销售和分销凭证号
private
String
posnr
;
// 销售和分销凭证的项目号号
private
String
bstkd_e
;
// 运达方的采购订单编号
private
String
bstdk_e
;
// 运达方的PO日期
private
String
bsark_e
;
// 运达方采购订单类型
private
Long
rowids
;
// sap那边的rowid
private
boolean
exist
;
// 用于标记,不是字段
private
String
hashResult
;
// 数据hash标记
private
String
rowNum
;
// 用于标记,不是字段
}
src/main/java/com/huazheng/project/hana/model/Zsd06.java
浏览文件 @
26ca619a
...
@@ -41,8 +41,8 @@ public class Zsd06 implements Serializable {
...
@@ -41,8 +41,8 @@ public class Zsd06 implements Serializable {
private
String
cha_time
;
// 上次修改时间
private
String
cha_time
;
// 上次修改时间
private
Date
datum1
;
// 计划承诺交期日
private
Date
datum1
;
// 计划承诺交期日
private
Date
crt_datum1
;
// 创建日期
private
Date
crt_datum1
;
// 创建日期
+ 创建时间
private
Date
cha_datum1
;
// 最后修改日期
private
Date
cha_datum1
;
// 最后修改日期
+ 上次修改时间
private
Vbap
vbap
;
private
Vbap
vbap
;
...
...
src/main/resources/devtools/doc/华正项目-数据库表设计2021011
1
.xlsx
→
src/main/resources/devtools/doc/华正项目-数据库表设计2021011
8
.xlsx
浏览文件 @
26ca619a
No preview for this file type
src/main/resources/devtools/table/hana/t_vbkd.sql
0 → 100644
浏览文件 @
26ca619a
drop
table
vbkd
;
CREATE
TABLE
vbkd
(
mandt
text
,
vbeln
text
,
posnr
text
,
bstkd_e
text
,
bstdk_e
text
,
bsark_e
text
,
PRIMARY
KEY
(
mandt
,
vbeln
,
posnr
)
)
Distributed
by
(
mandt
,
vbeln
,
posnr
);
alter
table
vbkd
add
column
hashResult
text
;
alter
table
vbkd
add
column
rowNum
serial
;
src/main/resources/mapper/greenplum/GPMapper_greenplum.xml
浏览文件 @
26ca619a
...
@@ -1129,7 +1129,7 @@
...
@@ -1129,7 +1129,7 @@
erdat2erzet, erdat, erzet, spart, hashResult)
erdat2erzet, erdat, erzet, spart, hashResult)
values(#{vbeln}, #{mandt}, #{vbtyp}, #{audat}, #{waerk}, #{kalsm}, #{ctlpc}, #{kunnr}, #{bukrs_vf}, #{bstnk},
values(#{vbeln}, #{mandt}, #{vbtyp}, #{audat}, #{waerk}, #{kalsm}, #{ctlpc}, #{kunnr}, #{bukrs_vf}, #{bstnk},
#{bname}, #{telf1}, #{netwr}, #{vkbur}, #{audat1}, #{knumv}, #{vkorg}, #{vtweg}, #{kkber}, #{auart}, #{aedat}, #{pernr},
#{bname}, #{telf1}, #{netwr}, #{vkbur}, #{audat1}, #{knumv}, #{vkorg}, #{vtweg}, #{kkber}, #{auart}, #{aedat}, #{pernr},
#{erdat2erzet}, #{erdat}, #{erzet}, #{
erze
t}, #{hashResult})
#{erdat2erzet}, #{erdat}, #{erzet}, #{
spar
t}, #{hashResult})
</insert>
</insert>
<delete
id=
"deleteVbak"
parameterType=
"com.huazheng.project.hana.model.Vbak"
>
<delete
id=
"deleteVbak"
parameterType=
"com.huazheng.project.hana.model.Vbak"
>
delete from vbak where vbeln = #{vbeln} and mandt = #{mandt}
delete from vbak where vbeln = #{vbeln} and mandt = #{mandt}
...
@@ -1286,6 +1286,25 @@
...
@@ -1286,6 +1286,25 @@
select * from Vbup where rownum
>
#{rowNum} order by rownum limit 20
select * from Vbup where rownum
>
#{rowNum} order by rownum limit 20
</select>
</select>
<select
id=
"selectVbkd"
parameterType=
"com.huazheng.project.hana.model.Vbkd"
resultType=
"com.huazheng.project.hana.model.Vbkd"
>
select * from Vbkd where vbeln = #{vbeln} and posnr = #{posnr} and mandt = #{mandt}
</select>
<insert
id=
"insertVbkd"
parameterType=
"com.huazheng.project.hana.model.Vbkd"
>
insert into Vbkd (mandt, vbeln, posnr, bstkd_e, bstdk_e, bsark_e, hashResult)
values(#{mandt}, #{vbeln}, #{posnr}, #{bstkd_e}, #{bstdk_e}, #{bsark_e}, #{hashResult})
</insert>
<delete
id=
"deleteVbkd"
parameterType=
"com.huazheng.project.hana.model.Vbkd"
>
delete from Vbkd where vbeln = #{vbeln} and posnr = #{posnr} and mandt = #{mandt}
</delete>
<update
id=
"updateVbkd"
parameterType=
"com.huazheng.project.hana.model.Vbkd"
>
update Vbkd set
mandt = #{mandt}, vbeln = #{vbeln}, posnr = #{posnr}, bstkd_e = #{bstkd_e}, bstdk_e = #{bstdk_e}, bsark_e = #{bsark_e}, hashResult = #{hashResult}
where vbeln = #{vbeln} and posnr = #{posnr} and mandt = #{mandt}
</update>
<select
id=
"selectVbkdCheck"
parameterType=
"com.huazheng.project.hana.model.Vbkd"
resultType=
"com.huazheng.project.hana.model.Vbkd"
>
select * from Vbkd where rownum
>
#{rowNum} order by rownum limit 20
</select>
<select
id=
"selectZmdpc"
parameterType=
"com.huazheng.project.hana.model.Zmdpc"
resultType=
"com.huazheng.project.hana.model.Zmdpc"
>
<select
id=
"selectZmdpc"
parameterType=
"com.huazheng.project.hana.model.Zmdpc"
resultType=
"com.huazheng.project.hana.model.Zmdpc"
>
select * from Zmdpc
select * from Zmdpc
where vbeln = #{vbeln} and posnr = #{posnr} and mandt = #{mandt}
where vbeln = #{vbeln} and posnr = #{posnr} and mandt = #{mandt}
...
...
src/main/resources/mapper/hana/SapMapper_hana.xml
浏览文件 @
26ca619a
...
@@ -114,6 +114,14 @@
...
@@ -114,6 +114,14 @@
order by "$rowid$"
order by "$rowid$"
</select>
</select>
<select
id=
"selectVbkdNew"
parameterType=
"Vbkd"
resultType=
"Vbkd"
>
select top 20 "$rowid$" as rowids,
mandt, vbeln, posnr, bstkd_e, bstdk_e, bsark_e
from ${hana_user}.Vbkd
where "$rowid$"
>
#{rowids} ${hana_mandt}
order by "$rowid$"
</select>
<select
id=
"selectLikpNew"
parameterType=
"Likp"
resultType=
"Likp"
>
<select
id=
"selectLikpNew"
parameterType=
"Likp"
resultType=
"Likp"
>
select top 20 "$rowid$" as rowids,
select top 20 "$rowid$" as rowids,
vbeln, mandt, erdat, wadat_ist, ctlpc, kunnr, aedat,lfuhr
vbeln, mandt, erdat, wadat_ist, ctlpc, kunnr, aedat,lfuhr
...
@@ -498,6 +506,13 @@
...
@@ -498,6 +506,13 @@
where vbeln=#{vbeln} and posnr=#{posnr} ${hana_mandt}
where vbeln=#{vbeln} and posnr=#{posnr} ${hana_mandt}
</select>
</select>
<select
id=
"cascadeVbkdByVbap"
parameterType=
"Vbap"
resultType=
"Vbkd"
>
select
mandt, vbeln, posnr, bstkd_e, bstdk_e, bsark_e
from ${hana_user}.vbkd
where vbeln=#{vbeln} and posnr=#{posnr} ${hana_mandt}
</select>
<select
id=
"cascadeZpoeditByAufk"
parameterType=
"Aufk"
resultType=
"Zpoedit"
>
<select
id=
"cascadeZpoeditByAufk"
parameterType=
"Aufk"
resultType=
"Zpoedit"
>
select
select
mandt, pwerk, aufnr, reason
mandt, pwerk, aufnr, reason
...
@@ -723,6 +738,12 @@
...
@@ -723,6 +738,12 @@
where vbeln = #{vbeln} and posnr = #{posnr} and mandt = '800'
where vbeln = #{vbeln} and posnr = #{posnr} and mandt = '800'
</select>
</select>
<select
id=
"selectVbkdById"
parameterType=
"Vbkd"
resultType=
"Vbkd"
>
select mandt, vbeln, posnr, bstkd_e, bstdk_e, bsark_e
from ${hana_user}.Vbkd
where vbeln = #{vbeln} and posnr = #{posnr} and mandt = '800'
</select>
<select
id=
"selectZmdpcById"
parameterType=
"Zmdpc"
resultType=
"Zmdpc"
>
<select
id=
"selectZmdpcById"
parameterType=
"Zmdpc"
resultType=
"Zmdpc"
>
select vbeln,posnr,f_plant,plant,x_plant,mandt
select vbeln,posnr,f_plant,plant,x_plant,mandt
from ${hana_user}.Zmdpc
from ${hana_user}.Zmdpc
...
...
src/main/resources/spring-init.xml
浏览文件 @
26ca619a
...
@@ -604,6 +604,15 @@
...
@@ -604,6 +604,15 @@
<property
name=
"jobDetail"
>
<property
name=
"jobDetail"
>
<bean
parent=
"methodJobDetail"
>
<bean
parent=
"methodJobDetail"
>
<property
name=
"targetObject"
ref=
"jobServiceImpl"
/>
<property
name=
"targetObject"
ref=
"jobServiceImpl"
/>
<property
name=
"targetMethod"
value=
"selectVbkdNew"
/>
</bean>
</property>
<property
name=
"cronExpression"
value=
"* * * * * ?"
/>
</bean>
<bean
class=
"org.springframework.scheduling.quartz.CronTriggerFactoryBean"
>
<property
name=
"jobDetail"
>
<bean
parent=
"methodJobDetail"
>
<property
name=
"targetObject"
ref=
"jobServiceImpl"
/>
<property
name=
"targetMethod"
value=
"selectVbpaNew"
/>
<property
name=
"targetMethod"
value=
"selectVbpaNew"
/>
</bean>
</bean>
</property>
</property>
...
@@ -1236,6 +1245,15 @@
...
@@ -1236,6 +1245,15 @@
</property>
</property>
<property
name=
"cronExpression"
value=
"* * * * * ?"
/>
<property
name=
"cronExpression"
value=
"* * * * * ?"
/>
</bean>
</bean>
<bean
class=
"org.springframework.scheduling.quartz.CronTriggerFactoryBean"
>
<property
name=
"jobDetail"
>
<bean
parent=
"methodJobDetail"
>
<property
name=
"targetObject"
ref=
"checkDeleteServiceImpl"
/>
<property
name=
"targetMethod"
value=
"selectVbkdCheckByDelete"
/>
</bean>
</property>
<property
name=
"cronExpression"
value=
"* * * * * ?"
/>
</bean>
<!-- <bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<!-- <bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<property name="jobDetail">
...
@@ -1451,7 +1469,7 @@
...
@@ -1451,7 +1469,7 @@
<property
name=
"cronExpression"
value=
"* * * * * ?"
/>
<property
name=
"cronExpression"
value=
"* * * * * ?"
/>
</bean>
</bean>
<!-- 以上7
5
个任务 -->
<!-- 以上7
7
个任务 -->
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论