Kafka消息积压了,同事跑路了:从零开始的救火指南

一、事故现场:当Kafka积压遇上同事跑路

1.1 那个不寻常的早晨

周一早上9点,我像往常一样走进办公室,准备开始新一周的工作。然而,迎接我的不是同事们的问候,而是监控大屏上刺眼的红色告警:

[CRITICAL] Kafka消费者组order-consumer积压消息数:1,234,567
[CRITICAL] Kafka消费者组payment-consumer积压消息数:987,654
[WARNING] Kafka集群磁盘使用率:85%

我下意识地看向旁边工位——空的。昨天还在和我讨论如何优化Kafka消费性能的小王,今天没来。我拿起手机,准备给他发消息,却发现他已经退出了所有的工作群聊。

“小王跑路了。”我意识到问题的严重性。

1.2 为什么偏偏是Kafka?

Kafka作为我们系统的核心消息中间件,承载着订单创建、支付回调、库存扣减等所有核心业务的消息流转。一旦Kafka出现问题,整个系统都会瘫痪。

更糟糕的是,小王是团队里唯一对Kafka有深入研究的同事。他负责Kafka集群的运维、监控和性能优化,而我虽然也了解一些Kafka的基础知识,但从未真正处理过生产环境的紧急问题。

二、Kafka消息积压:认识你的敌人

2.1 什么是消息积压?

消息积压(Message Backlog)是指Kafka中未被消费者及时消费的消息数量。当生产者生产消息的速度超过消费者消费消息的速度时,就会产生积压。

积压的计算公式

积压消息数 = 分区最新偏移量(Log End Offset) - 消费者当前偏移量(Current Offset)

2.2 积压的危害

业务影响

  • 订单创建延迟,用户无法及时收到订单确认
  • 支付回调延迟,导致订单状态不一致
  • 库存扣减延迟,可能造成超卖
  • 用户无法及时收到通知消息

系统影响

  • Kafka磁盘空间告急,可能触发数据删除策略
  • 消费者重启后需要处理大量历史消息,恢复时间变长
  • 监控告警持续,影响团队工作效率

2.3 积压的常见原因

生产者问题

  • 生产者发送消息速度过快
  • 生产者批量发送配置不合理
  • 网络问题导致消息重试

消费者问题

  • 消费者处理逻辑复杂,消费速度慢
  • 消费者单线程消费,无法充分利用资源
  • 消费者频繁重启或宕机
  • 消费者代码存在bug,导致消息处理失败

Kafka集群问题

  • Broker节点负载过高
  • 磁盘IO瓶颈
  • 网络带宽不足
  • 分区数量不足,无法水平扩展

三、紧急响应:第一步做什么?

3.1 保持冷静,不要慌张

面对紧急情况,最重要的是保持冷静。慌乱只会让事情变得更糟。我深吸一口气,开始制定应急计划。

3.2 第一步:评估影响范围

我立即登录监控系统,查看各个业务系统的状态:

# 查看Kafka集群状态
kafka-topics.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --describe

# 查看消费者组状态
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group order-consumer
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group payment-consumer

# 查看积压消息数
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --topic order-topic --time -1
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --topic order-topic --time -2

通过以上命令,我快速了解了当前的情况:

  • order-consumer组积压约120万条消息
  • payment-consumer组积压约100万条消息
  • 积压主要集中在order-topic和payment-topic两个主题

3.3 第二步:通知相关人员

我立即在团队群里发送紧急通知:

【紧急通知】Kafka消息积压告警
影响范围:订单、支付、库存等核心业务
当前状态:积压约220万条消息,业务处理延迟
处理人:我
预计恢复时间:待评估

同时,我联系了运维团队,请求协助监控Kafka集群状态和磁盘使用情况。

3.4 第三步:临时解决方案

为了缓解积压压力,我采取了以下临时措施:

增加消费者实例

# 临时启动额外的消费者实例
nohup java -jar order-consumer.jar --spring.profiles.active=prod &
nohup java -jar payment-consumer.jar --spring.profiles.active=prod &

调整消费者配置

# 增加消费者并发数
spring:
  kafka:
    consumer:
      concurrency: 5  # 从3增加到5
      max-poll-records: 500  # 增加每次拉取的消息数
      fetch-max-wait: 500ms  # 减少等待时间
      fetch-min-size: 1MB  # 增加最小拉取大小

调整生产者配置

# 临时降低生产者发送速度
spring:
  kafka:
    producer:
      linger-ms: 100  # 增加延迟时间
      batch-size: 16384  # 减少批量大小
      acks: 1  # 降低确认级别

四、深入分析:找到根本原因

4.1 查看消费者日志

我登录到消费者服务器,查看最近的日志:

# 查看消费者日志
tail -f /var/log/order-consumer/application.log

在日志中发现了大量错误信息:

2023-12-23 10:05:23.123 ERROR [order-consumer-1] c.e.o.h.OrderHandler : Failed to process message: order_1234567890
org.springframework.dao.DataIntegrityViolationException: Could not execute statement; SQL [n/a]; constraint [uk_order_no]; nested exception is org.hibernate.exception.ConstraintViolationException: Could not execute statement

4.2 分析错误原因

从日志可以看出,消费者在处理消息时遇到了数据库唯一约束冲突。具体来说,是订单号(order_no)重复导致的插入失败。

我立即检查了数据库中的订单表:

SELECT COUNT(*) FROM `order` WHERE order_no = 'order_1234567890';
-- 返回结果:1

SELECT * FROM `order` WHERE order_no = 'order_1234567890';
-- 发现该订单已经存在,状态为"待支付"

4.3 问题定位

经过分析,我发现了问题的根本原因:

消息重复消费:由于Kafka的at-least-once语义,在消费者处理消息后,如果提交偏移量失败,消息会被重新消费。而我们的订单创建逻辑没有做幂等处理,导致重复消息创建了重复订单。

根本原因:小王的代码在处理订单创建时,没有实现幂等性,导致消息重复消费时产生数据库唯一约束冲突,消费者处理失败,消息重新入队,形成死循环。

4.4 验证猜想

为了验证我的猜想,我查看了消费者组的偏移量提交情况:

kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group order-consumer

输出显示,消费者的偏移量几乎没有前进,说明消息一直在被重复消费。

五、解决方案:从临时到根治

5.1 临时修复:跳过问题消息

为了快速恢复业务,我决定先跳过有问题的消息:

@Component
public class OrderHandler {
    
    @KafkaListener(topics = "order-topic", groupId = "order-consumer")
    public void handleOrderMessage(String message) {
        try {
            OrderDTO order = parseOrder(message);
            createOrder(order);
        } catch (DataIntegrityViolationException e) {
            // 如果是唯一约束冲突,说明订单已存在,直接跳过
            log.warn("订单已存在,跳过处理: {}", order.getOrderNo());
        } catch (Exception e) {
            log.error("处理订单消息失败", e);
            throw e; // 其他异常继续抛出,触发重试
        }
    }
}

注意:这只是临时方案,不能作为长期解决方案,因为:

  • 可能丢失其他类型的错误信息
  • 无法处理其他原因导致的重复消息
  • 掩盖了真正的业务问题

5.2 根本解决:实现幂等性

方案一:数据库幂等表

CREATE TABLE `idempotent_record` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `biz_type` varchar(50) NOT NULL COMMENT '业务类型',
  `biz_id` varchar(100) NOT NULL COMMENT '业务唯一标识',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_biz_type_biz_id` (`biz_type`, `biz_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
@Service
public class IdempotentService {
    
    @Autowired
    private IdempotentRecordMapper idempotentRecordMapper;
    
    public boolean checkAndRecord(String bizType, String bizId) {
        try {
            IdempotentRecord record = new IdempotentRecord();
            record.setBizType(bizType);
            record.setBizId(bizId);
            idempotentRecordMapper.insert(record);
            return false; // 插入成功,说明未处理过
        } catch (DuplicateKeyException e) {
            return true; // 插入失败,说明已处理过
        }
    }
}

方案二:Redis幂等控制

@Service
public class RedisIdempotentService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String IDEMPOTENT_PREFIX = "idempotent:";
    private static final long EXPIRE_TIME = 24 * 60 * 60; // 24小时
    
    public boolean checkAndSet(String bizType, String bizId) {
        String key = IDEMPOTENT_PREFIX + bizType + ":" + bizId;
        return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "1", EXPIRE_TIME, TimeUnit.SECONDS));
    }
}

方案三:业务逻辑幂等

@Service
public class OrderService {
    
    @Transactional
    public void createOrder(OrderDTO order) {
        // 先查询订单是否已存在
        Order existingOrder = orderMapper.selectByOrderNo(order.getOrderNo());
        if (existingOrder != null) {
            log.info("订单已存在,跳过创建: {}", order.getOrderNo());
            return;
        }
        
        // 创建订单
        orderMapper.insert(order);
    }
}

5.3 最终选择

考虑到性能和实现复杂度,我选择了方案三:业务逻辑幂等,因为:

  • 不需要引入额外的表或缓存
  • 业务逻辑清晰,易于维护
  • 性能影响最小

修改后的订单创建逻辑:

@Service
public class OrderService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Transactional
    public void createOrder(OrderDTO order) {
        // 检查订单是否已存在
        Order existingOrder = orderMapper.selectByOrderNo(order.getOrderNo());
        if (existingOrder != null) {
            log.info("订单已存在,跳过创建: {}", order.getOrderNo());
            return;
        }
        
        // 创建订单
        Order newOrder = convertToEntity(order);
        orderMapper.insert(newOrder);
        
        log.info("订单创建成功: {}", order.getOrderNo());
    }
}

六、恢复验证:确保问题解决

6.1 验证消费者恢复

修改代码并重新部署后,我监控消费者的处理情况:

# 查看消费者组状态
while true; do
    kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group order-consumer | grep -E "TOPIC|LAG"
    sleep 10
done

输出显示,积压消息数正在逐渐减少:

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
order-topic     0          1234567         2345678         1111111
order-topic     1          987654          1987654         1000000

# 10秒后
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
order-topic     0          1234689         2345678         1110989
order-topic     1          987789          1987654         999865

6.2 验证业务正确性

我检查了数据库中的订单数据,确认没有重复订单:

-- 检查是否有重复订单
SELECT order_no, COUNT(*) as count 
FROM `order` 
GROUP BY order_no 
HAVING count > 1;
-- 返回结果:空

6.3 验证告警恢复

我查看了监控系统,确认Kafka积压告警已经消失:

[INFO] Kafka消费者组order-consumer积压消息数:123
[INFO] Kafka消费者组payment-consumer积压消息数:89
[INFO] Kafka集群磁盘使用率:45%

七、复盘总结:从事故中学习

7.1 事故原因总结

直接原因

  • 消费者代码没有实现幂等性,导致重复消息处理失败
  • 消息重复消费形成死循环,导致积压

根本原因

  • 代码审查不严格,幂等性问题被忽略
  • 缺乏完善的监控和告警机制
  • 团队成员技能单一,Kafka知识储备不足
  • 缺乏应急预案和文档

7.2 改进措施

技术层面

  1. 完善幂等性处理:在所有消息处理逻辑中增加幂等性检查
  2. 优化消费者配置:调整max.poll.records、fetch.min.bytes等参数
  3. 增加重试机制:实现带退避策略的重试机制,避免无限重试
  4. 引入死信队列:将处理失败的消息转移到死信队列,避免阻塞正常消费

监控层面

  1. 完善监控告警:增加Kafka积压、消费者延迟、错误率等监控指标
  2. 设置多级告警:根据积压程度设置不同级别的告警
  3. 增加业务监控:监控订单创建成功率、支付成功率等业务指标

流程层面

  1. 加强代码审查:建立严格的代码审查流程,重点关注幂等性、异常处理等
  2. 完善文档:编写Kafka使用规范、常见问题处理文档
  3. 定期演练:定期进行故障演练,提高应急响应能力
  4. 技能培训:组织Kafka技术培训,提升团队整体能力

7.3 个人成长

这次事故让我深刻认识到:

  • 技术深度的重要性:不能只停留在表面,要深入理解每个组件的原理
  • 应急响应能力:面对紧急情况要保持冷静,按照预案有序处理
  • 团队协作:技术问题往往不是一个人的事,需要团队共同努力
  • 持续学习:技术更新换代很快,要保持学习的态度

八、写给跑路的同事

虽然你已经离开了,但我想对你说:

感谢你:感谢你在团队期间的付出,虽然最后留下了这个烂摊子,但你也为团队做了很多贡献。

希望你:在未来的工作中,能够更加负责任,遇到问题不要逃避,勇敢面对和解决。

提醒你:技术能力固然重要,但责任心和团队协作同样重要。希望你能在新的环境中成长,成为更好的自己。

九、结语

Kafka消息积压是一个常见但严重的问题,处理起来需要冷静、专业和系统化的方法。通过这次事故,我不仅解决了技术问题,更重要的是学会了如何在压力下保持冷静,如何系统化地分析和解决问题。

记住:技术问题不可怕,可怕的是没有解决问题的能力和勇气。每一次事故都是一次成长的机会,关键是要从中学习,不断进步。

最后,给所有正在处理Kafka积压问题的你

  • 保持冷静,不要慌张
  • 先评估影响,再制定方案
  • 优先恢复业务,再解决根本问题
  • 做好复盘,避免类似问题再次发生

希望这篇文章能帮助你在遇到类似问题时,能够从容应对,快速解决!

版权声明:本文为JienDa博主的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
若内容若侵犯到您的权益,请发送邮件至:platform_service@jienda.com我们将第一时间处理!
所有资源仅限于参考和学习,版权归JienDa作者所有,更多请访问JienDa首页。

给TA赞助
共{{data.count}}人
人已赞助
后端

从零到一:外卖系统架构设计与实战

2025-12-23 10:05:12

后端

阿里Java开发岗超级详细八股文

2025-12-23 10:19:48

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索