5 分布式事务怎么做的

vvEcho 2026-03-13 10:45:07
Categories: Tags:

常见的分布式有以下几种方案
1.2pc 2.TCC 3.本地消息表 4.消息的最终一致性

一般国内用seata来解决,但是其太重了;很多使用本地事务+消息表来解决

rocketMq事务消息

还有使用rocketMq 提供了事务消息
流程如下:
Producer

│ ①发送半消息

Broker (消息不可消费)

│ ②执行本地事务

Producer

│ ③提交 or 回滚

Broker

│ commit -> 投递消费者
│ rollback -> 删除消息

Consumer

RocketMQ 事务消息通过 Half Message + 本地事务 + Broker 回查机制实现最终一致性。Producer 先发送半消息到 Broker,此时消息不会被消费。随后执行本地事务,如果成功则向 Broker 提交 commit,消息才会被消费者消费;如果失败则 rollback 删除消息。如果 Producer 在提交前宕机,Broker 会通过事务回查机制主动询问 Producer 的事务状态,从而最终确定消息是提交还是回滚

java代码演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* 事务消息
*/
@Service
public class OrderService {

@Autowired
private RocketMQTemplate rocketMQTemplate;

public void createOrder(String orderId) {
Message<String> msg = MessageBuilder
.withPayload(orderId)
.build();
// 发送事务消息
rocketMQTemplate.sendMessageInTransaction(
"order_tx_group",
"orderTopic",
msg,
orderId
);
}
}

/**
* 事务监听器
*/
@RocketMQTransactionListener(txProducerGroup = "order_tx_group")
public class OrderTxListener implements RocketMQLocalTransactionListener {

@Autowired
private OrderRepository orderRepository;

/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(
Message msg, Object arg) {

String orderId = (String) arg;
try {

// 本地事务
Order order = new Order(orderId);
orderRepository.save(order);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}

/**
* Broker回查本地事务
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String orderId = new String((byte[]) msg.getPayload());
boolean exists = orderRepository.existsById(orderId);
if (exists) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}

/**
* 消费端
*/
@Service
@RocketMQMessageListener(
topic = "orderTopic",
consumerGroup = "order_consumer_group"
)
public class OrderConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String orderId) {
System.out.println("收到订单消息:" + orderId);

// 扣库存
// 更新缓存
// 通知其他系统
}
}

以上的事务监听器中,回查本地事务什么时候触发呢?

回查流程
Broker

│ 扫描 half message

发现未提交事务

│ 发送 checkTransaction 请求

Producer

│ 调用 checkLocalTransaction()

返回事务状态


Broker

commit / rollback

Broker会发送一个 回查请求 RPC 给 Producer。
Producer 收到后执行:checkLocalTransaction();然后返回本地事务状态给broker 然后触发commit / rollback

最后broker commit后就会将这个半消息改为normal消息,这个时候将会把这个消息投递给消费者;如果rollback就会将这个消息删除

为什么RocketMQ不直接修改消息状态,而是重新写入 topic?

因为RocketMQ 的存储基于CommitLog顺序写来实现的,这样做的好处是磁盘顺序写性能极高,反之每次都要去查询对应的消息,然后修改状态浪费磁盘IO;

brocker commit不是修改half message,也不是简单复制,而是“重新写入一条新的消息到真实Topic“–指的是RocketMQ不是把整条half message物理复制,而是读取half message的内容,然后重新构造一条新的normal message写入CommitLog。