Java 开发日记:消息的可靠性投递深度解析

一、消息可靠性投递的核心问题

在分布式系统中,消息的可靠性投递是确保系统数据一致性的关键环节。当我们使用消息队列进行异步通信时,可能会遇到以下三种典型问题:

1. 消息丢失:生产者发送消息到MQ,但MQ未成功接收;或MQ接收后未成功投递给消费者;或消费者处理消息后未正确确认。

2. 消息重复:生产者重试机制导致消息重复发送;消费者处理超时,MQ重新投递;网络抖动导致确认消息丢失。

3. 消息乱序:多个生产者并发发送消息;消费者多线程处理导致顺序错乱。

二、消息投递的三种语义

2.1 At Most Once(最多一次)

消息可能丢失,但不会重复投递。适用于对数据一致性要求不高的场景,如日志收集、统计信息等。

2.2 At Least Once(至少一次)

消息不会丢失,但可能重复投递。这是最常见的模式,需要消费者实现幂等性。

2.3 Exactly Once(恰好一次)

消息不丢失、不重复,保证恰好投递一次。这是最理想的模式,但实现成本较高。

三、RabbitMQ 的可靠性投递机制

3.1 生产者确认机制(Confirm)

RabbitMQ提供了两种确认机制:事务机制和Confirm机制。Confirm机制性能更好,推荐使用。

// 开启Confirm模式
channel.confirmSelect();

// 发送消息
channel.basicPublish(exchange, routingKey, props, message.getBytes());

// 同步等待确认
if (channel.waitForConfirms()) {
    System.out.println("消息发送成功");
} else {
    System.out.println("消息发送失败");
}

// 异步确认回调
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) {
        System.out.println("消息投递成功:" + deliveryTag);
    }
    
    @Override
    public void handleNack(long deliveryTag, boolean multiple) {
        System.out.println("消息投递失败:" + deliveryTag);
    }
});

3.2 消息持久化

消息持久化需要同时配置队列持久化和消息持久化:

// 声明持久化队列
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

// 发送持久化消息
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .deliveryMode(2) // 2表示持久化
    .build();
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());

3.3 消费者ACK机制

消费者需要手动确认消息处理完成:

// 关闭自动确认,开启手动确认
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, 
                               AMQP.BasicProperties properties, byte[] body) {
        try {
            // 处理消息
            String message = new String(body, "UTF-8");
            System.out.println("收到消息:" + message);
            
            // 手动确认消息
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (Exception e) {
            // 处理失败,拒绝消息(可配置是否重新入队)
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        }
    }
});

四、RocketMQ 的可靠性投递机制

4.1 同步发送

DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();

Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg);
System.out.println("发送结果:" + sendResult);

producer.shutdown();

4.2 异步发送

producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("发送成功:" + sendResult);
    }
    
    @Override
    public void onException(Throwable e) {
        System.out.println("发送失败:" + e.getMessage());
    }
});

4.3 顺序消息

// 发送顺序消息
MessageQueueSelector selector = new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
};

producer.send(msg, selector, orderId);

// 消费顺序消息
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, 
                                              ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.println("收到顺序消息:" + new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

4.4 事务消息

TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 执行本地事务
            boolean success = doLocalTransaction();
            return success ? LocalTransactionState.COMMIT_MESSAGE : 
                            LocalTransactionState.ROLLBACK_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.UNKNOW;
        }
    }
    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 检查本地事务状态
        return checkTransactionStatus(msg.getTransactionId());
    }
});

TransactionSendResult result = producer.sendMessageInTransaction(msg, null);

五、Kafka 的可靠性投递机制

5.1 生产者配置

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); // 所有副本确认
props.put("retries", 3); // 重试次数
props.put("max.in.flight.requests.per.connection", 1); // 保证顺序
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            exception.printStackTrace();
        } else {
            System.out.println("发送成功:" + metadata.offset());
        }
    }
});

producer.close();

5.2 消费者配置

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 关闭自动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("收到消息:" + record.value());
        }
        // 手动提交偏移量
        consumer.commitSync();
    }
} finally {
    consumer.close();
}

六、幂等性设计

6.1 数据库唯一索引

CREATE TABLE message_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    message_id VARCHAR(64) NOT NULL UNIQUE,
    content TEXT,
    status TINYINT DEFAULT 0,
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

6.2 Redis 幂等性实现

public boolean checkMessageId(String messageId) {
    String key = "message:" + messageId;
    return redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofMinutes(30));
}

public void processMessage(String messageId, String content) {
    if (!checkMessageId(messageId)) {
        throw new RuntimeException("消息重复处理");
    }
    
    try {
        // 处理业务逻辑
        doBusinessLogic(content);
    } catch (Exception e) {
        // 处理失败,删除幂等记录
        redisTemplate.delete("message:" + messageId);
        throw e;
    }
}

6.3 分布式锁方案

public void processWithLock(String messageId, String content) {
    String lockKey = "lock:message:" + messageId;
    boolean locked = false;
    try {
        locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", Duration.ofSeconds(30));
        if (!locked) {
            throw new RuntimeException("消息正在处理中");
        }
        
        // 检查是否已处理
        if (isMessageProcessed(messageId)) {
            return;
        }
        
        // 处理业务
        doBusinessLogic(content);
        
        // 标记已处理
        markMessageProcessed(messageId);
    } finally {
        if (locked) {
            redisTemplate.delete(lockKey);
        }
    }
}

七、消息顺序性保证

7.1 RabbitMQ 顺序消息

// 生产者:发送顺序消息
for (int i = 0; i < 10; i++) {
    String message = "顺序消息-" + i;
    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .messageId(UUID.randomUUID().toString())
        .build();
    channel.basicPublish("", QUEUE_NAME, props, message.getBytes());
}

// 消费者:单线程消费
channel.basicQos(1); // 每次只处理一条消息
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, 
                               AMQP.BasicProperties properties, byte[] body) {
        try {
            String message = new String(body, "UTF-8");
            System.out.println("收到消息:" + message);
            Thread.sleep(1000); // 模拟处理耗时
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        }
    }
});

7.2 基于分区的顺序消息

// 生产者:按业务ID路由到同一分区
public void sendOrderMessage(String orderId, String message) {
    int partition = Math.abs(orderId.hashCode()) % partitionCount;
    ProducerRecord<String, String> record = new ProducerRecord<>(
        "order-topic", partition, orderId, message
    );
    producer.send(record);
}

// 消费者:单线程消费分区
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.assign(Arrays.asList(new TopicPartition("order-topic", 0))); // 指定分区

八、死信队列与消息重试

8.1 RabbitMQ 死信队列

// 声明死信交换机和队列
channel.exchangeDeclare("dlx.exchange", "direct");
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "dlx.routingkey");

// 声明业务队列,绑定死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "dlx.routingkey");
args.put("x-message-ttl", 10000); // 消息10秒后过期
channel.queueDeclare("business.queue", true, false, false, args);

8.2 消息重试机制

public void consumeMessage(String message) {
    int maxRetry = 3;
    int retryCount = 0;
    
    while (retryCount < maxRetry) {
        try {
            // 处理消息
            processMessage(message);
            break; // 处理成功,跳出循环
        } catch (Exception e) {
            retryCount++;
            if (retryCount >= maxRetry) {
                // 达到最大重试次数,进入死信队列
                sendToDlq(message);
                break;
            }
            // 等待后重试
            Thread.sleep(1000 * retryCount);
        }
    }
}

九、事务消息与最终一致性

9.1 本地消息表方案

@Transactional
public void processOrder(Order order) {
    // 1. 创建订单
    orderMapper.insert(order);
    
    // 2. 记录消息到本地表
    MessageLog messageLog = new MessageLog();
    messageLog.setMessageId(UUID.randomUUID().toString());
    messageLog.setContent(JSON.toJSONString(order));
    messageLog.setStatus(0);
    messageLogMapper.insert(messageLog);
    
    // 3. 发送消息到MQ
    sendMessage(messageLog.getMessageId(), messageLog.getContent());
}

// 定时任务扫描未确认的消息
@Scheduled(fixedRate = 10000)
public void retrySendMessage() {
    List<MessageLog> unconfirmedMessages = messageLogMapper.selectUnconfirmed();
    for (MessageLog message : unconfirmedMessages) {
        try {
            sendMessage(message.getMessageId(), message.getContent());
            messageLogMapper.updateStatus(message.getId(), 1);
        } catch (Exception e) {
            log.error("重发消息失败", e);
        }
    }
}

9.2 TCC 补偿事务

public void createOrder(Order order) {
    try {
        // Try 阶段:预留资源
        boolean tryResult = orderService.tryCreateOrder(order);
        if (!tryResult) {
            throw new RuntimeException("资源预留失败");
        }
        
        // Confirm 阶段:确认执行
        orderService.confirmCreateOrder(order);
    } catch (Exception e) {
        // Cancel 阶段:回滚操作
        orderService.cancelCreateOrder(order);
        throw e;
    }
}

十、监控与告警

10.1 消息积压监控

@Component
public class MessageBacklogMonitor {
    
    @Autowired
    private RabbitAdmin rabbitAdmin;
    
    @Scheduled(fixedRate = 60000)
    public void checkBacklog() {
        QueueInformation queueInfo = rabbitAdmin.getQueueInfo("business.queue");
        if (queueInfo != null && queueInfo.getMessageCount() > 1000) {
            // 发送告警
            sendAlert("消息积压告警", "队列积压消息数:" + queueInfo.getMessageCount());
        }
    }
}

10.2 消息处理异常监控

@Aspect
@Component
public class MessageProcessAspect {
    
    @Pointcut("execution(* com.example.service..*.*(..))")
    public void servicePointcut() {}
    
    @Around("servicePointcut()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        long startTime = System.currentTimeMillis();
        try {
            return joinPoint.proceed();
        } catch (Exception e) {
            // 记录异常日志
            log.error("消息处理异常", e);
            // 发送告警
            sendAlert("消息处理异常", e.getMessage());
            throw e;
        } finally {
            long costTime = System.currentTimeMillis() - startTime;
            if (costTime > 5000) {
                // 处理超时告警
                sendAlert("消息处理超时", "耗时:" + costTime + "ms");
            }
        }
    }
}

十一、总结

消息的可靠性投递是分布式系统设计的核心问题,需要从生产者、MQ、消费者三个层面综合考虑。通过确认机制、持久化、幂等性、死信队列、事务消息等多种技术手段的组合,可以构建出高可用的消息系统。在实际项目中,需要根据业务场景选择合适的方案,平衡一致性、可用性和性能之间的关系。

版权声明:本文为JienDa博主的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
若内容若侵犯到您的权益,请发送邮件至:platform_service@jienda.com我们将第一时间处理!
所有资源仅限于参考和学习,版权归JienDa作者所有,更多请访问JienDa首页。

给TA赞助
共{{data.count}}人
人已赞助
后端

Redis集群模式工作原理深度解析

2025-12-22 9:13:15

后端

SpringBoot集成Sa-Token权限校验框架深度解析

2025-12-22 9:27:34

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索