提交 e8636d71 作者: think

增加表knvp

上级 123574e6
......@@ -27,6 +27,7 @@ import com.huazheng.project.greenplum.source.hana.BsadSource;
import com.huazheng.project.greenplum.source.hana.BsidSource;
import com.huazheng.project.greenplum.source.hana.Kna1Source;
import com.huazheng.project.greenplum.source.hana.KnkkSource;
import com.huazheng.project.greenplum.source.hana.KnvpSource;
import com.huazheng.project.greenplum.source.hana.KnvvSource;
import com.huazheng.project.greenplum.source.hana.KonvSource;
import com.huazheng.project.greenplum.source.hana.LikpSource;
......@@ -80,6 +81,7 @@ import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid;
import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Knkk;
import com.huazheng.project.hana.model.Knvp;
import com.huazheng.project.hana.model.Knvv;
import com.huazheng.project.hana.model.Konv;
import com.huazheng.project.hana.model.Likp;
......@@ -205,6 +207,7 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
private static Kna1Source kna1Source;
private static KnvvSource knvvSource;
private static AfvcSource afvcSource;
private static KnvpSource knvpSource;
private static AuspSource auspSource;
private static TspatSource tspatSource;
private static AfruSource afruSource;
......@@ -273,6 +276,7 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
kna1Source = (Kna1Source) context.getBean("kna1Source");
knvvSource = (KnvvSource) context.getBean("knvvSource");
afvcSource = (AfvcSource) context.getBean("afvcSource");
knvpSource = (KnvpSource) context.getBean("knvpSource");
auspSource = (AuspSource) context.getBean("auspSource");
afruSource = (AfruSource) context.getBean("afruSource");
tspatSource = (TspatSource) context.getBean("tspatSource");
......@@ -929,6 +933,7 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
env.addSource(tspatSource).setParallelism(1).name("输入Tspat队列").flatMap(greenPlumFlatMapFunction).returns(Tspat.class).setParallelism(1).name("拉取Tspat数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Tspat数据");
env.addSource(zpoeditSource).setParallelism(1).name("输入Zpoedit队列").flatMap(greenPlumFlatMapFunction).returns(Zpoedit.class).setParallelism(1).name("拉取zpoedit数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Zpoedit数据");
env.addSource(auspSource).setParallelism(1).name("输入Ausp队列").flatMap(greenPlumFlatMapFunction).returns(Ausp.class).setParallelism(1).name("拉取Ausp数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Ausp数据");
env.addSource(knvpSource).setParallelism(1).name("输入Knvp队列").flatMap(greenPlumFlatMapFunction).returns(Knvp.class).setParallelism(1).name("拉取Knvp数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出Knvp数据");
env.execute("华正数据迁移任务");
}
......
......@@ -20,6 +20,7 @@ import com.huazheng.project.hana.model.Bsid;
import com.huazheng.project.hana.model.CAVThisMonth;
import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Knkk;
import com.huazheng.project.hana.model.Knvp;
import com.huazheng.project.hana.model.Knvv;
import com.huazheng.project.hana.model.Konv;
import com.huazheng.project.hana.model.Likp;
......@@ -70,6 +71,15 @@ import com.huazheng.project.mysql.model.TransformNewNode;
@CacheConfig(cacheNames = "huazheng")
public interface GPMapper {
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.kunnr+','+#p0.vkorg+','+#p0.vtweg+','+#p0.spart+','+#p0.parvw+','+#p0.parza", unless="#result == null")
public Knvp selectKnvp(Knvp knvp); // 查询替代删除
public void insertKnvp(Knvp element);
@CacheEvict(key = "'selectKnvp'+':'+#p0.mandt+','+#p0.kunnr+','+#p0.vkorg+','+#p0.vtweg+','+#p0.spart+','+#p0.parvw+','+#p0.parza")
public void deleteKnvp(Knvp item);
@CacheEvict(key = "'selectKnvp'+':'+#p0.mandt+','+#p0.kunnr+','+#p0.vkorg+','+#p0.vtweg+','+#p0.spart+','+#p0.parvw+','+#p0.parza")
public void updateKnvp(Knvp element);
public List<Knvp> selectKnvpCheck(Knvp build);
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.objek+','+#p0.atinn+','+#p0.atzhl+','+#p0.mafid+','+#p0.klart+','+#p0.adzhl", unless="#result == null")
public Ausp selectAusp(Ausp ausp); // 查询替代删除
public void insertAusp(Ausp element);
......
......@@ -28,6 +28,7 @@ import com.huazheng.project.hana.model.Bsid;
//import com.huazheng.project.hana.model.Bsid2Bsad;
import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Knkk;
import com.huazheng.project.hana.model.Knvp;
import com.huazheng.project.hana.model.Knvv;
import com.huazheng.project.hana.model.Konv;
import com.huazheng.project.hana.model.Likp;
......@@ -968,6 +969,7 @@ public class DeleteUpdateJobServiceImpl {
selectTspatCheck();
selectZpoeditCheck();
selectAuspCheck();
selectKnvpCheck();
}
private void selectAfkoCheck() {
try {
......@@ -1389,6 +1391,49 @@ public class DeleteUpdateJobServiceImpl {
}
}
private void selectKnvpCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Knvp:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Knvp:rowNum");
Knvp build = Knvp.builder().rowNum(rowNum).build();
List<Knvp> list = gpMapper.selectKnvpCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:check:Knvp:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Knvp source = sapMapper.selectKnvpById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteKnvp(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
while (true) {
try {
gpMapper.updateKnvp(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:check:Knvp:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectKnvpCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Knvp:rowNum", getErrorInfoFromException(e));
}
}
private void selectMkpfCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......
......@@ -23,6 +23,7 @@ import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid;
import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Knkk;
import com.huazheng.project.hana.model.Knvp;
import com.huazheng.project.hana.model.Knvv;
import com.huazheng.project.hana.model.Konv;
import com.huazheng.project.hana.model.Likp;
......@@ -2140,4 +2141,31 @@ public class GPServiceImpl {
}
}
public void processKnvp(Knvp data, Collector<Knvp> out) {
try {
Knvp exist = gpMapper.selectKnvp(data);
if (exist != null) {
data.setExist(true); // 已经在库
}
out.collect(data);
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:Knvp:error", "processKnvp", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
public void sinkKnvp(Knvp element) {
try {
if (element.isExist() == false) {
log.debug("GPServiceImpl.sinkKnvp()");
gpMapper.insertKnvp(element);
}
} catch (RuntimeException e) {
redis1Template.opsForHash().put("huazheng:Knvp:error", "sinkKnvp", getErrorInfoFromException(e));
log.error(e.getMessage());
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:Knvp:error", "sinkKnvp", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
}
......@@ -25,6 +25,7 @@ import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid;
import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Knkk;
import com.huazheng.project.hana.model.Knvp;
import com.huazheng.project.hana.model.Knvv;
import com.huazheng.project.hana.model.Konv;
import com.huazheng.project.hana.model.Likp;
......@@ -148,6 +149,7 @@ public class JobServiceImpl {
selectT001wNew();
selectMskaNew();
selectZsdfhzlNew();
selectKnvpNew();
}
private void selectVbapNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
......@@ -575,6 +577,32 @@ public class JobServiceImpl {
}
}
private void selectKnvpNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap4send.lua")));
List<String> keys = Arrays.asList("huazheng:Knvp:sendcount", "huazheng:Knvp:rowids", "huazheng:list:Knvp");
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:Knvp:sendcount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:Knvp:receivecount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:Knvp:rowids", "0"); // 不存在则创建,存在则么有操作
Long sendcount = Long.valueOf(opsForValue.get("huazheng:Knvp:sendcount"));
Long receivecount = Long.valueOf(opsForValue.get("huazheng:Knvp:receivecount"));
if (sendcount - receivecount <= 20) { // 如果发送数和消费数的差小于5则往队列中写数据
String rowids = opsForValue.get("huazheng:Knvp:rowids"); // 标记id
Knvp knvp = Knvp.builder().rowids(Long.valueOf(rowids)).build();
List<Knvp> list = sapMapper.selectKnvpNew(knvp);
if (!list.isEmpty()) {
list.forEach(item -> {
JSONObject json = JSONUtil.parseObj(item, false);
String execute = redis1Template.execute(script, keys, item.getRowids().toString(), json.toString());
log.info("标记时间回写 --> " + execute);
});
}
}
}
private void selectAfruNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
......
package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream;
import com.huazheng.project.hana.model.Knvp;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONUtil;
import lombok.extern.log4j.Log4j2;
@Log4j2
@Service
public class KnvpSource implements SourceFunction<Knvp> {
private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) {
try {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) {
return "bad getErrorInfoFromException";
}
}
@Override
public void run(SourceContext<Knvp> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Knvp:sendcount", "huazheng:Knvp:id", "huazheng:list:Knvp", "huazheng:Knvp:receivecount");
while (true) {
try {
String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("==========");
String checkString = values[0];
String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check);
Knvp data = JSONUtil.toBean(values[1], Knvp.class);
ctx.collect(data);
} else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
} catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Knvp:error", "receivecount_elseerror", getErrorInfoFromException(e));
}
}
}
@Override
public void cancel() {
}
}
......@@ -13,6 +13,7 @@ import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid;
import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Knkk;
import com.huazheng.project.hana.model.Knvp;
import com.huazheng.project.hana.model.Knvv;
import com.huazheng.project.hana.model.Konv;
import com.huazheng.project.hana.model.Likp;
......@@ -81,7 +82,9 @@ public interface SapMapper {
public List<Tspat> selectTspatNew(Tspat tspat);
public List<Zpoedit> selectZpoeditNew(Zpoedit zpoedit);
public List<Ausp> selectAuspNew(Ausp ausp);
public List<Knvp> selectKnvpNew(Knvp knvp);
public Knvp selectKnvpById(Knvp target);
public Ausp selectAuspById(Ausp target);
public Zpoedit selectZpoeditById(Zpoedit target);
public Tspat selectTspatById(Tspat target);
......
package com.huazheng.project.hana.model;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
@Builder
public class Knvp implements Serializable {
private static final long serialVersionUID = 1L;
/* === 组合键 === */
private String mandt; // 集团
private String kunnr; // 客户编号
private String vkorg; // 销售组织
private String vtweg; // 分销渠道
private String spart; // 产品组
private String parvw; // 合作伙伴功能
private String parza; // 合作伙伴计数器
/* === 组合键 === */
private String kunn2; // 业务伙伴的客户号
private Integer pernr; // 人员编号
private String knref; // 业务合作伙伴的客户描述
private Long rowids; // sap那边的rowid
private boolean exist; // 用于标记,不是字段
private String hashResult; // 数据hash标记
private String rowNum; // 用于标记,不是字段
}
......@@ -57,6 +57,8 @@ public class Vbak implements Serializable {
private String aedat; // 更新日期
private String spart; // 更新日期
private String erdat; // 输入日期
private String erzet; // 输入日期
......
drop table knvp;
CREATE TABLE knvp (
mandt text,
kunnr text,
vkorg text,
vtweg text,
spart text,
parvw text,
parza text,
kunn2 text,
pernr integer,
knref text,
hashResult text,
rowNum serial,
PRIMARY KEY (mandt,kunnr,vkorg,vtweg,spart,parvw,parza)
)
Distributed by (mandt,kunnr,vkorg,vtweg,spart,parvw,parza);
......@@ -631,6 +631,25 @@
select * from Afvc where rownum &gt; #{rowNum} order by rownum limit 20
</select>
<select id="selectKnvp" parameterType="com.huazheng.project.hana.model.Knvp" resultType="com.huazheng.project.hana.model.Knvp">
select * from Knvp where mandt = #{mandt} and kunnr = #{kunnr} and vkorg = #{vkorg} and vtweg = #{vtweg} and spart = #{spart} and parvw = #{parvw} and parza = #{parza}
</select>
<insert id="insertKnvp" parameterType="com.huazheng.project.hana.model.Knvp">
insert into Knvp (mandt, kunnr, vkorg, vtweg, spart, parvw, parza, kunn2, pernr, knref, hashResult)
values(#{mandt},#{kunnr},#{vkorg},#{vtweg},#{spart},#{parvw},#{parza},#{kunn2},#{pernr},#{knref},#{hashResult})
</insert>
<delete id="deleteKnvp" parameterType="com.huazheng.project.hana.model.Knvp">
delete from Knvp where mandt = #{mandt} and kunnr = #{kunnr} and vkorg = #{vkorg} and vtweg = #{vtweg} and spart = #{spart} and parvw = #{parvw} and parza = #{parza}
</delete>
<update id="updateKnvp" parameterType="com.huazheng.project.hana.model.Knvp">
update Knvp set
mandt = #{mandt}, kunnr = #{kunnr}, vkorg = #{vkorg}, vtweg = #{vtweg}, spart = #{spart}, parvw = #{parvw}, parza = #{parza}, kunn2 = #{kunn2}, pernr = #{pernr}, knref = #{knref}, hashResult = #{hashResult}
where mandt = #{mandt} and kunnr = #{kunnr} and vkorg = #{vkorg} and vtweg = #{vtweg} and spart = #{spart} and parvw = #{parvw} and parza = #{parza}
</update>
<select id="selectKnvpCheck" parameterType="com.huazheng.project.hana.model.Knvp" resultType="com.huazheng.project.hana.model.Knvp">
select * from Knvp where rownum &gt; #{rowNum} order by rownum limit 20
</select>
<select id="selectAusp" parameterType="com.huazheng.project.hana.model.Ausp" resultType="com.huazheng.project.hana.model.Ausp">
select * from Ausp where mandt = #{mandt} and objek = #{objek} and atinn = #{atinn} and atzhl = #{atzhl} and mafid = #{mafid} and klart = #{klart} and adzhl = #{adzhl}
</select>
......
......@@ -200,6 +200,14 @@
order by "$rowid$"
</select>
<select id="selectKnvpNew" parameterType="Knvp" resultType="Knvp">
select top 20 "$rowid$" as rowids,
mandt, kunnr, vkorg, vtweg, spart, parvw, parza, kunn2, pernr, knref
from ${hana_user}.Knvp
where "$rowid$" &gt; #{rowids} ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectAuspNew" parameterType="Ausp" resultType="Ausp">
select top 20 "$rowid$" as rowids,
mandt, objek, atinn, atzhl, mafid, klart, adzhl, atwrt
......@@ -380,6 +388,12 @@
where mandt = #{mandt} and aufpl = #{aufpl} and aplzl = #{aplzl}
</select>
<select id="selectKnvpById" parameterType="Knvp" resultType="Knvp">
select mandt, kunnr, vkorg, vtweg, spart, parvw, parza, kunn2, pernr, knref
from ${hana_user}.Knvp
where mandt = #{mandt} and kunnr = #{kunnr} and vkorg = #{vkorg} and vtweg = #{vtweg} and spart = #{spart} and parvw = #{parvw} and parza = #{parza}
</select>
<select id="selectAuspById" parameterType="Ausp" resultType="Ausp">
select mandt, objek, atinn, atzhl, mafid, klart, adzhl, atwrt
from ${hana_user}.Ausp
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论