• 首页
  • 博客
  • 问答
  • 快讯
  • 社区
  • 商城
  • 客户端
文章
文章用户商铺专辑快讯圈子网址导航问答程序定制

{{userData.name}}已认证

文章

评论

关注

粉丝

¥{{role.user_data.money}}
{{role.user_data.credit}}
您已完成今天任务的
  • 私信列表所有往来私信

  • 财富管理余额、积分管理

  • 推广中心推广有奖励

    NEW
  • 任务中心每日任务

    NEW
  • 成为会员购买付费会员

  • 认证服务申请认证

    NEW
  • 小黑屋关进小黑屋的人

    NEW
  • 我的订单查看我的订单

  • 我的设置编辑个人资料

  • 进入后台管理

  • 首页
  • 后端
  • 前端
  • 人工智能
  • Android
  • 鸿蒙
  • 代码人生
  • 阅读
  • 科技
  • 学到
  • 青训营
  • 读书会
  • 福利
  • 公益
  • 新鲜事
  • 树洞
投稿

Kafka消息积压了,同事跑路了:从零开始的救火指南

  • 后端
  • 25年12月23日
  • 编辑
JienDaPHP程序员

一、事故现场:当Kafka积压遇上同事跑路

1.1 那个不寻常的早晨

周一早上9点,我像往常一样走进办公室,准备开始新一周的工作。然而,迎接我的不是同事们的问候,而是监控大屏上刺眼的红色告警:

[CRITICAL] Kafka消费者组order-consumer积压消息数:1,234,567
[CRITICAL] Kafka消费者组payment-consumer积压消息数:987,654
[WARNING] Kafka集群磁盘使用率:85%

我下意识地看向旁边工位——空的。昨天还在和我讨论如何优化Kafka消费性能的小王,今天没来。我拿起手机,准备给他发消息,却发现他已经退出了所有的工作群聊。

“小王跑路了。”我意识到问题的严重性。

1.2 为什么偏偏是Kafka?

Kafka作为我们系统的核心消息中间件,承载着订单创建、支付回调、库存扣减等所有核心业务的消息流转。一旦Kafka出现问题,整个系统都会瘫痪。

更糟糕的是,小王是团队里唯一对Kafka有深入研究的同事。他负责Kafka集群的运维、监控和性能优化,而我虽然也了解一些Kafka的基础知识,但从未真正处理过生产环境的紧急问题。

二、Kafka消息积压:认识你的敌人

2.1 什么是消息积压?

消息积压(Message Backlog)是指Kafka中未被消费者及时消费的消息数量。当生产者生产消息的速度超过消费者消费消息的速度时,就会产生积压。

积压的计算公式:

积压消息数 = 分区最新偏移量(Log End Offset) - 消费者当前偏移量(Current Offset)

2.2 积压的危害

业务影响:

  • 订单创建延迟,用户无法及时收到订单确认
  • 支付回调延迟,导致订单状态不一致
  • 库存扣减延迟,可能造成超卖
  • 用户无法及时收到通知消息

系统影响:

  • Kafka磁盘空间告急,可能触发数据删除策略
  • 消费者重启后需要处理大量历史消息,恢复时间变长
  • 监控告警持续,影响团队工作效率

2.3 积压的常见原因

生产者问题:

  • 生产者发送消息速度过快
  • 生产者批量发送配置不合理
  • 网络问题导致消息重试

消费者问题:

  • 消费者处理逻辑复杂,消费速度慢
  • 消费者单线程消费,无法充分利用资源
  • 消费者频繁重启或宕机
  • 消费者代码存在bug,导致消息处理失败

Kafka集群问题:

  • Broker节点负载过高
  • 磁盘IO瓶颈
  • 网络带宽不足
  • 分区数量不足,无法水平扩展

三、紧急响应:第一步做什么?

3.1 保持冷静,不要慌张

面对紧急情况,最重要的是保持冷静。慌乱只会让事情变得更糟。我深吸一口气,开始制定应急计划。

3.2 第一步:评估影响范围

我立即登录监控系统,查看各个业务系统的状态:

# 查看Kafka集群状态
kafka-topics.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --describe

# 查看消费者组状态
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group order-consumer
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group payment-consumer

# 查看积压消息数
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --topic order-topic --time -1
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --topic order-topic --time -2

通过以上命令,我快速了解了当前的情况:

  • order-consumer组积压约120万条消息
  • payment-consumer组积压约100万条消息
  • 积压主要集中在order-topic和payment-topic两个主题

3.3 第二步:通知相关人员

我立即在团队群里发送紧急通知:

【紧急通知】Kafka消息积压告警
影响范围:订单、支付、库存等核心业务
当前状态:积压约220万条消息,业务处理延迟
处理人:我
预计恢复时间:待评估

同时,我联系了运维团队,请求协助监控Kafka集群状态和磁盘使用情况。

3.4 第三步:临时解决方案

为了缓解积压压力,我采取了以下临时措施:

增加消费者实例:

# 临时启动额外的消费者实例
nohup java -jar order-consumer.jar --spring.profiles.active=prod &
nohup java -jar payment-consumer.jar --spring.profiles.active=prod &

调整消费者配置:

# 增加消费者并发数
spring:
  kafka:
    consumer:
      concurrency: 5  # 从3增加到5
      max-poll-records: 500  # 增加每次拉取的消息数
      fetch-max-wait: 500ms  # 减少等待时间
      fetch-min-size: 1MB  # 增加最小拉取大小

调整生产者配置:

# 临时降低生产者发送速度
spring:
  kafka:
    producer:
      linger-ms: 100  # 增加延迟时间
      batch-size: 16384  # 减少批量大小
      acks: 1  # 降低确认级别

四、深入分析:找到根本原因

4.1 查看消费者日志

我登录到消费者服务器,查看最近的日志:

# 查看消费者日志
tail -f /var/log/order-consumer/application.log

在日志中发现了大量错误信息:

2023-12-23 10:05:23.123 ERROR [order-consumer-1] c.e.o.h.OrderHandler : Failed to process message: order_1234567890
org.springframework.dao.DataIntegrityViolationException: Could not execute statement; SQL [n/a]; constraint [uk_order_no]; nested exception is org.hibernate.exception.ConstraintViolationException: Could not execute statement

4.2 分析错误原因

从日志可以看出,消费者在处理消息时遇到了数据库唯一约束冲突。具体来说,是订单号(order_no)重复导致的插入失败。

我立即检查了数据库中的订单表:

SELECT COUNT(*) FROM `order` WHERE order_no = 'order_1234567890';
-- 返回结果:1

SELECT * FROM `order` WHERE order_no = 'order_1234567890';
-- 发现该订单已经存在,状态为"待支付"

4.3 问题定位

经过分析,我发现了问题的根本原因:

消息重复消费:由于Kafka的at-least-once语义,在消费者处理消息后,如果提交偏移量失败,消息会被重新消费。而我们的订单创建逻辑没有做幂等处理,导致重复消息创建了重复订单。

根本原因:小王的代码在处理订单创建时,没有实现幂等性,导致消息重复消费时产生数据库唯一约束冲突,消费者处理失败,消息重新入队,形成死循环。

4.4 验证猜想

为了验证我的猜想,我查看了消费者组的偏移量提交情况:

kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group order-consumer

输出显示,消费者的偏移量几乎没有前进,说明消息一直在被重复消费。

五、解决方案:从临时到根治

5.1 临时修复:跳过问题消息

为了快速恢复业务,我决定先跳过有问题的消息:

@Component
public class OrderHandler {
    
    @KafkaListener(topics = "order-topic", groupId = "order-consumer")
    public void handleOrderMessage(String message) {
        try {
            OrderDTO order = parseOrder(message);
            createOrder(order);
        } catch (DataIntegrityViolationException e) {
            // 如果是唯一约束冲突,说明订单已存在,直接跳过
            log.warn("订单已存在,跳过处理: {}", order.getOrderNo());
        } catch (Exception e) {
            log.error("处理订单消息失败", e);
            throw e; // 其他异常继续抛出,触发重试
        }
    }
}

注意:这只是临时方案,不能作为长期解决方案,因为:

  • 可能丢失其他类型的错误信息
  • 无法处理其他原因导致的重复消息
  • 掩盖了真正的业务问题

5.2 根本解决:实现幂等性

方案一:数据库幂等表

CREATE TABLE `idempotent_record` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `biz_type` varchar(50) NOT NULL COMMENT '业务类型',
  `biz_id` varchar(100) NOT NULL COMMENT '业务唯一标识',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_biz_type_biz_id` (`biz_type`, `biz_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
@Service
public class IdempotentService {
    
    @Autowired
    private IdempotentRecordMapper idempotentRecordMapper;
    
    public boolean checkAndRecord(String bizType, String bizId) {
        try {
            IdempotentRecord record = new IdempotentRecord();
            record.setBizType(bizType);
            record.setBizId(bizId);
            idempotentRecordMapper.insert(record);
            return false; // 插入成功,说明未处理过
        } catch (DuplicateKeyException e) {
            return true; // 插入失败,说明已处理过
        }
    }
}

方案二:Redis幂等控制

@Service
public class RedisIdempotentService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String IDEMPOTENT_PREFIX = "idempotent:";
    private static final long EXPIRE_TIME = 24 * 60 * 60; // 24小时
    
    public boolean checkAndSet(String bizType, String bizId) {
        String key = IDEMPOTENT_PREFIX + bizType + ":" + bizId;
        return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "1", EXPIRE_TIME, TimeUnit.SECONDS));
    }
}

方案三:业务逻辑幂等

@Service
public class OrderService {
    
    @Transactional
    public void createOrder(OrderDTO order) {
        // 先查询订单是否已存在
        Order existingOrder = orderMapper.selectByOrderNo(order.getOrderNo());
        if (existingOrder != null) {
            log.info("订单已存在,跳过创建: {}", order.getOrderNo());
            return;
        }
        
        // 创建订单
        orderMapper.insert(order);
    }
}

5.3 最终选择

考虑到性能和实现复杂度,我选择了方案三:业务逻辑幂等,因为:

  • 不需要引入额外的表或缓存
  • 业务逻辑清晰,易于维护
  • 性能影响最小

修改后的订单创建逻辑:

@Service
public class OrderService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Transactional
    public void createOrder(OrderDTO order) {
        // 检查订单是否已存在
        Order existingOrder = orderMapper.selectByOrderNo(order.getOrderNo());
        if (existingOrder != null) {
            log.info("订单已存在,跳过创建: {}", order.getOrderNo());
            return;
        }
        
        // 创建订单
        Order newOrder = convertToEntity(order);
        orderMapper.insert(newOrder);
        
        log.info("订单创建成功: {}", order.getOrderNo());
    }
}

六、恢复验证:确保问题解决

6.1 验证消费者恢复

修改代码并重新部署后,我监控消费者的处理情况:

# 查看消费者组状态
while true; do
    kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group order-consumer | grep -E "TOPIC|LAG"
    sleep 10
done

输出显示,积压消息数正在逐渐减少:

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
order-topic     0          1234567         2345678         1111111
order-topic     1          987654          1987654         1000000

# 10秒后
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
order-topic     0          1234689         2345678         1110989
order-topic     1          987789          1987654         999865

6.2 验证业务正确性

我检查了数据库中的订单数据,确认没有重复订单:

-- 检查是否有重复订单
SELECT order_no, COUNT(*) as count 
FROM `order` 
GROUP BY order_no 
HAVING count > 1;
-- 返回结果:空

6.3 验证告警恢复

我查看了监控系统,确认Kafka积压告警已经消失:

[INFO] Kafka消费者组order-consumer积压消息数:123
[INFO] Kafka消费者组payment-consumer积压消息数:89
[INFO] Kafka集群磁盘使用率:45%

七、复盘总结:从事故中学习

7.1 事故原因总结

直接原因:

  • 消费者代码没有实现幂等性,导致重复消息处理失败
  • 消息重复消费形成死循环,导致积压

根本原因:

  • 代码审查不严格,幂等性问题被忽略
  • 缺乏完善的监控和告警机制
  • 团队成员技能单一,Kafka知识储备不足
  • 缺乏应急预案和文档

7.2 改进措施

技术层面:

  1. 完善幂等性处理:在所有消息处理逻辑中增加幂等性检查
  2. 优化消费者配置:调整max.poll.records、fetch.min.bytes等参数
  3. 增加重试机制:实现带退避策略的重试机制,避免无限重试
  4. 引入死信队列:将处理失败的消息转移到死信队列,避免阻塞正常消费

监控层面:

  1. 完善监控告警:增加Kafka积压、消费者延迟、错误率等监控指标
  2. 设置多级告警:根据积压程度设置不同级别的告警
  3. 增加业务监控:监控订单创建成功率、支付成功率等业务指标

流程层面:

  1. 加强代码审查:建立严格的代码审查流程,重点关注幂等性、异常处理等
  2. 完善文档:编写Kafka使用规范、常见问题处理文档
  3. 定期演练:定期进行故障演练,提高应急响应能力
  4. 技能培训:组织Kafka技术培训,提升团队整体能力

7.3 个人成长

这次事故让我深刻认识到:

  • 技术深度的重要性:不能只停留在表面,要深入理解每个组件的原理
  • 应急响应能力:面对紧急情况要保持冷静,按照预案有序处理
  • 团队协作:技术问题往往不是一个人的事,需要团队共同努力
  • 持续学习:技术更新换代很快,要保持学习的态度

八、写给跑路的同事

虽然你已经离开了,但我想对你说:

感谢你:感谢你在团队期间的付出,虽然最后留下了这个烂摊子,但你也为团队做了很多贡献。

希望你:在未来的工作中,能够更加负责任,遇到问题不要逃避,勇敢面对和解决。

提醒你:技术能力固然重要,但责任心和团队协作同样重要。希望你能在新的环境中成长,成为更好的自己。

九、结语

Kafka消息积压是一个常见但严重的问题,处理起来需要冷静、专业和系统化的方法。通过这次事故,我不仅解决了技术问题,更重要的是学会了如何在压力下保持冷静,如何系统化地分析和解决问题。

记住:技术问题不可怕,可怕的是没有解决问题的能力和勇气。每一次事故都是一次成长的机会,关键是要从中学习,不断进步。

最后,给所有正在处理Kafka积压问题的你:

  • 保持冷静,不要慌张
  • 先评估影响,再制定方案
  • 优先恢复业务,再解决根本问题
  • 做好复盘,避免类似问题再次发生

希望这篇文章能帮助你在遇到类似问题时,能够从容应对,快速解决!

版权声明:本文为JienDa博主的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
若内容若侵犯到您的权益,请发送邮件至:platform_service@jienda.com我们将第一时间处理!
所有资源仅限于参考和学习,版权归JienDa作者所有,更多请访问JienDa首页。

给TA赞助
共{{data.count}}人
人已赞助
后端

从零到一:外卖系统架构设计与实战

2025-12-23 10:05:12

后端

阿里Java开发岗超级详细八股文

2025-12-23 10:19:48

1 条回复 A文章作者 M管理员
  1. JienDa
    JienDaAMPHP程序员 初中lv2
    1月1日

❯

解锁会员权限

开通会员

解锁海量优质VIP资源

立刻开通

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索
客服
  • 扫码打开当前页

  • 微信小程序

  • App下载

  • 百度小程序

  • 抖音小程序

  • 微信客服

返回顶部
幸运之星正在降临...
点击领取今天的签到奖励!
恭喜!您今天获得了{{mission.data.mission.credit}}积分

今日签到

连续签到

  • {{item.credit}}
  • 连续{{item.count}}天
查看所有
我的优惠劵
  • ¥优惠劵
    使用时效:无法使用
    使用时效:

    之前

    使用时效:永久有效
    优惠劵ID:
    ×
    限制以下商品使用: 限制以下商品分类使用: 不限制使用:
    [{{ct.name}}]
    所有商品和商品类型均可使用
没有优惠劵可用!

购物车
  • ×
    删除
购物车空空如也!

清空购物车 前往结算
您有新的私信
没有新私信
写新私信 查看全部

关于我们

JienDa是一个专注于PHP技术生态的开发者社区平台,通过技术分享、实战案例和行业洞察,为PHP开发者提供全方位的成长支持。

分类

JienDa团队

  • JienDa
  • 海之云
  • 阿伟
  • 简答

最近

  • ThinkAdmin v6 后台操作规范使用流程(完整版)
  • ThinkAdmin v6 + 前端评论系统 完整使用文档

热门推荐

相似站点

  • HaiOOS

    HaiOOS专注于虚拟商品交易系统开发,正版提供商城系统PHP源码。 搭建HaiOOS虚拟物品交易平台,帮助企业自建私有化部署源代码,HaiOOS为平台实现精准功能定制,各种运营插件可支持虚拟物品行业全场景的业务线上化。

  • 海之云

    海之云网络诞生于2018年,是一个专注于高端品牌网站建设的工作室,2018年正式注册“莆田市海之云网络技术有限公司”公司化运营(简称海之云网络),是国内专业的网站设计及网站建设服务商,我们是一个高品质的技术团队,凝聚了一批年轻且有活力的80、90后成员。我们拥有丰富的网站开发经验,设计上不仅追求美观更讲究用户的实用性,经过多年的实战经验,我公司与百度、360、阿里云、西数、安全联盟等多家企业深度合…

Copyright © 2026 JienDa
・闽ICP备2023000041号-21 ・闽公网安备35030202001165号
查询 108 次,耗时 1.3394 秒
首页专题认证
搜索菜单我的