【Kafka系列】(二)Kafka的基本使用

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

有的时候博客内容会有变动首发博客是最新的其他博客地址可能会未同步,认准https://blog.zysicyj.top

首发博客地址[1]

文章更新计划[2]

系列文章地址[3]


Kafka 线上集群部署方案怎么做

操作系统

先说结论Kafka 部署在 Linux 上要比 Windows 和 Mac 上性能高的多主要是以下几个原因

  1. 操作系统优化Linux 操作系统在网络和文件系统性能方面通常比 Windows 和 Mac 更优秀。Linux 内核对网络和磁盘 I/O 的处理更高效能够更好地利用硬件资源从而提高 Kafka 的性能。

  2. 文件系统选择Linux 上常用的文件系统如 ext4、XFS 等对大规模数据处理和高并发读写有更好的支持。而 Windows 上的 NTFS 文件系统在处理大量小文件和高并发读写时性能相对较差。

  3. 网络栈性能Linux 的网络栈在处理高并发连接和大规模数据传输时表现更出色。Linux 内核对网络协议栈的优化更多能够更好地处理网络数据包提高 Kafka 的吞吐量和响应速度。

  4. 硬件资源管理Linux 操作系统对硬件资源的管理更加灵活和高效。Linux 上的进程调度、内存管理等机制能够更好地利用多核处理器和大内存提高 Kafka 的并发处理能力。

  5. 社区支持度

I/O 模型

主流的 I/O 模型通常有以下五种类型

  1. 阻塞 I/OBlocking I/O在进行 I/O 操作时应用程序会被阻塞直到数据准备好或者操作完成。这种模型是最简单的但是会导致应用程序的性能下降因为在等待 I/O 完成时CPU 无法处理其他任务。

  2. 非阻塞 I/ONon-blocking I/O在进行 I/O 操作时应用程序可以继续执行其他任务而不会被阻塞。但是如果数据还没有准备好或者操作还没有完成应用程序需要不断地轮询来检查状态这会导致 CPU 的资源浪费。

  3. I/O 多路复用I/O Multiplexing通过使用 select、poll 或者 epoll 等系统调用应用程序可以同时监视多个文件描述符的状态当任何一个文件描述符准备好进行 I/O 操作时应用程序就可以进行相应的读写操作。这种模型可以有效地处理多个连接提高系统的并发性能。

  4. 信号驱动 I/OSignal-driven I/O应用程序通过注册信号处理函数在数据准备好时接收到一个信号然后进行相应的读写操作。这种模型相比于非阻塞 I/O减少了轮询的开销但是仍然需要应用程序不断地检查数据是否准备好。

  5. 异步 I/OAsynchronous I/O应用程序发起一个 I/O 操作后可以继续执行其他任务当数据准备好或者操作完成时操作系统会通知应用程序进行相应的读写操作。这种模型是最高效的因为应用程序不需要进行轮询或者阻塞等待可以充分利用 CPU 的资源。

你不必详细了解每一种模型的实现细节通常情况下我们认为后一种模型会比前一种模型要高级比如 epoll 就比 select 要好了解到这一程度应该足以应付我们下面的内容了。

说了这么多I/O 模型与 Kafka 的关系又是什么呢实际上 Kafka 客户端底层使用了 Java 的 selectorselector 在 Linux 上的实现机制是 epoll而在 Windows 平台上的实现机制是 select。因此在这一点上将 Kafka 部署在 Linux 上是有优势的因为能够获得更高效的 I/O 性能

零拷贝

Kafka 在 Linux 上支持零拷贝Zero-copy的主要原因是 Linux 操作系统提供了一些特性和系统调用使得零拷贝成为可能。而 Windows 操作系统在设计上与 Linux 有所不同因此不直接支持零拷贝。

零拷贝是一种优化技术可以减少数据在内核空间和用户空间之间的拷贝次数提高数据传输的效率。在传统的拷贝方式中数据从磁盘读取到内核缓冲区然后再从内核缓冲区拷贝到用户空间的应用程序缓冲区这涉及到多次数据拷贝操作增加了 CPU 和内存的开销。

在 Linux 上零拷贝的实现主要依赖以下几个特性和系统调用

  1. 文件描述符File DescriptorLinux 使用文件描述符来表示打开的文件通过文件描述符可以进行文件的读写操作。

  2. 内核缓冲区Kernel BufferLinux 内核提供了一块内存区域作为内核缓冲区用于存放从磁盘读取的数据。

  3. sendfile 系统调用sendfile 系统调用可以在内核空间和用户空间之间直接传输数据而无需经过用户空间缓冲区。

通过使用 sendfile 系统调用Kafka 可以直接将数据从磁盘读取到内核缓冲区然后通过网络发送给消费者避免了数据在内核空间和用户空间之间的多次拷贝。

而在 Windows 上没有类似于 Linux 的 sendfile 系统调用因此无法直接实现零拷贝。在 Windows 上数据需要经过内核空间和用户空间之间的多次拷贝导致性能上的损失。

一句话总结一下在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的快速数据传输特性。

磁盘

先说结论

  • 追求性价比的公司可以不搭建 RAID使用普通磁盘组成存储空间即可
  • 使用机械磁盘完全能够胜任 Kafka 线上环境

为什么说 Kafka 可以不搭建 RAID 环境

  1. 分布式架构 Kafka 采用分布式架构将消息分散存储在多个 Broker 节点上。每个 Broker 节点都是独立的它们之间相互复制消息实现数据的冗余和高可用性。因此即使某个节点的磁盘发生故障其他节点仍然可以提供服务不会导致数据丢失。

  2. 数据复制 Kafka 使用副本机制来保证数据的可靠性。每个分区都可以配置多个副本这些副本分布在不同的 Broker 节点上。当消息写入到 Leader 副本后Kafka 会将消息复制到其他副本确保数据的冗余存储。如果某个副本所在的磁盘发生故障Kafka 会自动选择其他副本作为 Leader保证数据的可用性。

  3. 持久化存储 Kafka 将消息持久化存储在磁盘上而不是仅保存在内存中。这样即使 Broker 节点发生故障消息也不会丢失。Kafka 使用顺序写入的方式将消息追加到磁盘上的日志文件中这种方式对磁盘的要求相对较低不需要特别高的磁盘性能。

  4. 水平扩展Kafka 支持水平扩展可以通过增加 Broker 节点来提高系统的吞吐量和容量。在扩展过程中可以选择在新节点上添加磁盘而不需要对现有节点进行改动。这种方式可以灵活地根据需求来调整磁盘的配置和容量。

为什么说使用机械磁盘完全能够胜任 Kafka 线上环境

Kafka 是一个高吞吐量、低延迟的分布式消息系统它的性能和稳定性对于线上环境非常重要。虽然 SSD固态硬盘在性能方面有明显的优势但机械磁盘仍然可以胜任 Kafka 线上环境的原因如下

  1. 顺序写入Kafka 的特点之一是顺序写入即消息按照顺序追加到日志文件中。机械磁盘在顺序写入方面的性能表现通常比较好因为它们具有较大的磁道和扇区可以更好地支持连续写入操作。

  2. 容量成本低相比于 SSD机械磁盘的容量成本更低。对于大规模的 Kafka 集群存储成本是一个重要的考虑因素。机械磁盘可以提供更大的存储容量使得 Kafka 能够存储更多的消息数据。

  3. 持久性和可靠性机械磁盘通常具有更高的持久性和可靠性。它们在写入数据时通常会将数据缓存在磁盘的缓存区中然后再进行持久化写入。这种写入方式可以提供更好的数据可靠性即使在断电等异常情况下数据也不容易丢失。

  4. 成熟稳定机械磁盘是一种成熟的存储设备已经在各种应用场景中得到广泛应用。相比之下SSD 虽然在性能方面有优势但在稳定性和寿命方面可能存在一些问题。对于对数据可靠性要求较高的线上环境机械磁盘可能更受信任。

需要注意的是尽管机械磁盘可以胜任 Kafka 线上环境但在某些特定场景下如对延迟要求非常高的应用或者对存储容量要求非常大的应用可能需要考虑使用 SSD 等更高性能的存储设备。此外随着技术的发展未来可能会有更多新型存储设备出现对于 Kafka 的存储需求可能会有不同的选择。

磁盘容量

总结下磁盘容量需要考虑的因素

  • 新增消息数
  • 消息留存时间
  • 平均消息大小
  • 备份数
  • 是否启用压缩

我举一个简单的例子来说明该如何思考这个问题。假设你所在公司有个业务每天需要向 Kafka 集群发送 1 亿条消息每条消息保存两份以防止数据丢失另外消息默认保存两周时间。现在假设消息的平均大小是 1KB那么你能说出你的 Kafka 集群需要为这个业务预留多少磁盘空间吗

我们来计算一下每天 1 亿条 1KB 大小的消息保存两份且留存两周的时间那么总的空间大小就等于 1 亿 * 1KB * 2 / 1000 / 1000 = 200GB。一般情况下 Kafka 集群除了消息数据还有其他类型的数据比如索引数据等故我们再为这些数据预留出 10% 的磁盘空间因此总的存储容量就是 220GB。既然要保存两周那么整体容量即为 220GB _ 14大约 3TB 左右。Kafka 支持数据的压缩假设压缩比是 0.75那么最后你需要规划的存储空间就是 0.75 _ 3 = 2.25TB

带宽

举个例子我们使用以下假设和计算

  1. 带宽资源假设机房环境是千兆网络1Gbps即每秒处理 1Gb 的数据。
  2. 带宽利用率假设 Kafka 服务器最多使用 70%的带宽资源即每秒最多使用 700Mb 的带宽。
  3. 预留资源为了避免网络丢包我们额外预留了 2/3 的带宽资源即单台服务器使用带宽 240Mbps。
  4. 数据处理目标在 1 小时内处理 1TB 的业务数据即每秒需要处理 2336Mb 的数据。

根据以上计算我们得出了需要 10 台 Kafka 服务器来完成这个业务目标。这个计算还没有考虑到消息的复制如果消息需要额外复制两份那么总的服务器台数还要乘以 3即需要 30 台服务器。

在实际部署中你可以根据自己的网络环境和业务需求进行调整和优化。

另外如果你的环境中还涉及跨机房传输那么带宽资源的瓶颈可能会更加明显。在跨机房传输的情况下网络延迟和带宽限制都会对性能产生影响。你可能需要考虑使用更高带宽的网络或者采取其他优化措施来解决这个问题。

总结起来对于带宽资源的规划你需要考虑以下几个因素

  1. 网络带宽根据网络环境确定每秒处理的数据量。
  2. 带宽利用率根据实际情况确定 Kafka 服务器使用的带宽比例。
  3. 预留资源为了避免网络丢包额外预留一部分带宽资源。
  4. 数据处理目标根据业务需求确定每秒需要处理的数据量。
  5. 消息复制如果消息需要复制考虑复制的数量。

根据以上因素你可以计算出所需的 Kafka 服务器数量并根据实际情况进行调整和优化。

小结

alt

集群参数配置

静态参数和动态参数

静态参数是指在 Kafka 启动时配置的参数一旦设置后只能通过重启 Kafka 来更改。这些参数通常是对 Kafka 整体行为的全局设置例如 Kafka 的监听端口、日志目录、副本数量等。静态参数的配置通常在 Kafka 的配置文件如 server.properties中进行。

动态参数是指在 Kafka 运行时可以动态修改的参数而无需重启 Kafka。这些参数通常是对 Kafka 的某个特定组件或功能进行细粒度的调整。动态参数可以通过 Kafka 的命令行工具或 API 进行修改。

Broker

磁盘相关

在 Kafka 中Broker 是消息队列的核心组件负责接收、存储和转发消息。为了配置存储信息我们需要设置一些重要的参数。

  1. log.dirs这是一个非常重要的参数用于指定 Broker 使用的文件目录路径。这个参数没有默认值因此必须由用户自己指定。在生产环境中建议为 log.dirs 配置多个路径以提高读写性能和实现故障转移。具体格式是一个 CSV 格式多个路径之间用逗号分隔例如/home/kafka1,/home/kafka2,/home/kafka3。如果有条件的话最好将这些目录挂载到不同的物理磁盘上以提高性能和可靠性。

  2. log.dir这是 log.dirs 的补充参数用于指定单个路径。在实际使用中我们只需要设置 log.dirs 参数即可不需要设置 log.dir。

为什么要为 log.dirs 配置多个路径呢这是因为多块物理磁盘同时读写数据可以提高吞吐量同时也能实现故障转移。在 Kafka 1.1 版本之前如果 Broker 使用的任何一块磁盘挂掉了整个 Broker 进程都会关闭。但是从 Kafka 1.1 版本开始引入了 Failover 功能坏掉的磁盘上的数据会自动转移到其他正常的磁盘上Broker 仍然可以正常工作。这个改进使得我们不再依赖 RAID 来提供数据的可靠性而是通过多块磁盘的故障转移来实现。

需要注意的是如果使用了多个路径Kafka 会根据一定的策略将消息分配到不同的路径上以实现负载均衡。同时Kafka 也会自动管理磁盘空间当某个路径的磁盘空间不足时会自动将消息转移到其他路径上。

总结一下为了配置存储信息我们需要设置 log.dirs 参数为其配置多个路径最好挂载到不同的物理磁盘上。这样可以提高读写性能和实现故障转移。同时Kafka 会自动管理磁盘空间和实现负载均衡。这些配置可以在 Kafka 的配置文件中进行设置。

Zookeeper 相关

ZooKeeper 是一个分布式协调框架用于协调和管理 Kafka 集群的元数据信息。它负责保存 Kafka 集群的配置信息例如 Broker 的运行状态、Topic 的创建情况、分区信息以及 Leader 副本的位置等。

在 Kafka 中与 ZooKeeper 相关的最重要的参数是zookeeper.connect。这个参数是一个 CSV 格式的字符串用于指定连接到 ZooKeeper 集群的地址和端口。例如zk1:2181,zk2:2181,zk3:2181表示连接到三个 ZooKeeper 节点默认端口为 2181。

如果要让多个 Kafka 集群共享同一个 ZooKeeper 集群可以使用chroot参数来进行区分。chroot是 ZooKeeper 的概念类似于别名。假设有两个 Kafka 集群分别命名为 kafka1 和 kafka2那么可以将zookeeper.connect参数设置为zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2。这样就可以通过chroot来区分不同的 Kafka 集群。

需要注意的是chroot只需要在参数中指定一次并且应该添加到最后。有时候会遇到这样的错误格式zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3这是不正确的。

总结一下设置与 ZooKeeper 相关的参数时需要注意以下几点

  1. zookeeper.connect参数是一个 CSV 格式的字符串用于指定连接到 ZooKeeper 集群的地址和端口。
  2. 如果多个 Kafka 集群共享同一个 ZooKeeper 集群可以使用 chroot参数来进行区分。
  3. chroot参数只需要在参数中指定一次并且应该添加到最后。

下面是一个示例配置

zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka1

这个配置表示连接到三个 ZooKeeper 节点并使用kafka1作为chroot

Broker 相关

在 Kafka 中listeners 参数用于指定外部连接者通过什么协议访问 Kafka 服务。它是一个逗号分隔的三元组列表每个三元组由协议名称、主机名和端口号组成。协议名称可以是标准的协议如 PLAINTEXT 表示明文传输SSL 表示使用 SSL 或 TLS 加密传输等也可以是自定义的协议名称。

举个例子如果你定义了一个名为 CONTROLLER 的自定义协议你可以在 listeners 参数中添加 CONTROLLER://localhost:9092表示该协议通过 localhost 的 9092 端口进行通信。

需要注意的是如果你自定义了协议名称你还需要通过 listener.security.protocol.map 参数告诉 Kafka 使用哪种安全协议。比如如果你定义了 CONTROLLER 协议并且该协议使用明文传输数据你需要设置 listener.security.protocol.map=CONTROLLER:PLAINTEXT

另外主机名和端口号比较直观不需要过多解释。但是需要注意的是建议在 Broker 端和客户端应用的配置中都使用主机名而不是 IP 地址。因为在 Kafka 的源代码中也是使用主机名进行连接的。如果你在某些地方使用了 IP 地址进行连接可能会导致连接失败的问题。

总结一下listeners 参数用于指定 Kafka 服务的监听器告诉外部连接者通过什么协议访问 Kafka。它是一个三元组列表每个三元组由协议名称、主机名和端口号组成。建议使用主机名而不是 IP 地址进行配置。如果使用自定义协议还需要通过 listener.security.protocol.map参数指定安全协议。

Topic 相关

auto.create.topics.enable

该参数用于控制是否允许自动创建 Topic。建议将该参数设置为 false即不允许自动创建 Topic。

在线上环境中如果该参数被设置为 true可能会导致出现很多名字稀奇古怪的 Topic。例如当我们想要为名为 test 的 Topic 发送事件时由于拼写错误将 test 写成了 tst启动生产者程序后一个名为 tst 的 Topic 就会被自动创建。这种情况下好的运维应该防止这种情况的发生特别是对于大公司而言每个部门被分配的 Topic 应该由运维严格把控不允许自行创建任何 Topic。

unclean.leader.election.enable

该参数用于控制是否允许 Unclean Leader 选举。Unclean Leader 选举是指在 Kafka 中当保存数据较多的副本都挂掉时是否允许从保存数据较少的副本中选举出新的 Leader。

在 Kafka 中每个分区都有多个副本来提供高可用性其中只有一个副本对外提供服务即 Leader 副本。只有保存数据较多的副本才有资格竞选 Leader而那些落后进度太多的副本没有资格竞选。

如果设置unclean.leader.election.enable为 false那么 Kafka 将坚持之前的原则坚决不允许那些落后太多的副本竞选 Leader。这样做的后果是该分区将不可用因为没有 Leader。

如果设置unclean.leader.election.enable为 true那么 Kafka 允许从那些保存数据较少的副本中选举出新的 Leader。这样做的后果是数据有可能丢失因为这些副本保存的数据本来就不全当成为 Leader 后它本身就变得膨胀了认为自己的数据才是权威的。

需要注意的是该参数在最新版的 Kafka 中默认为 false。但是由于社区对该参数的默认值进行了多次更改所以建议在使用时显式地将其设置为 false。

auto.leader.rebalance.enable

该参数用于控制是否允许 Kafka 定期进行 Leader 选举。建议将该参数设置为 false。

设置auto.leader.rebalance.enable为 true 表示允许 Kafka 定期对一些 Topic 分区进行 Leader 重选举。需要满足一定的条件才会触发 Leader 重选举。

unclean.leader.election.enable参数不同的是auto.leader.rebalance.enable并不是选举新的 Leader而是更换现有的 Leader。例如如果 Leader A 一直表现良好但是当auto.leader.rebalance.enable为 true 时经过一段时间后Leader A 可能会被强制卸任换成 Leader B。

需要注意的是Leader 的更换代价很高。原本向 Leader A 发送请求的所有客户端都需要切换成向 Leader B 发送请求。而且这种 Leader 的更换本质上没有任何性能收益。

因此在生产环境中建议将auto.leader.rebalance.enable设置为 false避免不必要的 Leader 更换。

数据留存方面

在 Kafka 中有一组参数用于控制数据的留存。下面我将逐个介绍这些参数。

  1. log.retention.{hours|minutes|ms}这是一组参数用于控制消息数据在 Kafka 中保存的时间。这三个参数分别是以小时hours、分钟minutes和毫秒ms为单位的时间间隔。优先级上ms 设置最高minutes 次之hours 最低。通常情况下我们会设置较长的时间间隔比如 log.retention.hours=168 表示默认保存 7 天的数据自动删除 7 天前的数据。如果将 Kafka 用作存储系统那么这个值可能需要相应调大。

  2. log.retention.bytes这个参数用于指定 Broker 在磁盘上保存的消息数据的总容量大小。默认值为-1表示没有容量限制即可以保存任意大小的数据。这个参数在构建云上的多租户 Kafka 集群时发挥作用。假设你要提供一个云上的 Kafka 服务每个租户只能使用 100GB 的磁盘空间为了避免某个租户占用过多的磁盘空间设置这个参数就非常重要了。

  3. message.max.bytes这个参数用于控制 Broker 能够接收的最大消息大小。默认值为 1000012即不到 1MB。然而在实际场景中超过 1MB 的消息是很常见的。因此在生产环境中将这个值设置得比较大是比较保险的做法。这个参数只是一个标尺仅仅衡量 Broker 能够处理的最大消息大小即使设置得大一点也不会占用太多磁盘空间。

需要注意的是这些参数都是可配置的可以根据实际需求进行调整。在配置文件中可以通过设置对应的属性来修改这些参数的值。例如可以在server.properties文件中添加以下配置来修改log.retention.hours参数的值

log.retention.hours=168

这样就将消息数据的保存时间设置为 7 天。

总结一下这些参数在 Kafka 中起到了重要的作用可以根据实际需求来调整以满足不同的业务场景。

Topic 级别参数

Topic 级别的参数在 Kafka 中非常重要它允许我们为每个 Topic 设置特定的参数值这些参数会覆盖全局 Broker 参数的值。这样做的好处是可以根据不同的业务需求为不同的 Topic 设置不同的参数提高系统的灵活性和效率。

下面我将详细介绍几个重要的 Topic 级别参数按照用途分组。

  1. 保存消息方面的参数

    • retention.ms规定了该 Topic 消息被保存的时长。默认值是 7 天即该 Topic 只保存最近 7 天的消息。如果设置了这个值它会覆盖 Broker 端的全局参数值。通过设置不同的 retention.ms 值我们可以根据业务需求来控制消息的保存时长避免无效的数据占用过多的存储空间。
    • retention.bytes规定了为该 Topic 预留的磁盘空间大小。和全局参数的作用类似这个值在多租户的 Kafka 集群中非常有用。默认值是-1表示可以无限使用磁盘空间。通过设置不同的 retention.bytes 值我们可以根据不同的 Topic 的数据量来合理分配磁盘空间避免存储空间不足的问题。
  2. 处理消息大小方面的参数

    • max.message.bytes决定了 Kafka Broker 能够正常接收该 Topic 的最大消息大小。在很多公司中Kafka 作为基础架构组件运行承载了大量的业务数据。如果在全局层面上无法给出一个合适的最大消息值那么允许不同的业务部门自行设定 Topic 级别的 max.message.bytes参数就显得非常必要了。通过设置不同的 max.message.bytes值我们可以根据不同的业务需求来控制消息的大小确保系统能够正常处理各种大小的消息。

通过设置 Topic 级别的参数我们可以根据不同的业务需求来灵活地调整 Kafka 的配置提高系统的性能和可用性。同时这也是 Kafka 作为一个高性能分布式消息系统的重要特性之一。

需要注意的是Topic 级别的参数只对该 Topic 中的消息生效不会影响其他 Topic。如果没有为某个 Topic 设置特定的参数值那么将会使用全局 Broker 参数的默认值。

除了上述介绍的参数Kafka 还有其他一些 Topic 级别的参数如cleanup.policycompression.type等它们都可以根据具体的业务需求进行设置。在实际应用中我们可以根据不同的场景和需求灵活地使用这些参数来优化 Kafka 集群的性能和可靠性。

总结一下Topic 级别的参数允许我们为每个 Topic 设置特定的参数值覆盖全局 Broker 参数的值。通过设置不同的参数值我们可以根据业务需求来控制消息的保存时长、磁盘空间使用和消息大小等提高系统的灵活性和效率。这是 Kafka 作为一个高性能分布式消息系统的重要特性之一。

在 Kafka 中可以通过两种方式来设置 Topic 级别的参数在创建 Topic 时设置和修改已存在的 Topic 时设置。

1. 创建 Topic 时设置参数

在创建 Topic 时可以通过--config参数来设置 Topic 级别的参数。例如我们要创建一个名为transaction的 Topic并设置retention.ms为 15552000000max.message.bytes为 5242880可以使用以下命令

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880

在上述命令中--config后面的参数用于指定要设置的 Topic 级别参数。

2. 修改已存在的 Topic 时设置参数

可以使用kafka-configs命令来修改已存在的 Topic 的参数。假设我们要将transaction Topic 的max.message.bytes修改为 10485760可以使用以下命令

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760

在上述命令中--entity-type topics表示要修改的实体类型为 Topic--entity-name transaction表示要修改的 Topic 名称--alter表示要进行修改操作--add-config后面的参数用于指定要修改的 Topic 级别参数及其新值。

个人建议

个人建议始终坚持使用第二种方式来设置 Topic 级别参数并且在未来Kafka 社区很有可能统一使用kafka-configs脚本来调整 Topic 级别参数。这样做的好处是统一了设置参数的方式减少了学习成本和混淆同时也更加方便管理和维护。

JVM 参数

JVM 参数对于 Kafka 集群的性能和稳定性非常重要。在设置 JVM 参数之前首先需要确定 Java 版本。对于 Kafka 来说不推荐在 Java 6 或 7 的环境上运行建议至少使用 Java 8。

在 JVM 参数设置中堆大小是一个关键参数。尽管后面我们还会讨论如何调优 Kafka 性能的问题但是现在我想给出一个通用的建议将 JVM 堆大小设置为 6GB这是目前业界普遍认可的一个合理值。很多人使用默认的堆大小来运行 Kafka但是默认的 1GB 有点小因为 Kafka Broker 在与客户端进行交互时会在 JVM 堆上创建大量的 ByteBuffer 实例堆大小不能太小。

另一个重要的 JVM 参数是垃圾回收器GC的设置。如果你仍在使用 Java 7可以根据以下规则选择合适的垃圾回收器如果 Broker 所在机器的 CPU 资源非常充裕建议使用 CMSConcurrent Mark Sweep收集器启用方法是指定-XX:+UseConcMarkSweepGC。否则使用吞吐量收集器启用方法是指定-XX:+UseParallelGC。如果你使用 Java 8可以手动设置使用 G1Garbage First收集器。在没有任何调优的情况下G1 表现要比 CMS 更出色主要体现在更少的 Full GC 和需要调整的参数更少等方面所以使用 G1 就可以了。

现在我们确定了要设置的 JVM 参数接下来我们来为 Kafka 进行设置。奇怪的是这个问题在 Kafka 官网上居然没有被提及。实际上设置的方法非常简单你只需要设置下面这两个环境变量即可

  • KAFKA_HEAP_OPTS指定堆大小。例如你可以这样设置 export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
  • KAFKA_JVM_PERFORMANCE_OPTS指定 GC 参数。例如你可以这样设置 export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"

在启动 Kafka Broker 之前先设置好这两个环境变量然后执行启动命令例如

export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
$ bin/kafka-server-start.sh config/server.properties

这样就完成了 JVM 参数的设置。通过合理的设置可以提高 Kafka 集群的性能和稳定性。需要注意的是具体的参数设置可能因环境和需求而有所不同可以根据实际情况进行调整。

操作系统参数

Kafka 集群通常需要设置一些操作系统参数来优化性能和稳定性。下面是一些常见的操作系统参数设置

  1. 文件描述符限制ulimit -n文件描述符是操作系统用于跟踪打开文件的标识符。Kafka 集群需要同时打开大量的文件描述符因此需要增加文件描述符限制。默认情况下操作系统的文件描述符限制较低可能会导致 Kafka 进程无法打开足够的文件描述符从而影响性能。建议将文件描述符限制设置为一个较大的值例如 ulimit -n 1000000。

  2. 文件系统类型的选择Kafka 集群的性能和稳定性受到文件系统的影响。根据官方测试报告XFS 文件系统的性能要优于 ext4 文件系统。因此在生产环境中最好选择 XFS 文件系统。最近也有一些关于 Kafka 使用 ZFS 文件系统的报告显示其性能更强劲如果条件允许可以尝试使用 ZFS 文件系统。

  3. Swap 的调优Swap 是操作系统用于将内存中不常用的数据暂时存储在磁盘上的机制。一些文章建议将 Swap 设置为 0完全禁用 Swap以防止 Kafka 进程使用 Swap 空间。然而个人认为最好不要将 Swap 设置为 0而是设置为一个较小的值。这是因为当物理内存耗尽时操作系统会触发 OOM killer 组件随机选择一个进程并杀死它而不给用户任何预警。如果将 Swap 设置为一个较小的值当开始使用 Swap 空间时你至少能观察到 Broker 性能的急剧下降从而有时间进行进一步的调优和问题诊断。建议将 swappiness 设置为接近 0 但不为 0 的值例如 1。

  4. 提交时间Flush 落盘时间Kafka 发送数据时并不需要等待数据被写入磁盘才认为成功只要数据被写入操作系统的页缓存Page Cache即可。操作系统会根据 LRU 算法定期将页缓存上的“脏”数据写入物理磁盘。提交时间决定了这个定期的间隔默认为 5 秒。通常情况下这个时间间隔可能太频繁可以适当增加提交时间间隔来降低物理磁盘的写操作。需要注意的是如果数据在写入磁盘之前发生机器宕机数据将会丢失。但由于 Kafka 在软件层面提供了多副本的冗余机制因此可以适当增加提交时间间隔以换取性能。

需要注意的是以上参数设置是一般情况下的建议具体的设置还需要根据实际情况和硬件配置进行调整。另外不同的操作系统和版本可能会有不同的参数设置方式请参考相应的操作系统文档或官方建议进行设置。

下面是一个示例展示如何在 Linux 系统上设置 ulimit -n 参数

# 查看当前文件描述符限制
ulimit -n

# 修改文件描述符限制为 1000000
ulimit -n 1000000

# 验证修改是否生效
ulimit -n

请注意以上示例仅适用于 Linux 系统其他操作系统可能有不同的设置方式。

参考资料

[1]

首发博客地址: https://blog.zysicyj.top/

[2]

文章更新计划: https://blog.zysicyj.top/update_plan/

[3]

系列文章地址: https://blog.zysicyj.top/categories/技术文章/后端技术/系列文章/Kafka/

本文由 mdnice 多平台发布

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

“【Kafka系列】(二)Kafka的基本使用” 的相关文章