04 xxl-job如何实现任务的分片

vvEcho 2025-02-27 18:44:56
Categories: Tags:

xxl-job 实现了分片任务的功能,可以将一个任务分片执行,每个执行器运行其中的一片任务。

实现分片任务的原理是:任务注册时,可以设置分片参数,指定任务分片的总数和当前执行器的分片序号。

在任务执行器获取任务时,通过判断当前执行器的分片序号来决定是否执行该任务。

执行器只会执行当前分片序号符合的任务片段,实现任务的分片执行

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@Slf4j
@Component
public class ShardingTaskDemo {

@Autowired
private DataService dataService;

/**
* 分片广播任务示例:处理100万条数据
*/
@XxlJob("shardingDataJob")
public ReturnT<String> shardingDataJob(String param) {
// 获取分片参数
int shardIndex = XxlJobHelper.getShardIndex(); // 当前分片序号(从0开始)
int shardTotal = XxlJobHelper.getShardTotal(); // 总分片数(等于执行器数量)

// 解析参数中的总数据范围(假设参数为JSON格式:{"startId":1, "endId":1000000})
RangeParam range = JSON.parseObject(param, RangeParam.class);

// 计算当前分片处理的数据范围
Pair<Integer, Integer> shardRange = calculateRange(range.getStartId(), range.getEndId(), shardIndex, shardTotal);

// 分批次处理数据(每次处理1000条)
processDataByBatch(shardRange.getLeft(), shardRange.getRight(), 1000);

return ReturnT.SUCCESS;
}

/**
* 分片范围计算逻辑
* 示例:总数据100万,分片数3 → 分片0处理1-333333,分片1处理333334-666666,分片2处理666667-1000000
*/
private Pair<Integer, Integer> calculateRange(int startId, int endId, int currentShard, int totalShards) {
int totalData = endId - startId + 1;
int step = totalData / totalShards;
int remainder = totalData % totalShards;

int rangeStart = startId + currentShard * step;
int rangeEnd = (currentShard == totalShards - 1) ? (rangeStart + step + remainder - 1) : (rangeStart + step - 1);

return Pair.of(rangeStart, rangeEnd);
}

/**
* 分批次处理数据
*/
private void processDataByBatch(int startId, int endId, int batchSize) {
int current = startId;
while (current <= endId) {
int batchEnd = Math.min(current + batchSize - 1, endId);
List<Data> batchData = dataService.queryDataByIdRange(current, batchEnd);
dataService.processBatch(batchData); // 实际业务处理
XxlJobHelper.log("处理分片数据:{} - {}", current, batchEnd);
current = batchEnd + 1;
}
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class RangeParam {
private int startId;
private int endId;
}
}

需要注意业务逻辑的幂等处理,避免重复或失败重试导致数据幂等性问题