Kafka生产问题总结及性能优化实践

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

Kafka Manager

Kafka可视化管理工具。

具体的安装及使用请参考如下文章。
传送门https://blog.csdn.net/u010355502/article/details/132448713

线上环境规划

概述

  • 80%的数据一般集中在20%的时间内产生
  • 一般需要在QPS均值基础上放大5~10倍以应对瞬时高峰流量
  • Topic一般设置副本为2
  • 高并发环境下一般使用物理机
  • 由于Kafka是顺序读写用物理磁盘即可无需SSD固态盘固态盘在随机读写场景下首选
  • Kafka启动Broker内部会有很多线程同时运作一般建议机器CPU核数尽量多点。Kafka写消息到磁盘会用到大量的os cache内存也要多预留些
  • 合理考虑网卡千兆/万兆网卡根据实际压缩情况选择

JVM参数设置

kafka是scala语言开发运行在JVM上需要对JVM参数合理设置参看JVM调优专题。
修改bin/kafka-start-server.sh中的jvm设置假设机器是32G内存可以如下设置

export KAFKA_HEAP_OPTS="-Xmx16G -Xms16G -Xmn10G -XX:MetaspaceSize=256M -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:G1HeapRegionSize=16M"

这种大内存的情况一般都要用G1垃圾收集器因为年轻代内存比较大用G1可以设置GC最大停顿时间不至于一次minor gc就花费太长时间。
当然因为像kafkarocketmqes这些中间件写数据到磁盘会用到操作系统的page cache所以JVM内存不宜分配过大需要给操作系统的缓存留出几个G。

线上问题及优化

消息丢失

消息发送端
1acks=0 表示producer不需要等待任何broker确认收到消息的回复就可以继续发送下一条消息。性能最高但是最容易丢消息。大数据统计报表场景对性能要求很高对数据丢失不敏感的情况可以用这种。
2acks=1 至少要等待leader已经成功将数据写入本地log但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下如果follower没有成功备份数据而此时leader又挂掉则消息会丢失。
3acks=-1或all 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别或跟钱打交道的场景才会使用这种配置。当然如果min.insync.replicas配置的是1则也可能丢消息跟acks=1情况类似。

消息消费端
如果消费这边配置的是自动提交万一消费到数据还没处理完就自动提交offset了但是此时你consumer直接宕机了未处理完的数据丢失了下次也消费不到了。

消息重复消费

消息发送端
发送消息如果配置了重试机制比如网络抖动时间过长导致发送端发送超时实际broker可能已经接收到消息但发送方会重新发送消息。

消息消费端
如果消费这边配置的是自动提交刚拉取了一批数据处理了一部分但还没来得及提交服务挂了下次重启又会拉取相同的一批数据重复处理。
一般消费端都是要做消费幂等处理的。

消息乱序

如果发送端配置了重试机制kafka不会等之前那条消息完全发送成功才去发送下一条消息这样可能会出现发送了123条消息第一条超时了后面两条发送成功再重试发送第1条消息这时消息在broker端的顺序就是231了。
所以是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式去发消息当然acks不能设置为0这样也能保证消息发送的有序。
kafka保证全链路消息顺序消费需要从发送端开始将所有有序消息发送到同一个分区然后用一个消费者去消费但是这种性能比较低可以在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个)一个内存队列开启一个线程顺序处理消息。

消息积压

1线上有时因为发送方发送消息速度过快或者消费方处理消息过慢可能会导致broker积压大量未消费消息。
此种情况如果积压了上百万未消费消息需要紧急处理可以修改消费端程序让其将收到的消息快速转发到其他topic(可以设置很多分区)然后再启动多个消费者同时消费新主题的不同分区。
2由于消息数据格式变动或消费者程序有bug导致消费者一直消费不成功也可能导致broker积压大量未消费消息。
此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列)后面再慢慢分析死信队列里的消息处理问题。

延时队列

延时队列存储的对象是延时消息。所谓的“延时消息”是指消息被发送以后并不想让消费者立刻获取而是等待特定的时间后消费者才能获取这个消息进行消费延时队列的使用场景有很多 比如
1在订单系统中 一个用户下单之后通常有30分钟的时间进行支付如果30分钟之内没有支付成功那么这个订单将进行异常处理这时就可以使用延时队列来处理这些订单了。
2订单完成1小时后通知用户进行评价。

实现思路
发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中topic_1stopic_5stopic_10s…topic_2h这个一般不能支持任意时间段的延时然后通过定时器进行轮训消费这些topic查看消息是否到期如果到期就把这个消息发送到具体业务处理的topic中。队列中消息越靠前的到期时间越早具体来说就是定时器在一次消费过程中对消息的发送时间做判断看下是否延迟到对应时间了如果到了就转发如果还没到这一次定时任务就可以提前结束了。

消息回溯

如果某段时间对已消费消息计算的结果觉得有问题可能是由于程序bug导致的计算错误当程序bug修复后这时可能需要对之前已消费的消息重新消费可以指定从多久之前的消息回溯消费这种可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移的消息开始消费。

分区数越多吞吐量越高吗

可以用kafka压测工具自己测试分区数不同各种情况下的吞吐量。

# 往test里发送一百万消息每条设置1KB
# throughput用来进行限流控制当设定的值小于0时不限流当设定的值大于0时当发送的吞吐量大于该值时就会被阻塞一段时间
bin/kafka-producer-perf-test.sh --topic test --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=192.168.0.84:9092 acks=1

image.png
网络上很多资料都说分区数越多吞吐量越高但从压测结果来看分区数到达某个值吞吐量反而开始下降实际上很多事情都会有一个临界值当超过这个临界值之后很多原本符合既定逻辑的走向又会变得不同。一般情况分区数跟集群机器数量相当就差不多了。
当然吞吐量的数值和走势还会和磁盘、文件系统、 I/O调度策略等因素相关。
注意
如果分区数设置过大比如设置10000可能会设置不成功后台会报错java.io.IOException : Too many open files
异常中最关键的信息是Too many open flies这是一种常见的Linux系统错误通常意味着文件描述符不足它一般发生在创建线程、创建Socket、打开文件这些场景下 。在Linux系统的默认设置下这个文件描述符的个数不是很多通过ulimit -n命令可以查看一般默认是1024可以将该值增大比如ulimit -n 65535

消息传递保障

  1. at most once(消费者最多收到一次消息0–1次)acks = 0 可以实现。
  2. at least once(消费者至少收到一次消息1–多次)ack = all 可以实现。
  3. exactly once(消费者刚好收到一次消息)at least once 加上消费者幂等性可以实现还可以用kafka生产者的幂等性来实现。

kafka生产者的幂等性因为发送端重试导致的消息重复发送问题kafka的幂等性可以保证重复发送的消息只接收一次只需在生产者加上参数props.put(“enable.idempotence”, true) 即可默认是false不开启。
具体实现原理是kafka每次发送消息会生成PID和Sequence Number并将这两个属性一起发送给brokerbroker会将PID和Sequence Number跟消息绑定一起存起来下次如果生产者重发相同消息broker会检查PID和Sequence Number如果相同不会再接收。

PID每个新的Producer在初始化的时候会被分配一个唯一的PID这个PID对用户完全是透明的。生产者如果重启则会生成新的PID。
Sequence Number对于每个PID该Producer发送到每个Partition的数据都有对应的序列号这些序列号是从0开始单调递增的。

Kafka的事务

Kafka的事务不同于RocketMQRocketMQ是保障本地事务比如数据库与MQ消息发送的事务一致性Kafka的事务主要是保障一次发送多条消息的事务一致性要么同时成功要么同时失败一般在Kafka的流式计算场景用得多一点。比如Kafka需要对一个Topic里的消息做不同的流式计算处理处理完分别发到不同的Topic里这些Topic分别被不同的下游系统消费比如HBaseRedisES等)这种我们肯定希望系统发送到多个Topic的数据保持事务一致性。Kafka要实现类似RocketMQ的分布式事务需要额外开发功能。

Kafka的事务处理可以参考官方文档https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

Kafka事务示例

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
// 初始化事务
producer.initTransactions();

try {
    // 开启事务
    producer.beginTransaction();
    for (int i = 0; i < 100; i++){
        // 发到不同的主题的不同分区
        producer.send(new ProducerRecord<>("hdfs-topic", Integer.toString(i), Integer.toString(i)));
        producer.send(new ProducerRecord<>("es-topic", Integer.toString(i), Integer.toString(i)));
        producer.send(new ProducerRecord<>("redis-topic", Integer.toString(i), Integer.toString(i)));
    }
    // 提交事务
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // We can't recover from these exceptions, so our only option is to close the producer and exit.
    producer.close();
} catch (KafkaException e) {
    // For all other exceptions, just abort the transaction and try again.
    // 回滚事务
    producer.abortTransaction();
}
producer.close();

Kafka高性能的原因

  • 磁盘顺序读写

kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读kafka的消息写入文件都是追加在文件末尾不会写入文件中的某个位置随机写保证了磁盘顺序写。
磁盘是由磁道组成的寻址就是通过磁头找到对应的磁道中的数据位置如果是顺序写磁头就可以一直向前寻址而不用来回跳跃降低性能。

  • 数据传输的零拷贝

image.png

  • 读写数据的批量batch处理以及压缩传输
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6