06 结合kafka的exactly-once说明如何实现一个无锁的幂等方案

vvEcho 2025-02-19 18:36:44
Categories: > Tags:

在佣金系统的价保消息改造中,您提到兼容新旧消息并采用Redis分布式锁防重消费。若消费者集群规模扩大到100个节点,Redis锁可能成为性能瓶颈,如何设计无锁的幂等性方案?请结合消息的重复投递机制(如Kafka的exactly-once)说明?

无锁幂等方案设计

1
2
3
4
5
6
CREATE TABLE message_processed (
id BIGINT AUTO_INCREMENT,
biz_id VARCHAR(64) NOT NULL COMMENT '订单ID+操作类型', -- 如 "ORDER_12345_APPLY_PRICE_PROTECTION"
msg_id VARCHAR(64) NOT NULL,
UNIQUE KEY uk_biz_msg (biz_id, msg_id) -- 联合唯一索引
);

java操作流程

1
2
3
4
5
6
7
8
9
10
11
12
13
public void processMessage(Message msg) {
String bizId = msg.getOrderId() + "_" + msg.getActionType();
try {
// 插入成功即表示未处理
jdbcTemplate.update("INSERT INTO message_processed (biz_id, msg_id) VALUES (?, ?)",
bizId, msg.getId());
// 执行实际业务逻辑
applyPriceProtection(msg.getOrderId());
} catch (DuplicateKeyException e) {
// 唯一键冲突,直接跳过
log.info("Message already processed: {}", msg.getId());
}
}

优势:

Kafka的exactly-once语义为消息不会丢失,也不会重复,只会被处理一次;‌

借助kafka的事务机制来实现‌

前提条件,配置如下:

1
2
3
4
5
# 生产者配置
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 开启幂等
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "price-protection-producer"); // 事务ID
# 消费者配置
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 只读取已提交的消息

然后业务代码中,在业务处理代码前需要先开启事务,业务处理后再提交偏移量,最后再提交事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton("price-protection-topic"));

// 开启事务
KafkaProducer producer = new KafkaProducer<>(props);
producer.initTransactions();

while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
try {
producer.beginTransaction();

for (ConsumerRecord record : records) {
// 业务处理(幂等)
applyPriceProtection(record.key());

// 将处理结果发送到下游Topic(如佣金计算完成通知)
ProducerRecord<String, String> outputRecord =
new ProducerRecord<>("commission-result-topic", record.key(), "SUCCESS");
producer.send(outputRecord);
}

// 提交事务(同时提交消费位移)
producer.sendOffsetsToTransaction(offsets, "price-protection-consumer-group");
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction(); // 回滚事务,消费位移不提交
}
}

核心机制:‌