提交 129373b1 作者: guofeng

历史数据

上级 a7d7ab44
......@@ -145,6 +145,12 @@
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
<build>
......
package com.huazheng.project.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.log4j.Log4j2;
@Log4j2
@Configuration
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.executor.appname}")
private String appName;
@Value("${xxl.job.executor.port}")
private int port;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
log.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appName);
xxlJobSpringExecutor.setPort(port);
return xxlJobSpringExecutor;
}
}
......@@ -3,5 +3,5 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.huazheng.project.greenplum.model.GpTest1;
public interface GpTest1Mapper extends BaseMapper<GpTest1> {
}
\ No newline at end of file
}
......@@ -4,12 +4,10 @@ import java.util.List;
import org.apache.ibatis.annotations.Param;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.toolkit.Constants;
import com.huazheng.project.hana.model.HanaTest1;
public interface HanaMapper {
List<HanaTest1> selectHanaTest1List(@Param(Constants.WRAPPER) Wrapper<HanaTest1> queryWrapper);
List<HanaTest1> selectHanaTest1List(@Param("rowids") Long rowids);
}
......@@ -45,6 +45,8 @@ public class HanaTest1 extends Model<HanaTest1> {
@TableField("times")
private Date times;
private Long rowids; // sap那边的rowid
@Override
protected Serializable pkVal() {
return this.id;
......
package com.huazheng.project.service.impl;
import java.util.List;
import javax.annotation.Resource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import com.huazheng.project.greenplum.mapper.GpTest1Mapper;
import com.huazheng.project.greenplum.model.GpTest1;
import com.huazheng.project.hana.mapper.HanaMapper;
import com.huazheng.project.hana.model.HanaTest1;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
......@@ -21,6 +33,15 @@ public class GpTest1ServiceImpl {
@Autowired
private GpTest1Mapper mapper;
@Autowired
private HanaMapper hanaMapper;
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private RedisTemplate<String, String> redis1Template;
@KafkaListener(topics = "test_topic_2")
public void hanaData(ConsumerRecord<?, ?> record) {
Object value = record.value();
......@@ -50,4 +71,37 @@ public class GpTest1ServiceImpl {
}
}
// 方式2,调度器方式,读取rowids的方式抓取历史数据,丢入kafka
@XxlJob("hanaFullData")
public ReturnT<String> hanaFullData(String param) {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:full:HanaTest1:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:full:HanaTest1:rowids"));
List<HanaTest1> list = hanaMapper.selectHanaTest1List(rowids);
if (list.size() == 0) {
// TODO 此处改为停止任务
redis1Template.opsForValue().set("huazheng:full:HanaTest1:rowids", "0"); // 计数器复位
}
for (HanaTest1 object : list) {
String times = DateUtil.format(object.getTimes(), "yyyy-MM-dd HH:mm:ss");
StringBuffer sb = new StringBuffer();
sb.append("\"" + object.getId() + "\",");
sb.append("\"" + object.getPswd() + "\",");
sb.append("\"" + times + "\"");
kafkaTemplate.send("test_topic_2_merge", sb.toString());
log.info("hana merge --> " + sb.toString());
redis1Template.opsForValue().set("huazheng:full:HanaTest1:rowids", object.getRowids().toString());
}
// XxlJobLogger.log("");
return ReturnT.SUCCESS;
}
// 用于测试,发布前注释掉
// @Scheduled(cron = "* * * * * ?")
// public void hanaFullDataTest() {
// hanaFullData(null);
// }
}
{"properties": [{
"name": "druid",
"type": "java.util.Map",
"description": "A description for 'feign'"
},{
"name": "xxl",
"type": "java.util.Map",
"description": "A description for 'feign'"
},{
"name": "spring.datasource.hana",
"type": "java.util.Map",
"description": "A description for 'feign'"
},{
"name": "spring.datasource.mssql",
"type": "java.util.Map",
"description": "A description for 'feign'"
},{
"name": "spring.datasource.mysql",
"type": "java.util.Map",
"description": "A description for 'feign'"
},{
"name": "spring.datasource.greenplum",
"type": "java.util.Map",
"description": "A description for 'feign'"
}]}
\ No newline at end of file
#生产
xxl:
job:
admin:
addresses: http://192.168.80.52:8080/xxl-job-admin
executor:
appname: ${spring.application.name}
port: 31025
spring:
application:
name: cloud-data-center
name: huazheng-project-quartz
redis:
host: 192.168.80.55
database: 1
......
......@@ -3,7 +3,9 @@
<mapper namespace="com.huazheng.project.hana.mapper.HanaMapper">
<select id="selectHanaTest1List" resultType="HanaTest1">
select * from test1 ${ew.customSqlSegment}
select top 20 "$rowid$" as rowids, * from test1
where "$rowid$" &gt; #{rowids}
order by "$rowid$"
</select>
</mapper>
......@@ -7,7 +7,15 @@ public class Test {
public static void main(String[] args) {
JSONObject object = JSONUtil.parseObj(new HanaTest1(), false);
System.out.println(object);
String str = "";
str += "CREATE TRIGGER %s_TRIGGER_VAR_DELETE AFTER DELETE ON %s REFERENCING OLD ROW myoldrow BEGIN INSERT INTO ";
str += "%s(ID,\"TYPE\",\"BEFORE\") VALUES(%s_log_SEQ.NEXTVAL,'d','%s'); END;";
String format = String.format(str, "TEST2","TEST2","TEST2_log","TEST1",object.toString());
System.out.println(format);
}
// CREATE TRIGGER TEST2_TRIGGER_VAR_DELETE AFTER DELETE ON test2 REFERENCING OLD ROW myoldrow BEGIN INSERT INTO test2_log(ID,"TYPE","BEFORE") VALUES(test2_log_SEQ.NEXTVAL,'d','{"id":'||:myoldrow.id||',"user":"'||:myoldrow.user||'","pswd":"'||:myoldrow.pswd||'","times":"'||:myoldrow.times||'"}'); END;
// CREATE TRIGGER TEST2_TRIGGER_VAR_DELETE AFTER DELETE ON TEST2 REFERENCING OLD ROW myoldrow BEGIN INSERT INTO TEST2(ID,"TYPE","BEFORE") VALUES(TEST1_log_SEQ.NEXTVAL,'d','{"rowids":null,"pswd":null,"times":null,"id":null,"user":null}'); END;
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论