Kafka note partitioner

这是我参与更文挑战的第5天,活动详情查看:更文挑战

消息 ProducerRecordsend() 发送 broker 过程中,需要经过:

  1. 拦截器(Interceptor)
  2. 序列化器(Serializer)
  3. 分区器(Partitioner)

一系列作用之后才能被真正地发往 broker。Interceptor 不是必须的,Serializer 是必需的。

消息经过序列化后,就需要确定它发送的分区,这个时候就涉及到 分区策略 了。

String msg = i + " This is matt's blog.";
// 可以设置发送的分区:
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, msg);
producer.send(record);
复制代码

上述在构造 ProducerRecord 的构造方法中,可以设置要发送的分区:

// 意思是在构造消息的时候进行分区选定,因为你都要发送了,肯定是在构造消息指定
public ProducerRecord(topic, partition, timestamp, K key, V value, headers) {}
复制代码

所有的 ProducerRecord 构造都会执行这个函数。如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要 分区器 的作用。

分区器

如果没有指定 partition ,默认也得给 ProducerRecord 一个分区策略。这个就依赖内部默认分区器:

Kafka 中提供的默认分区器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现了 org.apache.kafka.clients.producer.Partitioner 接口

先看看默认的分区器是怎么定义分区策略的:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        }
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
复制代码
  1. key 为空,采用轮训算法,选择的是主题内的各个可用分区【注意是可用分区】
  2. key 不为空,对 key 进行哈希(采用 MurmurHash2 算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同 key 的消息会被写入同一个分区

但是⚠️:在不改变主题分区数量的情况下,key 与分区之间的映射可以保持不变。不过,一旦主题中增加了分区,那么就难以保证 key 与分区之间的映射关系了。

自定义分区器

所以只要像 DefaultPartitioner 实现提供的 Partitioner 接口即可。然后要将其通过 config 注入到 kafka broker

// 这里将上述的3种分区策略,这边都自己实现一个版本
public class DemoPartitioner implements Partitioner {
    private final AtomicInteger counter = new AtomicInteger(0);
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (null == keyBytes) {
            return counter.getAndIncrement() % numPartitions;
        } else return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
    @Override public void close() {}
    @Override public void configure(Map<String, ?> configs) {}
}

// 1. Round-robin策略,顺序分配
// 按照默认的分区器就行,同时不指定 partition 以及 key

// 2. 随机策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

// 3. Key-ordering策略,按照指定消息键保序策略
// 不过强烈不要自己实现,kafka内部已经实现了,用原生提供的,hash碰撞还低很多
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

// 4. 自定义分区。根据地区【南北机房分区】,仓库【地点分区】
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host()))
  .map(PartitionInfo::partition).findAny().get();
复制代码

然后注入:

Properties props = new Properties();
// 设置自定义分区器
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DemoPartitioner.class.getName());

roducer<String, String> producer = new KafkaProducer<>(props);
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享