Kafka安全机制与授权(linux)

这是我参与8月更文挑战的第11天,活动详情查看:8月更文挑战

下载kafka

地址:https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz

下载:wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz

解压:tar zxvf kafka_2.12-2.5.0.tgz

进入目录:cd kafka_2.12-2.5.0/

[root@localhost kafka_2.12-2.5.0]# ll
总用量 56
drwxr-xr-x 3 root root  4096 4月   8 09:16 bin
drwxr-xr-x 2 root root  4096 4月   8 09:16 config
drwxr-xr-x 2 root root  8192 6月  30 13:01 libs
-rw-r--r-- 1 root root 32216 4月   8 09:13 LICENSE
-rw-r--r-- 1 root root   337 4月   8 09:13 NOTICE
drwxr-xr-x 2 root root    44 4月   8 09:16 site-docs
[root@localhost kafka_2.12-2.5.0]# 
复制代码

bin目录放的都是可执行文件,config目录放的都是配置文件。

首先我们先创建一个证书
创建证书

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin  

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=test],SCRAM-SHA-512=[password=test]' --entity-type users --entity-name test  
复制代码

验证证书

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name admin  

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name test  
复制代码

这地方的admin就是下面的admin

我们先进入config目录下,创建一个名为kafka_server_jaas.conf文件,内容如下:

KafkaServer {
  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin"
  password="admin";
};
复制代码

具体操作

[root@localhost kafka_2.12-2.5.0]# cd config/
[root@localhost config]# touch kafka_server_jaas.conf
[root@localhost config]# vim kafka_server_jaas.conf 
[root@localhost config]# 
复制代码

创建好这个文件后,然后我们去修改server.properties

# 设置监听使用SASL而不是SSL
listeners=SASL_PLAINTEXT://192.168.0.98:9092
advertised.listeners=SASL_PLAINTEXT://192.168.0.98:9092

security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256

# ACL
allow.everyone.if.no.acl.found=false
# 设置超级账号,如果是多个需要分号分割,例如:User:admin;User:root
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

#这里加上这句话就不需要kafka_server_jaas.conf这个文件了
#如果没加则需要执行
#export KAFKA_OPTS="-Djava.security.auth.login.config=/你的目录/kafka_2.12-2.5.0/config/kafka_server_jaas.conf"
listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="admin" \
    password="admin";
复制代码

启动zookeeper

cd bin/
./zookeeper-server-start.sh ../config/zookeeper.properties 
复制代码

启动kafka

./kafka-server-start.sh ../config/server.properties 
复制代码

创建Topic

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_test
复制代码

添加读写权限

# 添加读权限  
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Read --topic kafka_test  

# 添加写权限  
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Write --topic kafka_test  

# 添加消费者组权限  
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Read --group group_test   

# 查看权限列表
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --list

复制代码

生产者

/**
 * 生产者配置
 */
@Configuration
public class KafkaProducerConfig {


    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }

    public ProducerFactory<String,String> producerFactory() {
        return new DefaultKafkaProducerFactory(producerConfigs());
    }

    private Map<String, Object> producerConfigs() {

        HashMap<String, Object> properties = new HashMap<String, Object>();
        //配置的是kafka的端口
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.98:9092");
        //配置key的序列化类
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //配置value的序列化类
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,
        //这有助于提升客户端和服务端之间的性能,此配置控制默认批量大小(以字节为单位),默认值为16384
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        //0生产者在写入消息之前不会等待任何来自服务器的响应,容易丢消息,但是吞吐量高。
        //1只要集群的首领节点收到消息,生产者会收到来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新首领没有选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。默认使用这个配置。
        //all或-1只有当所有参与复制的节点都收到消息,生产者才会收到一个来自服务器的成功响应。延迟高。
        properties.put(ProducerConfig.ACKS_CONFIG, "1");
        //producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
        //
        //配置文件设置sasl_plaintext认证
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
        properties.put(SaslConfigs.SASL_MECHANISM,"SCRAM-SHA-256");
        properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test\" password=\"test\";");
        return properties;
    }
    
}
复制代码

生产消息

@RestController
public class TestController {

    private static Logger logger = LoggerFactory.getLogger(TestController.class);

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @GetMapping("test")
    public String test(){
        HashMap<String,String> hashMap = new HashMap<>();
        hashMap.put("name","zd");
        hashMap.put("age","18");
        //这地方如果有topic:testTopic会往这个topic添加消息,如果没有则创建这个testTopic然后往里面添加消息
        ListenableFuture<SendResult<String, String>> testTopic = kafkaTemplate.send("kafka_test", JSONObject.toJSONString(hashMap));

        //可有可无看自己需求
        testTopic.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.info("发送失败");
                //失败do something
            }

            @Override
            public void onSuccess(SendResult<String, String> integerStringSendResult) {
                logger.info("发送成功");
                //成功do something
            }
        });

        return "ok";
    }
}
复制代码

消费者

/**
 * 消费者配置
 */
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() { Map<String, Object> properties = new HashMap<>();

        //配置的是kafka的端口
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.98:9092");
        //消息反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group,默认:""
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
        //如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。这项提交的offset将在进程挂掉时,由新的consumer使用,默认:true
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        //consumer自动向zookeeper提交offset的频率,默认:5000
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,5000);
        //没有初始化的offset时,可以设置以下三种情况:(默认:latest)
        //earliest
        //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        //latest
        //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
        //none
        //topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //消费者进程的标识。如果设置一个人为可读的值,跟踪问题会比较方便。。默认:""
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "zdpower");

        //配置文件设置sasl_plaintext认证
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
        properties.put(SaslConfigs.SASL_MECHANISM,"SCRAM-SHA-256");
        properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test1\" password=\"test1\";");
        return properties;
    }
}
复制代码

消费消息

@Component
public class MyKafkaConsumer {

    public static Logger logger = LoggerFactory.getLogger(MyKafkaConsumer.class);

    @KafkaListener(containerFactory="kafkaListenerContainerFactory",topics = {"kafka_test"})
    public void preCommandTicket1(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            logger.info("----------------- record =" + record);
            logger.info("------------------ message =" + message);
        }
    }
}
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享