最近需要用到带有鉴权的kafka,看网上大都是使用confluentinc的kafka,因内网环境没有这个,所以使用 wurstmeister/zookeeper 和 wurstmeister/kafka 搭建了一个带有密码验证的kafka,简单记录下搭建的过程。
1 配置ZOOKEEPER
1.1 新建放置配置文件的目录
/usr/xxx/kafka-sasl/conf/
1.2 在文件夹内创建一个新的zookeeper配置文件zoo.cfg
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/opt/zookeeper-3.4.13/data # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature autopurge.purgeInterval=1 authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000 zookeeper.sasl.client=true
1.3 新建密码验证的配置文件 server_jass.conf
Client { org.apache.zookeeper.server.auth.DigestLoginModule required username="admin" password="12345678"; }; Server { org.apache.zookeeper.server.auth.DigestLoginModule required username="admin" password="12345678" user_super="12345678" user_admin="12345678"; }; KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="12345678" user_admin="12345678" }; KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="12345678"; };
1.4 启动命令
docker run -d --name zookeeper_sasl -p 2181:2181 --rm -it -e SERVER_JVMFLAGS="-Djava.security.auth.login.config=/opt/zookeeper-3.4.13/secrets/server_jaas.conf" -v /usr/xxx/kafka-sasl/conf:/opt/zookeeper-3.4.13/conf -v /usr/xxx/kafka-sasl/conf:/opt/zookeeper-3.4.13/secrets/ wurstmeister/zookeeper
需要注意这样是将zookeeper的conf文件夹给覆盖了,所以需要将zookeeper中原有的log4j.properties和configuration.xsl 给复制过来
2 配置 KAFKA
使用上面的server_jass.conf 配置文件作为密码验证文件
2.1 启动命令
第一个
docker run -d --name kafka_sasl -p 32887:32887 --link zookeeper_sasl:zookeeper_sasl -e KAFKA_BROKER_ID=0 -e KAFKA_ADVERTISED_LISTENERS=SASL_PLAINTEXT://47.16.142.13:32887 -e KAFKA_LISTENERS=SASL_PLAINTEXT://0.0.0.0:32887 -e KAFKA_SECURITY_INTER_BROKER_PROTOCOL=SASL_PLAINTEXT -e KAFKA_PORT=32887 -e KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN -e KAFKA_SASL_ENABLED_MECHANISMS=PLAIN -e KAFKA_AUTHORIZER_CLASS_NAME=kafka.security.auth.SimpleAclAuthorizer -e KAFKA_SUPER_USERS=User:admin -e KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND=false -e KAFKA_ZOOKEEPER_CONNECT='zookeeper_sasl:2181' -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 -e KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/secrets/server_jaas.conf -v /usr/xxx/kafka-sasl/conf/:/opt/kafka/secrets/ --rm -it wurstmeister/kafka
再启动两个,建立一个集群
第二个
docker run -d --name kafka_sasl -p 32888:32888 --link zookeeper_sasl:zookeeper_sasl -e KAFKA_BROKER_ID=0 -e KAFKA_ADVERTISED_LISTENERS=SASL_PLAINTEXT://47.16.142.13:32888 -e KAFKA_LISTENERS=SASL_PLAINTEXT://0.0.0.0:32888 -e KAFKA_SECURITY_INTER_BROKER_PROTOCOL=SASL_PLAINTEXT -e KAFKA_PORT=32888 -e KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN -e KAFKA_SASL_ENABLED_MECHANISMS=PLAIN -e KAFKA_AUTHORIZER_CLASS_NAME=kafka.security.auth.SimpleAclAuthorizer -e KAFKA_SUPER_USERS=User:admin -e KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND=false -e KAFKA_ZOOKEEPER_CONNECT='zookeeper_sasl:2181' -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 -e KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/secrets/server_jaas.conf -v /usr/xxx/kafka-sasl/conf/:/opt/kafka/secrets/ --rm -it wurstmeister/kafka
第三个
docker run -d --name kafka_sasl -p 32889:32889 --link zookeeper_sasl:zookeeper_sasl -e KAFKA_BROKER_ID=0 -e KAFKA_ADVERTISED_LISTENERS=SASL_PLAINTEXT://47.16.142.13:32889 -e KAFKA_LISTENERS=SASL_PLAINTEXT://0.0.0.0:32889 -e KAFKA_SECURITY_INTER_BROKER_PROTOCOL=SASL_PLAINTEXT -e KAFKA_PORT=32888 -e KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN -e KAFKA_SASL_ENABLED_MECHANISMS=PLAIN -e KAFKA_AUTHORIZER_CLASS_NAME=kafka.security.auth.SimpleAclAuthorizer -e KAFKA_SUPER_USERS=User:admin -e KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND=false -e KAFKA_ZOOKEEPER_CONNECT='zookeeper_sasl:2181' -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 -e KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/secrets/server_jaas.conf -v /usr/xxx/kafka-sasl/conf/:/opt/kafka/secrets/ --rm -it wurstmeister/kafka
3 验证
3.1 创建topic
./bin/kafka-topics.sh --zookeeper 47.16.142.13:7181 --list ./bin/kafka-topics.sh --zookeeper 47.16.142.13:7181 --create --topic A_TEST_TOPIC --partitions 3 --replication-factor 3./bin/kafka-topics.sh --zookeeper 47.16.142.13:7181 --describe --topic A_TEST_TOPIC
3.2 产出一条消息
./bin/kafka-console-producer.sh --broker-list 47.16.142.13:32887,47.16.142.13:32888,47.16.142.13:32889 --topic A_TEST_TOPIC --producer.config config/producer.properties
3.3 消费一条消息
./bin/kafka-console-consumer.sh --bootstrap-server 47.16.142.13:32887,47.16.142.13:32888,47.16.142.13:32889 --topic A_TEST_TOPIC --from-beginning --consumer.config config/consumer.properties