Kafka Avro序列化以及S3 Parquet Sink

在工作中我们需要使用Kafka Connect将Kafka中的json数据传输到s3上,并保存成parquet。在这里记录一下Demo。

Kafka connect低版本是不支持S3 Parquet Sink的,我们使用的Confluent 5.5.5.

Avro序列化

在使用Parquet sink的时候官方要求我们使用AvroConverter, JsonSchemaConverter会带来运行时的错误。

image.png

在使用Avro的时候,我们需要给数据定义schema,其实Avro和Parquet的文件格式都是需要header定义schema。Kafka也为我们提供了定义Schema的服务Schema registry

Schema registry

Schema registry为我们提供了定义schema的API,在Producer/Consumer的读写数据的时候都可以使用Schema registry对数据schema进行定义和校验。
正确启动Schema registry,就可以访问8081端口了,可以启动多台Schema registry,他们会互相协调工作, 在Kafka中会创建topic “_schema”, 这个topic保存管理着schema的信息。
这里提供一些最简单的REST API:

#创建schema
# 注意data 中的name不支持 '-'
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\":\"record\",\"name\":\"topic_name\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"}]}"}' \
  http://localhost:8081/subjects/topic_name/versions
#查询schema
curl -X GET http://localhost:8081/subjects/topic_name/versions
curl -X GET http://localhost:8081/subjects
curl -X GET http://localhost:8081/subjects/topic_name/versions/3
curl -X GET http://localhost:8081/subjects/topic_name/versions/latest
curl -X GET http://localhost:8081/schemas/ids/266
#删除schema
curl -X DELETE http://localhost:8081/subjects/topic_name/versions/1
curl -X DELETE http://localhost:8081/subjects/topic_name
#比较latest与提供的schema是否一致
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\":\"record\",\"name\":\"topic_name\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"}]}"}' \
  http://localhost:8081/compatibility/subjects/topic_name/versions/latest

复制代码

Kafka Code

POJO

public class User implements Serializable {
    private static final long serialVersionUID = -8441975211075128605L;
    private String name;
    private Integer age;
}
复制代码

Producer

public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " http://schema-registry:8081");
    Producer<String, Object> producer = null;
    try {
        producer = new KafkaProducer<>(properties);
        String topicName = "topic_name";
        String userSchema = "{\"type\":\"record\",\"name\":\"topic_name\","
                + "\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}";
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(userSchema);
        User u1 = new User("zs", 1000000000);
        ReflectDatumWriter<User> reflectDatumWriter = new ReflectDatumWriter<>(schema);
        GenericDatumReader<Object> genericRecordReader = new GenericDatumReader<>(schema);
        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
        reflectDatumWriter.write(u1, EncoderFactory.get().directBinaryEncoder(bytes, null));
        GenericRecord avroRecord = (GenericRecord) genericRecordReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));

        producer.send(new ProducerRecord<String, Object>(topicName, avroRecord)
                    , new Callback() {
                        @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                System.out.println(e.getMessage());
                            } else {
                            }
                        }
                    }
            );
    }catch (Exception e){
        System.out.println(e.getMessage());
    }finally {
        if (producer != null){
            producer.close();
        }
    }
}
复制代码

Consumer

public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "kafka:9092");
    props.put("group.id", "group-id");
    props.put("key.deserializer", StringDeserializer.class);
    props.put("value.deserializer", KafkaAvroDeserializer.class);
    props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " http://schema-registry:8081");
    final Consumer<String,User> consumer = new KafkaConsumer<String, User>(props);
    String topic = "topic_name";
    consumer.subscribe(Arrays.asList(topic));
    try {
        while (true) {
            ConsumerRecords<String, User> records = consumer.poll(100);
            for (ConsumerRecord<String, User> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
            }
        }
    } finally {
        consumer.close();
    }
}
复制代码

Config of Connect

{
  "name": "connector-name",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "errors.log.include.messages": "true",
    "s3.region": "region",
    "topics.dir": "folder",
    "flush.size": "300",
    "tasks.max": "1",
    "timezone": "UTC",
    "s3.part.size": "5242880",
    "enhanced.avro.schema.support": "true",
    "rotate.interval.ms": "6000",
    "locale": "US",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "s3.part.retries": "18",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "errors.log.enable": "true",
    "s3.bucket.name": "bucket",
    "partition.duration.ms": "3600000",
    "topics": "topics",
    "batch.max.rows": "100",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "value.converter.schemas.enable": "true",
    "name": "connector-name",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "rotate.schedule.interval.ms": "6000",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "schema.registry.url": "http://schema-registry:8081",
    "path.format": "'log_year'=YYYY/'log_month'=MM/'log_day'=dd/'log_hour'=HH"
  }
}
复制代码

更加复杂的数据结构:

Schema registry

{"schema": "{\"type\":\"record\",\"name\":\"name_1\",\"fields\":[{\"name\":\"name\",\"type\":[\"string\",\"null\"],\"default\" : null},{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"nickName\",\"type\":[\"null\",\"string\"],\"default\" : null},{\"name\":\"jsonData\",\"type\":{\"type\":\"record\",\"name\":\"jsonData\",\"fields\":[{\"name\":\"jsonK1\",\"type\":\"string\"},{\"name\":\"jsonK2\",\"type\":\"int\"}]}},{\"name\":\"arrayStringData\",\"type\":{\"type\": \"array\", \"items\": \"string\"}},{\"name\":\"arrayJsonData\",\"type\":{\"type\": \"array\", \"items\": {\"type\":\"record\",\"name\":\"arrayJsonData\",\"fields\":[{\"name\":\"arrayKsonK1\",\"type\":\"string\"},{\"name\":\"arrayKsonK2\",\"type\":\"int\"}]}}}]}"}
复制代码

POJO

public class UserArrayJsonItemData implements Serializable {
    private static final long serialVersionUID = -3621940426286065831L;
    private String arrayKsonK1;
    private int arrayKsonK2;
}
复制代码
public class UserJsonData implements Serializable {
    private static final long serialVersionUID = 5226807403457803391L;
    private String jsonK1;
    private int jsonK2;
}
复制代码
public class User2 implements Serializable {
    private static final long serialVersionUID = 5976712933804910638L;
    private String name;
    private int age;
    private String nickName;
    private UserJsonData jsonData;
    private List<String> arrayStringData;
    private List<UserArrayJsonItemData> arrayJsonData;
}
复制代码

Producer

public static void main(String[] args) {

    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " http://schema-registry:8081");
    Producer<String, Object> producer = null;
    try {
        producer = new KafkaProducer<>(properties);
        String topicName = "topic_name";
        String userSchema = "{"type":"record","name":"topic_name","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"nickName","type":["null","string"],"default" : null},{"name":"jsonData","type":{"type":"record","name":"jsonData","fields":[{"name":"jsonK1","type":"string"},{"name":"jsonK2","type":"int"}]}},{"name":"arrayStringData","type":{"type": "array", "items": "string"}},{"name":"arrayJsonData","type":{"type": "array", "items": {"type":"record","name":"arrayJsonData","fields":[{"name":"arrayKsonK1","type":"string"},{"name":"arrayKsonK2","type":"int"}]}}}]}";
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(userSchema);
        User2 u1 = new User2("zs", 1000000000,null,new UserJsonData("jk-1",50), Arrays.asList("array-1","array-2"),Arrays.asList(new UserArrayJsonItemData("array-json-item-1", 53),new UserArrayJsonItemData("array-json-item-2", 63)));
        ReflectDatumWriter<User2> reflectDatumWriter = new ReflectDatumWriter<>(schema);
        GenericDatumReader<Object> genericRecordReader = new GenericDatumReader<>(schema);
        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
        reflectDatumWriter.write(u1, EncoderFactory.get().directBinaryEncoder(bytes, null));
        GenericRecord avroRecord = (GenericRecord) genericRecordReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));
        producer.send(new ProducerRecord<String, Object>(topicName, avroRecord)
                    , new Callback() {
                        @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                System.out.println(e.getMessage());
                            } else {
                                System.out.println(e.getMessage());
                            }
                        }
                    }
            );
    }catch (Exception e){
        System.out.println(e.getMessage());
    }finally {
        if (producer != null){
            producer.close();
        }
    }
}
复制代码

Consumer

public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "kafka:9092");
    props.put("group.id", "group-id");
    props.put("key.deserializer", StringDeserializer.class);
    props.put("value.deserializer", KafkaAvroDeserializer.class);
    props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " http://schema-registry:8081");
    final Consumer<String,User> consumer = new KafkaConsumer<String, User>(props);
    String topic1 = "topic_name";
    consumer.subscribe(Arrays.asList(topic1));
    try {
        while (true) {
            ConsumerRecords<String, User> records = consumer.poll(100);
            for (ConsumerRecord<String, User> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
            }
        }
    } finally {
        consumer.close();
    }
}
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享