Kafka架构组成及相关内容
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
主流的消息队列有ActiveMQ、RabbitMQ、RocketMQ、Kafka主要应用场景是解耦、异步、消峰可以参考链接
0. 主要参考
https://www.bilibili.com/video/BV1vr4y1677k
1. Kafka基础架构组成
主要由生产者(组)、Broke、消费者(组)、Topic主题、Partition分区 、Replica副本Leader副本、Follower副本
【架构组成】
- Broker一台 Kafka 服务器就是一个 brokerKafka集群就会有多个 broker
- Producer消息生产者就是向 broker 发消息的客户端
- Consumer消息消费者就是向 broker 取消息的客户端
- Topic每条发布到Kafka集群的消息都有一个类别这个类别被称为Topic
- Partition一个 Topic 可以分为多个 partition分区每个 partition 是一个有序的队列可以分布到多个 broker上
- Replica每个分区有多个副本每个分区都有一个Leader副本和若干Follower副本
`【消费者组-主题关系】
- 消费者组由多个消费者组成每一个消费者订阅的同一个Topic
- 一个消费者可以消费多个分区但一个分区只能被一个消费者组的消费者消费此外消费者组之间互不影响
`【Topic主题-Partition分区-Replica副本关系】
- 一个Topic有多个Partition每个分区有多个副本如上图一个TopicA有2个分区三个副本其中Broke1的分区0是主副本其他的为从副本
2. Kafka的一些操作命令
2.1.【启动】
- sh kafka-server-start.sh -daemon …/config/server.properties
·2.2.【创建Topic】
- bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic first
·
选项说明
– topic 定义 topic 名
–replication-factor 定义副本
–partitions 定义分区数
·2.3.【启动生产者】
- bin/kafka-console-producer.sh --bootstrap-server 192.169.182.128:9092 --topic first
·2.4.【启动消费者者消费主题】
- bin/kafka-console-consumer.sh --bootstrap-server 192.169.182.128:9092 --topic first
·2.5.【启动消费者者消费主题包括历史数据】
- bin/kafka-console-consumer.sh --bootstrap-server 192.169.182.128:9092 --from-beginning --topic first
·2.6. 【查看当前服务器中的所有 Topic】
- bin/kafka-topics.sh --bootstrap-server 192.169.182.128:9092 --list
`2.7. 【查看 first 主题的详情】
- bin/kafka-topics.sh --bootstrap-server 192.169.182.128:9092 --describe --topic first
·2.8.【修改分区数注意分区数只能增加不能减少】
- bin/kafka-topics.sh --bootstrap-server 192.169.182.128:9092 --alter --topic first --partitions 3
·2.9. 【删除 topic】
- bin/kafka-topics.sh --bootstrap-server 192.169.182.128:9092 --delete --topic first
3. Kafka 生产者消息发送流程
【发送原理】
- 1生产者实例Producer调用send方法发送消息
- 2先经过一层拦截器处理在经过kafka的序列化器兑key和value进行序列化然后经过分区器选择消息发送的分区
- 3消息会先发到RecordAccumulator缓存区默认32M由生产者配置buffer.memory 设置
- 3.1首先根据分区创建队列Deque<ProducerBatch>然后将消息追到队列批次ProducerBatch
- 3.2批次大小默认16K由生产者配置batch.size设置
- 4队列批次已满或者经过linger.ms等待时间默认0ms就唤醒sender线程进行消息发送到broker
- 4.1会先判断RecordAccumulator缓存区是否满足发送条件达到batch.size或linger.ms
- 4.2发送时会先过滤出可发送的broke节点然后进行封装成ClientRequest请求通过selector发送到broker
4.3发送时broker可以缓存最近的5个请求- 5请求发送到集群后Kafka集群会返回对应的acks应答发送成功则区移除缓存区的队列批次
- 5.1如果失败并允许重试配置retries重试次数则会进行重试发送不管最终成功还是失败都要移除队列批次
4. Kafka 的ack机制
Kafka 的ack机制是指生产者的消息发送确认机制有01-1可选值不同的值会影响消息发送的吞吐量和可靠性
- 通过acks进行配置properties.put(ProducerConfig.ACKS_CONFIG,“1”);
- 0生产者发送过来的数据不需要等数据落盘应答可靠性差消息没落盘leader挂了消息丢失但效率高。
- 1生产者发送过来的数据Leader收到数据落盘后应答可靠性和效率都适中
- -1all生产者发送过来的数据Leader+和isr队列里面的所有节点收齐数据后应答可靠性高效率低。
- 一般acks=0很少使用acks=1一般用于传输普通日志允许丢个别数据acks=-1一般用对可靠性要求比较高的场景比如转账。
5. Kafka 生产者消息发送模式同步/异步
【普通异步发送】
- 异步消息发送不需要等broke落盘确认便可以继续发送数据会一直先发到RecordAccumulator缓存区
- kafkaProducer.send(new ProducerRecord<>(“first”,“aaaa”));
- ·
【普通异步回调发送】
异步回调可以知道生产者发送异步消息有没有异常如果有异常exception不为null可以进行再处理
【同步发送】
- 同步消息发送需要等broke落盘确认才可以继续发送再确认之前下一批数据不会发到RecordAccumulator缓存区
- kafkaProducer.send(new ProducerRecord<>(“first”,“aaaa”)).get();
6. Kafka发送消息的分区策略
kafkaProducer.send(new ProducerRecord<>(“first”, 1,“”,“hello”)
7. Kafka消息发送可靠性保证消息不丢失
- 数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
- 但会存在数据重复落盘的情况即消息都落盘后leader挂了然后选举新的leader消息重试重复落盘
8. Kafka消息发送去重
Kafka默认是开启幂等性的即enable.idempotence 默认为 true
8.1. 幂等性
幂等性就是指Producer不论向Broker发送多少次重复数据Broker端都只会持久化一条保证了不重复。
- 重复数据的判断标准具有<PID, Partition, SeqNumber>相同主键的消息提交时Broker只会持久化一条。
- 比如上述ack=-1消息重试时SeqNumber是一致的就不能落盘
8.2. 生产者事务
【事务原理】
【发送代码】
·
9. Kafka的有序消息保证
【有序】在同一个分区是有序的但在多个分区是无法保证有序的
·
【有序消息保证】
- 如果要保证某一类业务有序可以通过自定义分区器实现Partitioner接口根据某一唯一key进行发送到指定broker分区
- 如果不区分业务要保证强有序只能使用一个分区但性能会大大降低
·
【重试乱序问题】
- 往同一分区发送abcdef六个消息生产者配置是重试retries比如c异常重试理论上会造成乱序abdefc;
- 【解决】开启幂等性enable.idempotence=truebroke缓存请求max.in.flight.requests.per.connection需要设置小于等于5
- 【原理】缓存请求5个时ab无异常正常broke数据落盘c异常重试def正常发送由于幂等性def还在内存中并不会落盘等c重试时在broke重新排序最后cdef落盘
- 如果c一直重试失败最后被异常那是不是cdef都发送失败
10. Kafka提高生产者吞吐量
可以增大缓冲区、批次大小或者稍微调大批次等待时间linger.ms以及对批次消息压缩
// 缓冲区大小 ,默认32M
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// 批次大小默认16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// linger.ms默认0ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 压缩默认不压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");