发布于 

Kafka安装文档

  1. 关闭防火墙
1
systemctl stop firewalld
  1. 需安装Zookeeper组件具体要求同Zookeeper任务要求,并与Kafka环境适配,启动Zookeeper并截图保存结果:
  • 在master,slave1,slave2分别启动zookeeper
1
zkServer.sh start
  • 查询3台机器的zookeeper启动状态
1
zkServer.sh status
  • zookeeper 配置文件 /opt/software/zookeeper/conf/zoo.cfg ,数据文件夹设置为 /opt/software/zookeeper/zkdatazoo.cfg 需要和 kafka 的 zookeeper.properties 适配
1
vi /opt/module/zookeeper/conf/zoo.cfg

需要查看的配置内容:

1
2
3
4
5
dataDir=/opt/module/zookeeper/zkdata
clientPort=2181
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888
  1. 解压kafka安装包到 /opt/module 路径,并修改解压后文件夹名为 kafka ,截图并保存结果:
1
2
tar -xvzf /opt/software/kafka1.0.0.tar.gz -C /opt/module
mv /opt/module/kafka_2.11-1.0.0 /opt/module/kafka
  1. 设置kafka环境变量, 并使环境变量只对当前root用户生效,截图并保存结果:
1
vi /root/.bash_profile

添加以下内容:

1
2
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

刷新生效:

1
source /root/.bash_profile
  1. 修改kafka相应文件, 截图并保存结果:
  • 在 master 上修改文件zookeeper.properties
1
vi /opt/module/kafka/config/zookeeper.properties

配置文件内容:

1
2
3
4
5
clientPort=2181
dataDir=/opt/module/kafka/data
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888

在kafka安装文件夹中建立 data 文件夹,与 dataDir=/opt/module/kafka/data 对应

1
2
mkdir /opt/module/kafka/data
vi /opt/module/kafka/data/myid
  • 在 master 上修改 server.properties
1
vi  /opt/module/kafka/config/server.properties

修改以下内容:

1
2
3
4
5
broker.id=0
listeners=PLAINTEXT://192.168.152.240:9092
advertised.listeners=PLAINTEXT://192.168.152.240:9092
log.dirs=/usr/local/src/kafka/data
zookeeper.connect=192.168.152.240:2181,192.168.152.241:2181,192.168.152.242:2181

文件分发:

1
2
3
4
scp -r /opt/module/kafka slave1:/opt/module
scp -r /opt/module/kafka slave2:/opt/module
scp /root/.bash_profile slave1:/root
scp /root/.bash_profile slave2:/root

在其他节点刷新变量文件生效:

1
source /root/.bash_profile
  • 在 slave1 上修改 myid 文件为 2
1
2
3
cat>/opt/module/kafka/data/myid<<EOF
2
EOF

修改 server.properties 文件为以下内容:

1
2
3
4
broker.id=1
listeners=PLAINTEXT://192.168.152.241:9092
advertised.listeners=PLAINTEXT://192.168.152.241:9092
log.dirs=/usr/local/src/kafka/data zookeeper.connect=192.168.152.240:2181,192.168.152.241:2181,192.168.152.242:2181

在 slave2 上修改 myid 文件为 3

1
2
3
cat>/opt/module/kafka/data/myid<<EOF
3
EOF

修改 server.properties 文件为以下内容:

1
2
3
4
broker.id=1
listeners=PLAINTEXT://192.168.152.242:9092
advertised.listeners=PLAINTEXT://192.168.152.242:9092
log.dirs=/usr/local/src/kafka/data zookeeper.connect=192.168.152.240:2181,192.168.152.241:2181,192.168.152.242:2181
  1. 启动 kafka 并保存命令输出结果, 截图并保存结果:

分别在master, slave1, slave2 上执行

1
2
3
kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties

jps

输出以下内容为正常启动:

1
2
3
2198 QuorumPeerMain
6732 Jps
3758 Kafka
  1. 创建指定topic, 截图并保存结果:
1
kafka-topics.sh --zookeeper master01:2181,slave01:2181,slave02:2181 --create --partitions 3 --replication-factor 3 --topic test

输出以下内容为正常启动:

1
Created topic "test-topic".
  1. 查看所有topic信息, 并截图保存结果:
1
kafka-topics.sh --list --zookeeper master01:2181

输出:

1
test
1
kafka-topics.sh -zookeeper master01:2181,slave01:2181,slave02:2181 --describe --topic test

输出:

1
2
3
4
Topic:test-topic     PartitionCount:3   ReplicationFactor:3      Configs:
      Topic: test-topic    Partition: 0 Leader: 1   Replicas: 1,2,0      Isr: 1,0
      Topic: test-topic    Partition: 1 Leader: 2   Replicas: 2,0,1      Isr: 2,0,1
      Topic: test-topic    Partition: 2 Leader: 0   Replicas: 0,1,2      Isr: 0,1
  1. 启动指定生产者(producer), 并截图保存结果:

注意: 控制台发送者的broker-list的ip需要与server.properties中listeners处的一致, 控制台消费者就能接收到消息

1
kafka-console-producer.sh --broker-list 192.168.152.240:9092 --topic test
  1. 启动消费者(sonsumer), 并截图保存结果:

再开启一个master连接, 输入:

1
kafka-console-consumer.sh --bootstrap-server 192.168.152.240:9092 --topic test

注: kafka旧版本连接服务器的参数为 --zookeeper , 新版为 --bootstrap-server

输出:

1
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
  1. 测试生产者(producer), 并截图保存结果:
1
kafka-console-producer.sh --broker-list 192.168.152.240:9092 --topic test
1
2
3
4
5
6
7
8
9
[root@master01 ~]# kafka-console-producer.sh --broker-list 192.168.152.240:9092 --topic test
>111
>222
>333
>444
>555
>666
>777
>888
  1. 测试消费者(consumer), 并截图保存结果

消费者上显示:

1
2
3
4
5
6
7
8
9
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
111
222
333
444
555
666
777
888

基于命令行方式使用kafka

1.  三台机器上启动zookeeper
zkServer.sh start

2.  三台机器上启动kafka服务

1
kafka-server-start.sh -daemon /usr/local/src/kafka/config/server.properties  

3.  查看进程

1
jps

4.  创建主题

1
2
3
4
5
kafka-topics.sh --create \  
--topic itcasttopic \
--partitions 3 \
--replication-factor 2 \
--zookeeper master01:2181,slave01:2181,slave02:2181

5.  创建生产者,来生产消息。在master01上输入

1
2
3
kafka-console-producer.sh \  
--broker-list master01:9092,slave01:9092,slave02:9092 \
--topic itcasttopic

6.  创建消费者,来消费消息。在slave01上输入

1
2
3
kafka-console-consumer.sh \  
--from-beginning \
--topic itcasttopic

测试消费者(consumer)(kafka2.11-2.0.0命令有所不同)

1
2
3
4
kafka-console-consumer.sh \  
--from-beginning \
--topic test \
--bootstrap-server 192.168.152.245:9092