提交 1b2bfb8a 作者: think

增加表TShuJuHZDDMX

上级 5094c1b3
...@@ -69,6 +69,7 @@ import com.huazheng.project.greenplum.source.mssql.SampleClosingProcessSource; ...@@ -69,6 +69,7 @@ import com.huazheng.project.greenplum.source.mssql.SampleClosingProcessSource;
import com.huazheng.project.greenplum.source.mssql2.SysSAPreturnNoSource; import com.huazheng.project.greenplum.source.mssql2.SysSAPreturnNoSource;
import com.huazheng.project.greenplum.source.mssql2.TKeHuSource; import com.huazheng.project.greenplum.source.mssql2.TKeHuSource;
import com.huazheng.project.greenplum.source.mssql2.TKeHuXiaoShouZZYeWuYSource; import com.huazheng.project.greenplum.source.mssql2.TKeHuXiaoShouZZYeWuYSource;
import com.huazheng.project.greenplum.source.mssql2.TShuJuHzddmxSource;
import com.huazheng.project.greenplum.source.mssql3.PersonCompSource; import com.huazheng.project.greenplum.source.mssql3.PersonCompSource;
import com.huazheng.project.greenplum.source.mssql4.TasksSource; import com.huazheng.project.greenplum.source.mssql4.TasksSource;
import com.huazheng.project.greenplum.source.mysql.HandoverSource; import com.huazheng.project.greenplum.source.mysql.HandoverSource;
...@@ -131,6 +132,7 @@ import com.huazheng.project.mssql2.model.SysSAPreturnNo; ...@@ -131,6 +132,7 @@ import com.huazheng.project.mssql2.model.SysSAPreturnNo;
import com.huazheng.project.mssql2.model.TKeHu; import com.huazheng.project.mssql2.model.TKeHu;
import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZ; import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZ;
import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZYeWuY; import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZYeWuY;
import com.huazheng.project.mssql2.model.TShuJuHzddmx;
import com.huazheng.project.mssql2.model.TWuLiaoDB; import com.huazheng.project.mssql2.model.TWuLiaoDB;
import com.huazheng.project.mssql3.model.PersonComp; import com.huazheng.project.mssql3.model.PersonComp;
import com.huazheng.project.mssql4.model.Tasks; import com.huazheng.project.mssql4.model.Tasks;
...@@ -248,6 +250,8 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del ...@@ -248,6 +250,8 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
private static SalesInquiryProcessSource salesInquiryProcessSource; private static SalesInquiryProcessSource salesInquiryProcessSource;
private static SalesInquiryProcessMXSource salesInquiryProcessMXSource; private static SalesInquiryProcessMXSource salesInquiryProcessMXSource;
private static TShuJuHzddmxSource tShuJuHzddmxSource;
private static TransformHistoryNodeSource transformHistoryNodeSource; private static TransformHistoryNodeSource transformHistoryNodeSource;
private static TransformNewNodeSource transformNewNodeSource; private static TransformNewNodeSource transformNewNodeSource;
...@@ -320,6 +324,8 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del ...@@ -320,6 +324,8 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
sampleClosingProcessSource = (SampleClosingProcessSource) context.getBean("sampleClosingProcessSource"); sampleClosingProcessSource = (SampleClosingProcessSource) context.getBean("sampleClosingProcessSource");
sampleClosingProcessMingXiSource = (SampleClosingProcessMingXiSource) context.getBean("sampleClosingProcessMingXiSource"); sampleClosingProcessMingXiSource = (SampleClosingProcessMingXiSource) context.getBean("sampleClosingProcessMingXiSource");
salesInquiryProcessSource = (SalesInquiryProcessSource) context.getBean("salesInquiryProcessSource"); salesInquiryProcessSource = (SalesInquiryProcessSource) context.getBean("salesInquiryProcessSource");
tShuJuHzddmxSource = (TShuJuHzddmxSource) context.getBean("TShuJuHzddmxSource");
salesInquiryProcessMXSource = (SalesInquiryProcessMXSource) context.getBean("salesInquiryProcessMXSource"); salesInquiryProcessMXSource = (SalesInquiryProcessMXSource) context.getBean("salesInquiryProcessMXSource");
transformHistoryNodeSource = (TransformHistoryNodeSource) context.getBean("transformHistoryNodeSource"); transformHistoryNodeSource = (TransformHistoryNodeSource) context.getBean("transformHistoryNodeSource");
...@@ -401,6 +407,19 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del ...@@ -401,6 +407,19 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
} }
}).setParallelism(1).name("拉取SampleApplicationProcess数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出SampleApplicationProcess数据"); }).setParallelism(1).name("拉取SampleApplicationProcess数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出SampleApplicationProcess数据");
// ================= TShuJuHzddmxSource 队列 =================
DataStream<String> salesShuJuHzddmxSourceRedis = env.addSource(tShuJuHzddmxSource).setParallelism(1).name("输入tShuJuHzddmxSource队列");
// 数据处理
salesShuJuHzddmxSourceRedis.flatMap(new FlatMapFunction<String, TShuJuHzddmx>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<TShuJuHzddmx> out) throws Exception {
gpserviceImpl.processTShuJuHzddmx(value, out);
}
}).setParallelism(1).name("拉取TShuJuHzddmx数据").addSink(greenPlumRichSinkFunction).setParallelism(1).name("输出TShuJuHzddmx数据");
// ================= SalesInquiryProcessSource 队列 ================= // ================= SalesInquiryProcessSource 队列 =================
DataStream<String> salesInquiryProcessSourceRedis = env.addSource(salesInquiryProcessSource).setParallelism(1).name("输入SalesInquiryProcessSource队列"); DataStream<String> salesInquiryProcessSourceRedis = env.addSource(salesInquiryProcessSource).setParallelism(1).name("输入SalesInquiryProcessSource队列");
......
...@@ -66,6 +66,7 @@ import com.huazheng.project.mssql2.model.SysSAPreturnNo; ...@@ -66,6 +66,7 @@ import com.huazheng.project.mssql2.model.SysSAPreturnNo;
import com.huazheng.project.mssql2.model.TKeHu; import com.huazheng.project.mssql2.model.TKeHu;
import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZ; import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZ;
import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZYeWuY; import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZYeWuY;
import com.huazheng.project.mssql2.model.TShuJuHzddmx;
import com.huazheng.project.mssql2.model.TWuLiaoDB; import com.huazheng.project.mssql2.model.TWuLiaoDB;
import com.huazheng.project.mssql3.model.PersonComp; import com.huazheng.project.mssql3.model.PersonComp;
import com.huazheng.project.mssql4.model.Tasks; import com.huazheng.project.mssql4.model.Tasks;
...@@ -696,6 +697,15 @@ public interface GPMapper { ...@@ -696,6 +697,15 @@ public interface GPMapper {
public List<SampleClosingProcessMingXi> selectSampleClosingProcessMingXiCheck(SampleClosingProcessMingXi build); public List<SampleClosingProcessMingXi> selectSampleClosingProcessMingXiCheck(SampleClosingProcessMingXi build);
public Long selectSampleClosingProcessMingXiMaxRowNum(); public Long selectSampleClosingProcessMingXiMaxRowNum();
@Cacheable(key = "#root.method.name+':'+#p0.id", unless="#result == null")
public TShuJuHzddmx selectTShuJuHzddmx(TShuJuHzddmx sfp); // 查询替代删除
public void insertTShuJuHzddmx(TShuJuHzddmx element);
@CacheEvict(key = "'selectTShuJuHzddmx'+':'+#p0.id")
public void updateTShuJuHzddmx(TShuJuHzddmx element); // 配合redis中的reset标签进行更新
@CacheEvict(key = "'selectTShuJuHzddmx'+':'+#p0.id")
public void deleteTShuJuHzddmx(TShuJuHzddmx item);
public List<TShuJuHzddmx> selectTShuJuHzddmxCheck(TShuJuHzddmx build);
......
...@@ -51,6 +51,7 @@ import com.huazheng.project.mssql.model.SampleClosingProcess; ...@@ -51,6 +51,7 @@ import com.huazheng.project.mssql.model.SampleClosingProcess;
import com.huazheng.project.mssql.model.SampleClosingProcessMingXi; import com.huazheng.project.mssql.model.SampleClosingProcessMingXi;
import com.huazheng.project.mssql2.mapper.HzcrmMapper; import com.huazheng.project.mssql2.mapper.HzcrmMapper;
import com.huazheng.project.mssql2.model.SysSAPreturnNo; import com.huazheng.project.mssql2.model.SysSAPreturnNo;
import com.huazheng.project.mssql2.model.TShuJuHzddmx;
import com.huazheng.project.mssql4.mapper.UltimusDBMapper; import com.huazheng.project.mssql4.mapper.UltimusDBMapper;
import com.huazheng.project.mssql4.model.Tasks; import com.huazheng.project.mssql4.model.Tasks;
import com.huazheng.project.mysql.mapper.TmsMapper; import com.huazheng.project.mysql.mapper.TmsMapper;
...@@ -108,6 +109,7 @@ public class CheckDeleteServiceImpl { ...@@ -108,6 +109,7 @@ public class CheckDeleteServiceImpl {
selectTransformHistoryNodeCheckByDelete(); selectTransformHistoryNodeCheckByDelete();
selectTransformNewNodeCheckByDelete(); selectTransformNewNodeCheckByDelete();
selectSalesForecastProcessMXCheckByDelete(); selectSalesForecastProcessMXCheckByDelete();
} }
public void selectAufkCheckByDelete() { public void selectAufkCheckByDelete() {
...@@ -989,6 +991,37 @@ public class CheckDeleteServiceImpl { ...@@ -989,6 +991,37 @@ public class CheckDeleteServiceImpl {
redis1Template.opsForValue().set("huazheng:checkDeleteError:SalesInquiryProcess:rowNum", SomeUtils.getErrorInfoFromException(e)); redis1Template.opsForValue().set("huazheng:checkDeleteError:SalesInquiryProcess:rowNum", SomeUtils.getErrorInfoFromException(e));
} }
} }
public void selectTShuJuHzddmxCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:TShuJuHzddmx:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:TShuJuHzddmx:rowNum");
TShuJuHzddmx build = TShuJuHzddmx.builder().rowNum(rowNum).build();
List<TShuJuHzddmx> list = gpMapper.selectTShuJuHzddmxCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkDelete:TShuJuHzddmx:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
TShuJuHzddmx source = hzcrmMapper.selectTShuJuHzddmxById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteTShuJuHzddmx(target); // 删除数仓中的数据
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:TShuJuHzddmx:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectTShuJuHzddmxcheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:TShuJuHzddmx:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectSampleClosingProcessCheckByDelete() { public void selectSampleClosingProcessCheckByDelete() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......
...@@ -59,6 +59,7 @@ import com.huazheng.project.mssql.model.SampleClosingProcess; ...@@ -59,6 +59,7 @@ import com.huazheng.project.mssql.model.SampleClosingProcess;
import com.huazheng.project.mssql.model.SampleClosingProcessMingXi; import com.huazheng.project.mssql.model.SampleClosingProcessMingXi;
import com.huazheng.project.mssql2.mapper.HzcrmMapper; import com.huazheng.project.mssql2.mapper.HzcrmMapper;
import com.huazheng.project.mssql2.model.SysSAPreturnNo; import com.huazheng.project.mssql2.model.SysSAPreturnNo;
import com.huazheng.project.mssql2.model.TShuJuHzddmx;
import com.huazheng.project.mssql4.mapper.UltimusDBMapper; import com.huazheng.project.mssql4.mapper.UltimusDBMapper;
import com.huazheng.project.mssql4.model.Tasks; import com.huazheng.project.mssql4.model.Tasks;
import com.huazheng.project.mysql.mapper.TmsMapper; import com.huazheng.project.mysql.mapper.TmsMapper;
...@@ -2102,6 +2103,53 @@ public class CheckUpdateServiceImpl { ...@@ -2102,6 +2103,53 @@ public class CheckUpdateServiceImpl {
} }
} }
public void selectTShuJuHzddmxCheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkUpdate:TShuJuHzddmx:rowids", "0");
String rowids = opsForValue.get("huazheng:checkUpdate:TShuJuHzddmx:rowids");
TShuJuHzddmx build = TShuJuHzddmx.builder().id(Integer.valueOf(rowids)).build();
List<TShuJuHzddmx> slist = hzcrmMapper.selectTShuJuHzddmxCheckByUpdate(build); // 从数仓中查询一组数据
if (slist.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkUpdate:TShuJuHzddmx:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历要检查的数据
TShuJuHzddmx target = gpMapper.selectTShuJuHzddmx(source); // 根据主键查询源库中的数据
String operator = "none";
Integer srowids = source.getId();
if (target != null) {
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
// ===============================
while (true) {
try {
gpMapper.updateTShuJuHzddmx(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:checkUpdate:TShuJuHzddmx:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectTShuJuHzddmxcheckUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:TShuJuHzddmx:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectSampleClosingProcessCheckUpdate() { public void selectSampleClosingProcessCheckUpdate() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......
...@@ -71,6 +71,7 @@ import com.huazheng.project.mssql2.model.SysSAPreturnNo; ...@@ -71,6 +71,7 @@ import com.huazheng.project.mssql2.model.SysSAPreturnNo;
import com.huazheng.project.mssql2.model.TKeHu; import com.huazheng.project.mssql2.model.TKeHu;
import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZ; import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZ;
import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZYeWuY; import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZYeWuY;
import com.huazheng.project.mssql2.model.TShuJuHzddmx;
import com.huazheng.project.mssql2.model.TWuLiaoDB; import com.huazheng.project.mssql2.model.TWuLiaoDB;
import com.huazheng.project.mssql3.model.PersonComp; import com.huazheng.project.mssql3.model.PersonComp;
import com.huazheng.project.mssql4.model.Tasks; import com.huazheng.project.mssql4.model.Tasks;
...@@ -419,6 +420,35 @@ public class GPServiceImpl { ...@@ -419,6 +420,35 @@ public class GPServiceImpl {
} }
} }
public void processTShuJuHzddmx(String value, Collector<TShuJuHzddmx> arg2) {
try {
TShuJuHzddmx data = JSONUtil.toBean(value, TShuJuHzddmx.class);
TShuJuHzddmx exist = gpMapper.selectTShuJuHzddmx(data);
if (exist != null) {
data.setExist(true); // 已经在库
}
arg2.collect(data);
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:TShuJuHzddmx:error", "processTShuJuHzddmx", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
public void sinkTShuJuHzddmx(TShuJuHzddmx element) {
try {
if (element.isExist() == false) {
log.debug("GPServiceImpl.sinkTShuJuHzddmx()");
gpMapper.insertTShuJuHzddmx(element);
}
} catch (RuntimeException e) {
redis1Template.opsForHash().put("huazheng:TShuJuHzddmx:error", "sinkTShuJuHzddmx", getErrorInfoFromException(e));
log.error(e.getMessage());
} catch (Exception e) {
redis1Template.opsForHash().put("huazheng:TShuJuHzddmx:error", "sinkTShuJuHzddmx", getErrorInfoFromException(e));
log.error(e.getMessage());
}
}
public void processSampleApplicationProcess(String value, Collector<SampleApplicationProcess> arg2) { public void processSampleApplicationProcess(String value, Collector<SampleApplicationProcess> arg2) {
try { try {
SampleApplicationProcess data = JSONUtil.toBean(value, SampleApplicationProcess.class); SampleApplicationProcess data = JSONUtil.toBean(value, SampleApplicationProcess.class);
......
...@@ -71,6 +71,7 @@ import com.huazheng.project.mssql2.mapper.HzcrmMapper; ...@@ -71,6 +71,7 @@ import com.huazheng.project.mssql2.mapper.HzcrmMapper;
import com.huazheng.project.mssql2.model.SysSAPreturnNo; import com.huazheng.project.mssql2.model.SysSAPreturnNo;
import com.huazheng.project.mssql2.model.TKeHu; import com.huazheng.project.mssql2.model.TKeHu;
import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZYeWuY; import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZYeWuY;
import com.huazheng.project.mssql2.model.TShuJuHzddmx;
import com.huazheng.project.mssql4.mapper.UltimusDBMapper; import com.huazheng.project.mssql4.mapper.UltimusDBMapper;
import com.huazheng.project.mssql4.model.Tasks; import com.huazheng.project.mssql4.model.Tasks;
import com.huazheng.project.mysql.mapper.TmsMapper; import com.huazheng.project.mysql.mapper.TmsMapper;
...@@ -1336,6 +1337,32 @@ public class JobServiceImpl { ...@@ -1336,6 +1337,32 @@ public class JobServiceImpl {
} }
} }
public void selectTShuJuHzddmxNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap4send.lua")));
List<String> keys = Arrays.asList("huazheng:TShuJuHzddmx:sendcount", "huazheng:TShuJuHzddmx:id", "huazheng:list:TShuJuHzddmx");
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:TShuJuHzddmx:sendcount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:TShuJuHzddmx:receivecount", "0"); // 不存在则创建,存在则么有操作
opsForValue.setIfAbsent("huazheng:TShuJuHzddmx:rowids", "0"); // 不存在则创建,存在则么有操作
Long sendcount = Long.valueOf(opsForValue.get("huazheng:TShuJuHzddmx:sendcount"));
Long receivecount = Long.valueOf(opsForValue.get("huazheng:TShuJuHzddmx:receivecount"));
if (sendcount - receivecount <= 20) { // 如果发送数和消费数的差小于5则往队列中写数据
String rowids = opsForValue.get("huazheng:TShuJuHzddmx:rowids"); // 标记id
TShuJuHzddmx scp = TShuJuHzddmx.builder().rowids(Long.valueOf(rowids)).build();
List<TShuJuHzddmx> list = hzcrmMapper.selectTShuJuHzddmxNew(scp);
if (!list.isEmpty()) {
list.forEach(item -> {
JSONObject json = JSONUtil.parseObj(item, false);
String execute = redis1Template.execute(script, keys, item.getRowids().toString(), json.toString());
log.info("标记时间回写 --> " + execute);
});
}
}
}
public void selectSalesInquiryProcessMXNew() { public void selectSalesInquiryProcessMXNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
......
package com.huazheng.project.greenplum.source.mssql2;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2;
@Log4j2
@Service
public class TShuJuHzddmxSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) {
try {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) {
return "bad getErrorInfoFromException";
}
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:TShuJuHzddmx:sendcount", "huazheng:TShuJuHzddmx:flagDate", "huazheng:list:TShuJuHzddmx", "huazheng:TShuJuHzddmx:receivecount");
while (true) {
try {
String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("==========");
String checkString = values[0];
String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check);
ctx.collect(values[1]);
} else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
} catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:TShuJuHzddmx:error", "receivecount_elseerror", getErrorInfoFromException(e));
}
}
}
@Override
public void cancel() {
}
}
...@@ -7,6 +7,7 @@ import com.huazheng.project.mssql2.model.SysSAPreturnNo; ...@@ -7,6 +7,7 @@ import com.huazheng.project.mssql2.model.SysSAPreturnNo;
import com.huazheng.project.mssql2.model.TKeHu; import com.huazheng.project.mssql2.model.TKeHu;
import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZ; import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZ;
import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZYeWuY; import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZYeWuY;
import com.huazheng.project.mssql2.model.TShuJuHzddmx;
import com.huazheng.project.mssql2.model.TWuLiaoDB; import com.huazheng.project.mssql2.model.TWuLiaoDB;
public interface HzcrmMapper { public interface HzcrmMapper {
...@@ -14,11 +15,15 @@ public interface HzcrmMapper { ...@@ -14,11 +15,15 @@ public interface HzcrmMapper {
public List<TKeHuXiaoShouZZYeWuY> selectTKeHuXiaoShouZZYeWuYNew(TKeHuXiaoShouZZYeWuY tkeHuXiaoShouZZYeWuY); public List<TKeHuXiaoShouZZYeWuY> selectTKeHuXiaoShouZZYeWuYNew(TKeHuXiaoShouZZYeWuY tkeHuXiaoShouZZYeWuY);
public List<TKeHu> selectTKeHuNew(TKeHu tkeHu); public List<TKeHu> selectTKeHuNew(TKeHu tkeHu);
public List<SysSAPreturnNo> selectSysSAPreturnNoNew(SysSAPreturnNo sysSAPreturnNo); public List<SysSAPreturnNo> selectSysSAPreturnNoNew(SysSAPreturnNo sysSAPreturnNo);
public List<TShuJuHzddmx> selectTShuJuHzddmxNew(TShuJuHzddmx tShuJuHzddmx);
public List<TKeHuXiaoShouZZ> selectTKeHuXiaoShouZZ(TKeHu tkeHu); public List<TKeHuXiaoShouZZ> selectTKeHuXiaoShouZZ(TKeHu tkeHu);
public List<TKeHuXiaoShouZZYeWuY> selectTKeHuXiaoShouZZYeWuY(TKeHu tkeHu); public List<TKeHuXiaoShouZZYeWuY> selectTKeHuXiaoShouZZYeWuY(TKeHu tkeHu);
public List<TWuLiaoDB> selectTWuLiaoDB(TKeHu tkeHu); public List<TWuLiaoDB> selectTWuLiaoDB(TKeHu tkeHu);
public List<TShuJuHzddmx> selectTShuJuHzddmxCheckByUpdate(TShuJuHzddmx sip);
public TShuJuHzddmx selectTShuJuHzddmxById(TShuJuHzddmx target);
public SysSAPreturnNo selectSysSAPreturnNoById(SysSAPreturnNo target); public SysSAPreturnNo selectSysSAPreturnNoById(SysSAPreturnNo target);
public TKeHu selectTKeHuById(TKeHu target); public TKeHu selectTKeHuById(TKeHu target);
public TKeHuXiaoShouZZ selectTKeHuXiaoShouZZById(TKeHuXiaoShouZZ target); public TKeHuXiaoShouZZ selectTKeHuXiaoShouZZById(TKeHuXiaoShouZZ target);
......
package com.huazheng.project.mssql2.model;
import java.io.Serializable;
import org.springframework.format.annotation.DateTimeFormat;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
/**
* 销售订单主表
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
@Builder
public class TShuJuHzddmx implements Serializable {
private static final long serialVersionUID = 1L;
private Integer id; // 主键
private String DingDan; // 销售订单
private String HangXiangM; // 行项目
private String DanJia; // 单价
@DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss") // 页面写入数据库时格式化
@JSONField(format="yyyy-MM-dd HH:mm:ss") // 数据库导出页面时json格式化
private String ChuangJianSJ; // 创建时间
@DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss") // 页面写入数据库时格式化
@JSONField(format="yyyy-MM-dd HH:mm:ss") // 数据库导出页面时json格式化
private String GengXinSJ; // 更新时间
@DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss") // 页面写入数据库时格式化
@JSONField(format="yyyy-MM-dd HH:mm:ss") // 数据库导出页面时json格式化
private String ShanChuSJ; // 删除时间
private Long rowids; // sap那边的rowid
private boolean exist; // 用于标记,不是字段
private String hashResult; // 数据hash标记
private String rowNum; // 用于标记,不是字段
}
drop table TShuJuHZDDMX;
CREATE TABLE TShuJuHZDDMX (
ID integer,
DingDan text,
HangXiangM text,
DanJia text,
ChuangJianSJ timestamp,
GengXinSJ timestamp,
ShanChuSJ timestamp,
PRIMARY KEY (id)
)
Distributed by (id);
alter table TShuJuHZDDMX add column hashResult text;
alter table TShuJuHZDDMX add column rowNum serial;
...@@ -418,6 +418,26 @@ ...@@ -418,6 +418,26 @@
select max(rowNum) from SampleClosingProcess select max(rowNum) from SampleClosingProcess
</select> </select>
<select id="selectTShuJuHzddmx" parameterType="com.huazheng.project.mssql2.model.TShuJuHzddmx" resultType="com.huazheng.project.mssql2.model.TShuJuHzddmx">
select * from TShuJuHzddmx where id = #{id}
</select>
<insert id="insertTShuJuHzddmx" parameterType="com.huazheng.project.mssql2.model.TShuJuHzddmx">
insert into TShuJuHzddmx (id,DingDan,HangXiangM,DanJia,ChuangJianSJ,GengXinSJ,ShanChuSJ,hashResult)
values(#{id},#{DingDan},#{HangXiangM},#{DanJia},#{ChuangJianSJ},#{GengXinSJ},#{ShanChuSJ},#{hashResult})
</insert>
<delete id="deleteTShuJuHzddmx" parameterType="com.huazheng.project.mssql2.model.TShuJuHzddmx">
delete from TShuJuHzddmx where id = #{id}
</delete>
<update id="updateTShuJuHzddmx" parameterType="com.huazheng.project.mssql2.model.TShuJuHzddmx">
update TShuJuHzddmx set
DingDan=#{DingDan},HangXiangM=#{HangXiangM},DanJia=#{DanJia},
ChuangJianSJ=#{ChuangJianSJ},GengXinSJ=#{GengXinSJ},ShanChuSJ=#{ShanChuSJ},hashResult=#{hashResult}
where id = #{id}
</update>
<select id="selectTShuJuHzddmxCheck" parameterType="com.huazheng.project.mssql2.model.TShuJuHzddmx" resultType="com.huazheng.project.mssql2.model.TShuJuHzddmx">
select * from TShuJuHzddmx where rownum &gt; #{rowNum} order by rownum limit 20
</select>
......
...@@ -24,6 +24,24 @@ ...@@ -24,6 +24,24 @@
) a where a.rowids &gt; #{rowids} ) a where a.rowids &gt; #{rowids}
</select> </select>
<select id="selectTShuJuHzddmxNew" parameterType="TShuJuHzddmx" resultType="TShuJuHzddmx">
select top 20 a.* from (
select id as rowids,
id,DingDan,HangXiangM,DanJia,ChuangJianSJ,GengXinSJ,ShanChuSJ
from T_ShuJuHZDDMX
) a where a.rowids &gt; #{rowids}
</select>
<select id="selectTShuJuHzddmxCheckByUpdate" parameterType="TShuJuHzddmx" resultType="TShuJuHzddmx">
select top 20
id,DingDan,HangXiangM,DanJia,ChuangJianSJ,GengXinSJ,ShanChuSJ
from T_ShuJuHZDDMX
where id &gt; #{id}
and GengXinSJ is not null and CONVERT(varchar(10),GengXinSJ,120) = CONVERT(varchar(10),GETDATE(),120)
</select>
<select id="selectTShuJuHzddmxById" parameterType="TShuJuHzddmx" resultType="TShuJuHzddmx">
select id,DingDan,HangXiangM,DanJia,ChuangJianSJ,GengXinSJ,ShanChuSJ from T_ShuJuHZDDMX
</select>
......
...@@ -591,6 +591,17 @@ ...@@ -591,6 +591,17 @@
</property> </property>
<property name="cronExpression" value="* * * * * ?" /> <property name="cronExpression" value="* * * * * ?" />
</bean> </bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="jobServiceImpl" />
<property name="targetMethod" value="selectTShuJuHzddmxNew" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"> <bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail"> <property name="jobDetail">
<bean parent="methodJobDetail"> <bean parent="methodJobDetail">
...@@ -1056,6 +1067,16 @@ ...@@ -1056,6 +1067,16 @@
<property name="jobDetail"> <property name="jobDetail">
<bean parent="methodJobDetail"> <bean parent="methodJobDetail">
<property name="targetObject" ref="checkUpdateServiceImpl" /> <property name="targetObject" ref="checkUpdateServiceImpl" />
<property name="targetMethod" value="selectTShuJuHzddmxCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkUpdateServiceImpl" />
<property name="targetMethod" value="selectTasksCheckUpdate" /> <property name="targetMethod" value="selectTasksCheckUpdate" />
</bean> </bean>
</property> </property>
...@@ -1426,6 +1447,16 @@ ...@@ -1426,6 +1447,16 @@
<property name="jobDetail"> <property name="jobDetail">
<bean parent="methodJobDetail"> <bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" /> <property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectTShuJuHzddmxCheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectVbepCheckByDelete" /> <property name="targetMethod" value="selectVbepCheckByDelete" />
</bean> </bean>
</property> </property>
...@@ -1469,7 +1500,7 @@ ...@@ -1469,7 +1500,7 @@
<property name="cronExpression" value="* * * * * ?" /> <property name="cronExpression" value="* * * * * ?" />
</bean> </bean>
<!-- 以上77个任务 --> <!-- 以上80个任务 -->
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论