5 分布式事务怎么做的
vvEcho
2026-03-13 10:45:07
常见的分布式有以下几种方案
1.2pc 2.TCC 3.本地消息表 4.消息的最终一致性
一般国内用seata来解决,但是其太重了;很多使用本地事务+消息表来解决
rocketMq事务消息
还有使用rocketMq 提供了事务消息
流程如下:
Producer
│
│ ①发送半消息
▼
Broker (消息不可消费)
│
│ ②执行本地事务
▼
Producer
│
│ ③提交 or 回滚
▼
Broker
│
│ commit -> 投递消费者
│ rollback -> 删除消息
▼
Consumer
RocketMQ 事务消息通过 Half Message + 本地事务 + Broker 回查机制实现最终一致性。Producer 先发送半消息到 Broker,此时消息不会被消费。随后执行本地事务,如果成功则向 Broker 提交 commit,消息才会被消费者消费;如果失败则 rollback 删除消息。如果 Producer 在提交前宕机,Broker 会通过事务回查机制主动询问 Producer 的事务状态,从而最终确定消息是提交还是回滚
java代码演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
|
@Service public class OrderService {
@Autowired private RocketMQTemplate rocketMQTemplate;
public void createOrder(String orderId) { Message<String> msg = MessageBuilder .withPayload(orderId) .build(); rocketMQTemplate.sendMessageInTransaction( "order_tx_group", "orderTopic", msg, orderId ); } }
@RocketMQTransactionListener(txProducerGroup = "order_tx_group") public class OrderTxListener implements RocketMQLocalTransactionListener {
@Autowired private OrderRepository orderRepository;
@Override public RocketMQLocalTransactionState executeLocalTransaction( Message msg, Object arg) {
String orderId = (String) arg; try {
Order order = new Order(orderId); orderRepository.save(order); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } }
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String orderId = new String((byte[]) msg.getPayload()); boolean exists = orderRepository.existsById(orderId); if (exists) { return RocketMQLocalTransactionState.COMMIT; } return RocketMQLocalTransactionState.ROLLBACK; } }
@Service @RocketMQMessageListener( topic = "orderTopic", consumerGroup = "order_consumer_group" ) public class OrderConsumer implements RocketMQListener<String> {
@Override public void onMessage(String orderId) { System.out.println("收到订单消息:" + orderId);
} }
|
以上的事务监听器中,回查本地事务什么时候触发呢?
- 1 Producer 没有返回 commit / rollback
- 2 Half Message超时,默认是transactionTimeout=6s,brocker 6s没有收到 commit 就会去调回查
回查流程
Broker
│
│ 扫描 half message
▼
发现未提交事务
│
│ 发送 checkTransaction 请求
▼
Producer
│
│ 调用 checkLocalTransaction()
▼
返回事务状态
│
▼
Broker
│
commit / rollback
Broker会发送一个 回查请求 RPC 给 Producer。
Producer 收到后执行:checkLocalTransaction();然后返回本地事务状态给broker 然后触发commit / rollback
最后broker commit后就会将这个半消息改为normal消息,这个时候将会把这个消息投递给消费者;如果rollback就会将这个消息删除
为什么RocketMQ不直接修改消息状态,而是重新写入 topic?
因为RocketMQ 的存储基于CommitLog顺序写来实现的,这样做的好处是磁盘顺序写性能极高,反之每次都要去查询对应的消息,然后修改状态浪费磁盘IO;
brocker commit不是修改half message,也不是简单复制,而是“重新写入一条新的消息到真实Topic“–指的是RocketMQ不是把整条half message物理复制,而是读取half message的内容,然后重新构造一条新的normal message写入CommitLog。