这是我参与更文挑战的第5天,活动详情查看:更文挑战
消息 ProducerRecord
在 send()
发送 broker
过程中,需要经过:
- 拦截器(Interceptor)
- 序列化器(Serializer)
- 分区器(Partitioner)
一系列作用之后才能被真正地发往 broker。Interceptor
不是必须的,Serializer
是必需的。
消息经过序列化后,就需要确定它发送的分区,这个时候就涉及到 分区策略 了。
String msg = i + " This is matt's blog.";
// 可以设置发送的分区:
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, msg);
producer.send(record);
复制代码
上述在构造 ProducerRecord
的构造方法中,可以设置要发送的分区:
// 意思是在构造消息的时候进行分区选定,因为你都要发送了,肯定是在构造消息指定
public ProducerRecord(topic, partition, timestamp, K key, V value, headers) {}
复制代码
所有的 ProducerRecord
构造都会执行这个函数。如果消息 ProducerRecord
中指定了 partition
字段,那么就不需要 分区器 的作用。
分区器
如果没有指定 partition
,默认也得给 ProducerRecord
一个分区策略。这个就依赖内部默认分区器:
Kafka 中提供的默认分区器是
org.apache.kafka.clients.producer.internals.DefaultPartitioner
,它实现了org.apache.kafka.clients.producer.Partitioner
接口
先看看默认的分区器是怎么定义分区策略的:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
复制代码
key
为空,采用轮训算法,选择的是主题内的各个可用分区【注意是可用分区】key
不为空,对 key 进行哈希(采用 MurmurHash2 算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同 key 的消息会被写入同一个分区。
但是⚠️:在不改变主题分区数量的情况下,key 与分区之间的映射可以保持不变。不过,一旦主题中增加了分区,那么就难以保证 key 与分区之间的映射关系了。
自定义分区器
所以只要像 DefaultPartitioner
实现提供的 Partitioner
接口即可。然后要将其通过 config
注入到 kafka broker
:
// 这里将上述的3种分区策略,这边都自己实现一个版本
public class DemoPartitioner implements Partitioner {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (null == keyBytes) {
return counter.getAndIncrement() % numPartitions;
} else return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override public void close() {}
@Override public void configure(Map<String, ?> configs) {}
}
// 1. Round-robin策略,顺序分配
// 按照默认的分区器就行,同时不指定 partition 以及 key
// 2. 随机策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
// 3. Key-ordering策略,按照指定消息键保序策略
// 不过强烈不要自己实现,kafka内部已经实现了,用原生提供的,hash碰撞还低很多
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
// 4. 自定义分区。根据地区【南北机房分区】,仓库【地点分区】
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host()))
.map(PartitionInfo::partition).findAny().get();
复制代码
然后注入:
Properties props = new Properties();
// 设置自定义分区器
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DemoPartitioner.class.getName());
roducer<String, String> producer = new KafkaProducer<>(props);
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END