Kafka-生产者分区

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

一、分区的好处

  • 便于合理使用存储资源每个Partition在一个Broker上存储可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务可以实现负载均衡的效果。
  • 提高并行度生产者可以以分区为单位发送数据消费者可以以分区为单位进行消费数据。

分区的作用就是提供负载均衡的能力或者说对数据进行分区的主要原因就是为了实现系统的高伸缩性Scalability。不同的分区能够被放置到不同节点的机器上而数据的读写操作也都是针对分区这个粒度而进行的这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且我们还可以通过添加新的节点机器来增加整体系统的吞吐量。

生产者分区.png

kafka默认的分区器DefaultPartitioner

package org.apache.kafka.clients.producer.internals;

import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.utils.Utils;
/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose the sticky partition that changes when the batch is full.
 *
 * See KIP-480 for details about sticky partitioning.
 */
public class DefaultPartitioner implements Partitioner {

二.分区策略

1.随机策略

指明partition的情况下直接将指明的值作为partition值

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {}
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {}
public ProducerRecord(String topic, Integer partition, K key, V value) {}

例如partition=1所有数据写入分区1

// 指定发送到1号分区
kafkaProducer.send(new ProducerRecord<>("first", 1, "", "record" + i),
        (recordMetadata, exception) -> {
            if (exception == null) {
                System.out.println("主题" + recordMetadata.topic() + ";分区" + recordMetadata.partition());
            }
        });

实现随机策略版的 partition

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        return ThreadLocalRandom.current().nextInt(partitions.size());
    }
      @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> map) {
    }
}

2.按消息键保序策略

没有指明partition值但有key的情况下将 key的hash值与topic的partition数进行取余得到partition值

public ProducerRecord(String topic, K key, V value) {}

例如key1的 hash值=5key2的 hash值=6 topic的 partition数 =2那么key1 对应的value1写入1号分区key2对应的value2写入0号分区。

// 指定发送到1号分区
kafkaProducer.send(new ProducerRecord<>("first", "a", "record" + i),
        (recordMetadata, exception) -> {
            if (exception == null) {
                System.out.println("主题" + recordMetadata.topic() + ";分区" + recordMetadata.partition());
            }
        });

3.轮询策略

既没有partition值又没有key值的情况下Kafka采用Sticky Partition黏性分区器会随机选择一个分区并尽可能一直使用该分区待该分区的batch已满或者已完成Kafka再随机一个分区进行使用和上一次的分区不同。

public ProducerRecord(String topic, V value) {}

例如第一次随机选择0号分区等0号分区当前批次满了默认16k或者linger.ms设置的时间到 Kafka再随机一个分区进行使用如果还是0会继续随机。

kafkaProducer.send(new ProducerRecord<>("first","record" + i),
        (recordMetadata, exception) -> {
            if (exception == null) {
                System.out.println("主题" + recordMetadata.topic() + ";分区" + recordMetadata.partition());
            }
        });

轮询策略有非常优秀的负载均衡表现它总是能保证消息最大限度地被平均分配到所有分区上故默认情况下它是最合理的分区策略也是我们最常用的分区策略之一。

三、自定义分区器

1.需求

实现一个分区器实现发送过来的数据中如果包含 tracy就发往0号分区不包含就发往1号分区。

2.实现

  • 定义类实现 Partitioner 接口。
  • 重写 partition()方法。
  • 在生产者的配置中添加分区器参数。

MyPartitioner

package kafka;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * @Title: MyPartitioner.java
 * @Package kafka
 * @Description: 自定义分区器
 * @Author: hongcaixia
 * @Date: 2023/1/21 21:24
 * @Version V1.0
 */
public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取消息
        String msgValue = value.toString();
        // 创建 partition
        int partition;
        // 判断消息是否包含 tracy
        if (msgValue.contains("tracy")) {
            partition = 0;
        } else {
            partition = 1;
        }
        // 返回分区号
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

MyProducerPartition

package kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @Title: MyProducer.java
 * @Package kafka
 * @Description: 生产者使用自定义分区器
 * @Author: hongcaixia
 * @Date: 2023/1/20 21:24
 * @Version V1.0
 */
public class MyProducerPartition {

    public static void main(String[] args) {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息bootstrap.servers
        // 连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.2:9092");
        // 指定序列化类型 key,value 序列化必须key.serializervalue.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        //设置自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.MyPartitioner");

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "record" + i));
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }
}

实现基于地理位置的分区策略

这种策略一般只针对那些大规模的 Kafka 集群特别是跨城市、跨国家甚至是跨大洲的集群。
假设集群中有一部分机器在北京另外一部分机器在广州。
某机构计划为每个新注册用户提供一份注册礼品比如南方的用户注册可以免费得到一碗“甜豆腐脑”而北方的新注册用户可以得到一碗“咸豆腐脑”。如果用 Kafka 来实现则很简单只需要创建一个双分区的主题然后再创建两个消费者程序分别处理南北方注册用户逻辑即可。
但是需要把南北方注册用户的注册消息正确地发送到位于南北方的不同机房中因为处理这些消息的消费者程序只可能在某一个机房中启动着。换句话说送甜豆腐脑的消费者程序只在广州机房启动着而送咸豆腐脑的程序只在北京的机房中如果你向广州机房中的 Broker 发送北方注册用户的消息那么这个用户将无法得到礼品
可以根据 Broker 所在的 IP 地址实现定制化的分区策略

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();

从所有分区中找出那些 Leader 副本在南方的所有分区然后随机挑选一个进行消息发送。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6