簡易異步任務中心&批量導入技術處理方案

2024年2月6日 20点热度 0人点赞

一、解決什麼問題

一個任務中心技術實現的參考案例,可以快速部署實現且僅需關註業務個性落庫邏輯實現,其他如任務狀態維護、數據解析及異常包裝、結果導出均由工具自動實現。

二、基本原理

圖1 請求示意圖

異步任務中心共分三個模塊:

1)任務初始化, 將目標導入文件上傳至雲存儲後得到目標文件url按任務類型(如類目導入、商品導入等)入庫任務表並返回前臺提交成功,任務初始狀態為"待處理";

2)任務調度,使用開源調度組件xxlJob開箱即用。傳送門: xxlJob

3)任務Worker執行器核心組成:

1.任務並行分片拉取

分片廣播模式下,每個worker按index取模 獲取應執行的任務id,參考sql :

from task where status in ('PENDING','FAILURE') and errCnt <= MAX_RETRY_CNT and mod(id,#總worker數量) = 當前worker index

2.根據任務類型命中執行器策略

任務類型: 即導入業務的枚舉字段,如類目導入CATE_IMPORT、商品導入PRODUCT_IMPORT等

業務執行器: 執行excel批量導入解析落庫的載體,下文介紹。

策略如何命中: 業務執行器class類增加@JobExecutor註解並指明註解值為對應任務類型; 拉取任務後尋找有@JobExecutor修飾的類且其註解值等於任務記錄任務類型即為命中目標執行器

3.執行器設計

A.抽象任務接口並定義行為 -> BaseJob<T>

accept() 接受任務,實現後置任務狀態為"處理中"

parse() 解析任務, 負責解析目標文件(zip、xlsx)為List<Bean>,並實現數據校驗

run() 將業務數據List<Bean>數據落庫

export() 生成導入結果文件,上傳至雲存儲並更新到任務記錄結果列

errHandle() 異常處理,置任務狀態為"失敗",累計任務失敗次數,觸發業務報警

B.基礎抽象實現類 -> BaseExecutableAbsJob implements BaseJob

accept()export()errHandle() 步驟因其業務無關性故在此抽象類中做通用默認實現;

parse() 有一定通用性,默認實現為excel解析(easyExcel實現)

run() 業務相關不做默認實現,由繼承方實現

C.一次性解析抽象實現 -> DisposableAbsJob extends BaseExecutableAbsJob

特征:

解析規則為一次性解析excel所有記錄,不適用超大excel解析job

可以在落庫前獲得全部業務實體信息

導出結果可以顯示原始輸入

D.分批解析通用實現類 -> BatchableAbsJob extends BaseExecutableAbsJob

特征:

解析規則為按BATCH_CNT來分段操作數據解析及入庫,適用於大excel導入場景的使用

解析完畢前拿不到記錄總數

導出結果不顯示原始輸入,僅顯示MAX_ERROR_CNT數量以內的錯誤記錄原始信息及錯誤信息。

三、快速使用

業務類按場景選擇繼承DisposableAbsJob或BatchableAbsJob,

僅需重寫落庫方法,其他如拉取、解析、導出結果步驟均由系統自動執行。如需特殊解析邏輯(比如解析zip按特定規則拼裝bean)重寫parse()方法即可

舉個栗子,現需求場景為批量類目信息導入, 則開發過程為:

步驟一 : 落庫任務類型為
TaskBizTypeEnum.CATE_BATCH_PUBLISH的記錄到任務表中,並記錄前臺上傳的excel導入文件url(常規crud本案例不做封裝,自行實現即可)

步驟二 : 定義類目Excel導入實體Bean

/**
 * 類目導入實體
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class ImportCateExcelDTO extends BaseWorkerDTO {
    /** 類目級別*/
    @ExcelProperty(index = 0,converter = CateLevelConverter.class,value = "類目級別")
    private Integer cateLevel;
    /** 類目中文名*/
    @ExcelProperty(index = 1 ,value = "類目中文名")
    private String cateName;
    /** 類目排序*/
    @ExcelProperty(index = 2 ,value = "類目排序")
    private Integer sort;
    /** 上級類目id*/
    @ExcelProperty(index = 3 ,value = "上級管理類目id")
    private Long parentCateId;
    /** 狀態*/
    @ExcelProperty(index = 4,converter = StatusConverter.class ,value = "狀態")
    private Integer status;
}

步驟三 : 編寫業務實現類,並自行實現run落庫方法.

/**
 * 類目批量導入(一次性解析全部excel)
 */
@Service
@Slf4j
@JobExecutor(taskBizType = TaskBizTypeEnum.CATE_BATCH_PUBLISH)  // 策略註解,枚舉類型全局唯一。 不加該註解則任務調度找不到策略
public class DisposableCateImportHandler extends DisposableAbsJob<ImportCateExcelDTO> {
    @Resource
    private XXXXService xxxxService;
    @Override
    public void run(TaskDTO<ImportCategoryExcelDTO> task){
        try{ 
           if(CollectionUtils.isNotEmpty(task.getTarget())){ 
               xxxxService.save(task.getTarget()) 
           }
        }catch (BaseImportException e){
            errHandle(task);
        }
    }
}

至此開發部分結束,任務執行器會自動調度拉取CATE_BATCH_PUBLISH類型的任務 -> 解析到List<Bean> -> 調用你的run()方法實現落庫 -> 將結果流上傳到雲存儲並將結果鏈接更新到任務表中

四、源碼

1. TaskDispatcher - 任務調度派發

/**
 * 任務調度派發
 */
@Component
@Slf4j
public class TaskDispatcher {
    @Resource
    private TaskMangeService taskMangeService;
    @Resource
    private ApplicationContext applicationContext;
    @SneakyThrows
    @XxlJob("iscWorker")
    public ReturnT<String> iscWorker(String param) {
        TaskDTO task = taskMangeService.pullTask();
        if(task!=null){
            BaseJob executor = getExecutor(task.getTask().getBizType());
            if(null!=executor){
                executor.of(task).start();
                log.info("iscWorker 執行完畢:{} " , JSON.toJSONString(task));
            }
        }
        return ReturnT.SUCCESS;
    }
    //獲取執行器
    public BaseJob getExecutor(TaskBizTypeEnum taskBizType){
        Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(JobExecutor.class);
        if(beanMap.isEmpty()){
            return null;
        }
        log.info("TaskDispatcher.getExecutor class list:{}" , beanMap.keySet());
        for (Map.Entry<String,Object> entry : beanMap.entrySet()) {
            try {
                JobExecutor ano = AnnotationUtil.getAnnotation(entry.getValue().getClass(), JobExecutor.class);
                if(taskBizType.equals(ano.taskBizType()) && entry.getValue() instanceof BaseJob){
                    log.info("TaskDispatcher.getExecutor 當前任務:{}命中執行策略job:{}" , taskBizType, entry.getValue());
                    return (BaseJob) entry.getValue();
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        return null;
    }
}

2. DisposableAbsJob - 一次性解析任務執行器

/**
 *  一次性解析任務執行器,解析規則為一次性解析所有excel記錄,不適用超大excel解析job
 *  使用方法: 1.使用方繼承DisposableAbsJob類,並根據需要重寫parse方法(當前默認是按excel解析)
 *           2.重寫run方法,將解析好的list<Bean>推入數據庫
 */
@Component
@Slf4j
public abstract class DisposableAbsJob<T extends BaseWorkerDTO> extends BaseExecutableAbsJob<T> {
        //自有個性邏輯,默認就是空邏輯
}

3. BatchableAbsJob - 分段解析任務執行器

/**
 *  批次解析任務執行器,解析規則為分批解析excel記錄,適用超大excel解析job
 *  使用方法: 1.使用方繼承BatchableAbsJob類,重寫saveOrUpdate方法和excel2Po方法,
 */
@Component
@Slf4j
public abstract class BatchableAbsJob<T extends BaseWorkerDTO,K> extends BaseExecutableAbsJob<T> {
    /**
     * 批次解析邏輯
     * @param task
     */
    @Override
    public void parse(TaskDTO<T> task){
        if(TaskCreateTypeEnum.IMPORT.equals(task.getTaskCreateType())){
            log.info("BaseExecutableAbsJob.import parse {} ",task.getTaskId());
            BaseBatchExcelDataListener<T,K> listener = new BaseBatchExcelDataListener<>(this);
            EasyExcel.read(task.getTargetInputFile().getObjectContent(), getTargetClass(), listener).sheet().doRead();
            task.setErrDataList(listener.errDataList);
        }
    }
    /** 批次解析結果邏輯,僅導出有問題的記錄(上限100條) */
    @Override
    public void export(TaskDTO<T> task){
        if(task!=null){
            log.info("BatchableAbsJob.export {}", task.getTaskId());
            if(CollectionUtils.isEmpty(task.getErrDataList())){
                taskMangeService.update(new TaskVO(task.getTaskId(), TaskStatusEnum.SUCCESS));
                log.info("BatchableAbsJob.export 任務{}全部執行成功" , task.getTaskId());
                return;
            }
            String resultName = task.getFileName()   Constant.UNDER_LINE   System.currentTimeMillis()   ".xlsx";
            ByteArrayOutputStream targetOutputStream = new ByteArrayOutputStream();
            try (ExcelWriter excelWriter = EasyExcel.write(targetOutputStream).build()) {
                if (CollectionUtils.isNotEmpty(task.getErrDataList())) {
                    excelWriter.write(task.getErrDataList(), EasyExcel.writerSheet(0, "result").head(BatchResultDTO.class).build());
                }
                task.setEndTime(System.currentTimeMillis());
                excelWriter.finish();
                try (ByteArrayInputStream inputStream = new ByteArrayInputStream(targetOutputStream.toByteArray())) {
                    task.setResultUrl(s3Utils.upload(inputStream, FileTypeEnum.BATCH_FILE.getCode(),resultName));
                    taskMangeService.update(new TaskVO(task.getTaskId(), TaskStatusEnum.SUCCESS, task.getResultUrl()));
                }
            } catch (Exception e) {
                log.error("BaseExecutableAbsJob.export error, target:{} ", task.getTaskId(), e);
                throw new TaskExportException(task.getTaskId()   e.getMessage());
            } finally {
                log.info("BaseExecutableAbsJob.export 任務「{}」執行完畢:{},文件地址:{}", task.getTaskId(), task.getOssPutMd5(), task.getResultUrl());
            }
        }
    }
    public List<BatchResultDTO> saveOrUpdate(Map<Integer, K> k) {
        return null;
    }
    public Map<Integer,K> excel2Po(Map<Integer, T> excel) {
        return null;
    }
}

4. BaseExecutableAbsJob - 通用抽象任務執行器

/**
 * 通用抽象任務執行器
 */
@Component
@Slf4j
public abstract class BaseExecutableAbsJob<T extends BaseWorkerDTO> implements BaseJob<T> {
    @Resource
    public S3Utils s3Utils;
    @Resource
    public TaskMangeService taskMangeService;
    public final static String RESULT_FOLDER = "xxx";
    @Override
    public void accept(TaskDTO<T> task){
        //導入類任務
        if(TaskCreateTypeEnum.IMPORT.equals(task.getTask().getCreateType())){
            task.setTargetInputFile(s3Utils.download(task.getTask().getReqParam()));
            task.setFileName(task.getTask().getName());
        //導出類任務
        }else if(TaskCreateTypeEnum.EXPORT.equals(task.getTask().getCreateType())){
            // 方式1. 保存 前臺勾選的記錄id到任務入參中
            // 方式2. 根據前臺勾選的查詢條件命中記錄id,再保存到任務入參中<限制總導出記錄數>
            String req = task.getTask().getReqParam();
            if(StringUtils.isNotBlank(req)){
                task.setKey(Arrays.stream(req.split(Constant.COMMA)).map(Long::valueOf).collect(Collectors.toSet()));
            }
        }
        task.setTaskBizTypeEnum(task.getTask().getBizType());
        task.setTaskId(task.getTask().getId());
        task.setStartTime(System.currentTimeMillis());
        //更新任務狀態
        taskMangeService.update(new TaskVO(task.getTaskId(),TaskStatusEnum.PROCESSING));
    }
    /**
     * 通用解析邏輯
     * @param task
     */
    @Override
    public void parse(TaskDTO<T> task){
        if(TaskCreateTypeEnum.IMPORT.equals(task.getTaskCreateType())){
            if(task.getTargetInputFile()!=null && task.getTargetInputFile().getObjectContent()!=null){
                List<T> target = EasyExcel.read(task.getTargetInputFile().getObjectContent(), getTargetClass() ,
                        new PageReadListener<T>(dataList -> {})).sheet(0).headRowNumber(1).doReadSync();
                task.setTarget(target);
            }
        }
    }
    /**
     * 導入通用落庫邏輯/導出構建list<Bean>邏輯
     * @param task
     */
    @Override
    public void run(TaskDTO<T> task){ }
    @Override
    public void export(TaskDTO<T> task){
        if(task!=null){
            if(CollectionUtils.isEmpty(task.getTarget())){
                taskMangeService.update(new TaskVO(task.getTaskId(), TaskStatusEnum.SUCCESS));
                log.info("BaseExecutableAbsJob.export 空任務{},跳過執行" , task.getTaskId());
                return;
            }
            String resultName = RESULT_FOLDER   task.getTaskBizTypeEnum().getName()   Constant.UNDER_LINE   System.currentTimeMillis()   ".xlsx";
            ByteArrayOutputStream targetOutputStream = new ByteArrayOutputStream();
            try (ExcelWriter excelWriter = EasyExcel.write(targetOutputStream).build()) {
                if (CollectionUtils.isNotEmpty(task.getTarget())) {
                    excelWriter.write(task.getTarget(), EasyExcel.writerSheet(0, "result").head(getTargetClass()).build());
                }
                task.setEndTime(System.currentTimeMillis());
                excelWriter.finish();
                try (ByteArrayInputStream inputStream = new ByteArrayInputStream(targetOutputStream.toByteArray())) {
                    task.setResultUrl(s3Utils.upload(inputStream, FileTypeEnum.BATCH_FILE.getCode(),resultName));
                    taskMangeService.update(new TaskVO(task.getTaskId(), TaskStatusEnum.SUCCESS, task.getResultUrl()));
                }
            } catch (Exception e) {
                log.error("BaseExecutableAbsJob.export error, target:{} ", task.getTaskId(), e);
                throw new TaskExportException(task.getTaskId()   e.getMessage());
            } finally {
                log.info("BaseExecutableAbsJob.export 任務「{}」執行完畢:{},文件地址:{}", task.getTaskId(), task.getOssPutMd5(), task.getResultUrl());
            }
        }
    }
    @Override
    public void errHandle(TaskDTO<T> taskDTO,Exception e){
        taskMangeService.errHandle(taskDTO,e.toString());
    }
    public Class<T> getTargetClass(){
            Type res = getClass().getGenericSuperclass();
            if(res instanceof ParameterizedType){
                ParameterizedType pRes = (ParameterizedType) res;
                Type[] type = pRes.getActualTypeArguments();
                if(type.length>0){
                    if(type[0] instanceof Class){
                        Type typeE = type[0];
                        return (Class<T>)typeE;
                    }
                }
            }
        return null;
    }
}

5.
BaseBatchExcelDataListener - 批處理excel解析監聽器

/**
 * 批處理excel解析監聽器
 * @param <T> Excel DTO
 * @param <K> 落庫 PO
 */
@Slf4j
public class BaseBatchExcelDataListener<T extends BaseWorkerDTO,K> implements ReadListener<T> {
    private static final int BATCH_COUNT = 100;
    private static final int MAX_ERROR_COUNT = 100;
    /** 業務服務*/
    private final BatchableAbsJob<T,K> batchableAbsJob;
    /** 每批待處理業務數據*/
    private Map<Integer,T> cachedDataList = Maps.newHashMapWithExpectedSize(BATCH_COUNT);
    /** 業務處理失敗數據,行號&錯誤報文 */
    public List<BatchResultDTO> errDataList = Lists.newArrayListWithExpectedSize(MAX_ERROR_COUNT) ;
    public BaseBatchExcelDataListener(BatchableAbsJob<T,K> batchableAbsJob) {
        this.batchableAbsJob = batchableAbsJob;
    }
    @Override
    public void invoke(T data, AnalysisContext context) {
        cachedDataList.put(context.readRowHolder().getRowIndex(),data);
        if (cachedDataList.size() >= BATCH_COUNT) {
            saveData();
            cachedDataList = Maps.newHashMapWithExpectedSize(BATCH_COUNT);
        }
    }
    @Override
    public void doAfterAllAnalysed(AnalysisContext analysisContext) {
        saveData();
    }
    /** 持久化 */
    private void saveData() {
        Map<Integer, K> po = batchableAbsJob.excel2Po(cachedDataList);
        if(po!=null && !po.isEmpty()){
            List<BatchResultDTO> errRes = batchableAbsJob.saveOrUpdate(po);
            if(errDataList.size()<MAX_ERROR_COUNT && CollectionUtils.isNotEmpty(errRes)){
                errDataList.addAll(errRes);
            }
        }
    }
}

6. BaseJob - 任務接口

public interface BaseJob<T> {
    void accept(TaskDTO<T> task);
    void parse(TaskDTO<T> task);
    void run(TaskDTO<T> task);
    void export(TaskDTO<T> task);
    void errHandle(TaskDTO<T> task,Exception e);
    default AbsExecutor<Void> of(TaskDTO<T> task){
        return () -> {
            try {
                accept(task);
                try {
                    parse(task);
                }finally {
                    if(task.getTargetInputFile()!=null){
                        task.getTargetInputFile().close();
                    }
                }
                run(task);
                export(task);
            }catch (Exception e){
                errHandle(task,e);
            }
            return null;
        };
    }
}

7. JobExecutor- 策略註解

/**
 * 任務執行器
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface JobExecutor {
    //任務業務類型
    TaskBizTypeEnum taskBizType() ;
}

8. TaskMangeService- 任務執行類

/**
 * 任務讀寫服務
 */
@Service
@Slf4j
public class TaskMangeServiceImpl extends BaseManageSupportService<TaskVO, TaskPO> implements TaskMangeService {
    private final static Integer MAX_ERR_CNT = 2;
    private final static Long LIMIT = 1L;
    @Override
    public TaskPO saveOrUpdate(TaskVO taskVO) {
        return taskService.save(input);
    }
    @Override
    public Page<TaskPO> hashList(TaskReqVO taskReqVO) {
        Page<TaskPO> page = Page.of(taskReqVO.getIndex(), taskReqVO.getSize());
        LambdaQueryWrapper<TaskPO> wrapper = Wrappers.<TaskPO>lambdaQuery()
                .in(CollectionUtils.isNotEmpty(taskReqVO.getStatus()), TaskPO::getStatus, taskReqVO.getStatus())
                .eq(taskReqVO.getBizType() != null, TaskPO::getBizType, taskReqVO.getBizType())
                .le(taskReqVO.getErrCnt() != null, TaskPO::getErrCnt, taskReqVO.getErrCnt())
                .apply("mod(id,"   taskReqVO.getShardTotal()   ") ="   taskReqVO.getShardIndex()   " ")
                .orderByAsc(TaskPO::getCreateTime);
        return taskService.page(page, wrapper);
    }
    private TaskVO getTask(String fileName,String pin, String key,TaskBizTypeEnum bizType,TaskCreateTypeEnum taskCreateType){
        // build task
        return res;
    }
    @Override
    public TaskDTO pullTask(){
        TaskDTO target = null;
        ShardingUtil.ShardingVO shardingVo = ShardingUtil.getShardingVo();
        log.info("iscWorker.pullTask workerIndex: {}, total:{}" ,  shardingVo.getIndex(),shardingVo.getTotal());
        TaskReqVO queryDTO = new TaskReqVO();
        queryDTO.setShardIndex(shardingVo.getIndex());
        queryDTO.setShardTotal(shardingVo.getTotal());
        queryDTO.setStatus(Lists.newArrayList(TaskStatusEnum.PENDING,TaskStatusEnum.FAILURE));
        queryDTO.setErrCnt(MAX_ERR_CNT);
        queryDTO.setIndex(0L);
        queryDTO.setSize(LIMIT);
        Page<TaskPO> targetList = hashList(queryDTO);
        if(CollectionUtils.isNotEmpty(targetList.getRecords())){
            log.info("PublishMkuBySkuWorker.pullTask 準備執行:{}" , JSON.toJSONString(targetList));
            target = new TaskDTO<>(targetList.getRecords().get(0));
        }
        return target;
    }
    @Override
    public Boolean error(TaskVO taskInfo) {
        return task.update(taskInfo);
    }
    /** 失敗處理*/
    @Override
    public void errHandle(TaskDTO task, String errMsg){
        error(new TaskVO(task.getTaskId()));
        Profiler.businessAlarm(UmpKeyConstant.BUSINESS_KEY_TASK_WARNING,("excel批量導入-任務執行異常:" errMsg task.getTaskId()));
        log.info("TaskMangeServiceImpl.errHandle 任務Id{}執行失敗:{}", task.getTaskId(),errMsg);
    }
}

五、類圖

圖2 類圖

作者:京東工業 於洋

來源:京東雲開發者社區 轉載請註明來源