【Kafka】基本概念-CSDN博客

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

文章目录

一、消息队列的流派

1.1 有Broker

这个流派通常有⼀台服务器作为 Broker所有的消息都通过它中转。⽣产者把消息发送给它就结束⾃⼰的任务了Broker 则把消息主动推送给消费者或者消费者主动轮询

1.1.1 重topic

生产者生产的消息有topic消费者订阅topic在重topic的消息队列⾥必然需要topic的存在
在这里插入图片描述

1.1.2 轻topic

这种的代表是 RabbitMQ或者说是 AMQP。⽣产者发送 key 和数据消费者订阅队列Broker 收到数据之后会通过⼀定的逻辑计算出key 对应的队列然后把数据交给队列。topic只是其中⼀种中转模式
在这里插入图片描述

1.2 无Broker

⽆ Broker 的 MQ 的代表是 ZeroMQ。该作者⾮常睿智他⾮常敏锐的意识到——MQ 是更⾼级的 Socket它是解决通讯问题的

在这里插入图片描述

节点之间通讯的消息都是发送到彼此的队列中每个节点都既是⽣产者⼜是消费者。ZeroMQ做的事情就是封装出⼀套类似于 Socket 的 API 可以完成发送数据读取数据

二、kafka安装

  • 部署⼀台zookeeper服务器
  • 安装jdk
  • 下载kafka的安装包http://kafka.apache.org/downloads
  • 上传kafka到服务器 /usr/local/kafka
  • 解压缩压缩包
  • 进⼊到config⽬录内修改server.properties
#broker.id属性在kafka集群中必须要是唯⼀
broker.id=0
#kafka部署的机器ip和提供服务的端⼝号
listeners=PLAINTEXT://192.168.65.60:9092 
#kafka的消息存储⽂件
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181
  • 进⼊到bin⽬录内执⾏以下命令来启动kafka服务器带着配置⽂件
./kafka-server-start.sh -daemon ../config/server.properties
  • 校验kafka是否启动成功进⼊到zk内查看是否有kafka的节点 /brokers/ids/0

三、kafka基本术语

在这里插入图片描述

创建topic这个topic只有⼀个partition并且备份因⼦也设置为1

/kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replicationfactor 1 --partitions 1 --topic test

查看当前kafka内有哪些topic

./kafka-topics.sh --list --zookeeper 172.16.253.35:2181

四、发送消息

./kafka-console-producer.sh --broker-list 172.16.253.38:9092 --topic test

五、消费消息

默认是消费最新的消息使⽤kafka的消费者消息的客户端从指定kafka服务器的指定
topic中消费消息

在这里插入图片描述

  • ⽅式⼀从最后⼀条消息的偏移量+1开始消费
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --topic test
  • ⽅式⼆从头开始消费
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --from-beginning --topic test

⼏个注意点消息是顺序存储的、有偏移量的、消费时可以指明偏移量进⾏消费、消费之后依然保存在日志文件中

六、单播消息

无论是几个消费者、几个消费者组都只有⼀个消费者可以收到订阅的topic中的消息

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --consumer-property group.id=testGroup --topic test

七、多播消息

不同的消费组订阅同⼀个topic同一消费者组只有⼀个消费者能收到消息多个消费组中的多个消费者可以收到同⼀个消息

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --consumer-property group.id=testGroup1 --topic test
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --consumer-property group.id=testGroup2 --topic test

八、查看消费组的详细信息

通过以下命令可以查看到消费组的详细信息

./kafka-consumer-groups.sh --bootstrap-server 172.16.253.38:9092 --describe --group testGroup

在这里插入图片描述

九、主题topic

主题topic在kafka中是⼀个逻辑的概念kafka通过topic将消息进⾏分类不同的topic会被订阅该topic的消费者消费。

但是有⼀个问题如果说这个topic中的消息⾮常⾮常多多到需要⼏T来存因为消息是会被保存到log⽇志⽂件中的。为了解决这个⽂件过⼤的问题kafka提出了Partition分区的概念

十、分区

通过partition将⼀个topic中的消息分区来存储这样的好处有多个

  • 分区存储可以解决存储⽂件过⼤的问题
  • 提供了读写的吞吐量读和写可以同时在多个分区中进⾏
    在这里插入图片描述

创建多分区的主题

./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 1 --partitions 2 --topic test1

十一、kafka中消息⽇志⽂件中保存的内容

  • 00000.log这个⽂件中保存的就是消息
  • __consumer_offsets-49kafka内部⾃⼰创建了__consumer_offsets主题包含了50个分区这个主题⽤来存放消费者消费某个topic的偏移量key就是consumerGroupId+topic+分区号value就是消费的偏移量 。kafka会维护每个消费者组的消费者消费不同topic以及不同分区的偏移量consumer_offsets
  • kafka为了提升这个主题的并发性默认设置了50个分区。
    • 提交到哪个分区通过hash函数hash(consumerGroupId) % __consumer_offsets主题的分区数
    • 提交到该主题中的内容是key是consumerGroupId+topic+分区号value就是当前offset的值
  • ⽂件中保存的消息默认保存7天
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6