1 kafka客户端分类和mvn坐标
1.1 kafka客户端分类
-
AdminClient:管理客户端, 主要负责管理和监测topic,broker等
-
Producer: 生产者,发布消息到指定的topic
-
Consumer: 消费者,订阅消息
-
Stream API: 高效的将输入流转换到输出流
-
Connector API:从一些源系统或者应用中拉取数据到kafka
1.2 mvn坐标
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
复制代码
2 AdminClient 学习
2.1 如何创建AdminClient
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.junit.Test;
import java.util.Properties;
public class CreateAdminClientTest {
@Test
public void test() {
Properties prop = new Properties();
// 设置kafka的地址,如果是多个使用 逗号 分隔
prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(prop);
}
}
复制代码
AdminClientConfig就是kafka提供的一个配置的参数的名称的常量类,具体可以配置客户端哪些参数,就可以参考这个类。
比如这里的BOOTSTRAP_SERVERS_CONFIG,就是配置的kafka的 地址
bootstrap.servers = [localhost:9092] ## 刚刚设置的就是这个参数
client.dns.lookup = default
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
复制代码
2.2 创建topic
package study.wyy.kafka.java.admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.junit.Before;
import org.junit.Test;
import javax.lang.model.element.VariableElement;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
public class TopicTest {
private AdminClient client;
@Before
public void createClient() {
Properties prop = new Properties();
prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
client = AdminClient.create(prop);
}
@Test
public void testCreat() {
short rs = 1;
/**
* 参数一:topic的名字
* 参数二: 分区数量
* 参数三:副本数量
*/
NewTopic newTopic = new NewTopic("java-api-study", 1, rs);
// createTopics 接收的是一个newTopic的集合,所以是可以一次创建多个topic的
CreateTopicsResult result = client.createTopics(Arrays.asList(newTopic));
// 避免客户端连接太快断开而导致Topic没有创建成功
Thread.sleep(500);
// 获取topic设置的partition数量
System.out.println(result.numPartitions("java-api-study").get());
}
}
复制代码
2.3 查询topic
- 无条件查询
@Test
public void testList() throws ExecutionException, InterruptedException {
ListTopicsResult listTopicsResult = client.listTopics();
// 获取名字
KafkaFuture<Set<String>> names = listTopicsResult.names();
names.get().forEach(System.out::println);
System.out.println("========================");
KafkaFuture<Collection<TopicListing>> listings = listTopicsResult.listings();
listings.get().forEach(System.out::println);
}
复制代码
输出:
java-api-study
========================
(name=java-api-study, internal=false)
复制代码
- 条件查询
@Test
public void testList2() throws ExecutionException, InterruptedException {
ListTopicsOptions options = new ListTopicsOptions();
// 查询内置的topic
options.listInternal(true);
ListTopicsResult listTopicsResult = client.listTopics(options);
// 获取名字
KafkaFuture<Set<String>> names = listTopicsResult.names();
names.get().forEach(System.out::println);
System.out.println("========================");
KafkaFuture<Collection<TopicListing>> listings = listTopicsResult.listings();
listings.get().forEach(System.out::println);
}
复制代码
输出:
java-api-study
__consumer_offsets
========================
(name=java-api-study, internal=false)
(name=__consumer_offsets, internal=true)
复制代码
__consumer_offsets: kafka内置的topic,用于记录消费消息的偏移量
2.4 删除topic
@Test
public void testDel() throws ExecutionException, InterruptedException {
Collection<String> topics = Arrays.asList("java-api-study");
DeleteTopicsResult deleteTopicsResult = client.deleteTopics(topics);
}
复制代码
2.5 查询topic消息信息
@Test
public void testDescribeTopics() throws ExecutionException, InterruptedException {
Collection<String> topics = Arrays.asList("java-api-study");
DescribeTopicsResult describeTopicsResult = client.describeTopics(topics);
Map<String, TopicDescription> descriptionMap = describeTopicsResult.all().get();
descriptionMap.forEach((k, v) -> {
System.out.println("topicName: " + k);
System.out.println("topicDesc: " + v);
});
}
复制代码
topicName: java-api-study
topicDesc: (name=java-api-study,
internal=false,
partitions=(partition=0, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)), authorizedOperations=[])
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END






















![[桜井宁宁]COS和泉纱雾超可爱写真福利集-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/4d3cf227a85d7e79f5d6b4efb6bde3e8.jpg)

![[桜井宁宁] 爆乳奶牛少女cos写真-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/d40483e126fcf567894e89c65eaca655.jpg)