springbatch实操Demo
·
/*
* Copyright yangjunxiong 2021 Wicrenet, Inc. All rights reserved.
*/
package com.aegonthtf.fate.service.batch;
import com.aegonthtf.fate.constant.BatchConvertEnum;
import com.aegonthtf.fate.constant.CommonConstant;
import com.aegonthtf.fate.dto.JobBehaviorParamesDto;
import com.aegonthtf.fate.util.exception.SZException;
import com.alibaba.fastjson.JSON;
import org.joda.time.LocalDateTime;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
/**
* 【 抽象执行者 】
*
* @author yangjunxiong
* @date 2021/5/14 15:55
**/
@Configuration
public abstract class AbsExportBatchJob {
@Autowired
public JobLauncher jobLauncher; //执行器
@Autowired
private JobBuilderFactory jobBuilderFactory; //创建job的对象
@Autowired
private StepBuilderFactory stepBuilderFactory; //创建step的对象
public JobBuilderFactory getJobBuilderFactory() {
return this.jobBuilderFactory;
}
public StepBuilderFactory getStepBuilderFactory() {
return this.stepBuilderFactory;
}
//异步执行者
@Async("behaviorTaskExecutor")
public void asyncStart(JobBehaviorParamesDto jobParameters, BatchConvertEnum batchConvertEnum) {
try {
jobLauncher.run(this.itemReaderFromDBJob(jobParameters.getStaratDateTime(), batchConvertEnum),
new JobParametersBuilder()
.addString(CommonConstant.JOB_PARAMETERS, JSON.toJSONString(jobParameters))//job全局参数
.toJobParameters());
} catch (Exception e) {
e.printStackTrace();
}
}
//启动器
public Job itemReaderFromDBJob(LocalDateTime staratDateTime, BatchConvertEnum batchConvertEnum) {
switch (batchConvertEnum) {
case ATH_ACT_EXPORT:
return this.athActExportSub(staratDateTime, batchConvertEnum);
case ESALES_ACT_EXPORT:
return this.esalesActExportSub(staratDateTime, batchConvertEnum);
case TU_D_SUBMISSION_DAILY:
break;
case APOLLO_ACT_EXPORT:
break;
}
throw new SZException("该Job启动器不存在!");
}
//【 ATH_ACT_EXPORT表数据批量同步处理(雅典娜 Athena2_0 的行为记录表) 】
protected abstract Job athActExportSub(LocalDateTime staratDateTime, BatchConvertEnum batchConvertEnum);
//【 ESALES_ACT_EXPORT表数据批量同步处理(在线销售系统 ESales 的行为记录表) 】
protected abstract Job esalesActExportSub(LocalDateTime staratDateTime, BatchConvertEnum batchConvertEnum);
}
/*
* Copyright yangjunxiong 2021 Wicrenet, Inc. All rights reserved.
*/
package com.aegonthtf.fate.service.batch;
import com.aegonthtf.fate.constant.BatchConvertEnum;
import com.aegonthtf.fate.constant.CommonConstant;
import com.aegonthtf.fate.entity.behavior.AthActExport;
import com.aegonthtf.fate.entity.behavior.EsalesActExport;
import com.aegonthtf.fate.entity.user.FateSalesAct;
import com.aegonthtf.fate.service.batch.listener.StartJobExecutionListener;
import com.aegonthtf.fate.service.batch.processor.AthActExportItemProcessor;
import com.aegonthtf.fate.service.batch.processor.EsalesActExportItemProcessor;
import com.aegonthtf.fate.service.batch.reader.AthActExportReader;
import com.aegonthtf.fate.service.batch.reader.EaalesActExportReader;
import com.aegonthtf.fate.service.batch.writer.FateSalesActWriter;
import org.joda.time.LocalDateTime;
import org.springframework.batch.core.Job;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 【 具体的Job 】
*
* @author yangjunxiong
* @date 2021/5/14 15:55
**/
@Component
public class ExportBatchRunJob extends AbsExportBatchJob {
@Autowired
private StartJobExecutionListener startJobExecutionListener;
@Autowired
private AthActExportReader athActExportReader;
@Autowired
private AthActExportItemProcessor athActExportItemProcessor;
@Autowired
private EaalesActExportReader eaalesActExportReader;
@Autowired
private EsalesActExportItemProcessor esalesActExportItemProcessor;
@Autowired
private FateSalesActWriter fateSalesActWriter;
//【 ATH_ACT_EXPORT表数据批量同步处理(雅典娜 Athena2_0 的行为记录表) 】
@Override
protected Job athActExportSub(LocalDateTime staratDateTime, BatchConvertEnum batchConvertEnum) {
String date = staratDateTime.toString(CommonConstant.yyyyMMddHHmmss);
return this.getJobBuilderFactory().get(batchConvertEnum.getiTableName() + ":Job" + ":" + date)
.start(this.getStepBuilderFactory().get(batchConvertEnum.getiTableName() + ":Step" + date)
.<AthActExport, FateSalesAct>chunk(CommonConstant.BEHAVIOR_BATCH_SIZE)
.reader(athActExportReader.itemReaderFromDB(CommonConstant.NULL))
.processor(athActExportItemProcessor)
.writer(fateSalesActWriter.writer())
.build()
)
.listener(startJobExecutionListener)
.build();
}
//【 ESALES_ACT_EXPORT表数据批量同步处理(在线销售系统 ESales 的行为记录表) 】
@Override
protected Job esalesActExportSub(LocalDateTime staratDateTime, BatchConvertEnum batchConvertEnum) {
String date = staratDateTime.toString(CommonConstant.yyyyMMddHHmmss);
return this.getJobBuilderFactory().get(batchConvertEnum.getiTableName() + ":Job" + ":" + date)
.start(this.getStepBuilderFactory().get(batchConvertEnum.getiTableName() + ":Step" + date)
.<EsalesActExport, FateSalesAct>chunk(CommonConstant.BEHAVIOR_BATCH_SIZE)
.reader(eaalesActExportReader.itemReaderFromDB(CommonConstant.NULL))
.processor(esalesActExportItemProcessor)
.writer(fateSalesActWriter.writer())
.build()
)
.listener(startJobExecutionListener)
.build();
}
}
/*
* Copyright 2021 Wicrenet, Inc. All rights reserved.
*/
package com.aegonthtf.fate.service.batch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.JobInstance;
import org.springframework.stereotype.Component;
/**
* 【 监听 job 开始结束】
*
* @author yangjunxiong
* @date 2021/5/14 16:46
**/
@Component
public class StartJobExecutionListener implements JobExecutionListener {
private static final Logger logger = LoggerFactory.getLogger(StartJobExecutionListener.class);
@Override
public void beforeJob(JobExecution jobExecution) {
JobInstance jobInstance = jobExecution.getJobInstance();
logger.info("job 开始, JOB_INSTANCE_ID={},JOB_EXECUTION_ID={}, jobName={}, parameters={}", jobInstance.getId(), jobExecution.getId(), jobInstance.getJobName(), jobExecution.getJobParameters());
}
@Override
public void afterJob(JobExecution jobExecution) {
JobInstance jobInstance = jobExecution.getJobInstance();
logger.info("job 结束, JOB_INSTANCE_ID={},JOB_EXECUTION_ID={}, jobName={}, parameters={}", jobInstance.getId(), jobExecution.getId(), jobInstance.getJobName(), jobExecution.getJobParameters());
}
}
/*
* Copyright 2021 Wicrenet, Inc. All rights reserved.
*/
package com.aegonthtf.fate.service.batch.reader;
import com.aegonthtf.fate.constant.BatchConvertEnum;
import com.aegonthtf.fate.constant.CommonConstant;
import com.aegonthtf.fate.constant.EventTypeEnum;
import com.aegonthtf.fate.dto.JobBehaviorParamesDto;
import com.aegonthtf.fate.entity.behavior.AthActExport;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.util.Date;
/**
* 【批量读 雅典娜 Athena2_0 的行为记录表】
*
* @author yangjunxiong
* Created on 2021/5/15 13:39
*/
@Configuration
public class AthActExportReader {
@Autowired
@Qualifier("ThirdDataSource")
private DataSource dataSource;
private final BatchConvertEnum batchConvertEnum = BatchConvertEnum.ATH_ACT_EXPORT;//ATH_ACT_EXPORT表里有E1,E2,E3,E7
@Bean
@StepScope
public JdbcPagingItemReader<AthActExport> itemReaderFromDB(@Value("#{jobParameters[jobParameters]}") String jobParameters) {
JobBehaviorParamesDto paramesDto = JSON.parseObject(jobParameters, JobBehaviorParamesDto.class);
ImmutablePair<Date, Date> createTime = EventTypeEnum.getCreateTime(paramesDto.getStaratDateTime());
return new JdbcPagingItemReaderBuilder<AthActExport>()
.dataSource(dataSource)
.name(batchConvertEnum.getiTableName())
.fetchSize(CommonConstant.BEHAVIOR_BATCH_SIZE)
.parameterValues(ImmutableMap.of(
"delFlag", CommonConstant._1,
"startTime", createTime.left,
"endTime", createTime.right
))
.pageSize(CommonConstant.BEHAVIOR_BATCH_SIZE)
.rowMapper((resultSet, i) -> {
AthActExport entity = new AthActExport();
entity.setId(resultSet.getInt(CommonConstant._1)); //ID
entity.setInfoKind(resultSet.getString(CommonConstant._2)); //INFO_KIND
entity.setEventType(resultSet.getString(CommonConstant._3)); //EVENT_TYPE
entity.setTransId(resultSet.getString(CommonConstant._4)); //TRANS_ID
entity.setTransDate(resultSet.getString(CommonConstant._5)); //TRANS_DATE (YYYYMMDD)
entity.setTransTime(resultSet.getDate(CommonConstant._6)); //TRANS_TIME (datetime)
entity.setAgentCode(resultSet.getString(CommonConstant._7)); //AGENT_CODE
entity.setAgentName(resultSet.getString(CommonConstant._8)); //AGENT_NAME
entity.setCustOpenid(resultSet.getString(CommonConstant._9)); //CUST_OPENID
entity.setOpenTime(resultSet.getDate(CommonConstant._10)); //OPEN_TIME (datetime)
entity.setSubTime(resultSet.getDate(CommonConstant._11)); //SUB_TIME (datetime)
entity.setAthId(resultSet.getString(CommonConstant._12)); //ATH_ID
entity.setCustName(resultSet.getString(CommonConstant._13)); //CUST_NAME
entity.setCustPhone(resultSet.getString(CommonConstant._14)); //CUST_PHONE
entity.setCustGender(resultSet.getString(CommonConstant._15));//CUST_GENDER
entity.setInfoTitle(resultSet.getString(CommonConstant._16)); //INFO_TITLE
entity.setInfoSource(resultSet.getString(CommonConstant._17));//INFO_SOURCE
entity.setCreatetime(resultSet.getDate(CommonConstant._18)); //CREATETIME
entity.setCreater(resultSet.getString(CommonConstant._19)); //CREATER
entity.setUpdatetime(resultSet.getDate(CommonConstant._20)); //UPDATETIME
entity.setDelFlag(resultSet.getString(CommonConstant._21)); //DEL_FLAG
return entity;
}
)
.queryProvider(articleProvider())
.build();
}
private PagingQueryProvider articleProvider() {
MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();
provider.setSelectClause("ID, INFO_KIND, EVENT_TYPE, TRANS_ID, TRANS_DATE, TRANS_TIME, AGENT_CODE, AGENT_NAME, CUST_OPENID, OPEN_TIME, SUB_TIME, ATH_ID, CUST_NAME, CUST_PHONE, CUST_GENDER, INFO_TITLE, INFO_SOURCE, CREATETIME, CREATER, UPDATETIME, DEL_FLAG");
provider.setFromClause("FROM " + batchConvertEnum.getiTableName());//ATH_ACT_EXPORT表里有E1,E2,E3,E7
// provider.setWhereClause("event_occurred_time >= :startTime AND event_occurred_time < :stopTime");
provider.setWhereClause("DEL_FLAG = :delFlag AND CREATETIME >= :startTime AND CREATETIME < :endTime");
provider.setSortKeys(ImmutableMap.of("ID", Order.ASCENDING));
return provider;
}
}
/*
* Copyright 2021 Wicrenet, Inc. All rights reserved.
*/
package com.aegonthtf.fate.service.batch.reader;
import com.aegonthtf.fate.constant.BatchConvertEnum;
import com.aegonthtf.fate.constant.CommonConstant;
import com.aegonthtf.fate.constant.EventTypeEnum;
import com.aegonthtf.fate.dto.JobBehaviorParamesDto;
import com.aegonthtf.fate.entity.behavior.EsalesActExport;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.util.Date;
/**
* 【 批量读 在线销售系统 ESales 的行为记录表】
*
* @author yangjunxiong
* Created on 2021/5/15 13:43
*/
@Configuration
public class EaalesActExportReader {
@Autowired
@Qualifier("ThirdDataSource")
private DataSource dataSource;
private final BatchConvertEnum batchConvertEnum = BatchConvertEnum.ESALES_ACT_EXPORT;//ESALES_ACT_EXPORT表里有E1,E2
@Bean
@StepScope
public JdbcPagingItemReader<EsalesActExport> itemReaderFromDB(@Value("#{jobParameters[jobParameters]}") String jobParameters) {
JobBehaviorParamesDto paramesDto = JSON.parseObject(jobParameters, JobBehaviorParamesDto.class);
//时间逻辑处理
ImmutablePair<Date, Date> createTime = EventTypeEnum.getCreateTime(paramesDto.getStaratDateTime());
return new JdbcPagingItemReaderBuilder<EsalesActExport>()
.dataSource(dataSource)
.name(batchConvertEnum.getiTableName())
.fetchSize(CommonConstant.BEHAVIOR_BATCH_SIZE)
.parameterValues(ImmutableMap.of(
"delFlag", CommonConstant._1,
"startTime", createTime.left,
"endTime", createTime.right
))
.pageSize(CommonConstant.BEHAVIOR_BATCH_SIZE)
.rowMapper((resultSet, i) -> {
EsalesActExport entity = new EsalesActExport();
entity.setId(resultSet.getInt(CommonConstant._1)); //ID
entity.setInfoKind(resultSet.getString(CommonConstant._2)); //INFO_KIND
entity.setEventType(resultSet.getString(CommonConstant._3)); //EVENT_TYPE
entity.setTransId(resultSet.getString(CommonConstant._4)); //TRANS_ID
entity.setTransDate(resultSet.getString(CommonConstant._5)); //TRANS_DATE (YYYYMMDD)
entity.setTransTime(resultSet.getDate(CommonConstant._6)); //TRANS_TIME (datetime)
entity.setAgentCode(resultSet.getString(CommonConstant._7)); //AGENT_CODE
entity.setAgentName(resultSet.getString(CommonConstant._8)); //AGENT_NAME
entity.setAgentOpenid(resultSet.getString(CommonConstant._9));//AGENT_OPENID
entity.setCustOpenid(resultSet.getString(CommonConstant._10)); //CUST_OPENID
entity.setOpenTime(resultSet.getDate(CommonConstant._11)); //OPEN_TIME (datetime)
entity.setInfoTitle(resultSet.getString(CommonConstant._12)); //INFO_TITLE
entity.setInfoSource(resultSet.getString(CommonConstant._13));//INFO_SOURCE
entity.setCreatetime(resultSet.getDate(CommonConstant._14)); //CREATETIME
entity.setCreater(resultSet.getString(CommonConstant._15)); //CREATER
entity.setUpdatetime(resultSet.getDate(CommonConstant._16)); //UPDATETIME
entity.setDelFlag(resultSet.getString(CommonConstant._17)); //DEL_FLAG
return entity;
}
)
.queryProvider(articleProvider())
.build();
}
private PagingQueryProvider articleProvider() {
MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();
provider.setSelectClause("ID, INFO_KIND, EVENT_TYPE, TRANS_ID, TRANS_DATE, TRANS_TIME, AGENT_CODE, AGENT_NAME, CUST_OPENID, OPEN_TIME, SUB_TIME, ATH_ID, CUST_NAME, CUST_PHONE, CUST_GENDER, INFO_TITLE, INFO_SOURCE, CREATETIME, CREATER, UPDATETIME, DEL_FLAG");
provider.setFromClause("FROM " + batchConvertEnum.getiTableName());//ESALES_ACT_EXPORT表里有E1,E2
provider.setWhereClause("DEL_FLAG = :delFlag AND CREATETIME >= :startTime AND CREATETIME < :endTime");
provider.setSortKeys(ImmutableMap.of("ID", Order.ASCENDING));
return provider;
}
}
package com.aegonthtf.fate.service.batch.processor;///*
import cn.com.common.util.util.StringUtils;
import com.aegonthtf.fate.constant.CommonConstant;
import com.aegonthtf.fate.constant.EventTypeEnum;
import com.aegonthtf.fate.dto.JobBehaviorParamesDto;
import com.aegonthtf.fate.entity.behavior.AthActExport;
import com.aegonthtf.fate.entity.user.FateSalesAct;
import com.alibaba.fastjson.JSON;
import org.joda.time.LocalDateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.Optional;
/**
* 【 数据清洗 】
*
* @author yangjunxiong
* @date 2021/5/14 16:46
**/
@Component
public class AthActExportItemProcessor implements ItemProcessor<AthActExport, FateSalesAct> {
private static final Logger logger = LoggerFactory.getLogger(AthActExportItemProcessor.class);
JobParameters jobParameters;
//获取job入参
@BeforeStep
public void beforeStep(final StepExecution stepExecution) {
jobParameters = stepExecution.getJobParameters();
logger.info("jobParameters: {}", jobParameters);
}
@Override
public FateSalesAct process(AthActExport item) throws Exception {
JobBehaviorParamesDto paramesDto = JSON.parseObject(jobParameters.getString(CommonConstant.JOB_PARAMETERS), JobBehaviorParamesDto.class);
if (Objects.isNull(item) || StringUtils.isBlank(item.getEventType())) {
return null;
}
EventTypeEnum eventTypeEnum = EventTypeEnum.toByCode(item.getEventType());
if (eventTypeEnum == null) {
return null;
}
FateSalesAct entity = new FateSalesAct();
//共有参数处理
Optional.ofNullable(item.getInfoKind()).ifPresent(entity::setInfoKind); //消息分类 INFO_KIND
Optional.ofNullable(item.getEventType()).ifPresent(entity::setEventType); //事件类型 EVENT_TYPE
Optional.ofNullable(item.getAgentCode()).ifPresent(entity::setAgentCode); //代理人工号 AGENT_CODE
Optional.ofNullable(item.getAgentName()).ifPresent(entity::setAgentName); //代理人姓名 AGENT_NAME
Optional.ofNullable(item.getInfoTitle()).ifPresent(entity::setInfoTitle); //消息标题 INFO_TITLE
Optional.ofNullable(item.getInfoSource()).ifPresent(entity::setInfoSource); //消息来源 INFO_SOURCE
Optional.ofNullable(item.getCreatetime()).ifPresent(entity::setCreatetime); //创建时间 CREATETIME
Optional.ofNullable(item.getCreater()).ifPresent(entity::setCreater); //创建人 CREATER
Optional.ofNullable(item.getDelFlag()).ifPresent(entity::setDelFlag); //删除标记 DEL_FLAG
Optional.ofNullable(paramesDto.getStaratDateTime())
.map(LocalDateTime::toDate)
.ifPresent(entity::setUpdatatime);//更新时间 UPDATETIME * 使用批量里时间
//私有参数处理
switch (eventTypeEnum) {
case E1:
Optional.ofNullable(item.getTransId()).ifPresent(entity::setTransId); //转发ID TRANS_ID
Optional.ofNullable(item.getTransDate()).ifPresent(entity::setTransDate); //转发日期 TRANS_DATE(YYYYMMDD)
Optional.ofNullable(item.getTransTime()).ifPresent(entity::setTransTime); //转发时间 TRANS_TIME(datetime)
break;
case E2:
Optional.ofNullable(item.getTransId()).ifPresent(entity::setTransId); //转发ID TRANS_ID
Optional.ofNullable(item.getTransDate()).ifPresent(entity::setTransDate); //转发日期 TRANS_DATE(YYYYMMDD)
Optional.ofNullable(item.getTransTime()).ifPresent(entity::setTransTime); //转发时间 TRANS_TIME(datetime)
Optional.ofNullable(item.getCustOpenid()).ifPresent(entity::setCustOpenid); //客户openID CUST_OPENID
Optional.ofNullable(item.getOpenTime()).ifPresent(entity::setOpenTime); //打开时间 OPEN_TIME(datetime)
break;
case E3:
Optional.ofNullable(item.getTransId()).ifPresent(entity::setTransId); //转发ID TRANS_ID
Optional.ofNullable(item.getTransDate()).ifPresent(entity::setTransDate); //转发日期 TRANS_DATE(YYYYMMDD)
Optional.ofNullable(item.getTransTime()).ifPresent(entity::setTransTime); //转发时间 TRANS_TIME(datetime)
Optional.ofNullable(item.getCustOpenid()).ifPresent(entity::setCustOpenid); //客户openID CUST_OPENID
Optional.ofNullable(item.getOpenTime()).ifPresent(entity::setOpenTime); //打开时间 OPEN_TIME(datetime)
Optional.ofNullable(item.getSubTime()).ifPresent(entity::setSubTime); //提交时间 SUB_TIME(datetime)
Optional.ofNullable(item.getCustName()).ifPresent(entity::setCustName); //客户姓名 CUST_NAME
Optional.ofNullable(item.getCustPhone()).ifPresent(entity::setCustPhone); //客户手机号 CUST_PHONE
Optional.ofNullable(item.getCustGender()).ifPresent(entity::setCustGender); //客户性别 CUST_GENDER
break;
case E7:
Optional.ofNullable(item.getCustOpenid()).ifPresent(entity::setCustOpenid); //客户openID CUST_OPENID
Optional.ofNullable(item.getSubTime()).ifPresent(entity::setSubTime); //提交时间 SUB_TIME(datetime)
Optional.ofNullable(item.getAthId()).ifPresent(entity::setAthId);
Optional.ofNullable(item.getCustName()).ifPresent(entity::setCustName); //客户姓名 CUST_NAME
Optional.ofNullable(item.getCustPhone()).ifPresent(entity::setCustPhone); //客户手机号 CUST_PHONE
break;
default:
return null;
}
return entity;
}
}
package com.aegonthtf.fate.service.batch.processor;///*
import cn.com.common.util.util.StringUtils;
import com.aegonthtf.fate.constant.CommonConstant;
import com.aegonthtf.fate.constant.EventTypeEnum;
import com.aegonthtf.fate.dto.JobBehaviorParamesDto;
import com.aegonthtf.fate.entity.behavior.EsalesActExport;
import com.aegonthtf.fate.entity.user.FateSalesAct;
import com.alibaba.fastjson.JSON;
import org.joda.time.LocalDateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.Optional;
/**
* 【 数据清洗 】
*
* @author yangjunxiong
* @date 2021/5/14 16:46
**/
@Component
public class EsalesActExportItemProcessor implements ItemProcessor<EsalesActExport, FateSalesAct> {
private static final Logger logger = LoggerFactory.getLogger(EsalesActExportItemProcessor.class);
JobParameters jobParameters;
//获取job入参
@BeforeStep
public void beforeStep(final StepExecution stepExecution) {
jobParameters = stepExecution.getJobParameters();
logger.info("jobParameters: {}", jobParameters);
}
@Override
public FateSalesAct process(EsalesActExport item) throws Exception {
JobBehaviorParamesDto paramesDto = JSON.parseObject(jobParameters.getString(CommonConstant.JOB_PARAMETERS), JobBehaviorParamesDto.class);
if (Objects.isNull(item) || StringUtils.isBlank(item.getEventType())) {
return null;
}
EventTypeEnum eventTypeEnum = EventTypeEnum.toByCode(item.getEventType());
if (eventTypeEnum == null) {
return null;
}
FateSalesAct entity = new FateSalesAct();
//共有参数处理
Optional.ofNullable(item.getInfoKind()).ifPresent(entity::setInfoKind); //消息分类 INFO_KIND
Optional.ofNullable(item.getEventType()).ifPresent(entity::setEventType); //事件类型 EVENT_TYPE
Optional.ofNullable(item.getAgentCode()).ifPresent(entity::setAgentCode); //代理人工号 AGENT_CODE
Optional.ofNullable(item.getAgentName()).ifPresent(entity::setAgentName); //代理人姓名 AGENT_NAME
Optional.ofNullable(item.getInfoTitle()).ifPresent(entity::setInfoTitle); //消息标题 INFO_TITLE
Optional.ofNullable(item.getInfoSource()).ifPresent(entity::setInfoSource); //消息来源 INFO_SOURCE
Optional.ofNullable(item.getCreatetime()).ifPresent(entity::setCreatetime); //创建时间 CREATETIME
Optional.ofNullable(item.getCreater()).ifPresent(entity::setCreater); //创建人 CREATER
Optional.ofNullable(item.getDelFlag()).ifPresent(entity::setDelFlag); //删除标记 DEL_FLAG
Optional.ofNullable(paramesDto.getStaratDateTime())
.map(LocalDateTime::toDate)
.ifPresent(entity::setUpdatatime);//更新时间 UPDATETIME * 使用批量里时间
//私有参数处理
switch (eventTypeEnum) {
case E1:
Optional.ofNullable(item.getTransId()).ifPresent(entity::setTransId); //转发ID TRANS_ID
Optional.ofNullable(item.getTransDate()).ifPresent(entity::setTransDate); //转发日期 TRANS_DATE(YYYYMMDD)
Optional.ofNullable(item.getTransTime()).ifPresent(entity::setTransTime); //转发时间 TRANS_TIME(datetime)
break;
case E2:
Optional.ofNullable(item.getTransId()).ifPresent(entity::setTransId); //转发ID TRANS_ID
Optional.ofNullable(item.getTransDate()).ifPresent(entity::setTransDate); //转发日期 TRANS_DATE(YYYYMMDD)
Optional.ofNullable(item.getTransTime()).ifPresent(entity::setTransTime); //转发时间 TRANS_TIME(datetime)
Optional.ofNullable(item.getCustOpenid()).ifPresent(entity::setCustOpenid); //客户openID CUST_OPENID
Optional.ofNullable(item.getOpenTime()).ifPresent(entity::setOpenTime); //打开时间 OPEN_TIME(datetime)
break;
default:
return null;
}
return entity;
}
}
/*
* Copyright 2021 Wicrenet, Inc. All rights reserved.
*/
package com.aegonthtf.fate.service.batch.writer;
import com.aegonthtf.fate.entity.user.FateSalesAct;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
/**
* 【 批量写 销售行为持久化 】
*
* @author yangjunxiong
* Created on 2021/5/15 10:14
*/
@Configuration
public class FateSalesActWriter {
@Autowired
@Qualifier("primaryDataSource")
private DataSource dataSource;
@Bean
public JdbcBatchItemWriter<FateSalesAct> writer() {
JdbcBatchItemWriter<FateSalesAct> writer = new JdbcBatchItemWriter<>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("INSERT INTO fate_sales_act (ID, INFO_KIND, INFO_SOURCE, INFO_TITLE, EVENT_TYPE, TRANS_ID, TRANS_DATE, TRANS_TIME, AGENT_CODE, AGENT_NAME, AGENT_OPENID, CUST_OPENID, OPEN_DATE, OPEN_TIME, SUB_DATE, SUB_TIME, ATH_ID, SIS_ID, CUST_ID, CUST_NAME, CUST_PHONE, CUST_GENDER, CUST_AGE, CUST_OPR, CUST_EMAIL, RISK_NAME, FATE_APE, PAY_TIME, FATE_FYP, CREATEDATE, CREATETIME, CREATER, UPDATATIME, DEL_FLAG, POLICY_NO) VALUES (" +
":id ,\n" +
":infoKind ,\n" +
":infoSource ,\n" +
":infoTitle ,\n" +
":eventType ,\n" +
":transId ,\n" +
":transDate ,\n" +
":transTime ,\n" +
":agentCode ,\n" +
":agentName ,\n" +
":agentOpenid ,\n" +
":custOpenid ,\n" +
":openDate ,\n" +
":openTime ,\n" +
":subDate ,\n" +
":subTime ,\n" +
":athId ,\n" +
":sisId ,\n" +
":custId ,\n" +
":custName ,\n" +
":custPhone ,\n" +
":custGender ,\n" +
":custAge ,\n" +
":custOpr ,\n" +
":custEmail ,\n" +
":riskName ,\n" +
":fateApe ,\n" +
":payTime ,\n" +
":fateFyp ,\n" +
":createdate ,\n" +
":createtime ,\n" +
":creater ,\n" +
":updatatime,\n" +
":delFlag,\n" +
":policyNo\n)");
writer.setDataSource(dataSource);
return writer;
}
}
/*
* Copyright yangjunxiong 2021 Wicrenet, Inc. All rights reserved.
*/
package com.aegonthtf.fate.constant;
import com.aegonthtf.fate.util.verify.Asserts;
import java.util.Arrays;
import java.util.stream.Collectors;
/**
* 【 表處理轉換映射 】
*
* @author yangjunxiong
* Created on 2021/5/14 18:10
*/
public enum BatchConvertEnum {
TU_D_SUBMISSION_DAILY(0, "tu_d_submission_daily", "fate_sales_act", null),
ATH_ACT_EXPORT(1, "ath_act_export", "fate_sales_act", InfoSourceEnum.Athena2_0),
ESALES_ACT_EXPORT(2, "esales_act_export", "fate_sales_act", InfoSourceEnum.ESales),
APOLLO_ACT_EXPORT(3, "apollo_act_export", "fate_sales_act", InfoSourceEnum.Apollo);
private final int value; //消息类型
private final String iTableName; //批处理读取的表名称
private final String oTableName; //批处理写入的表名称
private final InfoSourceEnum infoSourceEnum; //讀取文件系統
static {
//验证【value】必须是唯一的
int size = Arrays.stream(BatchConvertEnum.values()).map(BatchConvertEnum::getValue).collect(Collectors.toSet()).size();
Asserts.state(size == BatchConvertEnum.values().length, "BatchConvertEnum.value 重复定义");
}
BatchConvertEnum(int value, String iTableName, String oTableName, InfoSourceEnum infoSourceEnum) {
this.value = value;
this.iTableName = iTableName;
this.oTableName = oTableName;
this.infoSourceEnum = infoSourceEnum;
}
public static BatchConvertEnum toByValue(int value) {
for (BatchConvertEnum item : BatchConvertEnum.values()) {
if (item.value == value) {
return item;
}
}
throw new IllegalStateException("枚举类型转换失败");
}
public int getValue() {
return value;
}
public String getiTableName() {
return iTableName;
}
public String getoTableName() {
return oTableName;
}
public InfoSourceEnum getInfoSourceEnum() {
return infoSourceEnum;
}
}
/*
* Copyright yangjunxiong 2021 Wicrenet, Inc. All rights reserved.
*/
package com.aegonthtf.fate.constant;
import com.aegonthtf.fate.dto.JobBehaviorParamesDto;
import com.aegonthtf.fate.util.JodaTimeUtils;
import com.aegonthtf.fate.util.verify.Asserts;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.joda.time.LocalDateTime;
import java.util.Arrays;
import java.util.Date;
import java.util.stream.Collectors;
/**
* 【事件枚举】
*
* @author yangjunxiong
* Created on 2021/5/13 13:24
*/
public enum EventTypeEnum {
/**
* 事件类型 查询类型 操作内容 表名称
* E1 营销员转发页面链接
* E2 客户打开页面
* E3 客户留资后提交
* E4 营销员增加准客户信息
* E5 拜访行程记录约访动作
* E6 拜访行程记录初次拜访动作
* E7 风险报告完成
* E8 建议书制作完成
* E9 营销员补全客户信息
* E10 拜访行程记录需求分析动作
* E11 营销员分享建议书链接
* E12 客户打开建议书链接
* E13 拜访行程记录呈现建议书动作
* E14 投保完成
* E15 营销员转发页面链接
* E16 准增员打开页面
* E17 准增员留资后提交
* E18 营销员增加准增员信息
* E19 行程记录初次面谈
* E20 准增员信息补全
* E22 CC测评报告生成
* E23 行程记录吸引面谈
* E25 营销员发送面试邀请链接
* E25 准增员打开邀请链接
* E26 营销员增加面试结论
* E27 准增员进入新人班
* E28 行程记录甄选面谈
* E29 行程记录可行性面谈
* E30 行程记录成长性面谈
* E31 行程记录承诺面谈
* E32 准增员完成签约
* SEL_VISIT 0 约访 FATE_ENUM
* SEL_VISIT 1 拜访未谈及保险 FATE_ENUM
* SEL_VISIT 2 面对面需求分析 FATE_ENUM
* SEL_VISIT 3 呈现纸质建议书 FATE_ENUM
* SEL_VISIT 4 递送保单 FATE_ENUM
* SEL_VISIT 5 其他客户服务 FATE_ENUM
* SEL_VISIT 6 初次面谈 FATE_ENUM
* SEL_VISIT 7 吸引面谈 FATE_ENUM
* SEL_VISIT 8 甄选测评 FATE_ENUM
* SEL_VISIT 9 可行性面谈 FATE_ENUM
* SEL_VISIT 10 成长性面谈 FATE_ENUM
* SEL_VISIT 11 承诺面谈 FATE_ENUM
**/
E1(1, "E1", "", InfoSourceEnum.Athena2_0, "营销员转发页面链接"),
E2(2, "E2", "", InfoSourceEnum.Athena2_0, "客户打开页面"),
E3(3, "E3", "", InfoSourceEnum.Athena2_0, "客户留资后提交"),
// E4(4, "E4", "", "营销员增加准客户信息", "ATH_ACT_EXPORT"),
// E5(5, "E5", ""),
// E6(6, "E6", ""),
E7(7, "E7", "", InfoSourceEnum.Athena2_0, "风险报告完成"),
// E8(8, "E8", "", "建议书完成", ""),
E14(14, "E14", "", InfoSourceEnum.Athena2_0, "投保完成"),
E32(32, "E32", "", InfoSourceEnum.Athena2_0, "准增员完成签约"),
// E(, "E", ""),
// E(, "E", ""),
// E(, "E", ""),
// E(, "E", ""),
// E(, "E", ""),
// E(, "E", ""),
// E(, "E", ""),
// E(, "E", ""),
// E(, "E", ""),
// E(, "E", ""),
// E(, "E", ""),
// E(, "E", ""),
// E(, "E", ""),
// E(, "E", ""),
// E(, "E", ""),
// E(, "E", ""),
// E(, "E", ""),
;
private final int value; //事件类型
private final String code; //事件code
private final String select; //查询类型
private final InfoSourceEnum infoSource; //系统
private final String declare; //注释
static {
//验证【value】必须是唯一的
int size = Arrays.stream(EventTypeEnum.values()).map(EventTypeEnum::getValue).collect(Collectors.toSet()).size();
Asserts.state(size == EventTypeEnum.values().length, "EventTypeEnum.value 重复定义");
}
EventTypeEnum(int value, String code, String select, InfoSourceEnum infoSource, String declare) {
this.value = value;
this.code = code;
this.select = select;
this.infoSource = infoSource;
this.declare = declare;
}
public static EventTypeEnum toByValue(int value) {
for (EventTypeEnum item : EventTypeEnum.values()) {
if (item.value == value) {
return item;
}
}
throw new IllegalStateException("枚举类型转换失败");
}
public static EventTypeEnum toByCode(String code) {
for (EventTypeEnum item : EventTypeEnum.values()) {
if (item.code.equals(code)) {
return item;
}
}
return null;
}
public int getValue() {
return value;
}
public String getCode() {
return code;
}
public String getDeclare() {
return declare;
}
public String getSelect() {
return select;
}
public InfoSourceEnum getInfoSource() {
return infoSource;
}
//获取job运行参数
public JobBehaviorParamesDto getJobparames(LocalDateTime staratDateTime) {
JobBehaviorParamesDto paramesDto = new JobBehaviorParamesDto();
// TODO yangjunxiong 2021/5/14 11:32
return paramesDto;
}
/**
* 【 获取用户行为数据跑批起至时间逻辑 】
*
* @param staratDateTime
* @return org.apache.commons.lang3.tuple.ImmutablePair<java.util.Date, java.util.Date> 左边开始时间 右边结束时间
* @author yangjunxiong
* @date 2021/5/14 17:38
**/
public static ImmutablePair<Date, Date> getCreateTime(LocalDateTime staratDateTime) {
Asserts.notNull(staratDateTime, "staratDateTime不能为空!");
Date startTime;
Date endTime;
if (CommonConstant.ZERO.equals(staratDateTime.toString("H"))) {
//特殊处理获取前天的时间范围
endTime = JodaTimeUtils.maxTime(staratDateTime.minusDays(CommonConstant._1).toLocalDate()).toDate();
startTime = new LocalDateTime(endTime).minusHours(CommonConstant._1).plusSeconds(CommonConstant._1).toDate();
}else {
startTime = JodaTimeUtils.toDate(staratDateTime.minusHours(CommonConstant._1).toString(CommonConstant.yyyy_MM_dd_HH));
endTime = JodaTimeUtils.toDate(staratDateTime.toString(CommonConstant.yyyy_MM_dd_HH));
}
return ImmutablePair.of(startTime, endTime);
}
}
package com.aegonthtf.fate.constant;
/**
* @description 常量
*/
public class CommonConstant {
/**
* Swagger的Controller的包扫描路径
*/
public static final String BASE_CONTROLLER_PACKAGE = "com.aegonthtf.fate.controller";
public static final String N = "N";
public static final String Y = "Y";
public static final String LIMIT1 = "LIMIT 1";
public static final String ZERO = "0";
public static final int BATCH_SIZE = 1000;
public static final int BEHAVIOR_BATCH_SIZE = 500;
public static final String NULL = null;
public static final String CREATERBATCH = "batch";
public static final String JOB_PARAMETERS = "jobParameters";
public static String yyyyMMdd = "yyyyMMdd";
public static String yyyyMM = "yyyyMM";
public static String yyyy_MM_dd = "yyyy-MM-dd";
public static String yyyyMMddHHmmss = "yyyyMMddHHmmss";
public static String yyyy_MM_dd_HH = "yyyyMMddHH";
public static String HHmm = "HHmm";
public static String AGENT_STATUS_ACTIVE = "Active";
public static final int _1 = 1;
public static final int _2 = 2;
public static final int _3 = 3;
public static final int _4 = 4;
public static final int _5 = 5;
public static final int _6 = 6;
public static final int _7 = 7;
public static final int _8 = 8;
public static final int _9 = 9;
public static final int _10 = 10;
public static final int _11 = 11;
public static final int _12 = 12;
public static final int _13 = 13;
public static final int _14 = 14;
public static final int _15 = 15;
public static final int _16 = 16;
public static final int _17 = 17;
public static final int _18 = 18;
public static final int _19 = 19;
public static final int _20 = 20;
public static final int _21 = 21;
public static final int _22 = 22;
public static final int _23 = 23;
public static final int _24 = 24;
public static final int _25 = 25;
public static final int _26 = 26;
public static final int _27 = 27;
public static final int _28 = 28;
public static final int _29 = 29;
public static final int _30 = 30;
public static final int _31 = 31;
public static final int _32 = 32;
public static final int _33 = 33;
public static final int _34 = 34;
public static final int _35 = 35;
public static final int _36 = 36;
public static final int _37 = 37;
public static final int _38 = 38;
public static final int _39 = 39;
public static final int _40 = 40;
public static final int _41 = 41;
public static final int _42 = 42;
public static final int _43 = 43;
public static final int _44 = 44;
public static final int _45 = 45;
public static final int _46 = 46;
public static final int _47 = 47;
public static final int _48 = 48;
public static final int _49 = 49;
public static final int _50 = 50;
}
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)