Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
H
huazheng-project-flink
概览
概览
详情
活动
周期分析
版本库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
统计图
问题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
huazheng
huazheng-project-flink
Commits
2f8abd87
提交
2f8abd87
authored
11月 19, 2020
作者:
think
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
添加表bkpf
上级
6d605e64
隐藏空白字符变更
内嵌
并排
正在显示
12 个修改的文件
包含
304 行增加
和
0 行删除
+304
-0
HZDataStream.java
src/main/java/com/huazheng/project/HZDataStream.java
+5
-0
GPMapper.java
.../java/com/huazheng/project/greenplum/mapper/GPMapper.java
+10
-0
DeleteUpdateJobServiceImpl.java
...ct/greenplum/service/impl/DeleteUpdateJobServiceImpl.java
+46
-0
GPServiceImpl.java
...uazheng/project/greenplum/service/impl/GPServiceImpl.java
+27
-0
JobServiceImpl.java
...azheng/project/greenplum/service/impl/JobServiceImpl.java
+28
-0
BkpfSource.java
...om/huazheng/project/greenplum/source/hana/BkpfSource.java
+72
-0
SapMapper.java
...main/java/com/huazheng/project/hana/mapper/SapMapper.java
+3
-0
Bkpf.java
src/main/java/com/huazheng/project/hana/model/Bkpf.java
+53
-0
华正项目-数据库表设计20201114.xlsx
src/main/resources/devtools/doc/华正项目-数据库表设计20201114.xlsx
+0
-0
bkpf.sql
src/main/resources/devtools/table/hana/bkpf.sql
+20
-0
GPMapper_greenplum.xml
src/main/resources/mapper/greenplum/GPMapper_greenplum.xml
+21
-0
SapMapper_hana.xml
src/main/resources/mapper/hana/SapMapper_hana.xml
+19
-0
没有找到文件。
src/main/java/com/huazheng/project/HZDataStream.java
浏览文件 @
2f8abd87
...
@@ -23,6 +23,7 @@ import com.huazheng.project.greenplum.source.hana.AfvcSource;
...
@@ -23,6 +23,7 @@ import com.huazheng.project.greenplum.source.hana.AfvcSource;
import
com.huazheng.project.greenplum.source.hana.AufkSource
;
import
com.huazheng.project.greenplum.source.hana.AufkSource
;
import
com.huazheng.project.greenplum.source.hana.AufmSource
;
import
com.huazheng.project.greenplum.source.hana.AufmSource
;
import
com.huazheng.project.greenplum.source.hana.AuspSource
;
import
com.huazheng.project.greenplum.source.hana.AuspSource
;
import
com.huazheng.project.greenplum.source.hana.BkpfSource
;
import
com.huazheng.project.greenplum.source.hana.BsadSource
;
import
com.huazheng.project.greenplum.source.hana.BsadSource
;
import
com.huazheng.project.greenplum.source.hana.BsidSource
;
import
com.huazheng.project.greenplum.source.hana.BsidSource
;
import
com.huazheng.project.greenplum.source.hana.Kna1Source
;
import
com.huazheng.project.greenplum.source.hana.Kna1Source
;
...
@@ -77,6 +78,7 @@ import com.huazheng.project.hana.model.Afvc;
...
@@ -77,6 +78,7 @@ import com.huazheng.project.hana.model.Afvc;
import
com.huazheng.project.hana.model.Aufk
;
import
com.huazheng.project.hana.model.Aufk
;
import
com.huazheng.project.hana.model.Aufm
;
import
com.huazheng.project.hana.model.Aufm
;
import
com.huazheng.project.hana.model.Ausp
;
import
com.huazheng.project.hana.model.Ausp
;
import
com.huazheng.project.hana.model.Bkpf
;
import
com.huazheng.project.hana.model.Bsad
;
import
com.huazheng.project.hana.model.Bsad
;
import
com.huazheng.project.hana.model.Bsid
;
import
com.huazheng.project.hana.model.Bsid
;
import
com.huazheng.project.hana.model.Kna1
;
import
com.huazheng.project.hana.model.Kna1
;
...
@@ -206,6 +208,7 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
...
@@ -206,6 +208,7 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
private
static
T023tSource
t023tSource
;
private
static
T023tSource
t023tSource
;
private
static
Kna1Source
kna1Source
;
private
static
Kna1Source
kna1Source
;
private
static
KnvvSource
knvvSource
;
private
static
KnvvSource
knvvSource
;
private
static
BkpfSource
bkpfSource
;
private
static
AfvcSource
afvcSource
;
private
static
AfvcSource
afvcSource
;
private
static
KnvpSource
knvpSource
;
private
static
KnvpSource
knvpSource
;
private
static
AuspSource
auspSource
;
private
static
AuspSource
auspSource
;
...
@@ -276,6 +279,7 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
...
@@ -276,6 +279,7 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
kna1Source
=
(
Kna1Source
)
context
.
getBean
(
"kna1Source"
);
kna1Source
=
(
Kna1Source
)
context
.
getBean
(
"kna1Source"
);
knvvSource
=
(
KnvvSource
)
context
.
getBean
(
"knvvSource"
);
knvvSource
=
(
KnvvSource
)
context
.
getBean
(
"knvvSource"
);
afvcSource
=
(
AfvcSource
)
context
.
getBean
(
"afvcSource"
);
afvcSource
=
(
AfvcSource
)
context
.
getBean
(
"afvcSource"
);
bkpfSource
=
(
BkpfSource
)
context
.
getBean
(
"bkpfSource"
);
knvpSource
=
(
KnvpSource
)
context
.
getBean
(
"knvpSource"
);
knvpSource
=
(
KnvpSource
)
context
.
getBean
(
"knvpSource"
);
auspSource
=
(
AuspSource
)
context
.
getBean
(
"auspSource"
);
auspSource
=
(
AuspSource
)
context
.
getBean
(
"auspSource"
);
afruSource
=
(
AfruSource
)
context
.
getBean
(
"afruSource"
);
afruSource
=
(
AfruSource
)
context
.
getBean
(
"afruSource"
);
...
@@ -934,6 +938,7 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
...
@@ -934,6 +938,7 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
env
.
addSource
(
zpoeditSource
).
setParallelism
(
1
).
name
(
"输入Zpoedit队列"
).
flatMap
(
greenPlumFlatMapFunction
).
returns
(
Zpoedit
.
class
).
setParallelism
(
1
).
name
(
"拉取zpoedit数据"
).
addSink
(
greenPlumRichSinkFunction
).
setParallelism
(
1
).
name
(
"输出Zpoedit数据"
);
env
.
addSource
(
zpoeditSource
).
setParallelism
(
1
).
name
(
"输入Zpoedit队列"
).
flatMap
(
greenPlumFlatMapFunction
).
returns
(
Zpoedit
.
class
).
setParallelism
(
1
).
name
(
"拉取zpoedit数据"
).
addSink
(
greenPlumRichSinkFunction
).
setParallelism
(
1
).
name
(
"输出Zpoedit数据"
);
env
.
addSource
(
auspSource
).
setParallelism
(
1
).
name
(
"输入Ausp队列"
).
flatMap
(
greenPlumFlatMapFunction
).
returns
(
Ausp
.
class
).
setParallelism
(
1
).
name
(
"拉取Ausp数据"
).
addSink
(
greenPlumRichSinkFunction
).
setParallelism
(
1
).
name
(
"输出Ausp数据"
);
env
.
addSource
(
auspSource
).
setParallelism
(
1
).
name
(
"输入Ausp队列"
).
flatMap
(
greenPlumFlatMapFunction
).
returns
(
Ausp
.
class
).
setParallelism
(
1
).
name
(
"拉取Ausp数据"
).
addSink
(
greenPlumRichSinkFunction
).
setParallelism
(
1
).
name
(
"输出Ausp数据"
);
env
.
addSource
(
knvpSource
).
setParallelism
(
1
).
name
(
"输入Knvp队列"
).
flatMap
(
greenPlumFlatMapFunction
).
returns
(
Knvp
.
class
).
setParallelism
(
1
).
name
(
"拉取Knvp数据"
).
addSink
(
greenPlumRichSinkFunction
).
setParallelism
(
1
).
name
(
"输出Knvp数据"
);
env
.
addSource
(
knvpSource
).
setParallelism
(
1
).
name
(
"输入Knvp队列"
).
flatMap
(
greenPlumFlatMapFunction
).
returns
(
Knvp
.
class
).
setParallelism
(
1
).
name
(
"拉取Knvp数据"
).
addSink
(
greenPlumRichSinkFunction
).
setParallelism
(
1
).
name
(
"输出Knvp数据"
);
env
.
addSource
(
bkpfSource
).
setParallelism
(
1
).
name
(
"输入Bkpf队列"
).
flatMap
(
greenPlumFlatMapFunction
).
returns
(
Bkpf
.
class
).
setParallelism
(
1
).
name
(
"拉取Bkpf数据"
).
addSink
(
greenPlumRichSinkFunction
).
setParallelism
(
1
).
name
(
"输出Bkpf数据"
);
env
.
execute
(
"华正数据迁移任务"
);
env
.
execute
(
"华正数据迁移任务"
);
}
}
...
...
src/main/java/com/huazheng/project/greenplum/mapper/GPMapper.java
浏览文件 @
2f8abd87
...
@@ -15,6 +15,7 @@ import com.huazheng.project.hana.model.Afvc;
...
@@ -15,6 +15,7 @@ import com.huazheng.project.hana.model.Afvc;
import
com.huazheng.project.hana.model.Aufk
;
import
com.huazheng.project.hana.model.Aufk
;
import
com.huazheng.project.hana.model.Aufm
;
import
com.huazheng.project.hana.model.Aufm
;
import
com.huazheng.project.hana.model.Ausp
;
import
com.huazheng.project.hana.model.Ausp
;
import
com.huazheng.project.hana.model.Bkpf
;
import
com.huazheng.project.hana.model.Bsad
;
import
com.huazheng.project.hana.model.Bsad
;
import
com.huazheng.project.hana.model.Bsid
;
import
com.huazheng.project.hana.model.Bsid
;
import
com.huazheng.project.hana.model.CAVThisMonth
;
import
com.huazheng.project.hana.model.CAVThisMonth
;
...
@@ -224,6 +225,15 @@ public interface GPMapper {
...
@@ -224,6 +225,15 @@ public interface GPMapper {
public
void
updateLikp
(
Likp
element
);
public
void
updateLikp
(
Likp
element
);
public
List
<
Likp
>
selectLikpCheck
(
Likp
build
);
public
List
<
Likp
>
selectLikpCheck
(
Likp
build
);
@Cacheable
(
key
=
"#root.method.name+':'+#p0.mandt+','+#p0.bukrs+','+#p0.belnr+','+#p0.gjahr"
,
unless
=
"#result == null"
)
public
Bkpf
selectBkpf
(
Bkpf
bkpf
);
// 查询替代删除
public
void
insertBkpf
(
Bkpf
element
);
@CacheEvict
(
key
=
"'selectBkpf'+':'+#p0.mandt+','+#p0.bukrs+','+#p0.belnr+','+#p0.gjahr"
)
public
void
deleteBkpf
(
Bkpf
item
);
@CacheEvict
(
key
=
"'selectBkpf'+':'+#p0.mandt+','+#p0.bukrs+','+#p0.belnr+','+#p0.gjahr"
)
public
void
updateBkpf
(
Bkpf
element
);
public
List
<
Bkpf
>
selectBkpfCheck
(
Bkpf
build
);
@Cacheable
(
key
=
"#root.method.name+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt"
,
unless
=
"#result == null"
)
@Cacheable
(
key
=
"#root.method.name+':'+#p0.vbeln+','+#p0.posnr+','+#p0.mandt"
,
unless
=
"#result == null"
)
public
Lips
selectLips
(
Lips
lips
);
// 查询替代删除
public
Lips
selectLips
(
Lips
lips
);
// 查询替代删除
public
void
insertLips
(
Lips
element
);
public
void
insertLips
(
Lips
element
);
...
...
src/main/java/com/huazheng/project/greenplum/service/impl/DeleteUpdateJobServiceImpl.java
浏览文件 @
2f8abd87
...
@@ -23,6 +23,7 @@ import com.huazheng.project.hana.model.Afvc;
...
@@ -23,6 +23,7 @@ import com.huazheng.project.hana.model.Afvc;
import
com.huazheng.project.hana.model.Aufk
;
import
com.huazheng.project.hana.model.Aufk
;
import
com.huazheng.project.hana.model.Aufm
;
import
com.huazheng.project.hana.model.Aufm
;
import
com.huazheng.project.hana.model.Ausp
;
import
com.huazheng.project.hana.model.Ausp
;
import
com.huazheng.project.hana.model.Bkpf
;
import
com.huazheng.project.hana.model.Bsad
;
import
com.huazheng.project.hana.model.Bsad
;
import
com.huazheng.project.hana.model.Bsid
;
import
com.huazheng.project.hana.model.Bsid
;
//import com.huazheng.project.hana.model.Bsid2Bsad;
//import com.huazheng.project.hana.model.Bsid2Bsad;
...
@@ -964,6 +965,7 @@ public class DeleteUpdateJobServiceImpl {
...
@@ -964,6 +965,7 @@ public class DeleteUpdateJobServiceImpl {
selectZsdfhzlCheck
();
// 40
selectZsdfhzlCheck
();
// 40
selectTvkbtCheck
();
//
selectTvkbtCheck
();
//
selectAfvcCheck
();
selectAfvcCheck
();
selectBkpfCheck
();
selectMkpfCheck
();
selectMkpfCheck
();
selectMsegCheck
();
selectMsegCheck
();
selectAfruCheck
();
selectAfruCheck
();
...
@@ -1394,6 +1396,50 @@ public class DeleteUpdateJobServiceImpl {
...
@@ -1394,6 +1396,50 @@ public class DeleteUpdateJobServiceImpl {
redis1Template
.
opsForValue
().
set
(
"huazheng:checkError:Afvc:rowNum"
,
getErrorInfoFromException
(
e
));
redis1Template
.
opsForValue
().
set
(
"huazheng:checkError:Afvc:rowNum"
,
getErrorInfoFromException
(
e
));
}
}
}
}
private
void
selectBkpfCheck
()
{
try
{
ValueOperations
<
String
,
String
>
opsForValue
=
redis1Template
.
opsForValue
();
opsForValue
.
setIfAbsent
(
"huazheng:check:Bkpf:rowNum"
,
"0"
);
String
rowNum
=
opsForValue
.
get
(
"huazheng:check:Bkpf:rowNum"
);
Bkpf
build
=
Bkpf
.
builder
().
rowNum
(
rowNum
).
build
();
List
<
Bkpf
>
list
=
gpMapper
.
selectBkpfCheck
(
build
);
// 从数仓中查询一组数据
if
(
list
.
size
()
==
0
)
{
redis1Template
.
opsForValue
().
set
(
"huazheng:check:Bkpf:rowNum"
,
"0"
);
// 计数器复位
ThreadUtil
.
sleep
(
1000
);
// 没有数据了,休眠一下
}
list
.
forEach
(
target
->
{
// 遍历要检查的数据
Bkpf
source
=
sapMapper
.
selectBkpfById
(
target
);
// 根据主键查询源库中的数据
String
operator
=
"none"
;
if
(
source
==
null
)
{
// 如果源库中没有数据
gpMapper
.
deleteBkpf
(
target
);
// 删除数仓中的数据
operator
=
"delete"
;
}
else
{
// 源库中有数据
String
shash
=
SecureUtil
.
md5
(
JSONUtil
.
toJsonStr
(
source
));
// 源库中数据的hash结果
String
thash
=
target
.
getHashResult
();
// 数仓中数据的hash结果
if
(!
shash
.
equals
(
thash
))
{
// 如果hash结果不一致
source
.
setHashResult
(
shash
);
while
(
true
)
{
try
{
gpMapper
.
updateBkpf
(
source
);
// 更新数据到数仓中
break
;
}
catch
(
RuntimeException
e
)
{
log
.
error
(
e
.
getMessage
());
ThreadUtil
.
safeSleep
(
500
);
}
}
ThreadUtil
.
safeSleep
(
500
);
}
}
redis1Template
.
opsForValue
().
set
(
"huazheng:check:Bkpf:rowNum"
,
target
.
getRowNum
());
if
(!
operator
.
equals
(
"none"
))
{
log
.
info
(
String
.
format
(
"selectBkpfCheck --> rowNum:%s, operator:%s"
,
target
.
getRowNum
(),
operator
));
}
});
}
catch
(
Exception
e
)
{
redis1Template
.
opsForValue
().
set
(
"huazheng:checkError:Bkpf:rowNum"
,
getErrorInfoFromException
(
e
));
}
}
private
void
selectKnvpCheck
()
{
private
void
selectKnvpCheck
()
{
try
{
try
{
ValueOperations
<
String
,
String
>
opsForValue
=
redis1Template
.
opsForValue
();
ValueOperations
<
String
,
String
>
opsForValue
=
redis1Template
.
opsForValue
();
...
...
src/main/java/com/huazheng/project/greenplum/service/impl/GPServiceImpl.java
浏览文件 @
2f8abd87
...
@@ -19,6 +19,7 @@ import com.huazheng.project.hana.model.Afvc;
...
@@ -19,6 +19,7 @@ import com.huazheng.project.hana.model.Afvc;
import
com.huazheng.project.hana.model.Aufk
;
import
com.huazheng.project.hana.model.Aufk
;
import
com.huazheng.project.hana.model.Aufm
;
import
com.huazheng.project.hana.model.Aufm
;
import
com.huazheng.project.hana.model.Ausp
;
import
com.huazheng.project.hana.model.Ausp
;
import
com.huazheng.project.hana.model.Bkpf
;
import
com.huazheng.project.hana.model.Bsad
;
import
com.huazheng.project.hana.model.Bsad
;
import
com.huazheng.project.hana.model.Bsid
;
import
com.huazheng.project.hana.model.Bsid
;
import
com.huazheng.project.hana.model.Kna1
;
import
com.huazheng.project.hana.model.Kna1
;
...
@@ -2168,4 +2169,30 @@ public class GPServiceImpl {
...
@@ -2168,4 +2169,30 @@ public class GPServiceImpl {
}
}
}
}
public
void
processBkpf
(
Bkpf
data
,
Collector
<
Bkpf
>
out
)
{
try
{
Bkpf
exist
=
gpMapper
.
selectBkpf
(
data
);
if
(
exist
!=
null
)
{
data
.
setExist
(
true
);
// 已经在库
}
out
.
collect
(
data
);
}
catch
(
Exception
e
)
{
redis1Template
.
opsForHash
().
put
(
"huazheng:Bkpf:error"
,
"processBkpf"
,
getErrorInfoFromException
(
e
));
log
.
error
(
e
.
getMessage
());
}
}
public
void
sinkBkpf
(
Bkpf
element
)
{
try
{
if
(
element
.
isExist
()
==
false
)
{
log
.
debug
(
"GPServiceImpl.sinkBkpf()"
);
gpMapper
.
insertBkpf
(
element
);
}
}
catch
(
RuntimeException
e
)
{
redis1Template
.
opsForHash
().
put
(
"huazheng:Bkpf:error"
,
"sinkBkpf"
,
getErrorInfoFromException
(
e
));
log
.
error
(
e
.
getMessage
());
}
catch
(
Exception
e
)
{
redis1Template
.
opsForHash
().
put
(
"huazheng:Bkpf:error"
,
"sinkBkpf"
,
getErrorInfoFromException
(
e
));
log
.
error
(
e
.
getMessage
());
}
}
}
}
src/main/java/com/huazheng/project/greenplum/service/impl/JobServiceImpl.java
浏览文件 @
2f8abd87
...
@@ -21,6 +21,7 @@ import com.huazheng.project.hana.model.Afvc;
...
@@ -21,6 +21,7 @@ import com.huazheng.project.hana.model.Afvc;
import
com.huazheng.project.hana.model.Aufk
;
import
com.huazheng.project.hana.model.Aufk
;
import
com.huazheng.project.hana.model.Aufm
;
import
com.huazheng.project.hana.model.Aufm
;
import
com.huazheng.project.hana.model.Ausp
;
import
com.huazheng.project.hana.model.Ausp
;
import
com.huazheng.project.hana.model.Bkpf
;
import
com.huazheng.project.hana.model.Bsad
;
import
com.huazheng.project.hana.model.Bsad
;
import
com.huazheng.project.hana.model.Bsid
;
import
com.huazheng.project.hana.model.Bsid
;
import
com.huazheng.project.hana.model.Kna1
;
import
com.huazheng.project.hana.model.Kna1
;
...
@@ -129,6 +130,7 @@ public class JobServiceImpl {
...
@@ -129,6 +130,7 @@ public class JobServiceImpl {
selectAfkoNew
();
selectAfkoNew
();
selectAfpoNew
();
selectAfpoNew
();
selectAfvcNew
();
selectAfvcNew
();
selectBkpfNew
();
selectAuspNew
();
selectAuspNew
();
selectAfruNew
();
selectAfruNew
();
selectTspatNew
();
selectTspatNew
();
...
@@ -577,6 +579,32 @@ public class JobServiceImpl {
...
@@ -577,6 +579,32 @@ public class JobServiceImpl {
}
}
}
}
private
void
selectBkpfNew
()
{
DefaultRedisScript
<
String
>
script
=
new
DefaultRedisScript
<
String
>();
script
.
setResultType
(
String
.
class
);
script
.
setScriptSource
(
new
ResourceScriptSource
(
new
ClassPathResource
(
"luascript/vbap4send.lua"
)));
List
<
String
>
keys
=
Arrays
.
asList
(
"huazheng:Bkpf:sendcount"
,
"huazheng:Bkpf:rowids"
,
"huazheng:list:Bkpf"
);
ValueOperations
<
String
,
String
>
opsForValue
=
redis1Template
.
opsForValue
();
opsForValue
.
setIfAbsent
(
"huazheng:Bkpf:sendcount"
,
"0"
);
// 不存在则创建,存在则么有操作
opsForValue
.
setIfAbsent
(
"huazheng:Bkpf:receivecount"
,
"0"
);
// 不存在则创建,存在则么有操作
opsForValue
.
setIfAbsent
(
"huazheng:Bkpf:rowids"
,
"0"
);
// 不存在则创建,存在则么有操作
Long
sendcount
=
Long
.
valueOf
(
opsForValue
.
get
(
"huazheng:Bkpf:sendcount"
));
Long
receivecount
=
Long
.
valueOf
(
opsForValue
.
get
(
"huazheng:Bkpf:receivecount"
));
if
(
sendcount
-
receivecount
<=
20
)
{
// 如果发送数和消费数的差小于5则往队列中写数据
String
rowids
=
opsForValue
.
get
(
"huazheng:Bkpf:rowids"
);
// 标记id
Bkpf
bkpf
=
Bkpf
.
builder
().
rowids
(
Long
.
valueOf
(
rowids
)).
build
();
List
<
Bkpf
>
list
=
sapMapper
.
selectBkpfNew
(
bkpf
);
if
(!
list
.
isEmpty
())
{
list
.
forEach
(
item
->
{
JSONObject
json
=
JSONUtil
.
parseObj
(
item
,
false
);
String
execute
=
redis1Template
.
execute
(
script
,
keys
,
item
.
getRowids
().
toString
(),
json
.
toString
());
log
.
info
(
"标记时间回写 --> "
+
execute
);
});
}
}
}
private
void
selectKnvpNew
()
{
private
void
selectKnvpNew
()
{
DefaultRedisScript
<
String
>
script
=
new
DefaultRedisScript
<
String
>();
DefaultRedisScript
<
String
>
script
=
new
DefaultRedisScript
<
String
>();
script
.
setResultType
(
String
.
class
);
script
.
setResultType
(
String
.
class
);
...
...
src/main/java/com/huazheng/project/greenplum/source/hana/BkpfSource.java
0 → 100644
浏览文件 @
2f8abd87
package
com
.
huazheng
.
project
.
greenplum
.
source
.
hana
;
import
java.io.PrintWriter
;
import
java.io.StringWriter
;
import
java.util.Arrays
;
import
java.util.List
;
import
org.apache.flink.streaming.api.functions.source.SourceFunction
;
import
org.springframework.core.io.ClassPathResource
;
import
org.springframework.data.redis.core.script.DefaultRedisScript
;
import
org.springframework.scripting.support.ResourceScriptSource
;
import
org.springframework.stereotype.Service
;
import
com.huazheng.project.HZDataStream
;
import
com.huazheng.project.hana.model.Bkpf
;
import
cn.hutool.core.thread.ThreadUtil
;
import
cn.hutool.json.JSONUtil
;
import
lombok.extern.log4j.Log4j2
;
@Log4j2
@Service
public
class
BkpfSource
implements
SourceFunction
<
Bkpf
>
{
private
static
final
long
serialVersionUID
=
1L
;
public
String
getErrorInfoFromException
(
Exception
e
)
{
try
{
StringWriter
sw
=
new
StringWriter
();
PrintWriter
pw
=
new
PrintWriter
(
sw
);
e
.
printStackTrace
(
pw
);
return
"\r\n"
+
sw
.
toString
()
+
"\r\n"
;
}
catch
(
Exception
e2
)
{
return
"bad getErrorInfoFromException"
;
}
}
@Override
public
void
run
(
SourceContext
<
Bkpf
>
ctx
)
throws
Exception
{
DefaultRedisScript
<
String
>
script
=
new
DefaultRedisScript
<
String
>();
script
.
setResultType
(
String
.
class
);
script
.
setScriptSource
(
new
ResourceScriptSource
(
new
ClassPathResource
(
"luascript/vbap.lua"
)));
List
<
String
>
keys
=
Arrays
.
asList
(
"huazheng:Bkpf:sendcount"
,
"huazheng:Bkpf:id"
,
"huazheng:list:Bkpf"
,
"huazheng:Bkpf:receivecount"
);
while
(
true
)
{
try
{
String
value
=
HZDataStream
.
redis1Template
.
execute
(
script
,
keys
,
""
);
String
[]
values
=
value
.
toString
().
split
(
"=========="
);
String
checkString
=
values
[
0
];
String
[]
split
=
checkString
.
split
(
", "
);
boolean
check
=
split
[
0
].
split
(
":"
)[
1
].
equals
(
split
[
1
].
split
(
":"
)[
1
]);
if
(
values
.
length
>
1
)
{
// 有数据字符串
// log.info(msg + " " + value.toString() + " " + check);
log
.
info
(
checkString
+
" "
+
check
);
Bkpf
data
=
JSONUtil
.
toBean
(
values
[
1
],
Bkpf
.
class
);
ctx
.
collect
(
data
);
}
else
{
// 没有数据字符串
ThreadUtil
.
sleep
(
1000
);
// 没有数据了,休眠一下
}
}
catch
(
Exception
e
)
{
HZDataStream
.
redis1Template
.
opsForHash
().
put
(
"huazheng:Bkpf:error"
,
"receivecount_elseerror"
,
getErrorInfoFromException
(
e
));
}
}
}
@Override
public
void
cancel
()
{
}
}
src/main/java/com/huazheng/project/hana/mapper/SapMapper.java
浏览文件 @
2f8abd87
...
@@ -9,6 +9,7 @@ import com.huazheng.project.hana.model.Afvc;
...
@@ -9,6 +9,7 @@ import com.huazheng.project.hana.model.Afvc;
import
com.huazheng.project.hana.model.Aufk
;
import
com.huazheng.project.hana.model.Aufk
;
import
com.huazheng.project.hana.model.Aufm
;
import
com.huazheng.project.hana.model.Aufm
;
import
com.huazheng.project.hana.model.Ausp
;
import
com.huazheng.project.hana.model.Ausp
;
import
com.huazheng.project.hana.model.Bkpf
;
import
com.huazheng.project.hana.model.Bsad
;
import
com.huazheng.project.hana.model.Bsad
;
import
com.huazheng.project.hana.model.Bsid
;
import
com.huazheng.project.hana.model.Bsid
;
import
com.huazheng.project.hana.model.Kna1
;
import
com.huazheng.project.hana.model.Kna1
;
...
@@ -83,6 +84,7 @@ public interface SapMapper {
...
@@ -83,6 +84,7 @@ public interface SapMapper {
public
List
<
Zpoedit
>
selectZpoeditNew
(
Zpoedit
zpoedit
);
public
List
<
Zpoedit
>
selectZpoeditNew
(
Zpoedit
zpoedit
);
public
List
<
Ausp
>
selectAuspNew
(
Ausp
ausp
);
public
List
<
Ausp
>
selectAuspNew
(
Ausp
ausp
);
public
List
<
Knvp
>
selectKnvpNew
(
Knvp
knvp
);
public
List
<
Knvp
>
selectKnvpNew
(
Knvp
knvp
);
public
List
<
Bkpf
>
selectBkpfNew
(
Bkpf
bkpf
);
public
List
<
Knkk
>
selectKnkkCheckByUpdate
(
Knkk
knkk
);
public
List
<
Knkk
>
selectKnkkCheckByUpdate
(
Knkk
knkk
);
public
List
<
Likp
>
selectLikpCheckByUpdate
(
Likp
likp
);
public
List
<
Likp
>
selectLikpCheckByUpdate
(
Likp
likp
);
...
@@ -93,6 +95,7 @@ public interface SapMapper {
...
@@ -93,6 +95,7 @@ public interface SapMapper {
public
List
<
Zsd06
>
selectZsd06CheckByUpdate
(
Zsd06
zsd06
);
public
List
<
Zsd06
>
selectZsd06CheckByUpdate
(
Zsd06
zsd06
);
public
List
<
Zsdfhzl
>
selectZsdfhzlCheckByUpdate
(
Zsdfhzl
zsdfhzl
);
public
List
<
Zsdfhzl
>
selectZsdfhzlCheckByUpdate
(
Zsdfhzl
zsdfhzl
);
public
Bkpf
selectBkpfById
(
Bkpf
target
);
public
Knvp
selectKnvpById
(
Knvp
target
);
public
Knvp
selectKnvpById
(
Knvp
target
);
public
Ausp
selectAuspById
(
Ausp
target
);
public
Ausp
selectAuspById
(
Ausp
target
);
public
Zpoedit
selectZpoeditById
(
Zpoedit
target
);
public
Zpoedit
selectZpoeditById
(
Zpoedit
target
);
...
...
src/main/java/com/huazheng/project/hana/model/Bkpf.java
0 → 100644
浏览文件 @
2f8abd87
package
com
.
huazheng
.
project
.
hana
.
model
;
import
java.io.Serializable
;
import
java.util.Date
;
import
org.springframework.format.annotation.DateTimeFormat
;
import
com.alibaba.fastjson.annotation.JSONField
;
import
lombok.AllArgsConstructor
;
import
lombok.Builder
;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
import
lombok.experimental.Accessors
;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors
(
chain
=
true
)
@Builder
public
class
Bkpf
implements
Serializable
{
private
static
final
long
serialVersionUID
=
1L
;
/* === 组合键 === */
private
String
mandt
;
// 集团
private
String
bukrs
;
// 公司代码
private
String
belnr
;
// 会计凭证编号
private
String
gjahr
;
// 会计年度
/* === 组合键 === */
private
String
blart
;
// 凭证类型
private
String
bldat
;
// 凭证中的凭证日期
private
String
budat
;
// 凭证中的过帐日期
private
String
monat
;
// 会计期间
@DateTimeFormat
(
pattern
=
"yyyy-MM-dd"
)
// 页面写入数据库时格式化
@JSONField
(
format
=
"yyyy-MM-dd"
)
// 数据库导出页面时json格式化
private
String
cpudt
;
// 会计凭证输入日期
private
String
cputm
;
// 输入时间
private
String
aedat
;
// 上次根据事务修改凭证的日期
private
String
upddt
;
// 上次凭证更新日期
@DateTimeFormat
(
pattern
=
"yyyy-MM-dd HH:mm:ss"
)
// 页面写入数据库时格式化
@JSONField
(
format
=
"yyyy-MM-dd HH:mm:ss"
)
// 数据库导出页面时json格式化
private
Date
cpudt_cputm
;
// 时间
private
Long
rowids
;
// sap那边的rowid
private
boolean
exist
;
// 用于标记,不是字段
private
String
hashResult
;
// 数据hash标记
private
String
rowNum
;
// 用于标记,不是字段
}
src/main/resources/devtools/doc/华正项目-数据库表设计20201114.xlsx
浏览文件 @
2f8abd87
No preview for this file type
src/main/resources/devtools/table/hana/bkpf.sql
0 → 100644
浏览文件 @
2f8abd87
drop
table
bkpf
;
CREATE
TABLE
bkpf
(
mandt
text
,
bukrs
text
,
belnr
text
,
gjahr
text
,
blart
text
,
bldat
text
,
budat
text
,
monat
text
,
cpudt
date
,
cputm
text
,
aedat
text
,
upddt
text
,
cpudt_cputm
timestamp
,
hashResult
text
,
rowNum
serial
,
PRIMARY
KEY
(
mandt
,
bukrs
,
belnr
,
gjahr
)
)
Distributed
by
(
mandt
,
bukrs
,
belnr
,
gjahr
);
src/main/resources/mapper/greenplum/GPMapper_greenplum.xml
浏览文件 @
2f8abd87
...
@@ -804,6 +804,27 @@
...
@@ -804,6 +804,27 @@
select * from Likp where rownum
>
#{rowNum} order by rownum limit 20
select * from Likp where rownum
>
#{rowNum} order by rownum limit 20
</select>
</select>
<select
id=
"selectBkpf"
parameterType=
"com.huazheng.project.hana.model.Bkpf"
resultType=
"com.huazheng.project.hana.model.Bkpf"
>
select * from Bkpf where mandt = #{mandt} and bukrs = #{bukrs} and belnr = #{belnr} and gjahr = #{gjahr}
</select>
<insert
id=
"insertBkpf"
parameterType=
"com.huazheng.project.hana.model.Bkpf"
>
insert into Bkpf (mandt, bukrs, belnr, gjahr , blart, bldat, budat, monat, cpudt,cputm, aedat, upddt, cpudt_cputm, hashResult)
values(#{mandt}, #{bukrs}, #{belnr}, #{gjahr}, #{blart}, #{bldat}, #{budat}, #{monat}, #{cpudt},#{cputm}, #{aedat}, #{upddt},#{cpudt_cputm},#{hashResult})
</insert>
<delete
id=
"deleteBkpf"
parameterType=
"com.huazheng.project.hana.model.Bkpf"
>
delete from Bkpf where mandt = #{mandt} and bukrs = #{bukrs} and belnr = #{belnr} and gjahr = #{gjahr}
</delete>
<update
id=
"updateBkpf"
parameterType=
"com.huazheng.project.hana.model.Bkpf"
>
update Bkpf set
mandt = #{mandt}, bukrs = #{bukrs}, belnr = #{belnr}, gjahr = #{gjahr}, blart = #{blart},
bldat = #{bldat}, budat = #{budat}, monat = #{monat}, cpudt = #{cpudt},
cputm = #{cputm}, aedat = #{aedat}, upddt = #{upddt}, cpudt_cputm = #{cpudt_cputm}, hashResult = #{hashResult}
where mandt = #{mandt} and bukrs = #{bukrs} and belnr = #{belnr} and gjahr = #{gjahr}
</update>
<select
id=
"selectBkpfCheck"
parameterType=
"com.huazheng.project.hana.model.Bkpf"
resultType=
"com.huazheng.project.hana.model.Bkpf"
>
select * from Bkpf where rownum
>
#{rowNum} order by rownum limit 20
</select>
<select
id=
"selectLips"
parameterType=
"com.huazheng.project.hana.model.Lips"
resultType=
"com.huazheng.project.hana.model.Lips"
>
<select
id=
"selectLips"
parameterType=
"com.huazheng.project.hana.model.Lips"
resultType=
"com.huazheng.project.hana.model.Lips"
>
select * from lips where vbeln = #{vbeln} and posnr = #{posnr} and mandt = #{mandt}
select * from lips where vbeln = #{vbeln} and posnr = #{posnr} and mandt = #{mandt}
</select>
</select>
...
...
src/main/resources/mapper/hana/SapMapper_hana.xml
浏览文件 @
2f8abd87
...
@@ -179,6 +179,16 @@
...
@@ -179,6 +179,16 @@
where "$rowid$"
>
#{rowids} ${hana_mandt}
where "$rowid$"
>
#{rowids} ${hana_mandt}
order by "$rowid$"
order by "$rowid$"
</select>
</select>
<select
id=
"selectBkpfNew"
parameterType=
"Bkpf"
resultType=
"Bkpf"
>
select top 20 "$rowid$" as rowids,
mandt, bukrs, belnr, gjahr, blart, bldat, budat, monat, cpudt, cputm, aedat, upddt,
(to_date(cpudt)||' '||to_time(cputm)) as wadat_ist2lfuhr
from ${hana_user}.Bkpf
where "$rowid$"
>
#{rowids} ${hana_mandt}
order by "$rowid$"
</select>
<select
id=
"selectKnvpNew"
parameterType=
"Knvp"
resultType=
"Knvp"
>
<select
id=
"selectKnvpNew"
parameterType=
"Knvp"
resultType=
"Knvp"
>
select top 20 "$rowid$" as rowids,
select top 20 "$rowid$" as rowids,
mandt, kunnr, vkorg, vtweg, spart, parvw, parza, kunn2, pernr, knref, pernr as pernr1
mandt, kunnr, vkorg, vtweg, spart, parvw, parza, kunn2, pernr, knref, pernr as pernr1
...
@@ -485,6 +495,15 @@
...
@@ -485,6 +495,15 @@
from ${hana_user}.likp
from ${hana_user}.likp
where vbeln = #{vbeln} and mandt = #{mandt}
where vbeln = #{vbeln} and mandt = #{mandt}
</select>
</select>
<select
id=
"selectBkpfById"
parameterType=
"Bkpf"
resultType=
"Bkpf"
>
select
mandt, bukrs, belnr, gjahr, blart, bldat, budat,monat,cpudt, cputm, aedat,upddt,
(to_date(cpudt)||' '||to_time(cputm)) as wadat_ist2lfuhr
from ${hana_user}.Bkpf
where mandt = #{mandt} and bukrs = #{bukrs} and belnr = #{belnr} and gjahr = #{gjahr}
</select>
<select
id=
"selectLipsById"
parameterType=
"Lips"
resultType=
"Lips"
>
<select
id=
"selectLipsById"
parameterType=
"Lips"
resultType=
"Lips"
>
select
select
vbeln, posnr, vgbel, vgpos, mandt, matnr, matkl, arktx, werks, lgort,
vbeln, posnr, vgbel, vgpos, mandt, matnr, matkl, arktx, werks, lgort,
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论