1 Producer API
1.1 构建Producer
@Test
public void testBuildProducer() {
Properties properties = new Properties();
// 设置kafka的地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 重试次数, 默认值就是0
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
// producer 将试图批处理消息记录,以减少请求 次数。这将改善 client 与 server 之间的性能。 这项配置控制默认的批量处理消息字节数。
// 默认值16384
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
// 关键字的序列化类。如果没给与这项,默认 情况是和消息一致
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(properties);
}
复制代码
Producer还有很多配置项,可以设置
1.1 发送异步消息
kafka的消息都是异步发布的
@Test
public void testSend() throws ExecutionException, InterruptedException {
// 构建消息对象:ProducerRecord
// 参数一:topic的名字
// 参数二: 消息的key
// 参数三: 消息的值
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC, "key", "hello world");
Future<RecordMetadata> send = producer.send(producerRecord);
// 返回值是个Future,所以get方法会阻塞
RecordMetadata recordMetadata = send.get();
// 等待消息发送成功
TimeUnit.SECONDS.sleep(5);
producer.close();
}
复制代码
1.2 异步回调
@Test
public void testSend2() throws ExecutionException, InterruptedException {
// 构建消息对象:ProducerRecord
// 参数一:topic的名字
// 参数二: 消息的key
// 参数三: 消息的值
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC, "key", "hello world");
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
System.out.println(
"partition : " + recordMetadata.partition() + " , offset : " + recordMetadata.offset());
}
});
// 等待消息发送成功
TimeUnit.SECONDS.sleep(5);
producer.close();
}
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END