提交 4a02f8b8 作者: guofeng

1、Mseg表追加字段:KDAUF、kdpos、MATNR 2、设计Mseg、Mkpf、Aufm、Afru表的一次性任务

上级 38058400
......@@ -322,7 +322,6 @@ redis-cli -n 1 --raw keys "huazheng*ikp*" | xargs redis-cli -n 1 del
}
public static void main(String[] args) throws Exception {
// 初始化运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 10l));
......
......@@ -111,24 +111,32 @@ public interface GPMapper {
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.rueck+','+#p0.rmzhl", unless="#result == null")
public Afru selectAfru(Afru afru); // 查询替代删除
public void insertAfru(Afru element);
@CacheEvict(key = "'selectAfru'+':'+#p0.mandt+','+#p0.rueck+','+#p0.rmzhl")
public void deleteAfru(Afru item);
@CacheEvict(key = "'selectAfru'+':'+#p0.mandt+','+#p0.rueck+','+#p0.rmzhl")
public void updateAfru(Afru element);
public List<Afru> selectAfruCheck(Afru build);
public Long selectAfruMaxRowNum();
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.mblnr+','+#p0.mjahr+','+#p0.zeile", unless="#result == null")
public Mseg selectMseg(Mseg mseg); // 查询替代删除
public void insertMseg(Mseg element);
// @CacheEvict(key = "'selectMseg'+':'+#p0.mandt+','+#p0.mblnr+','+#p0.mjahr+','+#p0.zeile")
// public void deleteMseg(Mseg item);
// @CacheEvict(key = "'selectMseg'+':'+#p0.mandt+','+#p0.mblnr+','+#p0.mjahr+','+#p0.zeile")
// public void updateMseg(Mseg element);
// public List<Mseg> selectMsegCheck(Mseg build);
@CacheEvict(key = "'selectMseg'+':'+#p0.mandt+','+#p0.mblnr+','+#p0.mjahr+','+#p0.zeile")
public void deleteMseg(Mseg item);
@CacheEvict(key = "'selectMseg'+':'+#p0.mandt+','+#p0.mblnr+','+#p0.mjahr+','+#p0.zeile")
public void updateMseg(Mseg element);
public List<Mseg> selectMsegCheck(Mseg build);
public Long selectMsegMaxRowNum();
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.mblnr+','+#p0.mjahr", unless="#result == null")
public Mkpf selectMkpf(Mkpf mkpf); // 查询替代删除
public void insertMkpf(Mkpf element);
// @CacheEvict(key = "'selectMkpf'+':'+#p0.mandt+','+#p0.mblnr+','+#p0.mjahr")
// public void deleteMkpf(Mkpf item);
// @CacheEvict(key = "'selectMkpf'+':'+#p0.mandt+','+#p0.mblnr+','+#p0.mjahr")
// public void updateMkpf(Mkpf element);
// public List<Mkpf> selectMkpfCheck(Mkpf build);
@CacheEvict(key = "'selectMkpf'+':'+#p0.mandt+','+#p0.mblnr+','+#p0.mjahr")
public void deleteMkpf(Mkpf item);
@CacheEvict(key = "'selectMkpf'+':'+#p0.mandt+','+#p0.mblnr+','+#p0.mjahr")
public void updateMkpf(Mkpf element);
public List<Mkpf> selectMkpfCheck(Mkpf build);
public Long selectMkpfMaxRowNum();
@Cacheable(key = "#root.method.name+':'+#p0.mandt+','+#p0.aufnr", unless="#result == null")
public Aufk selectAufk(Aufk aufk); // 查询替代删除
......@@ -160,6 +168,12 @@ public interface GPMapper {
@Cacheable(key = "#root.method.name+':'+#p0.mblnr+','+#p0.mandt+','+#p0.mjahr+','+#p0.zeile", unless="#result == null")
public Aufm selectAufm(Aufm aufm); // 查询替代删除
public void insertAufm(Aufm element);
@CacheEvict(key = "'selectAufm'+':'+#p0.mblnr+','+#p0.mandt+','+#p0.mjahr+','+#p0.zeile")
public void deleteAufm(Aufm aufm);
@CacheEvict(key = "'selectAufm'+':'+#p0.mblnr+','+#p0.mandt+','+#p0.mjahr+','+#p0.zeile")
public void updateAufm(Aufm aufm);
public List<Aufm> selectAufmCheck(Aufm build);
public Long selectAufmMaxRowNum();
@Cacheable(key = "#root.method.name+':'+#p0.kunnr+','+#p0.mandt", unless="#result == null")
public Kna1 selectKna1(Kna1 kna1); // 查询替代删除
......@@ -719,4 +733,5 @@ public interface GPMapper {
public void updateCAVThisMonth(CAVThisMonth item); // 更新客户每月回款结果
}
......@@ -156,4 +156,9 @@ public interface SapMapper {
public Vbap selectVbapById(Vbap target);
public Tvkbt selectTvkbtById(Tvkbt target);
public Mseg selectMsegById(Mseg target);
public Mkpf selectMkpfById(Mkpf target);
public Afru selectAfruById(Afru target);
public Aufm selectAufmById(Aufm target);
}
......@@ -41,6 +41,9 @@ public class Mseg implements Serializable {
private String cputm_mkpf; // 输入时间
private String aufnr; // 订单号
private String kdauf; // 销售订单数
private String kdpos; // 销售订单中的项目编号
private String matnr; // 物料号
private Long rowids; // sap那边的rowid
private boolean exist; // 用于标记,不是字段
......
......@@ -2,22 +2,10 @@ select count(1) from sapabap1.kna1 where updat = '00000000' union all
select count(1) from sapabap1.kna1 where updat != '00000000' union all
select count(1) from sapabap1.kna1 where updat is null;
select count(1) from sapabap1.vbap where mandt = '800';
select "$rowid$",* from sapabap1.vbap where mandt = '800' and vbeln = '0010074232' and posnr = '000130';
select top 20 "$rowid$" as rowids,
p.vbeln, p.posnr, p.matnr, p.mandt, p.matkl, p.spart, p.kwmeng, p.netwr, p.mwsbp, p.waerk, p.vrkme, p.abgru,
p.uebto, p.untto, p.werks, p.vstel, p.mvgr1, p.plavo, p.mvgr4, p.mvgr5, p.ulxh, p.zma_ftb_02, p.zma_ftb_03,
p.zma_ftb_45, p.zma_ftb_06, p.zma_ftb_07, p.zma_ftb_08, p.zma_jyb_02, p.zma_jyb_03, p.zma_jyb_45,
p.zma_jyb_06, p.zma_jyb_07, p.zma_ljb_03, p.zma_ljb_04, p.zma_ljb_06, p.zma_ljb_07, p.zma_ljb_08,
p.zma_ljb_11, p.zma_ljb_12, p.zma_ljb_13, p.zma_yhbg_02, p.zma_yhbg_04, p.zma_yhbg_05, p.zma_yhbg_06,
p.tbpz, p.ws, p.cu_maktx, p.cu_pcdst, p.ztype, p.barcode, p.gdno, p.lh, p.dgxx, p.zklx, p.erdat, p.erzet, p.netpr, p.aedat,q.pernr
from sapabap1.vbap p
left join (
select a.vbeln,a.posnr,a.pernr from sapabap1.vbpa a
left join sapabap1.vbak b on a.vbeln = b.vbeln
where a.mandt = '800' and a.pernr != '00000000' and a.parvw = 'VE'
group by a.vbeln,a.posnr,a.pernr
) q on p.vbeln = q.vbeln and p.posnr = q.posnr
where p."$rowid$" > 10 and p.mandt = '800'
order by p."$rowid$"
mandt, pwerk, aufnr, reason
from sapabap1.Zpo_edit
where "$rowid$" > '20593' and mandt = '800'
order by "$rowid$";
select count(1) from sapabap1.Zpo_edit where mandt = '800';
现在的情况:
现在的情况:
现在有65个表,有三种类型的表任务
1、不需要更新的表(有两种任务):新增任务、加字段任务(一次性全量更新任务)
2、增量更新的表(有四种任务):新增任务、增量更新任务、加字段任务(一次性全量更新任务)、删除任务
3、全更新的表(有三种任务):新增任务、全量更新任务、删除任务
一次性全量更新任务,目前还没有开发
所以现在的问题是
1、目前的程序运行起来有350个线程左右,如果再加上新开发的 一次性全量更新任务,那么线程数超过400,相对的数据库连接数基本要到200。
2、现在程序运行时线程数很高,所在在结构上需要进行拆分,目前考虑按照 :新增任务、更新任务、删除任务、一次性更新任务,拆分4个项目工程
3、因为更新数据机制是轮询,所以数据库处于高并发的读写状态,这种压力下我们现在的数据库是1个主节点2个数据节点,负载都在50%左右。
4、现在的情况并没有算上前端帆软的查询,处理数据的并发这么高,一定影响到查询的。查询很慢,并且后端处理数据的时候已经出现很多慢查询了,都是10多秒以上的。
后续必要的调研和处理
1、外部表,调研过了,基本不行,链接过来表的数据需要二次解析
2、sap的kafka连接器,可以实现sap中新增的数据转发到kafka,我们的程序监听kafka就可以处理新增的数据,这样新增任务的轮询就不需要了。这个技术需要调研。
3、数据库扩容,解决负载问题,目前只有1个主节点2个数据节点负载50%。需要试验环境,5个ip,规划1个主节点,4个数据节点,每个数据节点50个segment。
第1点,已经调研过了
第2点,继续调研
第3点,资源需要申请
......@@ -16,3 +16,7 @@ PRIMARY KEY (mandt,mblnr,mjahr,zeile)
Distributed by (mandt,mblnr,mjahr,zeile);
alter table mseg add column aufnr text;
alter table mseg add column kdauf text;
alter table mseg add column kdpos text;
alter table mseg add column matnr text;
......@@ -541,7 +541,24 @@
values(#{mblnr}, #{mjahr}, #{zeile}, #{bldat}, #{budat}, #{bwart}, #{matnr}, #{werks}, #{lgort},
#{charg}, #{sobkz}, #{lifnr}, #{kdauf}, #{kdpos}, #{shkzg}, #{menge}, #{bwtar}, #{aufnr}, #{bldat1}, #{budat1}, #{mandt}, #{hashResult})
</insert>
<delete id="deleteAufm" parameterType="com.huazheng.project.hana.model.Aufm">
delete from aufm where mblnr = #{mblnr} and mandt = #{mandt} and mjahr = #{mjahr} and zeile = #{zeile}
</delete>
<update id="updateAufm" parameterType="com.huazheng.project.hana.model.Aufm">
update Aufm set
mblnr = #{mblnr}, mjahr = #{mjahr}, zeile = #{zeile}, bldat = #{bldat}, budat = #{budat}, bwart = #{bwart}, matnr = #{matnr},
werks = #{werks}, lgort = #{lgort}, charg = #{charg}, sobkz = #{sobkz}, lifnr = #{lifnr}, kdauf = #{kdauf}, kdpos = #{kdpos},
shkzg = #{shkzg}, menge = #{menge}, bwtar = #{bwtar}, aufnr = #{aufnr}, bldat1 = #{bldat1}, budat1 = #{budat1}, mandt = #{mandt},
hashResult = #{hashResult}
where mblnr = #{mblnr} and mandt = #{mandt} and mjahr = #{mjahr} and zeile = #{zeile}
</update>
<select id="selectAufmCheck" parameterType="com.huazheng.project.hana.model.Aufm" resultType="com.huazheng.project.hana.model.Aufm">
select * from Aufm where rownum &gt; #{rowNum} order by rownum limit 20
</select>
<select id="selectAufmMaxRowNum" resultType="long">
select max(rowNum) from Aufm
</select>
<select id="selectKna1" parameterType="com.huazheng.project.hana.model.Kna1" resultType="com.huazheng.project.hana.model.Kna1">
select * from kna1 where kunnr = #{kunnr} and mandt = #{mandt}
</select>
......@@ -663,7 +680,21 @@
insert into Afru (mandt, rueck, rmzhl, ersda, wablnr, aufnr, stokz, stzhl, hashResult)
values(#{mandt},#{rueck},#{rmzhl},#{ersda},#{wablnr},#{aufnr},#{stokz},#{stzhl},#{hashResult})
</insert>
<delete id="deleteAfru" parameterType="com.huazheng.project.hana.model.Afru">
delete from Afru where mandt = #{mandt} and rueck = #{rueck} and rmzhl = #{rmzhl}
</delete>
<update id="updateAfru" parameterType="com.huazheng.project.hana.model.Afru">
update Afru set
mandt = #{mandt}, rueck = #{rueck}, rmzhl = #{rmzhl}, ersda = #{ersda}, wablnr = #{wablnr}, aufnr = #{aufnr}, stokz = #{stokz}, stzhl = #{stzhl}, hashResult = #{hashResult}
where mandt = #{mandt} and rueck = #{rueck} and rmzhl = #{rmzhl}
</update>
<select id="selectAfruCheck" parameterType="com.huazheng.project.hana.model.Afru" resultType="com.huazheng.project.hana.model.Afru">
select * from Afru where rownum &gt; #{rowNum} order by rownum limit 20
</select>
<select id="selectAfruMaxRowNum" resultType="long">
select max(rowNum) from Afru
</select>
<select id="selectTspat" parameterType="com.huazheng.project.hana.model.Tspat" resultType="com.huazheng.project.hana.model.Tspat">
select * from Tspat where mandt = #{mandt} and spras = #{spras} and spart = #{spart}
</select>
......@@ -707,20 +738,24 @@
select * from Mseg where mandt = #{mandt} and mblnr = #{mblnr} and mjahr = #{mjahr} and zeile = #{zeile}
</select>
<insert id="insertMseg" parameterType="com.huazheng.project.hana.model.Mseg">
insert into Mseg (mandt, mblnr, mjahr, zeile, bwart, charg, menge, budat_mkpf, cputm_mkpf, aufnr, hashResult)
values(#{mandt},#{mblnr},#{mjahr},#{zeile},#{bwart},#{charg},#{menge},#{budat_mkpf},#{cputm_mkpf},#{aufnr},#{hashResult})
insert into Mseg (mandt, mblnr, mjahr, zeile, bwart, charg, menge, budat_mkpf, cputm_mkpf, aufnr,kdauf, kdpos, matnr, hashResult)
values(#{mandt},#{mblnr},#{mjahr},#{zeile},#{bwart},#{charg},#{menge},#{budat_mkpf},#{cputm_mkpf},#{aufnr},#{kdauf}, #{kdpos}, #{matnr},#{hashResult})
</insert>
<!-- <delete id="deleteMseg" parameterType="com.huazheng.project.hana.model.Mseg"> -->
<!-- delete from Mseg where mandt = #{mandt} and mblnr = #{mblnr} and mjahr = #{mjahr} and zeile = #{zeile} -->
<!-- </delete> -->
<!-- <update id="updateMseg" parameterType="com.huazheng.project.hana.model.Mseg"> -->
<!-- update Mseg set -->
<!-- mandt = #{mandt}, mblnr = #{mblnr}, mjahr = #{mjahr}, zeile = #{zeile}, bwart = #{bwart}, charg = #{charg}, menge = #{menge}, budat_mkpf = #{budat_mkpf}, cputm_mkpf = #{cputm_mkpf}, aufnr = #{aufnr}, hashResult = #{hashResult} -->
<!-- where mandt = #{mandt} and mblnr = #{mblnr} and mjahr = #{mjahr} and zeile = #{zeile} -->
<!-- </update> -->
<!-- <select id="selectMsegCheck" parameterType="com.huazheng.project.hana.model.Mseg" resultType="com.huazheng.project.hana.model.Mseg"> -->
<!-- select * from Mseg where rownum &gt; #{rowNum} order by rownum limit 20 -->
<!-- </select> -->
<delete id="deleteMseg" parameterType="com.huazheng.project.hana.model.Mseg">
delete from Mseg where mandt = #{mandt} and mblnr = #{mblnr} and mjahr = #{mjahr} and zeile = #{zeile}
</delete>
<update id="updateMseg" parameterType="com.huazheng.project.hana.model.Mseg">
update Mseg set
mandt = #{mandt}, mblnr = #{mblnr}, mjahr = #{mjahr}, zeile = #{zeile}, bwart = #{bwart}, charg = #{charg}, menge = #{menge},
budat_mkpf = #{budat_mkpf}, cputm_mkpf = #{cputm_mkpf}, aufnr = #{aufnr},kdauf=#{kdauf}, kdpos=#{kdpos}, matnr=#{matnr}, hashResult = #{hashResult}
where mandt = #{mandt} and mblnr = #{mblnr} and mjahr = #{mjahr} and zeile = #{zeile}
</update>
<select id="selectMsegCheck" parameterType="com.huazheng.project.hana.model.Mseg" resultType="com.huazheng.project.hana.model.Mseg">
select * from Mseg where rownum &gt; #{rowNum} order by rownum limit 20
</select>
<select id="selectMsegMaxRowNum" resultType="long">
select max(rowNum) from mseg
</select>
<select id="selectMkpf" parameterType="com.huazheng.project.hana.model.Mkpf" resultType="com.huazheng.project.hana.model.Mkpf">
select * from Mkpf where mandt = #{mandt} and mblnr = #{mblnr} and mjahr = #{mjahr}
......@@ -729,18 +764,21 @@
insert into Mkpf (mandt,mblnr,mjahr,bldat,cputm,hashResult)
values(#{mandt},#{mblnr},#{mjahr},#{bldat},#{cputm},#{hashResult})
</insert>
<!-- <delete id="deleteMkpf" parameterType="com.huazheng.project.hana.model.Mkpf"> -->
<!-- delete from Mkpf where mandt = #{mandt} and mblnr = #{mblnr} and mjahr = #{mjahr} -->
<!-- </delete> -->
<!-- <update id="updateMkpf" parameterType="com.huazheng.project.hana.model.Mkpf"> -->
<!-- update Mkpf set -->
<!-- mandt = #{mandt}, mblnr = #{mblnr}, mjahr = #{mjahr}, bldat = #{bldat}, cputm = #{cputm}, hashResult = #{hashResult} -->
<!-- where mandt = #{mandt} and mblnr = #{mblnr} and mjahr = #{mjahr} -->
<!-- </update> -->
<!-- <select id="selectMkpfCheck" parameterType="com.huazheng.project.hana.model.Mkpf" resultType="com.huazheng.project.hana.model.Mkpf"> -->
<!-- select * from Mkpf where rownum &gt; #{rowNum} order by rownum limit 20 -->
<!-- </select> -->
<delete id="deleteMkpf" parameterType="com.huazheng.project.hana.model.Mkpf">
delete from Mkpf where mandt = #{mandt} and mblnr = #{mblnr} and mjahr = #{mjahr}
</delete>
<update id="updateMkpf" parameterType="com.huazheng.project.hana.model.Mkpf">
update Mkpf set
mandt = #{mandt}, mblnr = #{mblnr}, mjahr = #{mjahr}, bldat = #{bldat}, cputm = #{cputm}, hashResult = #{hashResult}
where mandt = #{mandt} and mblnr = #{mblnr} and mjahr = #{mjahr}
</update>
<select id="selectMkpfCheck" parameterType="com.huazheng.project.hana.model.Mkpf" resultType="com.huazheng.project.hana.model.Mkpf">
select * from Mkpf where rownum &gt; #{rowNum} order by rownum limit 20
</select>
<select id="selectMkpfMaxRowNum" resultType="long">
select max(rowNum) from Mkpf
</select>
<select id="selectKonv" parameterType="com.huazheng.project.hana.model.Konv" resultType="com.huazheng.project.hana.model.Konv">
select * from Konv
where mandt = #{mandt} and knumv = #{knumv} and kposn = #{kposn} and stunr = #{stunr} and zaehk = #{zaehk}
......
......@@ -230,7 +230,7 @@
</select>
<select id="selectMsegNew" parameterType="Mseg" resultType="Mseg">
select top 20 "$rowid$" as rowids,
mandt, mblnr, mjahr, zeile, bwart, charg, menge, budat_mkpf, cputm_mkpf, aufnr
mandt, mblnr, mjahr, zeile, bwart, charg, menge, budat_mkpf, cputm_mkpf, aufnr,kdauf, kdpos, matnr
from ${hana_user}.Mseg
where "$rowid$" &gt; #{rowids} ${hana_mandt}
order by "$rowid$"
......@@ -774,4 +774,28 @@
where mandt = '800' and spras = #{spras} and vkbur = #{vkbur}
</select>
<select id="selectMsegById" parameterType="Mseg" resultType="Mseg">
select mandt, mblnr, mjahr, zeile, bwart, charg, menge, budat_mkpf, cputm_mkpf, aufnr,kdauf, kdpos, matnr
from ${hana_user}.Mseg
where mandt = #{mandt} and mblnr = #{mblnr} and mjahr = #{mjahr} and zeile = #{zeile}
</select>
<select id="selectMkpfById" parameterType="Mkpf" resultType="Mkpf">
select mandt, mblnr, mjahr, bldat, cputm
from ${hana_user}.Mkpf
where mandt = #{mandt} and mblnr = #{mblnr} and mjahr = #{mjahr}
</select>
<select id="selectAfruById" parameterType="Afru" resultType="Afru">
select mandt, rueck, rmzhl, ersda, wablnr, aufnr, stokz, stzhl
from ${hana_user}.Afru
where mandt = #{mandt} and rueck = #{rueck} and rmzhl = #{rmzhl}
</select>
<select id="selectAufmById" parameterType="Aufm" resultType="Aufm">
select
mblnr, mjahr, zeile, bldat, budat, bwart, matnr, werks, lgort,
charg, sobkz, lifnr, kdauf, kdpos, shkzg, menge, bwtar, aufnr, mandt
from ${hana_user}.aufm
where mblnr = #{mblnr} and mandt = #{mandt} and mjahr = #{mjahr} and zeile = #{zeile}
</select>
</mapper>
......@@ -1206,6 +1206,51 @@
</bean>
<!-- 以上66个任务 -->
<!-- 一次性任务 -->
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="name" value="selectMsegCheck_once"/>
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectMsegCheck" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="name" value="selectMkpfCheck_once"/>
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectMkpfCheck" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="name" value="selectAufmCheck_once"/>
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectAufmCheck" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="name" value="selectAfruCheck_once"/>
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectAfruCheck" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
</list>
</constructor-arg>
</bean>
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论