04 xxl-job如何实现任务的分片
xxl-job 实现了分片任务的功能,可以将一个任务分片执行,每个执行器运行其中的一片任务。
实现分片任务的原理是:任务注册时,可以设置分片参数,指定任务分片的总数和当前执行器的分片序号。
在任务执行器获取任务时,通过判断当前执行器的分片序号来决定是否执行该任务。
执行器只会执行当前分片序号符合的任务片段,实现任务的分片执行
示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| @Slf4j @Component public class ShardingTaskDemo {
@Autowired private DataService dataService;
@XxlJob("shardingDataJob") public ReturnT<String> shardingDataJob(String param) { int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal();
RangeParam range = JSON.parseObject(param, RangeParam.class);
Pair<Integer, Integer> shardRange = calculateRange(range.getStartId(), range.getEndId(), shardIndex, shardTotal);
processDataByBatch(shardRange.getLeft(), shardRange.getRight(), 1000);
return ReturnT.SUCCESS; }
private Pair<Integer, Integer> calculateRange(int startId, int endId, int currentShard, int totalShards) { int totalData = endId - startId + 1; int step = totalData / totalShards; int remainder = totalData % totalShards;
int rangeStart = startId + currentShard * step; int rangeEnd = (currentShard == totalShards - 1) ? (rangeStart + step + remainder - 1) : (rangeStart + step - 1);
return Pair.of(rangeStart, rangeEnd); }
private void processDataByBatch(int startId, int endId, int batchSize) { int current = startId; while (current <= endId) { int batchEnd = Math.min(current + batchSize - 1, endId); List<Data> batchData = dataService.queryDataByIdRange(current, batchEnd); dataService.processBatch(batchData); XxlJobHelper.log("处理分片数据:{} - {}", current, batchEnd); current = batchEnd + 1; } }
@Data @NoArgsConstructor @AllArgsConstructor public static class RangeParam { private int startId; private int endId; } }
|
需要注意业务逻辑的幂等处理,避免重复或失败重试导致数据幂等性问题