回答这个问题可以分三个方面,首先第一点要保证producer消息能达到broker,其次broker收到对应的消息要确保持久化磁盘,最后需要确保consumer能成功消费这条消息;
针对producer:
我们知道producer发送消息可以异步也可以同步,我们需要将其改成同步发送,这样就能知道消息发送的结果,且我们可以添加一个监听器,监听消息发送的结果,如果发送失败可以在回调方法中进行消息重试;另外如果因为网络问题或者broker发生了故障,producer提供了retries机制,可以自动重试
针对broker:
对于broker而言,批量异步刷盘是kafka为了提高吞出量的一个机制,它没有同步刷盘的机制;那么针对这个问题,需要Partition的副本机制,acks机制来解决。Partition的副本机制是针对每个数据分区的高可用策略,每个副本集会包含唯一的一个leader和多个follower,leader负责处理事务类型的请求,follower负责同步leader的数据。
而Kafka提供了一个acks的参数,Producer可以设置这个参数,去结合broker的副本机制来共同保障数据的可靠性。
- acks=0 表示producer不需要等待broker的响应,就认为消息发送成功了(可能存在数据丢失)
- acks=1 表示broker的leader和Partition收到消息之后,不等待其他的follower Partition的同步就给Producer发一个确认,假设leader和Partition挂了(可能存在数据丢失)
- acks=-1 表示broker的leader和Partition收到消息之后 并且等待 ISR列表中的follower同步完成,再给Producer返回一个确认(保证数据不丢失)
针对consumer:
其实只要Producer和Broker的消息能得到保证,消费端不太可能出现不能消费的问题
除非Consumer没有消费完这个消息就提交了offset,这个我们可以重新调整offset的值,来实现重新消费