Stay hungry Stay foolish

Kafka安装安全补丁及测试

Posted on By blue

目录



kafka 0.8.2版本中,在安全方面没有考虑太多,可喜的是,在一个版本0.9中,会加入大量的安全特性。由于工作需要,参照已有的补丁,实现Kafka IP地址过滤功能。

1、补丁介绍

https://issues.apache.org/jira/browse/KAFKA-1810

While longer-term goals of security in Kafka are on the roadmap there exists some value for the ability to restrict connection to Kafka brokers based on IP address. This is not intended as a replacement for security but more of a precaution against misconfiguration and to provide some level of control to Kafka administrators about who is reading/writing to their cluster.

1) In some organizations software administration vs o/s systems administration and network administration is disjointed and not well choreographed. Providing software administrators the ability to configure their platform relatively independently (after initial configuration) from Systems administrators is desirable.

2) Configuration and deployment is sometimes error prone and there are situations when test environments could erroneously read/write to production environments

3) An additional precaution against reading sensitive data is typically welcomed in most large enterprise deployments.

页面显示 Resolution: Won't Fix,表示该补丁并没有合并到trunk版本中。经过测试,当时补丁使用的程序版本法直接Patch到目前的trunk版本上。

需参照补丁文件内容,对Kafka代码进行修改

2、下载Kafka源码

首先在github中,将Kafka源码fork到自己的资源库中,然后下载该代码:

git clone  https://github.com/blue20080/kafka.git

3、安装Gradle

Kafka使用gradle编译和打包的,如果机器没有安装的话:

wget https://services.gradle.org/distributions/gradle-2.5-all.zip

将gradle的path目录加入环境变量,并重载环境变量

source .bash_profile 

4、配置源码环境

进入kafka目录,第一次的话,需执行:

./gradle

等待完成后,生成IntelliJ IDEA编辑器支持

./gradlew idea

执行完成后,打开IntelliJ IDEA,将项目导入到编辑器中

导入项目

导入完成后,使用Gradle刷新项目,下载项目所需的依赖包

刷新项目

5、测试源码环境

依赖包下载完成后,修改\kafka\config\server.properties配置文件,

zookeeper.connect=yourZookeeperIp:2181

将该项修改为自己的Zookeeper地址,点击Run---Edit Configurations

添加一个Applcation,并进行配置,修改完成后,点击OK

点击Run---Run Kafka 运行项目,提示报错,

log4j:WARN No appenders could be found for logger (kafka.server.KafkaConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

根据提示,将log4j的配置文件,放到 \core\build\classes\main 目录中,再次运行项目

[2015-07-30 15:21:43,694] INFO Completed load of log test5-0 with log end offset 0 (kafka.log.Log)
[2015-07-30 15:21:43,706] INFO Logs loading complete. (kafka.log.LogManager)
[2015-07-30 15:21:43,707] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2015-07-30 15:21:43,709] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2015-07-30 15:21:43,769] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2015-07-30 15:21:43,771] INFO [Socket Server on Broker 0], Started 1 acceptor threads (kafka.network.SocketServer)
[2015-07-30 15:21:43,821] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2015-07-30 15:21:43,850] INFO [ConsumerCoordinator 0]: Starting up. (kafka.coordinator.ConsumerCoordinator)
[2015-07-30 15:21:43,867] INFO [Offset Manager on Broker 0]: Removed 0 expired offsets in 7 milliseconds. (kafka.server.OffsetManager)
[2015-07-30 15:21:43,867] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2015-07-30 15:21:43,885] INFO [ConsumerCoordinator 0]: Startup complete. (kafka.coordinator.ConsumerCoordinator)
[2015-07-30 15:21:43,891] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2015-07-30 15:21:43,891] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2015-07-30 15:21:43,896] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2015-07-30 15:21:43,924] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(liwenjun-PC,9092,PLAINTEXT) (kafka.utils.ZkUtils$)
[2015-07-30 15:21:43,937] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

提示启动成功,通过Kafka Web Console查看,broker启动成功

Kafka Web Console

6、升级补丁

下载补丁文件,

wget https://issues.apache.org/jira/secure/attachment/12690887/KAFKA-1810.patch

通过 https://reviews.apache.org/r/29714/diff/3/ 可以看到,涉及补丁的代码为:

  • core/src/main/scala/kafka/network/IPFilter.scala
  • core/src/main/scala/kafka/network/SocketServer.scala
  • core/src/main/scala/kafka/server/KafkaConfig.scala
  • core/src/main/scala/kafka/server/KafkaServer.scala

相关Test类也需要进行修改,否则无法通过编译

  • core/src/test/scala/unit/kafka/network/IpFilterTest.scala
  • core/src/test/scala/unit/kafka/network/SocketServerTest.scala
  • core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
  • core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

参照补丁文件内容,对代码进行修改。

7、打包项目

进入Kafka目录,执行

gradlew clean
gradlew releaseTarGz -x signArchives

打包过程中,会提示一些的警告,可以忽略掉,打包好的文件在 core/build/distributions/ 目录中 ,将该文件上传到集群测试环境中

集群测试环境如下,一共三台主机及对应IP:

  • 192.168.100.19 master200
  • 192.168.100.20 slave201
  • 192.168.100.22 slave202

JDK:java version “1.7.0_80”

OS:CentOS release 6.4 (Final) x64

8、测试补丁

测试步骤如下:

1、配置IP地址白名单列表,加入master200、slave201两台主机IP,并启动Kafka集群

2、master200启动一个生产者程序,生产消息

3、在slave201中,观察能否消费消息

4、在slave202中,观察能否消费消息、生产消息

5、将slave202的IP地址加入白名单列表中,观察能否进行消费消息、生产消息

以下为具体步骤:

解压压缩包, tar -xvf kafka_2.10-0.8.3-SNAPSHOT.tgz,进入目录config,修改 server.properties文件,新增IP地址安全配置:

#security list
security.ip.filter.rule.type=allow
security.ip.filter.list=192.168.100.19/32,192.168.100.20/32

注意:security.ip.filter.list中,配置的IP地址,为 “位格式” 也被称为CIDR格式(CIDR=无类别域间路由选择)。如:192.168.100.1/24 表示192.168.100.1~192.168.100.254地址段

可通过 http://www.ab126.com/goju/1840.html 进行换算

以上配置的意思是:

  • IP地址过滤规则:允许
  • IP地址列表:192.168.100.19,192.168.100.20

启动Kafka程序

bin/kafka-server-start.sh  config/server.properties &

由于编译是在Windows环境下进行的,启动脚本报如下错误,

[root@master200 kafka_2.10-0.8.3-SNAPSHOT]# -bash: bin/kafka-server-start.sh: /bin/bash^M: bad interpreter: No such file or directory

使用dos2unix命令对启动脚本进行格式转换,再次启动程序,复制程序到slave201中,同样启动Kafka程序

通过Kafka Web Console查看,两个broker均启动成功。

在master200中,创建一个topic,并运行生产者程序,生产消息,

bin/kafka-topics.sh --create --zookeeper master200:2181,slave201:2181,slave202:2181 --replication-factor 1 --partitions 1 --topic test6

bin/kafka-console-producer.sh --broker-list master200:9092 ,slave201:9092  --topic test6
hello
world

在slave201中,启动消费者程序,消费消息,

bin/kafka-console-consumer.sh --zookeeper master200:2181,slave201:2181,slave202:2181 --topic test6 --from-beginning
hello
world

可见,slave201正常消费消息

在slave202中,启动消费者程序,消费消息,

bin/kafka-console-consumer.sh --zookeeper master200:2181,slave201:2181,slave202:2181 --topic test6 --from-beginning

程序抛出如下错误:

[2015-07-30 02:00:37,781] WARN Fetching topic metadata with correlation id 0 for topics [Set(test6)] from broker [BrokerEndPoint(2,slave201,9092)] failed (kafka.client.ClientUtils$)java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at org.apache.kafka.common.network.NetworkReceive.readCompletely(NetworkReceive.java:114)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:121)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:76)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:73)
at kafka.producer.SyncProducer.send(SyncProducer.scala:114)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

并且在master200、slave201中,kafka控制台抛出警告:

[2015-07-30 02:00:37,745] WARN Rejected connection from /192.168.100.22 due to IP filter rules (IP not in whitelist) (kafka.network.Acceptor)

由此可见:slave202不在IP地址白名单列表中,无法消费消息。

在slave202中,进行生产消息操作,

[root@slave202 kafka_2.10-0.8.3-SNAPSHOT]#  bin/kafka-console-producer.sh --broker-list master200:9092 ,slave201:9092  --topic test6
1
[2015-07-30 02:32:28,593] WARN Fetching topic metadata with correlation id 0 for topics [Set(test6)] from broker [BrokerEndPoint(0,master200,9092)] failed (kafka.client.ClientUtils$)
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at org.apache.kafka.common.network.NetworkReceive.readCompletely(NetworkReceive.java:114)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:121)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:76)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:73)
at kafka.producer.SyncProducer.send(SyncProducer.scala:114)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:79)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

程序抛出如上错误,并且master200在控制台中输出如下警告:

[2015-07-30 02:32:12,329] WARN Rejected connection from /192.168.100.22 due to IP filter rules (IP not in whitelist) (kafka.network.Acceptor)

由此可见:slave202不在IP地址白名单列表中,无法生产消息。

修改server.properties文件配置文件,将slave202加入到IP地址白名单列表中,重启Kafka集群

再次使用master200主机生产消息,

[root@slave201 kafka_2.10-0.8.3-SNAPSHOT]#  bin/kafka-console-producer.sh --broker-list master200:9092 ,slave201:9092  --topic test6
info

在slave202中,消费消息,

[root@slave202 kafka_2.10-0.8.3-SNAPSHOT]# bin/kafka-console-consumer.sh --zookeeper master200:2181,slave201:2181,slave202:2181 --topic test6 --from-beginning
info

由此可见:将slave202加入到IP地址白名单列表中,可以消费消息。

在slave202中,生产消息,

root@slave202 kafka_2.10-0.8.3-SNAPSHOT]#  bin/kafka-console-producer.sh --broker-list master200:9092 ,slave201:9092  --topic test6
procedure message

观察slave201的消费者,消费的消息,发现输出了slave202生产的消息 [root@slave201 kafka_2.10-0.8.3-SNAPSHOT]# bin/kafka-console-consumer.sh –zookeeper master200:2181,slave201:2181,slave202:2181 –topic tt6 –from-beginning hello world procedure message

由此可见:将slave202加入到IP地址白名单列表中,可以生产消息。

9、其他

已经打上该补丁的kafka程序地址:https://github.com/blue20080/kafka