提交 50a26ab1 作者: think

1.增加表SalesInquiryProcess2.增加表SalesInquiryProcessMX

上级 545f6efb
......@@ -58,6 +58,8 @@ import com.huazheng.project.greenplum.source.hana.Zsd06Source;
import com.huazheng.project.greenplum.source.hana.ZsdfhzlSource;
import com.huazheng.project.greenplum.source.mssql.SalesContractProcessSource;
import com.huazheng.project.greenplum.source.mssql.SalesForecastProcessSource;
import com.huazheng.project.greenplum.source.mssql.SalesInquiryProcessMXSource;
import com.huazheng.project.greenplum.source.mssql.SalesInquiryProcessSource;
import com.huazheng.project.greenplum.source.mssql.SampleApplicationProcessMingXiSource;
import com.huazheng.project.greenplum.source.mssql.SampleApplicationProcessSource;
import com.huazheng.project.greenplum.source.mssql.SampleClosingProcessMingXiSource;
......@@ -115,6 +117,8 @@ import com.huazheng.project.mssql.model.SalesContractProcess;
import com.huazheng.project.mssql.model.SalesContractProcessMX;
import com.huazheng.project.mssql.model.SalesForecastProcess;
import com.huazheng.project.mssql.model.SalesForecastProcessMX;
import com.huazheng.project.mssql.model.SalesInquiryProcess;
import com.huazheng.project.mssql.model.SalesInquiryProcessMX;
import com.huazheng.project.mssql.model.SampleApplicationProcess;
import com.huazheng.project.mssql.model.SampleApplicationProcessMingXi;
import com.huazheng.project.mssql.model.SampleClosingProcess;
......@@ -235,6 +239,8 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
private static SampleApplicationProcessMingXiSource sampleApplicationProcessMingXiSource;
private static SampleClosingProcessSource sampleClosingProcessSource;
private static SampleClosingProcessMingXiSource sampleClosingProcessMingXiSource;
private static SalesInquiryProcessSource salesInquiryProcessSource;
private static SalesInquiryProcessMXSource salesInquiryProcessMXSource;
private static TransformHistoryNodeSource transformHistoryNodeSource;
private static TransformNewNodeSource transformNewNodeSource;
......@@ -305,6 +311,8 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
sampleApplicationProcessMingXiSource = (SampleApplicationProcessMingXiSource) context.getBean("sampleApplicationProcessMingXiSource");
sampleClosingProcessSource = (SampleClosingProcessSource) context.getBean("sampleClosingProcessSource");
sampleClosingProcessMingXiSource = (SampleClosingProcessMingXiSource) context.getBean("sampleClosingProcessMingXiSource");
salesInquiryProcessSource = (SalesInquiryProcessSource) context.getBean("salesInquiryProcessSource");
salesInquiryProcessMXSource = (SalesInquiryProcessMXSource) context.getBean("salesInquiryProcessMXSource");
transformHistoryNodeSource = (TransformHistoryNodeSource) context.getBean("transformHistoryNodeSource");
transformNewNodeSource = (TransformNewNodeSource) context.getBean("transformNewNodeSource");
......@@ -385,6 +393,19 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
}
}).setParallelism(1).name("拉取SampleApplicationProcess数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出SampleApplicationProcess数据");
// ================= SalesInquiryProcessSource 队列 =================
DataStream<String> salesInquiryProcessSourceRedis = env.addSource(salesInquiryProcessSource).setParallelism(1).name("输入SalesInquiryProcessSource队列");
// 数据处理
salesInquiryProcessSourceRedis.flatMap(new FlatMapFunction<String, SalesInquiryProcess>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<SalesInquiryProcess> out) throws Exception {
gpserviceImpl.processSalesInquiryProcess(value, out);
}
}).setParallelism(1).name("拉取SalesInquiryProcess数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出SalesInquiryProcess数据");
// ================= SampleApplicationProcessMingXiSource 队列 =================
DataStream<String> sampleApplicationProcessMingXiRedis = env.addSource(sampleApplicationProcessMingXiSource).setParallelism(1).name("输入SampleApplicationProcessMingXi队列");
......@@ -398,6 +419,18 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
}
}).setParallelism(1).name("拉取SampleApplicationProcessMingXi数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出SampleApplicationProcessMingXi数据");
// ================= SalesInquiryProcessMXSource 队列 =================
DataStream<String> salesInquiryProcessMXSourceRedis = env.addSource(salesInquiryProcessMXSource).setParallelism(1).name("输入SalesInquiryProcessMXSource队列");
// 数据处理
salesInquiryProcessMXSourceRedis.flatMap(new FlatMapFunction<String, SalesInquiryProcessMX>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<SalesInquiryProcessMX> out) throws Exception {
gpserviceImpl.processSalesInquiryProcessMX(value, out);
}
}).setParallelism(1).name("拉取SalesInquiryProcessMX数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出SalesInquiryProcessMX数据");
// ================= SampleClosingProcessSource 队列 =================
DataStream<String> sampleClosingProcessRedis = env.addSource(sampleClosingProcessSource).setParallelism(1).name("输入SampleClosingProcess队列");
......
......@@ -54,6 +54,8 @@ import com.huazheng.project.mssql.model.SalesContractProcess;
import com.huazheng.project.mssql.model.SalesContractProcessMX;
import com.huazheng.project.mssql.model.SalesForecastProcess;
import com.huazheng.project.mssql.model.SalesForecastProcessMX;
import com.huazheng.project.mssql.model.SalesInquiryProcess;
import com.huazheng.project.mssql.model.SalesInquiryProcessMX;
import com.huazheng.project.mssql.model.SampleApplicationProcess;
import com.huazheng.project.mssql.model.SampleApplicationProcessMingXi;
import com.huazheng.project.mssql.model.SampleClosingProcess;
......@@ -632,6 +634,25 @@ public interface GPMapper {
public List<SampleApplicationProcessMingXi> selectSampleApplicationProcessMingXiCheck(SampleApplicationProcessMingXi build);
public Long selectSampleApplicationProcessMingXiMaxRowNum();
@Cacheable(key = "#root.method.name+':'+#p0.id", unless="#result == null")
public SalesInquiryProcess selectSalesInquiryProcess(SalesInquiryProcess sfp); // 查询替代删除
public void insertSalesInquiryProcess(SalesInquiryProcess element);
@CacheEvict(key = "'selectSalesInquiryProcess'+':'+#p0.id")
public void updateSalesInquiryProcess(SalesInquiryProcess element); // 配合redis中的reset标签进行更新
@CacheEvict(key = "'selectSalesInquiryProcess'+':'+#p0.id")
public void deleteSalesInquiryProcess(SalesInquiryProcess item);
public List<SalesInquiryProcess> selectSalesInquiryProcessCheck(SalesInquiryProcess build);
@Cacheable(key = "#root.method.name+':'+#p0.id", unless="#result == null")
public SalesInquiryProcessMX selectSalesInquiryProcessMX(SalesInquiryProcessMX sfp); // 查询替代删除
public void insertSalesInquiryProcessMX(SalesInquiryProcessMX element);
@CacheEvict(key = "'selectSalesInquiryProcessMX'+':'+#p0.id")
public void updateSalesInquiryProcessMX(SalesInquiryProcessMX element); //配合redis中的reset标签进行更新
@CacheEvict(key = "'selectSalesInquiryProcessMX'+':'+#p0.id")
public void deleteSalesInquiryProcessMX(SalesInquiryProcessMX item);
public List<SalesInquiryProcessMX> selectSalesInquiryProcessMXCheck(SalesInquiryProcessMX build);
@Cacheable(key = "#root.method.name+':'+#p0.id", unless="#result == null")
public SampleClosingProcess selectSampleClosingProcess(SampleClosingProcess sfp); // 查询替代删除
......
......@@ -41,6 +41,8 @@ import com.huazheng.project.mssql.model.SalesContractProcess;
import com.huazheng.project.mssql.model.SalesContractProcessMX;
import com.huazheng.project.mssql.model.SalesForecastProcess;
import com.huazheng.project.mssql.model.SalesForecastProcessMX;
import com.huazheng.project.mssql.model.SalesInquiryProcess;
import com.huazheng.project.mssql.model.SalesInquiryProcessMX;
import com.huazheng.project.mssql.model.SampleApplicationProcess;
import com.huazheng.project.mssql.model.SampleApplicationProcessMingXi;
import com.huazheng.project.mssql.model.SampleClosingProcess;
......@@ -894,6 +896,36 @@ public class CheckDeleteServiceImpl {
redis1Template.opsForValue().set("huazheng:checkDeleteError:SampleApplicationProcess:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectSalesInquiryProcessCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:SalesInquiryProcess:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:SalesInquiryProcess:rowNum");
SalesInquiryProcess build = SalesInquiryProcess.builder().rowNum(rowNum).build();
List<SalesInquiryProcess> list = gpMapper.selectSalesInquiryProcessCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkDelete:SalesInquiryProcess:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
SalesInquiryProcess source = crmMapper.selectSalesInquiryProcessById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteSalesInquiryProcess(target); // 删除数仓中的数据
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:SalesInquiryProcess:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectSalesInquiryProcesscheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:SalesInquiryProcess:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectSampleClosingProcessCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -981,6 +1013,36 @@ public class CheckDeleteServiceImpl {
redis1Template.opsForValue().set("huazheng:checkDeleteError:SampleApplicationProcessMingXi:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectSalesInquiryProcessMXCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:SalesInquiryProcessMX:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:SalesInquiryProcessMX:rowNum");
SalesInquiryProcessMX build = SalesInquiryProcessMX.builder().rowNum(rowNum).build();
List<SalesInquiryProcessMX> list = gpMapper.selectSalesInquiryProcessMXCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkDelete:SalesInquiryProcessMX:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
SalesInquiryProcessMX source = crmMapper.selectSalesInquiryProcessMXById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteSalesInquiryProcessMX(target); // 删除数仓中的数据
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:SalesInquiryProcessMX:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectSalesInquiryProcessMXcheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:SalesInquiryProcessMX:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectSampleClosingProcessMingXiCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......
......@@ -49,6 +49,8 @@ import com.huazheng.project.mssql.model.SalesContractProcess;
import com.huazheng.project.mssql.model.SalesContractProcessMX;
import com.huazheng.project.mssql.model.SalesForecastProcess;
import com.huazheng.project.mssql.model.SalesForecastProcessMX;
import com.huazheng.project.mssql.model.SalesInquiryProcess;
import com.huazheng.project.mssql.model.SalesInquiryProcessMX;
import com.huazheng.project.mssql.model.SampleApplicationProcess;
import com.huazheng.project.mssql.model.SampleApplicationProcessMingXi;
import com.huazheng.project.mssql.model.SampleClosingProcess;
......@@ -1107,7 +1109,28 @@ public class CheckUpdateServiceImpl {
}
}
}
// 所属selectSalesInquiryProcessCheckUpdate的级联
private void cascadeSalesInquiryProcessMXCheckByUpdate(SalesInquiryProcessMX source, SalesInquiryProcessMX target) {
if (target != null) { // 目标库有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
// ===============================
while (true) {
try {
gpMapper.updateSalesInquiryProcessMX(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
}
public void selectHandoverCheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -1938,6 +1961,61 @@ public class CheckUpdateServiceImpl {
redis1Template.opsForValue().set("huazheng:checkUpdateError:SampleApplicationProcess:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectSalesInquiryProcessCheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkUpdate:SalesInquiryProcess:rowids", "0");
String rowids = opsForValue.get("huazheng:checkUpdate:SalesInquiryProcess:rowids");
SalesInquiryProcess build = SalesInquiryProcess.builder().id(rowids).build();
List<SalesInquiryProcess> slist = crmMapper.selectSalesInquiryProcessCheckByUpdate(build); // 从数仓中查询一组数据
if (slist.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkUpdate:SalesInquiryProcess:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历要检查的数据
SalesInquiryProcess target = gpMapper.selectSalesInquiryProcess(source); // 根据主键查询源库中的数据
String operator = "none";
String srowids = source.getId();
if (target != null) {
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
// ===============================
while (true) {
try {
gpMapper.updateSalesInquiryProcess(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
// 级联更新业务
List<SalesInquiryProcessMX> sSalesInquiryProcessMXList = crmMapper.cascadeSalesInquiryProcessMXBySalesInquiryProcess(source); // 级联查询源库
for (SalesInquiryProcessMX sSalesInquiryProcessMX : sSalesInquiryProcessMXList) {
SalesInquiryProcessMX tSalesInquiryProcessMX = gpMapper.selectSalesInquiryProcessMX(sSalesInquiryProcessMX); // 查询目标库中
cascadeSalesInquiryProcessMXCheckByUpdate(sSalesInquiryProcessMX, tSalesInquiryProcessMX); // 级联更新
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:checkUpdate:SalesInquiryProcess:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectSalesInquiryProcesscheckUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:SalesInquiryProcess:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectSampleClosingProcessCheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......
......@@ -58,6 +58,8 @@ import com.huazheng.project.mssql.model.SalesContractProcess;
import com.huazheng.project.mssql.model.SalesContractProcessMX;
import com.huazheng.project.mssql.model.SalesForecastProcess;
import com.huazheng.project.mssql.model.SalesForecastProcessMX;
import com.huazheng.project.mssql.model.SalesInquiryProcess;
import com.huazheng.project.mssql.model.SalesInquiryProcessMX;
import com.huazheng.project.mssql.model.SampleApplicationProcess;
import com.huazheng.project.mssql.model.SampleApplicationProcessMingXi;
import com.huazheng.project.mssql.model.SampleClosingProcess;
......@@ -359,6 +361,61 @@ public class GPServiceImpl {
log.error(e.getMessage());
}
}
public void processSalesInquiryProcess(String value, Collector<SalesInquiryProcess> arg2) {
try {
SalesInquiryProcess data = JSONUtil.toBean(value, SalesInquiryProcess.class);
SalesInquiryProcess exist = gpMapper.selectSalesInquiryProcess(data);
if (exist != null) {
data.setExist(true); // 已经在库
}
arg2.collect(data);
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:SalesInquiryProcess:error", "processSalesInquiryProcess", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
public void sinkSalesInquiryProcess(SalesInquiryProcess element) {
try {
if (element.isExist() == false) {
log.debug("GPServiceImpl.sinkSalesInquiryProcess()");
gpMapper.insertSalesInquiryProcess(element);
}
} catch (RuntimeException e) {
redis1Template.opsForHash().put("huazheng:SalesInquiryProcess:error", "sinkSalesInquiryProcess", getErrorInfoFromException(e));
log.error(e.getMessage());
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:SalesInquiryProcess:error", "sinkSalesInquiryProcess", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
public void processSalesInquiryProcessMX(String value, Collector<SalesInquiryProcessMX> arg2) {
try {
SalesInquiryProcessMX data = JSONUtil.toBean(value, SalesInquiryProcessMX.class);
SalesInquiryProcessMX exist = gpMapper.selectSalesInquiryProcessMX(data);
if (exist != null) {
data.setExist(true); // 已经在库
}
arg2.collect(data);
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:SalesInquiryProcessMX:error", "processSalesInquiryProcessMX", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
public void sinkSalesInquiryProcessMX(SalesInquiryProcessMX element) {
try {
if (element.isExist() == false) {
log.debug("GPServiceImpl.sinkSalesInquiryProcessMX()");
gpMapper.insertSalesInquiryProcessMX(element);
}
} catch (RuntimeException e) {
redis1Template.opsForHash().put("huazheng:SalesInquiryProcessMX:error", "sinkSalesInquiryProcessMX", getErrorInfoFromException(e));
log.error(e.getMessage());
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:SalesInquiryProcessMX:error", "sinkSalesInquiryProcessMX", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
public void processSampleApplicationProcess(String value, Collector<SampleApplicationProcess> arg2) {
try {
......@@ -394,7 +451,6 @@ public class GPServiceImpl {
log.error(e.getMessage());
}
}
public void processSampleApplicationProcessMingXi(String value, Collector<SampleApplicationProcessMingXi> arg2) {
try {
SampleApplicationProcessMingXi data = JSONUtil.toBean(value, SampleApplicationProcessMingXi.class);
......@@ -408,7 +464,6 @@ public class GPServiceImpl {
log.error(e.getMessage());
}
}
public void sinkSampleApplicationProcessMingXi(SampleApplicationProcessMingXi element) {
try {
if (element.isExist() == false) {
......
......@@ -59,6 +59,8 @@ import com.huazheng.project.hana.model.Zsdfhzl;
import com.huazheng.project.mssql.mapper.CrmMapper;
import com.huazheng.project.mssql.model.SalesContractProcess;
import com.huazheng.project.mssql.model.SalesForecastProcess;
import com.huazheng.project.mssql.model.SalesInquiryProcess;
import com.huazheng.project.mssql.model.SalesInquiryProcessMX;
import com.huazheng.project.mssql.model.SampleApplicationProcess;
import com.huazheng.project.mssql.model.SampleApplicationProcessMingXi;
import com.huazheng.project.mssql.model.SampleClosingProcess;
......@@ -1254,6 +1256,58 @@ public class JobServiceImpl {
}
}
}
public void selectSalesInquiryProcessNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap4send.lua")));
List<String> keys = Arrays.asList("huazheng:SalesInquiryProcess:sendcount", "huazheng:SalesInquiryProcess:id", "huazheng:list:SalesInquiryProcess");
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:SalesInquiryProcess:sendcount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:SalesInquiryProcess:receivecount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:SalesInquiryProcess:id", "0"); // 不存在则创建,存在则么有操作
Long sendcount = Long.valueOf(opsForValue.get("huazheng:SalesInquiryProcess:sendcount"));
Long receivecount = Long.valueOf(opsForValue.get("huazheng:SalesInquiryProcess:receivecount"));
if (sendcount - receivecount <= 20) { // 如果发送数和消费数的差小于5则往队列中写数据
String id = opsForValue.get("huazheng:SalesInquiryProcess:id"); // 标记id
SalesInquiryProcess scp = SalesInquiryProcess.builder().id(id).build();
List<SalesInquiryProcess> list = crmMapper.selectSalesInquiryProcessNew(scp);
if (!list.isEmpty()) {
list.forEach(item -> {
JSONObject json = JSONUtil.parseObj(item, false);
String execute = redis1Template.execute(script, keys, item.getId(), json.toString());
log.info("标记时间回写 --> " + execute);
});
}
}
}
public void selectSalesInquiryProcessMXNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap4send.lua")));
List<String> keys = Arrays.asList("huazheng:SalesInquiryProcessMX:sendcount", "huazheng:SalesInquiryProcessMX:id", "huazheng:list:SalesInquiryProcessMX");
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:SalesInquiryProcessMX:sendcount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:SalesInquiryProcessMX:receivecount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:SalesInquiryProcessMX:id", "0"); // 不存在则创建,存在则么有操作
Long sendcount = Long.valueOf(opsForValue.get("huazheng:SalesInquiryProcessMX:sendcount"));
Long receivecount = Long.valueOf(opsForValue.get("huazheng:SalesInquiryProcessMX:receivecount"));
if (sendcount - receivecount <= 20) { // 如果发送数和消费数的差小于5则往队列中写数据
String id = opsForValue.get("huazheng:SalesInquiryProcessMX:id"); // 标记id
SalesInquiryProcessMX scp = SalesInquiryProcessMX.builder().id(id).build();
List<SalesInquiryProcessMX> list = crmMapper.selectSalesInquiryProcessMXNew(scp);
if (!list.isEmpty()) {
list.forEach(item -> {
JSONObject json = JSONUtil.parseObj(item, false);
String execute = redis1Template.execute(script, keys, item.getId(), json.toString());
log.info("标记时间回写 --> " + execute);
});
}
}
}
public void selectSampleApplicationProcessNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
......
package com.huazheng.project.greenplum.source.mssql;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2;
@Log4j2
@Service
public class SalesInquiryProcessMXSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) {
try {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) {
return "bad getErrorInfoFromException";
}
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:SalesInquiryProcessMX:sendcount", "huazheng:SalesInquiryProcessMX:id", "huazheng:list:SalesInquiryProcessMX", "huazheng:SalesInquiryProcessMX:receivecount");
while (true) {
try {
String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("==========");
String checkString = values[0];
String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check);
ctx.collect(values[1]);
} else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
} catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:SalesInquiryProcessMX:error", "receivecount_elseerror", getErrorInfoFromException(e));
}
}
}
@Override
public void cancel() {
}
}
package com.huazheng.project.greenplum.source.mssql;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2;
@Log4j2
@Service
public class SalesInquiryProcessSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) {
try {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) {
return "bad getErrorInfoFromException";
}
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:SalesInquiryProcess:sendcount", "huazheng:SalesInquiryProcess:id", "huazheng:list:SalesInquiryProcess", "huazheng:SalesInquiryProcess:receivecount");
while (true) {
try {
String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("==========");
String checkString = values[0];
String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check);
ctx.collect(values[1]);
} else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
} catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:SalesInquiryProcess:error", "receivecount_elseerror", getErrorInfoFromException(e));
}
}
}
@Override
public void cancel() {
}
}
......@@ -6,6 +6,8 @@ import com.huazheng.project.mssql.model.SalesContractProcess;
import com.huazheng.project.mssql.model.SalesContractProcessMX;
import com.huazheng.project.mssql.model.SalesForecastProcess;
import com.huazheng.project.mssql.model.SalesForecastProcessMX;
import com.huazheng.project.mssql.model.SalesInquiryProcess;
import com.huazheng.project.mssql.model.SalesInquiryProcessMX;
import com.huazheng.project.mssql.model.SampleApplicationProcess;
import com.huazheng.project.mssql.model.SampleApplicationProcessMingXi;
import com.huazheng.project.mssql.model.SampleClosingProcess;
......@@ -17,6 +19,10 @@ public interface CrmMapper {
public List<SalesForecastProcess> selectSalesForecastProcessNew(SalesForecastProcess sfp);
public List<SampleApplicationProcess> selectSampleApplicationProcessNew(SampleApplicationProcess scp);
public List<SampleApplicationProcessMingXi> selectSampleApplicationProcessMingXiNew(SampleApplicationProcessMingXi scp);
public List<SalesInquiryProcess> selectSalesInquiryProcessNew(SalesInquiryProcess sip);
public List<SalesInquiryProcessMX> selectSalesInquiryProcessMXNew(SalesInquiryProcessMX sip);
public List<SampleClosingProcess> selectSampleClosingProcessNew(SampleClosingProcess scp);
public List<SampleClosingProcessMingXi> selectSampleClosingProcessMingXiNew(SampleClosingProcessMingXi scp);
......@@ -28,12 +34,14 @@ public interface CrmMapper {
public List<SalesContractProcess> selectSalesContractProcessCheckByUpdate(SalesContractProcess scp);
public List<SalesForecastProcess> selectSalesForecastProcessCheckByUpdate(SalesForecastProcess sfp);
public List<SampleApplicationProcess> selectSampleApplicationProcessCheckByUpdate(SampleApplicationProcess scp);
public List<SalesInquiryProcess> selectSalesInquiryProcessCheckByUpdate(SalesInquiryProcess sip);
public List<SampleClosingProcess> selectSampleClosingProcessCheckByUpdate(SampleClosingProcess scp);
// 按天更新的级联
public List<SalesContractProcessMX> cascadeSalesContractProcessMXBySalesContractProcess(SalesContractProcess source);
public List<SalesForecastProcessMX> cascadeSalesForecastProcessMXBySalesForecastProcess(SalesForecastProcess source);
public List<SampleApplicationProcessMingXi> cascadeSampleApplicationProcessMingXiBySampleApplicationProcess(SampleApplicationProcess source);
public List<SalesInquiryProcessMX> cascadeSalesInquiryProcessMXBySalesInquiryProcess(SalesInquiryProcess source);
public List<SampleClosingProcessMingXi> cascadeSampleClosingProcessMingXiBySampleClosingProcess(SampleClosingProcess source);
......@@ -48,7 +56,7 @@ public interface CrmMapper {
public SampleApplicationProcessMingXi selectSampleApplicationProcessMingXiById(SampleApplicationProcessMingXi target);
public SampleClosingProcess selectSampleClosingProcessById(SampleClosingProcess target);
public SampleClosingProcessMingXi selectSampleClosingProcessMingXiById(SampleClosingProcessMingXi target);
public SalesInquiryProcess selectSalesInquiryProcessById(SalesInquiryProcess target);
public SalesInquiryProcessMX selectSalesInquiryProcessMXById(SalesInquiryProcessMX target);
}
package com.huazheng.project.mssql.model;
import java.io.Serializable;
import java.util.Date;
import org.springframework.format.annotation.DateTimeFormat;
import com.alibaba.fastjson.annotation.JSONField;
import com.huazheng.project.mssql.model.SalesForecastProcess.SalesForecastProcessBuilder;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
/**
* 销售询报价流程
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
@Builder
public class SalesInquiryProcess implements Serializable {
private static final long serialVersionUID = 1L;
private String id; // 主键
private String sys_processname; // 流程名称
private Integer sys_incident; // 实例号
@DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss") // 页面写入数据库时格式化
@JSONField(format="yyyy-MM-dd HH:mm:ss") // 数据库导出页面时json格式化
private String sys_starttime; // 发起时间
@DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss") // 页面写入数据库时格式化
@JSONField(format="yyyy-MM-dd HH:mm:ss") // 数据库导出页面时json格式化
private String sys_updatetime; // 办结时间
private Integer sys_incstatus; // 状态
private String kehubm; // 客户编码
private String bibiev; // 货币类型
private boolean exist; // 用于标记,不是字段
private String hashResult; // 数据hash标记
private String rowNum; // 用于标记,不是字段
}
package com.huazheng.project.mssql.model;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
/**
* 销售询报价流程明细
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
@Builder
public class SalesInquiryProcessMX implements Serializable {
private static final long serialVersionUID = 1L;
private String id ; // 主键
private Integer sys_incident; // 实例号
private String wuliaoh; // 物料号
private String chicun; // 尺寸
private String chengben; // 成本
private String baojiamll; // 报价毛利率
private String biaozhunml; // 标准毛利率
private boolean exist; // 用于标记,不是字段
private String hashResult; // 数据hash标记
private String rowNum; // 用于标记,不是字段
}
drop table SalesInquiryProcess;
CREATE TABLE SalesInquiryProcess (
id text,
sys_processname text,
sys_incident integer,
sys_starttime timestamp,
sys_updatetime timestamp,
sys_incstatus integer,
kehubm text,
bibiev text,
hashResult text,
rowNum serial,
PRIMARY KEY (id)
)
Distributed by (id);
drop table SalesInquiryProcessMX;
CREATE TABLE SalesInquiryProcessMX (
id text,
sys_incident integer,
wuliaoh text,
chicun text,
chengben text,
baojiamll text,
biaozhunml text,
hashResult text,
rowNum serial,
PRIMARY KEY (id)
)
Distributed by (id);
......@@ -349,6 +349,48 @@
select max(rowNum) from SampleApplicationProcessMingXi
</select>
<select id="selectSalesInquiryProcess" parameterType="com.huazheng.project.mssql.model.SalesInquiryProcess" resultType="com.huazheng.project.mssql.model.SalesInquiryProcess">
select * from SalesInquiryProcess where id = #{id}
</select>
<insert id="insertSalesInquiryProcess" parameterType="com.huazheng.project.mssql.model.SalesInquiryProcess">
insert into SalesInquiryProcess (id, sys_processname, sys_incident, sys_starttime, sys_updatetime, sys_incstatus, kehubm,
bibiev, hashResult)
values(#{id}, #{sys_processname}, #{sys_incident}, #{sys_starttime}, #{sys_updatetime}, #{sys_incstatus}, #{kehubm},
#{bibiev},#{hashResult})
</insert>
<delete id="deleteSalesInquiryProcess" parameterType="com.huazheng.project.mssql.model.SalesInquiryProcess">
delete from SalesInquiryProcess where id = #{id}
</delete>
<update id="updateSalesInquiryProcess" parameterType="com.huazheng.project.mssql.model.SalesInquiryProcess">
update SalesInquiryProcess set
sys_processname = #{sys_processname}, sys_incident = #{sys_incident}, sys_starttime = #{sys_starttime}, sys_updatetime = #{sys_updatetime}, sys_incstatus = #{sys_incstatus},
kehubm = #{kehubm}, bibiev = #{bibiev}, hashResult = #{hashResult}
where id = #{id}
</update>
<select id="selectSalesInquiryProcessCheck" parameterType="com.huazheng.project.mssql.model.SalesInquiryProcess" resultType="com.huazheng.project.mssql.model.SalesInquiryProcess">
select * from SalesInquiryProcess where rownum &gt; #{rowNum} order by rownum limit 20
</select>
<select id="selectSalesInquiryProcessMX" parameterType="com.huazheng.project.mssql.model.SalesInquiryProcessMX" resultType="com.huazheng.project.mssql.model.SalesInquiryProcessMX">
select * from SalesInquiryProcessMX where id = #{id}
</select>
<insert id="insertSalesInquiryProcessMX" parameterType="com.huazheng.project.mssql.model.SalesInquiryProcessMX">
insert into SalesInquiryProcessMX (id,sys_incident,wuliaoh,chicun,chengben,baojiamll,biaozhunml,hashResult)
values(#{id},#{sys_incident},#{wuliaoh},#{chicun},#{chengben},#{baojiamll},#{biaozhunml},#{hashResult})
</insert>
<delete id="deleteSalesInquiryProcessMX" parameterType="com.huazheng.project.mssql.model.SalesInquiryProcessMX">
delete from SalesInquiryProcessMX where id = #{id}
</delete>
<update id="updateSalesInquiryProcessMX" parameterType="com.huazheng.project.mssql.model.SalesInquiryProcessMX">
update SalesInquiryProcessMX set
sys_incident=#{sys_incident},wuliaoh=#{wuliaoh},chicun=#{chicun},chengben=#{chengben},baojiamll=#{baojiamll},
biaozhunml=#{biaozhunml},hashResult=#{hashResult}
where id = #{id}
</update>
<select id="selectSalesInquiryProcessMXCheck" parameterType="com.huazheng.project.mssql.model.SalesInquiryProcessMX" resultType="com.huazheng.project.mssql.model.SalesInquiryProcessMX">
select * from SalesInquiryProcessMX where rownum &gt; #{rowNum} order by rownum limit 20
</select>
......
......@@ -32,6 +32,12 @@
from "Biz_样品申请流程"
where id &gt; #{id}
</select>
<select id="selectSalesInquiryProcessNew" parameterType="SalesInquiryProcess" resultType="SalesInquiryProcess">
select top 20
id,sys_processname,sys_incident,sys_starttime,sys_updatetime,sys_incstatus,kehubm,bibiev
from "Biz_销售询报价流程"
where id &gt; #{id}
</select>
<select id="selectSampleClosingProcessNew" parameterType="SampleClosingProcess" resultType="SampleClosingProcess">
select top 20
id,sYS_INCIDENT,yangPinSQSLH,sYS_INCSTATUS,sys_updatetime,sys_starttime, CASE
......@@ -48,6 +54,12 @@
from "Biz_样品申请流程_MingXi"
where id &gt; #{id}
</select>
<select id="selectSalesInquiryProcessMXNew" parameterType="SalesInquiryProcessMX" resultType="SalesInquiryProcessMX">
select top 20
id,sys_incident,wuliaoh,chicun,chengben,baojiamll, biaozhunml
from "Biz_销售询报价流程_MX"
where id &gt; #{id}
</select>
<select id="selectSampleClosingProcessMingXiNew" parameterType="SampleClosingProcessMingXi" resultType="SampleClosingProcessMingXi">
select top 20
id,sYS_INCIDENT,shiFouTGv,shiFouTGt
......@@ -101,6 +113,13 @@
where id &gt; #{id}
and sys_updatetime is not null and CONVERT(varchar(10),sys_updatetime,120) = CONVERT(varchar(10),GETDATE(),120)
</select>
<select id="selectSalesInquiryProcessCheckByUpdate" parameterType="SalesInquiryProcess" resultType="SalesInquiryProcess">
select top 20
id,sys_processname,sys_incident,sys_starttime,sys_updatetime,sys_incstatus,kehubm,bibiev
from "Biz_销售询报价流程"
where id &gt; #{id}
and sys_updatetime is not null and CONVERT(varchar(10),sys_updatetime,120) = CONVERT(varchar(10),GETDATE(),120)
</select>
<select id="selectSampleClosingProcessCheckByUpdate" parameterType="SampleClosingProcess" resultType="SampleClosingProcess">
select top 20
id,sYS_INCIDENT,yangPinSQSLH,sYS_INCSTATUS,sys_updatetime,sys_starttime, CASE
......@@ -131,6 +150,12 @@
from "Biz_样品申请流程_MingXi"
where SYS_INCIDENT = #{sYS_INCIDENT}
</select>
<select id="cascadeSalesInquiryProcessMXBySalesInquiryProcess" parameterType="SalesInquiryProcess" resultType="SalesInquiryProcessMX">
select top 20
id,sys_incident,wuliaoh,chicun,chengben,baojiamll, biaozhunml
from "Biz_销售询报价流程_MX"
where sys_incident = #{sys_incident}
</select>
<select id="cascadeSampleClosingProcessMingXiBySampleClosingProcess" parameterType="SampleClosingProcess" resultType="SampleClosingProcessMingXi">
select top 20
id,sYS_INCIDENT,shiFouTGv,shiFouTGt
......@@ -164,6 +189,11 @@
from "Biz_样品申请流程"
where id = #{id}
</select>
<select id="selectSalesInquiryProcessById" parameterType="SalesInquiryProcess" resultType="SalesInquiryProcess">
select id,sys_processname,sys_incident,sys_starttime,sys_updatetime,sys_incstatus,kehubm,bibiev
from "Biz_销售询报价流程"
where id = #{id}
</select>
<select id="selectSampleClosingProcessById" parameterType="SampleClosingProcess" resultType="SampleClosingProcess">
select id,sYS_INCIDENT,yangPinSQSLH,sYS_INCSTATUS,sys_updatetime,sys_starttime, CASE
WHEN yangPinSQSLH = '' THEN null
......@@ -178,6 +208,11 @@
from "Biz_样品申请流程_MingXi"
where id = #{id}
</select>
<select id="selectSalesInquiryProcessMXById" parameterType="SalesInquiryProcessMX" resultType="SalesInquiryProcessMX">
select id,sys_incident,wuliaoh,chicun,chengben,baojiamll, biaozhunml
from "Biz_销售询报价流程_MX"
where id = #{id}
</select>
<select id="selectSampleClosingProcessMingXiById" parameterType="SampleClosingProcessMingXi" resultType="SampleClosingProcessMingXi">
select id,sYS_INCIDENT,shiFouTGv,shiFouTGt
from "Biz_样品结案流程_MingXi"
......
......@@ -690,6 +690,24 @@
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="jobServiceImpl" />
<property name="targetMethod" value="selectSalesInquiryProcessNew" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="jobServiceImpl" />
<property name="targetMethod" value="selectSalesInquiryProcessMXNew" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="jobServiceImpl" />
<property name="targetMethod" value="selectSampleApplicationProcessNew" />
</bean>
</property>
......@@ -955,6 +973,15 @@
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkUpdateServiceImpl" />
<property name="targetMethod" value="selectSalesInquiryProcessCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkUpdateServiceImpl" />
<property name="targetMethod" value="selectSampleClosingProcessCheckUpdate" />
</bean>
</property>
......@@ -1249,6 +1276,15 @@
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectSalesInquiryProcessCheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectSampleClosingProcessCheckByDelete" />
</bean>
</property>
......@@ -1323,6 +1359,15 @@
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectSalesInquiryProcessMXCheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectSampleClosingProcessMingXiCheckByDelete" />
</bean>
</property>
......@@ -1386,7 +1431,7 @@
<property name="cronExpression" value="* * * * * ?" />
</bean>
<!-- 以上68个任务 -->
<!-- 以上73个任务 -->
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论