一、消息可靠性投递的核心问题
在分布式系统中,消息的可靠性投递是确保系统数据一致性的关键环节。当我们使用消息队列进行异步通信时,可能会遇到以下三种典型问题:
1. 消息丢失:生产者发送消息到MQ,但MQ未成功接收;或MQ接收后未成功投递给消费者;或消费者处理消息后未正确确认。
2. 消息重复:生产者重试机制导致消息重复发送;消费者处理超时,MQ重新投递;网络抖动导致确认消息丢失。
3. 消息乱序:多个生产者并发发送消息;消费者多线程处理导致顺序错乱。
二、消息投递的三种语义
2.1 At Most Once(最多一次)
消息可能丢失,但不会重复投递。适用于对数据一致性要求不高的场景,如日志收集、统计信息等。
2.2 At Least Once(至少一次)
消息不会丢失,但可能重复投递。这是最常见的模式,需要消费者实现幂等性。
2.3 Exactly Once(恰好一次)
消息不丢失、不重复,保证恰好投递一次。这是最理想的模式,但实现成本较高。
三、RabbitMQ 的可靠性投递机制
3.1 生产者确认机制(Confirm)
RabbitMQ提供了两种确认机制:事务机制和Confirm机制。Confirm机制性能更好,推荐使用。
// 开启Confirm模式
channel.confirmSelect();
// 发送消息
channel.basicPublish(exchange, routingKey, props, message.getBytes());
// 同步等待确认
if (channel.waitForConfirms()) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}
// 异步确认回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
System.out.println("消息投递成功:" + deliveryTag);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
System.out.println("消息投递失败:" + deliveryTag);
}
});
3.2 消息持久化
消息持久化需要同时配置队列持久化和消息持久化:
// 声明持久化队列
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
// 发送持久化消息
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 2表示持久化
.build();
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
3.3 消费者ACK机制
消费者需要手动确认消息处理完成:
// 关闭自动确认,开启手动确认
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
try {
// 处理消息
String message = new String(body, "UTF-8");
System.out.println("收到消息:" + message);
// 手动确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
// 处理失败,拒绝消息(可配置是否重新入队)
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
});
四、RocketMQ 的可靠性投递机制
4.1 同步发送
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg);
System.out.println("发送结果:" + sendResult);
producer.shutdown();
4.2 异步发送
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功:" + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败:" + e.getMessage());
}
});
4.3 顺序消息
// 发送顺序消息
MessageQueueSelector selector = new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
};
producer.send(msg, selector, orderId);
// 消费顺序消息
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("收到顺序消息:" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
4.4 事务消息
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务
boolean success = doLocalTransaction();
return success ? LocalTransactionState.COMMIT_MESSAGE :
LocalTransactionState.ROLLBACK_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务状态
return checkTransactionStatus(msg.getTransactionId());
}
});
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
五、Kafka 的可靠性投递机制
5.1 生产者配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); // 所有副本确认
props.put("retries", 3); // 重试次数
props.put("max.in.flight.requests.per.connection", 1); // 保证顺序
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("发送成功:" + metadata.offset());
}
}
});
producer.close();
5.2 消费者配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 关闭自动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("收到消息:" + record.value());
}
// 手动提交偏移量
consumer.commitSync();
}
} finally {
consumer.close();
}
六、幂等性设计
6.1 数据库唯一索引
CREATE TABLE message_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
message_id VARCHAR(64) NOT NULL UNIQUE,
content TEXT,
status TINYINT DEFAULT 0,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
6.2 Redis 幂等性实现
public boolean checkMessageId(String messageId) {
String key = "message:" + messageId;
return redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofMinutes(30));
}
public void processMessage(String messageId, String content) {
if (!checkMessageId(messageId)) {
throw new RuntimeException("消息重复处理");
}
try {
// 处理业务逻辑
doBusinessLogic(content);
} catch (Exception e) {
// 处理失败,删除幂等记录
redisTemplate.delete("message:" + messageId);
throw e;
}
}
6.3 分布式锁方案
public void processWithLock(String messageId, String content) {
String lockKey = "lock:message:" + messageId;
boolean locked = false;
try {
locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", Duration.ofSeconds(30));
if (!locked) {
throw new RuntimeException("消息正在处理中");
}
// 检查是否已处理
if (isMessageProcessed(messageId)) {
return;
}
// 处理业务
doBusinessLogic(content);
// 标记已处理
markMessageProcessed(messageId);
} finally {
if (locked) {
redisTemplate.delete(lockKey);
}
}
}
七、消息顺序性保证
7.1 RabbitMQ 顺序消息
// 生产者:发送顺序消息
for (int i = 0; i < 10; i++) {
String message = "顺序消息-" + i;
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.messageId(UUID.randomUUID().toString())
.build();
channel.basicPublish("", QUEUE_NAME, props, message.getBytes());
}
// 消费者:单线程消费
channel.basicQos(1); // 每次只处理一条消息
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
try {
String message = new String(body, "UTF-8");
System.out.println("收到消息:" + message);
Thread.sleep(1000); // 模拟处理耗时
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
});
7.2 基于分区的顺序消息
// 生产者:按业务ID路由到同一分区
public void sendOrderMessage(String orderId, String message) {
int partition = Math.abs(orderId.hashCode()) % partitionCount;
ProducerRecord<String, String> record = new ProducerRecord<>(
"order-topic", partition, orderId, message
);
producer.send(record);
}
// 消费者:单线程消费分区
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.assign(Arrays.asList(new TopicPartition("order-topic", 0))); // 指定分区
八、死信队列与消息重试
8.1 RabbitMQ 死信队列
// 声明死信交换机和队列
channel.exchangeDeclare("dlx.exchange", "direct");
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "dlx.routingkey");
// 声明业务队列,绑定死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "dlx.routingkey");
args.put("x-message-ttl", 10000); // 消息10秒后过期
channel.queueDeclare("business.queue", true, false, false, args);
8.2 消息重试机制
public void consumeMessage(String message) {
int maxRetry = 3;
int retryCount = 0;
while (retryCount < maxRetry) {
try {
// 处理消息
processMessage(message);
break; // 处理成功,跳出循环
} catch (Exception e) {
retryCount++;
if (retryCount >= maxRetry) {
// 达到最大重试次数,进入死信队列
sendToDlq(message);
break;
}
// 等待后重试
Thread.sleep(1000 * retryCount);
}
}
}
九、事务消息与最终一致性
9.1 本地消息表方案
@Transactional
public void processOrder(Order order) {
// 1. 创建订单
orderMapper.insert(order);
// 2. 记录消息到本地表
MessageLog messageLog = new MessageLog();
messageLog.setMessageId(UUID.randomUUID().toString());
messageLog.setContent(JSON.toJSONString(order));
messageLog.setStatus(0);
messageLogMapper.insert(messageLog);
// 3. 发送消息到MQ
sendMessage(messageLog.getMessageId(), messageLog.getContent());
}
// 定时任务扫描未确认的消息
@Scheduled(fixedRate = 10000)
public void retrySendMessage() {
List<MessageLog> unconfirmedMessages = messageLogMapper.selectUnconfirmed();
for (MessageLog message : unconfirmedMessages) {
try {
sendMessage(message.getMessageId(), message.getContent());
messageLogMapper.updateStatus(message.getId(), 1);
} catch (Exception e) {
log.error("重发消息失败", e);
}
}
}
9.2 TCC 补偿事务
public void createOrder(Order order) {
try {
// Try 阶段:预留资源
boolean tryResult = orderService.tryCreateOrder(order);
if (!tryResult) {
throw new RuntimeException("资源预留失败");
}
// Confirm 阶段:确认执行
orderService.confirmCreateOrder(order);
} catch (Exception e) {
// Cancel 阶段:回滚操作
orderService.cancelCreateOrder(order);
throw e;
}
}
十、监控与告警
10.1 消息积压监控
@Component
public class MessageBacklogMonitor {
@Autowired
private RabbitAdmin rabbitAdmin;
@Scheduled(fixedRate = 60000)
public void checkBacklog() {
QueueInformation queueInfo = rabbitAdmin.getQueueInfo("business.queue");
if (queueInfo != null && queueInfo.getMessageCount() > 1000) {
// 发送告警
sendAlert("消息积压告警", "队列积压消息数:" + queueInfo.getMessageCount());
}
}
}
10.2 消息处理异常监控
@Aspect
@Component
public class MessageProcessAspect {
@Pointcut("execution(* com.example.service..*.*(..))")
public void servicePointcut() {}
@Around("servicePointcut()")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
long startTime = System.currentTimeMillis();
try {
return joinPoint.proceed();
} catch (Exception e) {
// 记录异常日志
log.error("消息处理异常", e);
// 发送告警
sendAlert("消息处理异常", e.getMessage());
throw e;
} finally {
long costTime = System.currentTimeMillis() - startTime;
if (costTime > 5000) {
// 处理超时告警
sendAlert("消息处理超时", "耗时:" + costTime + "ms");
}
}
}
}
十一、总结
消息的可靠性投递是分布式系统设计的核心问题,需要从生产者、MQ、消费者三个层面综合考虑。通过确认机制、持久化、幂等性、死信队列、事务消息等多种技术手段的组合,可以构建出高可用的消息系统。在实际项目中,需要根据业务场景选择合适的方案,平衡一致性、可用性和性能之间的关系。
若内容若侵犯到您的权益,请发送邮件至:platform_service@jienda.com我们将第一时间处理!
所有资源仅限于参考和学习,版权归JienDa作者所有,更多请访问JienDa首页。





