一、解決什麼問題
一個任務中心技術實現的參考案例,可以快速部署實現且僅需關註業務個性落庫邏輯實現,其他如任務狀態維護、數據解析及異常包裝、結果導出均由工具自動實現。
二、基本原理
![](https://news.xinpengboligang.com/upload/keji/1c3f522d2eb2ea306a16acee9ae8bb54.jpeg)
圖1 請求示意圖
異步任務中心共分三個模塊:
1)任務初始化, 將目標導入文件上傳至雲存儲後得到目標文件url按任務類型(如類目導入、商品導入等)入庫任務表並返回前臺提交成功,任務初始狀態為"待處理";
2)任務調度,使用開源調度組件xxlJob開箱即用。傳送門: xxlJob
3)任務Worker執行器核心組成:
1.任務並行分片拉取
分片廣播模式下,每個worker按index取模 獲取應執行的任務id,參考sql :
from task where status in ('PENDING','FAILURE') and errCnt <= MAX_RETRY_CNT and mod(id,#總worker數量) = 當前worker index
2.根據任務類型命中執行器策略
任務類型: 即導入業務的枚舉字段,如類目導入CATE_IMPORT、商品導入PRODUCT_IMPORT等
業務執行器: 執行excel批量導入解析落庫的載體,下文介紹。
策略如何命中: 業務執行器class類增加@JobExecutor註解並指明註解值為對應任務類型; 拉取任務後尋找有@JobExecutor修飾的類且其註解值等於任務記錄任務類型即為命中目標執行器
3.執行器設計
A.抽象任務接口並定義行為 -> BaseJob<T>
•accept() 接受任務,實現後置任務狀態為"處理中"
•parse() 解析任務, 負責解析目標文件(zip、xlsx)為List<Bean>,並實現數據校驗
•run() 將業務數據List<Bean>數據落庫
•export() 生成導入結果文件,上傳至雲存儲並更新到任務記錄結果列
•errHandle() 異常處理,置任務狀態為"失敗",累計任務失敗次數,觸發業務報警
B.基礎抽象實現類 -> BaseExecutableAbsJob implements BaseJob
accept() 、export()、errHandle() 步驟因其業務無關性故在此抽象類中做通用默認實現;
parse() 有一定通用性,默認實現為excel解析(easyExcel實現)
run() 業務相關不做默認實現,由繼承方實現
C.一次性解析抽象實現 -> DisposableAbsJob extends BaseExecutableAbsJob
特征:
解析規則為一次性解析excel所有記錄,不適用超大excel解析job
可以在落庫前獲得全部業務實體信息
導出結果可以顯示原始輸入
D.分批解析通用實現類 -> BatchableAbsJob extends BaseExecutableAbsJob
特征:
解析規則為按BATCH_CNT來分段操作數據解析及入庫,適用於大excel導入場景的使用
解析完畢前拿不到記錄總數
導出結果不顯示原始輸入,僅顯示MAX_ERROR_CNT數量以內的錯誤記錄原始信息及錯誤信息。
三、快速使用
業務類按場景選擇繼承DisposableAbsJob或BatchableAbsJob,
僅需重寫落庫方法,其他如拉取、解析、導出結果步驟均由系統自動執行。如需特殊解析邏輯(比如解析zip按特定規則拼裝bean)重寫parse()方法即可
舉個栗子,現需求場景為批量類目信息導入, 則開發過程為:
步驟一 : 落庫任務類型為
TaskBizTypeEnum.CATE_BATCH_PUBLISH的記錄到任務表中,並記錄前臺上傳的excel導入文件url(常規crud本案例不做封裝,自行實現即可)
步驟二 : 定義類目Excel導入實體Bean
/**
* 類目導入實體
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class ImportCateExcelDTO extends BaseWorkerDTO {
/** 類目級別*/
@ExcelProperty(index = 0,converter = CateLevelConverter.class,value = "類目級別")
private Integer cateLevel;
/** 類目中文名*/
@ExcelProperty(index = 1 ,value = "類目中文名")
private String cateName;
/** 類目排序*/
@ExcelProperty(index = 2 ,value = "類目排序")
private Integer sort;
/** 上級類目id*/
@ExcelProperty(index = 3 ,value = "上級管理類目id")
private Long parentCateId;
/** 狀態*/
@ExcelProperty(index = 4,converter = StatusConverter.class ,value = "狀態")
private Integer status;
}
步驟三 : 編寫業務實現類,並自行實現run落庫方法.
/**
* 類目批量導入(一次性解析全部excel)
*/
@Service
@Slf4j
@JobExecutor(taskBizType = TaskBizTypeEnum.CATE_BATCH_PUBLISH) // 策略註解,枚舉類型全局唯一。 不加該註解則任務調度找不到策略
public class DisposableCateImportHandler extends DisposableAbsJob<ImportCateExcelDTO> {
@Resource
private XXXXService xxxxService;
@Override
public void run(TaskDTO<ImportCategoryExcelDTO> task){
try{
if(CollectionUtils.isNotEmpty(task.getTarget())){
xxxxService.save(task.getTarget())
}
}catch (BaseImportException e){
errHandle(task);
}
}
}
至此開發部分結束,任務執行器會自動調度拉取CATE_BATCH_PUBLISH類型的任務 -> 解析到List<Bean> -> 調用你的run()方法實現落庫 -> 將結果流上傳到雲存儲並將結果鏈接更新到任務表中
四、源碼
1. TaskDispatcher - 任務調度派發
/**
* 任務調度派發
*/
@Component
@Slf4j
public class TaskDispatcher {
@Resource
private TaskMangeService taskMangeService;
@Resource
private ApplicationContext applicationContext;
@SneakyThrows
@XxlJob("iscWorker")
public ReturnT<String> iscWorker(String param) {
TaskDTO task = taskMangeService.pullTask();
if(task!=null){
BaseJob executor = getExecutor(task.getTask().getBizType());
if(null!=executor){
executor.of(task).start();
log.info("iscWorker 執行完畢:{} " , JSON.toJSONString(task));
}
}
return ReturnT.SUCCESS;
}
//獲取執行器
public BaseJob getExecutor(TaskBizTypeEnum taskBizType){
Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(JobExecutor.class);
if(beanMap.isEmpty()){
return null;
}
log.info("TaskDispatcher.getExecutor class list:{}" , beanMap.keySet());
for (Map.Entry<String,Object> entry : beanMap.entrySet()) {
try {
JobExecutor ano = AnnotationUtil.getAnnotation(entry.getValue().getClass(), JobExecutor.class);
if(taskBizType.equals(ano.taskBizType()) && entry.getValue() instanceof BaseJob){
log.info("TaskDispatcher.getExecutor 當前任務:{}命中執行策略job:{}" , taskBizType, entry.getValue());
return (BaseJob) entry.getValue();
}
}catch (Exception e){
e.printStackTrace();
}
}
return null;
}
}
2. DisposableAbsJob - 一次性解析任務執行器
/**
* 一次性解析任務執行器,解析規則為一次性解析所有excel記錄,不適用超大excel解析job
* 使用方法: 1.使用方繼承DisposableAbsJob類,並根據需要重寫parse方法(當前默認是按excel解析)
* 2.重寫run方法,將解析好的list<Bean>推入數據庫
*/
@Component
@Slf4j
public abstract class DisposableAbsJob<T extends BaseWorkerDTO> extends BaseExecutableAbsJob<T> {
//自有個性邏輯,默認就是空邏輯
}
3. BatchableAbsJob - 分段解析任務執行器
/**
* 批次解析任務執行器,解析規則為分批解析excel記錄,適用超大excel解析job
* 使用方法: 1.使用方繼承BatchableAbsJob類,重寫saveOrUpdate方法和excel2Po方法,
*/
@Component
@Slf4j
public abstract class BatchableAbsJob<T extends BaseWorkerDTO,K> extends BaseExecutableAbsJob<T> {
/**
* 批次解析邏輯
* @param task
*/
@Override
public void parse(TaskDTO<T> task){
if(TaskCreateTypeEnum.IMPORT.equals(task.getTaskCreateType())){
log.info("BaseExecutableAbsJob.import parse {} ",task.getTaskId());
BaseBatchExcelDataListener<T,K> listener = new BaseBatchExcelDataListener<>(this);
EasyExcel.read(task.getTargetInputFile().getObjectContent(), getTargetClass(), listener).sheet().doRead();
task.setErrDataList(listener.errDataList);
}
}
/** 批次解析結果邏輯,僅導出有問題的記錄(上限100條) */
@Override
public void export(TaskDTO<T> task){
if(task!=null){
log.info("BatchableAbsJob.export {}", task.getTaskId());
if(CollectionUtils.isEmpty(task.getErrDataList())){
taskMangeService.update(new TaskVO(task.getTaskId(), TaskStatusEnum.SUCCESS));
log.info("BatchableAbsJob.export 任務{}全部執行成功" , task.getTaskId());
return;
}
String resultName = task.getFileName() Constant.UNDER_LINE System.currentTimeMillis() ".xlsx";
ByteArrayOutputStream targetOutputStream = new ByteArrayOutputStream();
try (ExcelWriter excelWriter = EasyExcel.write(targetOutputStream).build()) {
if (CollectionUtils.isNotEmpty(task.getErrDataList())) {
excelWriter.write(task.getErrDataList(), EasyExcel.writerSheet(0, "result").head(BatchResultDTO.class).build());
}
task.setEndTime(System.currentTimeMillis());
excelWriter.finish();
try (ByteArrayInputStream inputStream = new ByteArrayInputStream(targetOutputStream.toByteArray())) {
task.setResultUrl(s3Utils.upload(inputStream, FileTypeEnum.BATCH_FILE.getCode(),resultName));
taskMangeService.update(new TaskVO(task.getTaskId(), TaskStatusEnum.SUCCESS, task.getResultUrl()));
}
} catch (Exception e) {
log.error("BaseExecutableAbsJob.export error, target:{} ", task.getTaskId(), e);
throw new TaskExportException(task.getTaskId() e.getMessage());
} finally {
log.info("BaseExecutableAbsJob.export 任務「{}」執行完畢:{},文件地址:{}", task.getTaskId(), task.getOssPutMd5(), task.getResultUrl());
}
}
}
public List<BatchResultDTO> saveOrUpdate(Map<Integer, K> k) {
return null;
}
public Map<Integer,K> excel2Po(Map<Integer, T> excel) {
return null;
}
}
4. BaseExecutableAbsJob - 通用抽象任務執行器
/**
* 通用抽象任務執行器
*/
@Component
@Slf4j
public abstract class BaseExecutableAbsJob<T extends BaseWorkerDTO> implements BaseJob<T> {
@Resource
public S3Utils s3Utils;
@Resource
public TaskMangeService taskMangeService;
public final static String RESULT_FOLDER = "xxx";
@Override
public void accept(TaskDTO<T> task){
//導入類任務
if(TaskCreateTypeEnum.IMPORT.equals(task.getTask().getCreateType())){
task.setTargetInputFile(s3Utils.download(task.getTask().getReqParam()));
task.setFileName(task.getTask().getName());
//導出類任務
}else if(TaskCreateTypeEnum.EXPORT.equals(task.getTask().getCreateType())){
// 方式1. 保存 前臺勾選的記錄id到任務入參中
// 方式2. 根據前臺勾選的查詢條件命中記錄id,再保存到任務入參中<限制總導出記錄數>
String req = task.getTask().getReqParam();
if(StringUtils.isNotBlank(req)){
task.setKey(Arrays.stream(req.split(Constant.COMMA)).map(Long::valueOf).collect(Collectors.toSet()));
}
}
task.setTaskBizTypeEnum(task.getTask().getBizType());
task.setTaskId(task.getTask().getId());
task.setStartTime(System.currentTimeMillis());
//更新任務狀態
taskMangeService.update(new TaskVO(task.getTaskId(),TaskStatusEnum.PROCESSING));
}
/**
* 通用解析邏輯
* @param task
*/
@Override
public void parse(TaskDTO<T> task){
if(TaskCreateTypeEnum.IMPORT.equals(task.getTaskCreateType())){
if(task.getTargetInputFile()!=null && task.getTargetInputFile().getObjectContent()!=null){
List<T> target = EasyExcel.read(task.getTargetInputFile().getObjectContent(), getTargetClass() ,
new PageReadListener<T>(dataList -> {})).sheet(0).headRowNumber(1).doReadSync();
task.setTarget(target);
}
}
}
/**
* 導入通用落庫邏輯/導出構建list<Bean>邏輯
* @param task
*/
@Override
public void run(TaskDTO<T> task){ }
@Override
public void export(TaskDTO<T> task){
if(task!=null){
if(CollectionUtils.isEmpty(task.getTarget())){
taskMangeService.update(new TaskVO(task.getTaskId(), TaskStatusEnum.SUCCESS));
log.info("BaseExecutableAbsJob.export 空任務{},跳過執行" , task.getTaskId());
return;
}
String resultName = RESULT_FOLDER task.getTaskBizTypeEnum().getName() Constant.UNDER_LINE System.currentTimeMillis() ".xlsx";
ByteArrayOutputStream targetOutputStream = new ByteArrayOutputStream();
try (ExcelWriter excelWriter = EasyExcel.write(targetOutputStream).build()) {
if (CollectionUtils.isNotEmpty(task.getTarget())) {
excelWriter.write(task.getTarget(), EasyExcel.writerSheet(0, "result").head(getTargetClass()).build());
}
task.setEndTime(System.currentTimeMillis());
excelWriter.finish();
try (ByteArrayInputStream inputStream = new ByteArrayInputStream(targetOutputStream.toByteArray())) {
task.setResultUrl(s3Utils.upload(inputStream, FileTypeEnum.BATCH_FILE.getCode(),resultName));
taskMangeService.update(new TaskVO(task.getTaskId(), TaskStatusEnum.SUCCESS, task.getResultUrl()));
}
} catch (Exception e) {
log.error("BaseExecutableAbsJob.export error, target:{} ", task.getTaskId(), e);
throw new TaskExportException(task.getTaskId() e.getMessage());
} finally {
log.info("BaseExecutableAbsJob.export 任務「{}」執行完畢:{},文件地址:{}", task.getTaskId(), task.getOssPutMd5(), task.getResultUrl());
}
}
}
@Override
public void errHandle(TaskDTO<T> taskDTO,Exception e){
taskMangeService.errHandle(taskDTO,e.toString());
}
public Class<T> getTargetClass(){
Type res = getClass().getGenericSuperclass();
if(res instanceof ParameterizedType){
ParameterizedType pRes = (ParameterizedType) res;
Type[] type = pRes.getActualTypeArguments();
if(type.length>0){
if(type[0] instanceof Class){
Type typeE = type[0];
return (Class<T>)typeE;
}
}
}
return null;
}
}
5.
BaseBatchExcelDataListener - 批處理excel解析監聽器
/**
* 批處理excel解析監聽器
* @param <T> Excel DTO
* @param <K> 落庫 PO
*/
@Slf4j
public class BaseBatchExcelDataListener<T extends BaseWorkerDTO,K> implements ReadListener<T> {
private static final int BATCH_COUNT = 100;
private static final int MAX_ERROR_COUNT = 100;
/** 業務服務*/
private final BatchableAbsJob<T,K> batchableAbsJob;
/** 每批待處理業務數據*/
private Map<Integer,T> cachedDataList = Maps.newHashMapWithExpectedSize(BATCH_COUNT);
/** 業務處理失敗數據,行號&錯誤報文 */
public List<BatchResultDTO> errDataList = Lists.newArrayListWithExpectedSize(MAX_ERROR_COUNT) ;
public BaseBatchExcelDataListener(BatchableAbsJob<T,K> batchableAbsJob) {
this.batchableAbsJob = batchableAbsJob;
}
@Override
public void invoke(T data, AnalysisContext context) {
cachedDataList.put(context.readRowHolder().getRowIndex(),data);
if (cachedDataList.size() >= BATCH_COUNT) {
saveData();
cachedDataList = Maps.newHashMapWithExpectedSize(BATCH_COUNT);
}
}
@Override
public void doAfterAllAnalysed(AnalysisContext analysisContext) {
saveData();
}
/** 持久化 */
private void saveData() {
Map<Integer, K> po = batchableAbsJob.excel2Po(cachedDataList);
if(po!=null && !po.isEmpty()){
List<BatchResultDTO> errRes = batchableAbsJob.saveOrUpdate(po);
if(errDataList.size()<MAX_ERROR_COUNT && CollectionUtils.isNotEmpty(errRes)){
errDataList.addAll(errRes);
}
}
}
}
6. BaseJob - 任務接口
public interface BaseJob<T> {
void accept(TaskDTO<T> task);
void parse(TaskDTO<T> task);
void run(TaskDTO<T> task);
void export(TaskDTO<T> task);
void errHandle(TaskDTO<T> task,Exception e);
default AbsExecutor<Void> of(TaskDTO<T> task){
return () -> {
try {
accept(task);
try {
parse(task);
}finally {
if(task.getTargetInputFile()!=null){
task.getTargetInputFile().close();
}
}
run(task);
export(task);
}catch (Exception e){
errHandle(task,e);
}
return null;
};
}
}
7. JobExecutor- 策略註解
/**
* 任務執行器
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface JobExecutor {
//任務業務類型
TaskBizTypeEnum taskBizType() ;
}
8. TaskMangeService- 任務執行類
/**
* 任務讀寫服務
*/
@Service
@Slf4j
public class TaskMangeServiceImpl extends BaseManageSupportService<TaskVO, TaskPO> implements TaskMangeService {
private final static Integer MAX_ERR_CNT = 2;
private final static Long LIMIT = 1L;
@Override
public TaskPO saveOrUpdate(TaskVO taskVO) {
return taskService.save(input);
}
@Override
public Page<TaskPO> hashList(TaskReqVO taskReqVO) {
Page<TaskPO> page = Page.of(taskReqVO.getIndex(), taskReqVO.getSize());
LambdaQueryWrapper<TaskPO> wrapper = Wrappers.<TaskPO>lambdaQuery()
.in(CollectionUtils.isNotEmpty(taskReqVO.getStatus()), TaskPO::getStatus, taskReqVO.getStatus())
.eq(taskReqVO.getBizType() != null, TaskPO::getBizType, taskReqVO.getBizType())
.le(taskReqVO.getErrCnt() != null, TaskPO::getErrCnt, taskReqVO.getErrCnt())
.apply("mod(id," taskReqVO.getShardTotal() ") =" taskReqVO.getShardIndex() " ")
.orderByAsc(TaskPO::getCreateTime);
return taskService.page(page, wrapper);
}
private TaskVO getTask(String fileName,String pin, String key,TaskBizTypeEnum bizType,TaskCreateTypeEnum taskCreateType){
// build task
return res;
}
@Override
public TaskDTO pullTask(){
TaskDTO target = null;
ShardingUtil.ShardingVO shardingVo = ShardingUtil.getShardingVo();
log.info("iscWorker.pullTask workerIndex: {}, total:{}" , shardingVo.getIndex(),shardingVo.getTotal());
TaskReqVO queryDTO = new TaskReqVO();
queryDTO.setShardIndex(shardingVo.getIndex());
queryDTO.setShardTotal(shardingVo.getTotal());
queryDTO.setStatus(Lists.newArrayList(TaskStatusEnum.PENDING,TaskStatusEnum.FAILURE));
queryDTO.setErrCnt(MAX_ERR_CNT);
queryDTO.setIndex(0L);
queryDTO.setSize(LIMIT);
Page<TaskPO> targetList = hashList(queryDTO);
if(CollectionUtils.isNotEmpty(targetList.getRecords())){
log.info("PublishMkuBySkuWorker.pullTask 準備執行:{}" , JSON.toJSONString(targetList));
target = new TaskDTO<>(targetList.getRecords().get(0));
}
return target;
}
@Override
public Boolean error(TaskVO taskInfo) {
return task.update(taskInfo);
}
/** 失敗處理*/
@Override
public void errHandle(TaskDTO task, String errMsg){
error(new TaskVO(task.getTaskId()));
Profiler.businessAlarm(UmpKeyConstant.BUSINESS_KEY_TASK_WARNING,("excel批量導入-任務執行異常:" errMsg task.getTaskId()));
log.info("TaskMangeServiceImpl.errHandle 任務Id{}執行失敗:{}", task.getTaskId(),errMsg);
}
}
五、類圖
![](https://news.xinpengboligang.com/upload/keji/05b6603a381c85d255dd6165fa2589cf.jpeg)
圖2 類圖
作者:京東工業 於洋
來源:京東雲開發者社區 轉載請註明來源