一、事故现场:当Kafka积压遇上同事跑路
1.1 那个不寻常的早晨
周一早上9点,我像往常一样走进办公室,准备开始新一周的工作。然而,迎接我的不是同事们的问候,而是监控大屏上刺眼的红色告警:
[CRITICAL] Kafka消费者组order-consumer积压消息数:1,234,567
[CRITICAL] Kafka消费者组payment-consumer积压消息数:987,654
[WARNING] Kafka集群磁盘使用率:85%
我下意识地看向旁边工位——空的。昨天还在和我讨论如何优化Kafka消费性能的小王,今天没来。我拿起手机,准备给他发消息,却发现他已经退出了所有的工作群聊。
“小王跑路了。”我意识到问题的严重性。
1.2 为什么偏偏是Kafka?
Kafka作为我们系统的核心消息中间件,承载着订单创建、支付回调、库存扣减等所有核心业务的消息流转。一旦Kafka出现问题,整个系统都会瘫痪。
更糟糕的是,小王是团队里唯一对Kafka有深入研究的同事。他负责Kafka集群的运维、监控和性能优化,而我虽然也了解一些Kafka的基础知识,但从未真正处理过生产环境的紧急问题。
二、Kafka消息积压:认识你的敌人
2.1 什么是消息积压?
消息积压(Message Backlog)是指Kafka中未被消费者及时消费的消息数量。当生产者生产消息的速度超过消费者消费消息的速度时,就会产生积压。
积压的计算公式:
积压消息数 = 分区最新偏移量(Log End Offset) - 消费者当前偏移量(Current Offset)
2.2 积压的危害
业务影响:
- 订单创建延迟,用户无法及时收到订单确认
- 支付回调延迟,导致订单状态不一致
- 库存扣减延迟,可能造成超卖
- 用户无法及时收到通知消息
系统影响:
- Kafka磁盘空间告急,可能触发数据删除策略
- 消费者重启后需要处理大量历史消息,恢复时间变长
- 监控告警持续,影响团队工作效率
2.3 积压的常见原因
生产者问题:
- 生产者发送消息速度过快
- 生产者批量发送配置不合理
- 网络问题导致消息重试
消费者问题:
- 消费者处理逻辑复杂,消费速度慢
- 消费者单线程消费,无法充分利用资源
- 消费者频繁重启或宕机
- 消费者代码存在bug,导致消息处理失败
Kafka集群问题:
- Broker节点负载过高
- 磁盘IO瓶颈
- 网络带宽不足
- 分区数量不足,无法水平扩展
三、紧急响应:第一步做什么?
3.1 保持冷静,不要慌张
面对紧急情况,最重要的是保持冷静。慌乱只会让事情变得更糟。我深吸一口气,开始制定应急计划。
3.2 第一步:评估影响范围
我立即登录监控系统,查看各个业务系统的状态:
# 查看Kafka集群状态
kafka-topics.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --describe
# 查看消费者组状态
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group order-consumer
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group payment-consumer
# 查看积压消息数
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --topic order-topic --time -1
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --topic order-topic --time -2
通过以上命令,我快速了解了当前的情况:
- order-consumer组积压约120万条消息
- payment-consumer组积压约100万条消息
- 积压主要集中在order-topic和payment-topic两个主题
3.3 第二步:通知相关人员
我立即在团队群里发送紧急通知:
【紧急通知】Kafka消息积压告警
影响范围:订单、支付、库存等核心业务
当前状态:积压约220万条消息,业务处理延迟
处理人:我
预计恢复时间:待评估
同时,我联系了运维团队,请求协助监控Kafka集群状态和磁盘使用情况。
3.4 第三步:临时解决方案
为了缓解积压压力,我采取了以下临时措施:
增加消费者实例:
# 临时启动额外的消费者实例
nohup java -jar order-consumer.jar --spring.profiles.active=prod &
nohup java -jar payment-consumer.jar --spring.profiles.active=prod &
调整消费者配置:
# 增加消费者并发数
spring:
kafka:
consumer:
concurrency: 5 # 从3增加到5
max-poll-records: 500 # 增加每次拉取的消息数
fetch-max-wait: 500ms # 减少等待时间
fetch-min-size: 1MB # 增加最小拉取大小
调整生产者配置:
# 临时降低生产者发送速度
spring:
kafka:
producer:
linger-ms: 100 # 增加延迟时间
batch-size: 16384 # 减少批量大小
acks: 1 # 降低确认级别
四、深入分析:找到根本原因
4.1 查看消费者日志
我登录到消费者服务器,查看最近的日志:
# 查看消费者日志
tail -f /var/log/order-consumer/application.log
在日志中发现了大量错误信息:
2023-12-23 10:05:23.123 ERROR [order-consumer-1] c.e.o.h.OrderHandler : Failed to process message: order_1234567890
org.springframework.dao.DataIntegrityViolationException: Could not execute statement; SQL [n/a]; constraint [uk_order_no]; nested exception is org.hibernate.exception.ConstraintViolationException: Could not execute statement
4.2 分析错误原因
从日志可以看出,消费者在处理消息时遇到了数据库唯一约束冲突。具体来说,是订单号(order_no)重复导致的插入失败。
我立即检查了数据库中的订单表:
SELECT COUNT(*) FROM `order` WHERE order_no = 'order_1234567890';
-- 返回结果:1
SELECT * FROM `order` WHERE order_no = 'order_1234567890';
-- 发现该订单已经存在,状态为"待支付"
4.3 问题定位
经过分析,我发现了问题的根本原因:
消息重复消费:由于Kafka的at-least-once语义,在消费者处理消息后,如果提交偏移量失败,消息会被重新消费。而我们的订单创建逻辑没有做幂等处理,导致重复消息创建了重复订单。
根本原因:小王的代码在处理订单创建时,没有实现幂等性,导致消息重复消费时产生数据库唯一约束冲突,消费者处理失败,消息重新入队,形成死循环。
4.4 验证猜想
为了验证我的猜想,我查看了消费者组的偏移量提交情况:
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group order-consumer
输出显示,消费者的偏移量几乎没有前进,说明消息一直在被重复消费。
五、解决方案:从临时到根治
5.1 临时修复:跳过问题消息
为了快速恢复业务,我决定先跳过有问题的消息:
@Component
public class OrderHandler {
@KafkaListener(topics = "order-topic", groupId = "order-consumer")
public void handleOrderMessage(String message) {
try {
OrderDTO order = parseOrder(message);
createOrder(order);
} catch (DataIntegrityViolationException e) {
// 如果是唯一约束冲突,说明订单已存在,直接跳过
log.warn("订单已存在,跳过处理: {}", order.getOrderNo());
} catch (Exception e) {
log.error("处理订单消息失败", e);
throw e; // 其他异常继续抛出,触发重试
}
}
}
注意:这只是临时方案,不能作为长期解决方案,因为:
- 可能丢失其他类型的错误信息
- 无法处理其他原因导致的重复消息
- 掩盖了真正的业务问题
5.2 根本解决:实现幂等性
方案一:数据库幂等表
CREATE TABLE `idempotent_record` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`biz_type` varchar(50) NOT NULL COMMENT '业务类型',
`biz_id` varchar(100) NOT NULL COMMENT '业务唯一标识',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_biz_type_biz_id` (`biz_type`, `biz_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
@Service
public class IdempotentService {
@Autowired
private IdempotentRecordMapper idempotentRecordMapper;
public boolean checkAndRecord(String bizType, String bizId) {
try {
IdempotentRecord record = new IdempotentRecord();
record.setBizType(bizType);
record.setBizId(bizId);
idempotentRecordMapper.insert(record);
return false; // 插入成功,说明未处理过
} catch (DuplicateKeyException e) {
return true; // 插入失败,说明已处理过
}
}
}
方案二:Redis幂等控制
@Service
public class RedisIdempotentService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String IDEMPOTENT_PREFIX = "idempotent:";
private static final long EXPIRE_TIME = 24 * 60 * 60; // 24小时
public boolean checkAndSet(String bizType, String bizId) {
String key = IDEMPOTENT_PREFIX + bizType + ":" + bizId;
return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "1", EXPIRE_TIME, TimeUnit.SECONDS));
}
}
方案三:业务逻辑幂等
@Service
public class OrderService {
@Transactional
public void createOrder(OrderDTO order) {
// 先查询订单是否已存在
Order existingOrder = orderMapper.selectByOrderNo(order.getOrderNo());
if (existingOrder != null) {
log.info("订单已存在,跳过创建: {}", order.getOrderNo());
return;
}
// 创建订单
orderMapper.insert(order);
}
}
5.3 最终选择
考虑到性能和实现复杂度,我选择了方案三:业务逻辑幂等,因为:
- 不需要引入额外的表或缓存
- 业务逻辑清晰,易于维护
- 性能影响最小
修改后的订单创建逻辑:
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Transactional
public void createOrder(OrderDTO order) {
// 检查订单是否已存在
Order existingOrder = orderMapper.selectByOrderNo(order.getOrderNo());
if (existingOrder != null) {
log.info("订单已存在,跳过创建: {}", order.getOrderNo());
return;
}
// 创建订单
Order newOrder = convertToEntity(order);
orderMapper.insert(newOrder);
log.info("订单创建成功: {}", order.getOrderNo());
}
}
六、恢复验证:确保问题解决
6.1 验证消费者恢复
修改代码并重新部署后,我监控消费者的处理情况:
# 查看消费者组状态
while true; do
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group order-consumer | grep -E "TOPIC|LAG"
sleep 10
done
输出显示,积压消息数正在逐渐减少:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
order-topic 0 1234567 2345678 1111111
order-topic 1 987654 1987654 1000000
# 10秒后
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
order-topic 0 1234689 2345678 1110989
order-topic 1 987789 1987654 999865
6.2 验证业务正确性
我检查了数据库中的订单数据,确认没有重复订单:
-- 检查是否有重复订单
SELECT order_no, COUNT(*) as count
FROM `order`
GROUP BY order_no
HAVING count > 1;
-- 返回结果:空
6.3 验证告警恢复
我查看了监控系统,确认Kafka积压告警已经消失:
[INFO] Kafka消费者组order-consumer积压消息数:123
[INFO] Kafka消费者组payment-consumer积压消息数:89
[INFO] Kafka集群磁盘使用率:45%
七、复盘总结:从事故中学习
7.1 事故原因总结
直接原因:
- 消费者代码没有实现幂等性,导致重复消息处理失败
- 消息重复消费形成死循环,导致积压
根本原因:
- 代码审查不严格,幂等性问题被忽略
- 缺乏完善的监控和告警机制
- 团队成员技能单一,Kafka知识储备不足
- 缺乏应急预案和文档
7.2 改进措施
技术层面:
- 完善幂等性处理:在所有消息处理逻辑中增加幂等性检查
- 优化消费者配置:调整max.poll.records、fetch.min.bytes等参数
- 增加重试机制:实现带退避策略的重试机制,避免无限重试
- 引入死信队列:将处理失败的消息转移到死信队列,避免阻塞正常消费
监控层面:
- 完善监控告警:增加Kafka积压、消费者延迟、错误率等监控指标
- 设置多级告警:根据积压程度设置不同级别的告警
- 增加业务监控:监控订单创建成功率、支付成功率等业务指标
流程层面:
- 加强代码审查:建立严格的代码审查流程,重点关注幂等性、异常处理等
- 完善文档:编写Kafka使用规范、常见问题处理文档
- 定期演练:定期进行故障演练,提高应急响应能力
- 技能培训:组织Kafka技术培训,提升团队整体能力
7.3 个人成长
这次事故让我深刻认识到:
- 技术深度的重要性:不能只停留在表面,要深入理解每个组件的原理
- 应急响应能力:面对紧急情况要保持冷静,按照预案有序处理
- 团队协作:技术问题往往不是一个人的事,需要团队共同努力
- 持续学习:技术更新换代很快,要保持学习的态度
八、写给跑路的同事
虽然你已经离开了,但我想对你说:
感谢你:感谢你在团队期间的付出,虽然最后留下了这个烂摊子,但你也为团队做了很多贡献。
希望你:在未来的工作中,能够更加负责任,遇到问题不要逃避,勇敢面对和解决。
提醒你:技术能力固然重要,但责任心和团队协作同样重要。希望你能在新的环境中成长,成为更好的自己。
九、结语
Kafka消息积压是一个常见但严重的问题,处理起来需要冷静、专业和系统化的方法。通过这次事故,我不仅解决了技术问题,更重要的是学会了如何在压力下保持冷静,如何系统化地分析和解决问题。
记住:技术问题不可怕,可怕的是没有解决问题的能力和勇气。每一次事故都是一次成长的机会,关键是要从中学习,不断进步。
最后,给所有正在处理Kafka积压问题的你:
- 保持冷静,不要慌张
- 先评估影响,再制定方案
- 优先恢复业务,再解决根本问题
- 做好复盘,避免类似问题再次发生
希望这篇文章能帮助你在遇到类似问题时,能够从容应对,快速解决!
若内容若侵犯到您的权益,请发送邮件至:platform_service@jienda.com我们将第一时间处理!
所有资源仅限于参考和学习,版权归JienDa作者所有,更多请访问JienDa首页。





