提交 374d340d 作者: guofeng

1

上级 3350aa6f
...@@ -47,6 +47,8 @@ import com.huazheng.project.mssql.model.SampleClosingProcess; ...@@ -47,6 +47,8 @@ import com.huazheng.project.mssql.model.SampleClosingProcess;
import com.huazheng.project.mssql.model.SampleClosingProcessMingXi; import com.huazheng.project.mssql.model.SampleClosingProcessMingXi;
import com.huazheng.project.mssql2.mapper.HzcrmMapper; import com.huazheng.project.mssql2.mapper.HzcrmMapper;
import com.huazheng.project.mssql2.model.SysSAPreturnNo; import com.huazheng.project.mssql2.model.SysSAPreturnNo;
import com.huazheng.project.mssql4.mapper.UltimusDBMapper;
import com.huazheng.project.mssql4.model.Tasks;
import com.huazheng.project.mysql.mapper.TmsMapper; import com.huazheng.project.mysql.mapper.TmsMapper;
import com.huazheng.project.mysql.model.Handover; import com.huazheng.project.mysql.model.Handover;
import com.huazheng.project.mysql.model.HandoverTask; import com.huazheng.project.mysql.model.HandoverTask;
...@@ -74,6 +76,8 @@ public class CheckDeleteServiceImpl { ...@@ -74,6 +76,8 @@ public class CheckDeleteServiceImpl {
private TmsMapper tmsMapper; private TmsMapper tmsMapper;
@Autowired @Autowired
private HzcrmMapper hzcrmMapper; private HzcrmMapper hzcrmMapper;
@Autowired
private UltimusDBMapper ultimusDBMapper;
public void checkJob7() { public void checkJob7() {
selectAufkCheckByDelete(); selectAufkCheckByDelete();
...@@ -1276,4 +1280,36 @@ public class CheckDeleteServiceImpl { ...@@ -1276,4 +1280,36 @@ public class CheckDeleteServiceImpl {
} }
} }
public void selectTasksCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Tasks:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Tasks:rowNum");
Tasks build = Tasks.builder().rowNum(rowNum).build();
List<Tasks> list = gpMapper.selectTasksCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkDelete:Tasks:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Tasks source = ultimusDBMapper.selectTasksById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteTasks(target); // 删除数仓中的数据
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:Tasks:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectTaskscheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Tasks:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
} }
...@@ -54,6 +54,8 @@ import com.huazheng.project.mssql.model.SampleClosingProcess; ...@@ -54,6 +54,8 @@ import com.huazheng.project.mssql.model.SampleClosingProcess;
import com.huazheng.project.mssql.model.SampleClosingProcessMingXi; import com.huazheng.project.mssql.model.SampleClosingProcessMingXi;
import com.huazheng.project.mssql2.mapper.HzcrmMapper; import com.huazheng.project.mssql2.mapper.HzcrmMapper;
import com.huazheng.project.mssql2.model.SysSAPreturnNo; import com.huazheng.project.mssql2.model.SysSAPreturnNo;
import com.huazheng.project.mssql4.mapper.UltimusDBMapper;
import com.huazheng.project.mssql4.model.Tasks;
import com.huazheng.project.mysql.mapper.TmsMapper; import com.huazheng.project.mysql.mapper.TmsMapper;
import com.huazheng.project.mysql.model.Handover; import com.huazheng.project.mysql.model.Handover;
import com.huazheng.project.mysql.model.HandoverTask; import com.huazheng.project.mysql.model.HandoverTask;
...@@ -86,6 +88,8 @@ public class CheckUpdateServiceImpl { ...@@ -86,6 +88,8 @@ public class CheckUpdateServiceImpl {
private TmsMapper tmsMapper; private TmsMapper tmsMapper;
@Autowired @Autowired
private HzcrmMapper hzcrmMapper; private HzcrmMapper hzcrmMapper;
@Autowired
private UltimusDBMapper ultimusDBMapper;
private ExecutorService service = Executors.newFixedThreadPool(15); private ExecutorService service = Executors.newFixedThreadPool(15);
...@@ -1982,4 +1986,55 @@ public class CheckUpdateServiceImpl { ...@@ -1982,4 +1986,55 @@ public class CheckUpdateServiceImpl {
} }
public void selectTasksCheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkUpdate:Tasks:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Tasks:rowids"));
Tasks build = Tasks.builder().rowids(rowids).build();
List<Tasks> slist = ultimusDBMapper.selectTasksCheckByUpdate(build); // 从数仓中查询一组数据
if (slist.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkUpdate:Tasks:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历要检查的数据
Tasks target = gpMapper.selectTasks(source); // 根据主键查询源库中的数据
String operator = "none";
Long srowids = source.getRowids();
if (target != null) {
source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
// ===============================
while (true) {
try {
gpMapper.updateTasks(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());
ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
} else { // 不能自增,在更新的时候写入数据
gpMapper.insertTasks(source);
}
redis1Template.opsForValue().set("huazheng:checkUpdate:Tasks:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectTaskscheckUpdate --> rowNum:%s, operator:%s", srowids.toString(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Tasks:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
} }
...@@ -42,8 +42,6 @@ import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZYeWuY; ...@@ -42,8 +42,6 @@ import com.huazheng.project.mssql2.model.TKeHuXiaoShouZZYeWuY;
import com.huazheng.project.mssql2.model.TWuLiaoDB; import com.huazheng.project.mssql2.model.TWuLiaoDB;
import com.huazheng.project.mssql3.mapper.Cinderellaw2Mapper; import com.huazheng.project.mssql3.mapper.Cinderellaw2Mapper;
import com.huazheng.project.mssql3.model.PersonComp; import com.huazheng.project.mssql3.model.PersonComp;
import com.huazheng.project.mssql4.mapper.UltimusDBMapper;
import com.huazheng.project.mssql4.model.Tasks;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
...@@ -68,8 +66,8 @@ public class DeleteUpdateJobServiceImpl { ...@@ -68,8 +66,8 @@ public class DeleteUpdateJobServiceImpl {
private GPMapper gpMapper; private GPMapper gpMapper;
@Autowired @Autowired
private HzcrmMapper hzcrmMapper; private HzcrmMapper hzcrmMapper;
@Autowired // @Autowired
private UltimusDBMapper ultimusDBMapper; // private UltimusDBMapper ultimusDBMapper;
@Autowired @Autowired
private Cinderellaw2Mapper cinderellaw2Mapper; private Cinderellaw2Mapper cinderellaw2Mapper;
@Autowired @Autowired
...@@ -792,48 +790,48 @@ public class DeleteUpdateJobServiceImpl { ...@@ -792,48 +790,48 @@ public class DeleteUpdateJobServiceImpl {
selectPersonCompCheck(); selectPersonCompCheck();
} }
//独立的 //独立的
public void selectTasksCheck() { // public void selectTasksCheck() {
try { // try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); // ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Tasks:rowNum", "0"); // opsForValue.setIfAbsent("huazheng:check:Tasks:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Tasks:rowNum"); // String rowNum = opsForValue.get("huazheng:check:Tasks:rowNum");
Tasks build = Tasks.builder().rowNum(rowNum).build(); // Tasks build = Tasks.builder().rowNum(rowNum).build();
List<Tasks> list = gpMapper.selectTasksCheck(build); // 从数仓中查询一组数据 // List<Tasks> list = gpMapper.selectTasksCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) { // if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:check:Tasks:rowNum", "0"); // 计数器复位 // redis1Template.opsForValue().set("huazheng:check:Tasks:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 // ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} // }
list.forEach(target -> { // 遍历要检查的数据 // list.forEach(target -> { // 遍历要检查的数据
Tasks source = ultimusDBMapper.selectTasksById(target); // 根据主键查询源库中的数据 // Tasks source = ultimusDBMapper.selectTasksById(target); // 根据主键查询源库中的数据
String operator = "none"; // String operator = "none";
if (source == null) { // 如果源库中没有数据 // if (source == null) { // 如果源库中没有数据
gpMapper.deleteTasks(target); // 删除数仓中的数据 // gpMapper.deleteTasks(target); // 删除数仓中的数据
operator = "delete"; // operator = "delete";
} else { // 源库中有数据 // } else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果 // String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果 // String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致 // if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash); // source.setHashResult(shash);
while (true) { // while (true) {
try { // try {
gpMapper.updateTasks(source); // 更新数据到数仓中 // gpMapper.updateTasks(source); // 更新数据到数仓中
break; // break;
} catch (RuntimeException e) { // } catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500); // log.error(e.getMessage());ThreadUtil.safeSleep(500);
} // }
} // }
ThreadUtil.safeSleep(500); // ThreadUtil.safeSleep(500);
} // }
} // }
redis1Template.opsForValue().set("huazheng:check:Tasks:rowNum", target.getRowNum()); // redis1Template.opsForValue().set("huazheng:check:Tasks:rowNum", target.getRowNum());
if (!operator.equals("none")) { // if (!operator.equals("none")) {
log.info(String.format("selectTasksCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator)); // log.info(String.format("selectTasksCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
} // }
}); // });
} catch (Exception e) { // } catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Tasks:rowNum", getErrorInfoFromException(e)); // redis1Template.opsForValue().set("huazheng:checkError:Tasks:rowNum", getErrorInfoFromException(e));
} // }
} // }
private void selectPersonCompCheck() { private void selectPersonCompCheck() {
try { try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue(); ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......
...@@ -9,5 +9,5 @@ public interface UltimusDBMapper { ...@@ -9,5 +9,5 @@ public interface UltimusDBMapper {
public List<Tasks> selectTasksNew(Tasks tasks); public List<Tasks> selectTasksNew(Tasks tasks);
public Tasks selectTasksById(Tasks target); public Tasks selectTasksById(Tasks target);
public List<Tasks> selectTasksCheckByUpdate(Tasks tasks);
} }
...@@ -17,4 +17,14 @@ ...@@ -17,4 +17,14 @@
from tasks where RTRIM(taskId) = #{taskId} from tasks where RTRIM(taskId) = #{taskId}
</select> </select>
<select id="selectTasksCheckByUpdate" parameterType="Tasks" resultType="Tasks">
select top 20 a.* from (
select ROW_NUMBER() OVER(ORDER BY taskid) as rowids,
RTRIM(taskId) as taskId, RTRIM(processName) as processName, incident, startTime, status,
SUBSTRING (taskUser,9,8) as taskUser
from tasks
where RTRIM(processName) = '样品结案流程' and LEN(taskUser) = 16
) a where a.rowids &gt; #{rowids}
</select>
</mapper> </mapper>
...@@ -801,7 +801,7 @@ ...@@ -801,7 +801,7 @@
</property> </property>
<property name="cronExpression" value="* * * * * ?" /> <property name="cronExpression" value="* * * * * ?" />
</bean> </bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"> <!-- <bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail"> <property name="jobDetail">
<bean parent="methodJobDetail"> <bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" /> <property name="targetObject" ref="deleteUpdateJobServiceImpl" />
...@@ -809,7 +809,7 @@ ...@@ -809,7 +809,7 @@
</bean> </bean>
</property> </property>
<property name="cronExpression" value="* * * * * ?" /> <property name="cronExpression" value="* * * * * ?" />
</bean> </bean> -->
<!-- ========================== 独立的任务 ========================== --> <!-- ========================== 独立的任务 ========================== -->
...@@ -1007,7 +1007,15 @@ ...@@ -1007,7 +1007,15 @@
</property> </property>
<property name="cronExpression" value="* * * * * ?" /> <property name="cronExpression" value="* * * * * ?" />
</bean> </bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkUpdateServiceImpl" />
<property name="targetMethod" value="selectTasksCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<!-- 删除流程 --> <!-- 删除流程 -->
...@@ -1367,7 +1375,18 @@ ...@@ -1367,7 +1375,18 @@
</property> </property>
<property name="cronExpression" value="* * * * * ?" /> <property name="cronExpression" value="* * * * * ?" />
</bean> </bean>
<!-- 以上66个任务 -->
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectTasksCheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<!-- 以上68个任务 -->
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论