在工作中我们需要使用Kafka Connect将Kafka中的json数据传输到s3上,并保存成parquet。在这里记录一下Demo。
Kafka connect低版本是不支持S3 Parquet Sink的,我们使用的Confluent 5.5.5.
Avro序列化
在使用Parquet sink的时候官方要求我们使用AvroConverter, JsonSchemaConverter会带来运行时的错误。
在使用Avro的时候,我们需要给数据定义schema,其实Avro和Parquet的文件格式都是需要header定义schema。Kafka也为我们提供了定义Schema的服务Schema registry。
Schema registry
Schema registry为我们提供了定义schema的API,在Producer/Consumer的读写数据的时候都可以使用Schema registry对数据schema进行定义和校验。
正确启动Schema registry,就可以访问8081端口了,可以启动多台Schema registry,他们会互相协调工作, 在Kafka中会创建topic “_schema”, 这个topic保存管理着schema的信息。
这里提供一些最简单的REST API:
#创建schema
# 注意data 中的name不支持 '-'
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"topic_name\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"}]}"}' \
http://localhost:8081/subjects/topic_name/versions
#查询schema
curl -X GET http://localhost:8081/subjects/topic_name/versions
curl -X GET http://localhost:8081/subjects
curl -X GET http://localhost:8081/subjects/topic_name/versions/3
curl -X GET http://localhost:8081/subjects/topic_name/versions/latest
curl -X GET http://localhost:8081/schemas/ids/266
#删除schema
curl -X DELETE http://localhost:8081/subjects/topic_name/versions/1
curl -X DELETE http://localhost:8081/subjects/topic_name
#比较latest与提供的schema是否一致
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"topic_name\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"}]}"}' \
http://localhost:8081/compatibility/subjects/topic_name/versions/latest
复制代码
Kafka Code
POJO
public class User implements Serializable {
private static final long serialVersionUID = -8441975211075128605L;
private String name;
private Integer age;
}
复制代码
Producer
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " http://schema-registry:8081");
Producer<String, Object> producer = null;
try {
producer = new KafkaProducer<>(properties);
String topicName = "topic_name";
String userSchema = "{\"type\":\"record\",\"name\":\"topic_name\","
+ "\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
User u1 = new User("zs", 1000000000);
ReflectDatumWriter<User> reflectDatumWriter = new ReflectDatumWriter<>(schema);
GenericDatumReader<Object> genericRecordReader = new GenericDatumReader<>(schema);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
reflectDatumWriter.write(u1, EncoderFactory.get().directBinaryEncoder(bytes, null));
GenericRecord avroRecord = (GenericRecord) genericRecordReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));
producer.send(new ProducerRecord<String, Object>(topicName, avroRecord)
, new Callback() {
@Override public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println(e.getMessage());
} else {
}
}
}
);
}catch (Exception e){
System.out.println(e.getMessage());
}finally {
if (producer != null){
producer.close();
}
}
}
复制代码
Consumer
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("group.id", "group-id");
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", KafkaAvroDeserializer.class);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " http://schema-registry:8081");
final Consumer<String,User> consumer = new KafkaConsumer<String, User>(props);
String topic = "topic_name";
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, User> records = consumer.poll(100);
for (ConsumerRecord<String, User> record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
复制代码
Config of Connect
{
"name": "connector-name",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"errors.log.include.messages": "true",
"s3.region": "region",
"topics.dir": "folder",
"flush.size": "300",
"tasks.max": "1",
"timezone": "UTC",
"s3.part.size": "5242880",
"enhanced.avro.schema.support": "true",
"rotate.interval.ms": "6000",
"locale": "US",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"s3.part.retries": "18",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"errors.log.enable": "true",
"s3.bucket.name": "bucket",
"partition.duration.ms": "3600000",
"topics": "topics",
"batch.max.rows": "100",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"value.converter.schemas.enable": "true",
"name": "connector-name",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"rotate.schedule.interval.ms": "6000",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"schema.registry.url": "http://schema-registry:8081",
"path.format": "'log_year'=YYYY/'log_month'=MM/'log_day'=dd/'log_hour'=HH"
}
}
复制代码
更加复杂的数据结构:
Schema registry
{"schema": "{\"type\":\"record\",\"name\":\"name_1\",\"fields\":[{\"name\":\"name\",\"type\":[\"string\",\"null\"],\"default\" : null},{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"nickName\",\"type\":[\"null\",\"string\"],\"default\" : null},{\"name\":\"jsonData\",\"type\":{\"type\":\"record\",\"name\":\"jsonData\",\"fields\":[{\"name\":\"jsonK1\",\"type\":\"string\"},{\"name\":\"jsonK2\",\"type\":\"int\"}]}},{\"name\":\"arrayStringData\",\"type\":{\"type\": \"array\", \"items\": \"string\"}},{\"name\":\"arrayJsonData\",\"type\":{\"type\": \"array\", \"items\": {\"type\":\"record\",\"name\":\"arrayJsonData\",\"fields\":[{\"name\":\"arrayKsonK1\",\"type\":\"string\"},{\"name\":\"arrayKsonK2\",\"type\":\"int\"}]}}}]}"}
复制代码
POJO
public class UserArrayJsonItemData implements Serializable {
private static final long serialVersionUID = -3621940426286065831L;
private String arrayKsonK1;
private int arrayKsonK2;
}
复制代码
public class UserJsonData implements Serializable {
private static final long serialVersionUID = 5226807403457803391L;
private String jsonK1;
private int jsonK2;
}
复制代码
public class User2 implements Serializable {
private static final long serialVersionUID = 5976712933804910638L;
private String name;
private int age;
private String nickName;
private UserJsonData jsonData;
private List<String> arrayStringData;
private List<UserArrayJsonItemData> arrayJsonData;
}
复制代码
Producer
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " http://schema-registry:8081");
Producer<String, Object> producer = null;
try {
producer = new KafkaProducer<>(properties);
String topicName = "topic_name";
String userSchema = "{"type":"record","name":"topic_name","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"nickName","type":["null","string"],"default" : null},{"name":"jsonData","type":{"type":"record","name":"jsonData","fields":[{"name":"jsonK1","type":"string"},{"name":"jsonK2","type":"int"}]}},{"name":"arrayStringData","type":{"type": "array", "items": "string"}},{"name":"arrayJsonData","type":{"type": "array", "items": {"type":"record","name":"arrayJsonData","fields":[{"name":"arrayKsonK1","type":"string"},{"name":"arrayKsonK2","type":"int"}]}}}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
User2 u1 = new User2("zs", 1000000000,null,new UserJsonData("jk-1",50), Arrays.asList("array-1","array-2"),Arrays.asList(new UserArrayJsonItemData("array-json-item-1", 53),new UserArrayJsonItemData("array-json-item-2", 63)));
ReflectDatumWriter<User2> reflectDatumWriter = new ReflectDatumWriter<>(schema);
GenericDatumReader<Object> genericRecordReader = new GenericDatumReader<>(schema);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
reflectDatumWriter.write(u1, EncoderFactory.get().directBinaryEncoder(bytes, null));
GenericRecord avroRecord = (GenericRecord) genericRecordReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));
producer.send(new ProducerRecord<String, Object>(topicName, avroRecord)
, new Callback() {
@Override public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println(e.getMessage());
} else {
System.out.println(e.getMessage());
}
}
}
);
}catch (Exception e){
System.out.println(e.getMessage());
}finally {
if (producer != null){
producer.close();
}
}
}
复制代码
Consumer
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("group.id", "group-id");
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", KafkaAvroDeserializer.class);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " http://schema-registry:8081");
final Consumer<String,User> consumer = new KafkaConsumer<String, User>(props);
String topic1 = "topic_name";
consumer.subscribe(Arrays.asList(topic1));
try {
while (true) {
ConsumerRecords<String, User> records = consumer.poll(100);
for (ConsumerRecord<String, User> record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END