06 结合kafka的exactly-once说明如何实现一个无锁的幂等方案
在佣金系统的价保消息改造中,您提到兼容新旧消息并采用Redis分布式锁防重消费。若消费者集群规模扩大到100个节点,Redis锁可能成为性能瓶颈,如何设计无锁的幂等性方案?请结合消息的重复投递机制(如Kafka的exactly-once)说明?
无锁幂等方案设计
1 2 3 4 5 6
| CREATE TABLE message_processed ( id BIGINT AUTO_INCREMENT, biz_id VARCHAR(64) NOT NULL COMMENT '订单ID+操作类型', msg_id VARCHAR(64) NOT NULL, UNIQUE KEY uk_biz_msg (biz_id, msg_id) );
|
java操作流程
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void processMessage(Message msg) { String bizId = msg.getOrderId() + "_" + msg.getActionType(); try { jdbcTemplate.update("INSERT INTO message_processed (biz_id, msg_id) VALUES (?, ?)", bizId, msg.getId()); applyPriceProtection(msg.getOrderId()); } catch (DuplicateKeyException e) { log.info("Message already processed: {}", msg.getId()); } }
|
优势:
- 无锁:依赖数据库唯一约束,天然幂等。
- 业务语义明确:biz_id可结合订单ID和操作类型(如”价保申请”),避免不同业务动作的ID冲突
Kafka的exactly-once语义为消息不会丢失,也不会重复,只会被处理一次;
借助kafka的事务机制来实现
前提条件,配置如下:
1 2 3 4 5
| # 生产者配置 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "price-protection-producer"); # 消费者配置 props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
|
然后业务代码中,在业务处理代码前需要先开启事务,业务处理后再提交偏移量,最后再提交事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton("price-protection-topic"));
KafkaProducer producer = new KafkaProducer<>(props); producer.initTransactions();
while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); try { producer.beginTransaction();
for (ConsumerRecord record : records) { applyPriceProtection(record.key());
ProducerRecord<String, String> outputRecord = new ProducerRecord<>("commission-result-topic", record.key(), "SUCCESS"); producer.send(outputRecord); }
producer.sendOffsetsToTransaction(offsets, "price-protection-consumer-group"); producer.commitTransaction(); } catch (Exception e) { producer.abortTransaction(); } }
|
核心机制:
- 生产者幂等性:通过transactional.id保证单个生产者发送的消息Exactly-Once。
- 事务性跨分区写:业务处理与消息发送到下游Topic的原子性。
- 消费者位移事务提交:消费位移的提交与业务处理结果绑定,避免重复消费。