- 浏览: 984267 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Kafka目录结构:http://donald-draper.iteye.com/blog/2396760
Kafka配置文件:http://donald-draper.iteye.com/blog/2397000
Kafka Standy模式、创建主题,生产消费消息:http://donald-draper.iteye.com/blog/2397170
上一篇文章,我们启动kafka的standy模式,并使用相关命令创建topic,生产消息和消费消息,我们只是运行一个broker,,没太多的意思。今天我们搭建一个kafka集群。对于Kafka,一个broker仅仅只是规模为1的集群,现在我们扩展至三个节点。我们在单机,只修改broker的监听端口,id及日志目录,其实多机版集群道理是一样的,只不过ip地址不一样。
首先为每个broker创建一个配置文件:
现在已经有三个broker配置文件,现在来修改broker1和broker2配置文件,即server1.properties与server2.properties
broker.id是集群中每个节点的唯一且永久的名称,我们修改端口和日志分区是因为我们现在在同一台机器上运行,我们要防止broker在同一端口上注册和覆盖对方的数据。需要注意的是日志目录由于我们测试用的是临时目录/tmp,在生产环境中最好换个目录,因为当机器宕机时,日志文件可能丢失,这个在hadoop集群中会出现,我不知道在kafka集群中会不会出现,个人认为应该会出现,最好换个目录。同时,我们并没有使用Zookeeper集群,Zookeeper是standy模式,其实在生产环境中Zookeeper应该是高可用的,必须是集群。我们就不使用Zookeeper集群了,这个不是重点,我们在可以下下面这篇文章中找到Zookeeper集群的搭建:
Hadoop2.7.1高可用环境搭建:http://donald-draper.iteye.com/blog/2302217
主要修改Zookeeper的配置文件zoo.cfg
并创建zookeeper数据文件夹zdata,并在zdata中创建myid文件,在三台机上内容分别为上面server后的id
然后再修改broker server配置文件的一下选项
现在来看启动集群,先启动zookeeper,再启动3个broker
现在集群已经启动了,我们在创建一个复制因子为3的主题:
浏览主题列表
从输出来看我们的my-replicated-topic主题创建成功,test是我们standy模式的测试主题。
好了,现在我们已经有了一个集群了,我们怎么知道每个集群在做什么呢?运行命令“describe topics”
关键在这两行:
下面来解释一下上面的输出,第一行是所有分区的摘要,每一个线提供一个分区信息,
因为我们只有一个分区,所有只有一条行介绍分区的信息。
1."leader":该节点负责所有指定分区的读和写,每个节点的领导都是随机选择的。
2."replicas":备份的节点,无论该节点是否是leader或者目前是否还活着,只是显示。
3."isr":备份节点的集合,也就是活着的节点集合。
从上面可看出,leader为broker2。
我们再次运行上面的这个命令,看看一开始我们创建的test主题的相关信息:
关键在这两行:
从上面可看出,由于上面我们运行的是standy模式,,所以test主题没有Replicas所以是0,同时isr为0,leader也是broker0。
现在我们来启动一个两个终端一个生产消息,一个消费消息:
生产者:
消费者:
在生产者端,生产消息:
消费者中断输出为:
现在我们来,测试集群的容错,kill掉leader,Broker2作为当前的leader,也就是kill掉Broker2。
在windows上:
再次查看my-replicated-topic主题
从输出来看,备份节点broker0成为新的leader,而broker2已经不在同步备份集合里了。
重新启动一个消息者终端2:
从输出来看消息并没有丢失。
重新生产消息:
消费者终端输出为:
消息者终端1:
消息者终端2
从生产者和消息者1的输出来看,它们已经知道broker2已经不可用。
到目前为止kafka集群搭建及测试完毕,关闭集群:
下一篇我们来讲kafka connect。
Kafka配置文件:http://donald-draper.iteye.com/blog/2397000
Kafka Standy模式、创建主题,生产消费消息:http://donald-draper.iteye.com/blog/2397170
上一篇文章,我们启动kafka的standy模式,并使用相关命令创建topic,生产消息和消费消息,我们只是运行一个broker,,没太多的意思。今天我们搭建一个kafka集群。对于Kafka,一个broker仅仅只是规模为1的集群,现在我们扩展至三个节点。我们在单机,只修改broker的监听端口,id及日志目录,其实多机版集群道理是一样的,只不过ip地址不一样。
首先为每个broker创建一个配置文件:
Last login: Sun Oct 22 21:43:13 2017 [donald@Donald_Draper ~]$ ls Desktop Documents Downloads kafka_2.11-0.11.0.1 kafka_2.11-0.11.0.1.tgz Music Pictures Public Templates Videos [donald@Donald_Draper ~]$ cd kafka_2.11-0.11.0.1/ [donald@Donald_Draper kafka_2.11-0.11.0.1]$ ls bin config libs LICENSE logs NOTICE site-docs [donald@Donald_Draper kafka_2.11-0.11.0.1]$ cd config/ [donald@Donald_Draper config]$ ls connect-console-sink.properties connect-file-sink.properties connect-standalone.properties producer.properties zookeeper.properties connect-console-source.properties connect-file-source.properties consumer.properties server.properties connect-distributed.properties connect-log4j.properties log4j.properties tools-log4j.properties [donald@Donald_Draper config]$ cp server.properties server1.properties [donald@Donald_Draper config]$ cp server.properties server2.properties [donald@Donald_Draper config]$ ls connect-console-sink.properties connect-file-sink.properties connect-standalone.properties producer.properties server.properties connect-console-source.properties connect-file-source.properties consumer.properties server1.properties tools-log4j.properties connect-distributed.properties connect-log4j.properties log4j.properties server2.properties zookeeper.properties
现在已经有三个broker配置文件,现在来修改broker1和broker2配置文件,即server1.properties与server2.properties
[donald@Donald_Draper config]$ vim server1.properties # The id of the broker. This must be set to a unique integer for each broker. broker.id=1 listeners=PLAINTEXT://:9093 log.dirs=/tmp/kafka-logs1 [donald@Donald_Draper config]$ vim server2.properties # The id of the broker. This must be set to a unique integer for each broker. broker.id=2 listeners=PLAINTEXT://:9093 log.dirs=/tmp/kafka-logs2 [donald@Donald_Draper config]$ vim server.properties # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 listeners=PLAINTEXT://:9092 log.dirs=/tmp/kafka-logs [donald@Donald_Draper config]$ ^C
broker.id是集群中每个节点的唯一且永久的名称,我们修改端口和日志分区是因为我们现在在同一台机器上运行,我们要防止broker在同一端口上注册和覆盖对方的数据。需要注意的是日志目录由于我们测试用的是临时目录/tmp,在生产环境中最好换个目录,因为当机器宕机时,日志文件可能丢失,这个在hadoop集群中会出现,我不知道在kafka集群中会不会出现,个人认为应该会出现,最好换个目录。同时,我们并没有使用Zookeeper集群,Zookeeper是standy模式,其实在生产环境中Zookeeper应该是高可用的,必须是集群。我们就不使用Zookeeper集群了,这个不是重点,我们在可以下下面这篇文章中找到Zookeeper集群的搭建:
Hadoop2.7.1高可用环境搭建:http://donald-draper.iteye.com/blog/2302217
主要修改Zookeeper的配置文件zoo.cfg
server.1=192.168.126.126:2888:3888 server.2=192.168.126.127:2888:3888 server.3=192.168.126.128:2888:3888
并创建zookeeper数据文件夹zdata,并在zdata中创建myid文件,在三台机上内容分别为上面server后的id
然后再修改broker server配置文件的一下选项
############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=192.168.126.126:2181,192.168.126.127:2181,192.168.126.128:2181
现在来看启动集群,先启动zookeeper,再启动3个broker
[donald@Donald_Draper bin]$ ./zookeeper-server-start.sh ../config/zookeeper.properties & [1] 5334 [donald@Donald_Draper bin]$ ... [2017-10-22 22:25:12,828] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-22 22:25:12,828] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-22 22:25:12,879] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory) [donald@Donald_Draper bin]$ ./kafka-server-start.sh ../config/server.properties & [1] 5334 [donald@Donald_Draper bin]$ ... [2017-10-22 22:27:04,624] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(Donald_Draper.server.com,9093,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils) [2017-10-22 22:27:04,625] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2017-10-22 22:27:05,047] INFO Kafka version : 0.11.0.1 (org.apache.kafka.common.utils.AppInfoParser) [2017-10-22 22:27:05,047] INFO Kafka commitId : c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser) [2017-10-22 22:27:05,048] INFO [Kafka Server 0], started (kafka.server.KafkaServer) [donald@Donald_Draper bin]$ ./kafka-server-start.sh ../config/server1.properties & [1] 5900 [donald@Donald_Draper bin]$ ... [2017-10-22 22:27:04,624] INFO Registered broker 1 at path /brokers/ids/1 with addresses: EndPoint(Donald_Draper.server.com,9093,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils) [2017-10-22 22:27:04,625] WARN No meta.properties file under dir /tmp/kafka-logs1/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2017-10-22 22:27:05,047] INFO Kafka version : 0.11.0.1 (org.apache.kafka.common.utils.AppInfoParser) [2017-10-22 22:27:05,047] INFO Kafka commitId : c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser) [2017-10-22 22:27:05,048] INFO [Kafka Server 1], started (kafka.server.KafkaServer) [donald@Donald_Draper bin]$ ./kafka-server-start.sh ../config/server2.properties & [1] 6611 [donald@Donald_Draper bin]$ ... [2017-10-22 22:31:18,442] INFO Creating /brokers/ids/2 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2017-10-22 22:31:18,496] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2017-10-22 22:31:18,497] INFO Registered broker 2 at path /brokers/ids/2 with addresses: EndPoint(Donald_Draper.server.com,9094,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils) [2017-10-22 22:31:18,550] WARN No meta.properties file under dir /tmp/kafka-logs2/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2017-10-22 22:31:19,288] INFO Kafka version : 0.11.0.1 (org.apache.kafka.common.utils.AppInfoParser) [2017-10-22 22:31:19,288] INFO Kafka commitId : c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser) [2017-10-22 22:31:19,288] INFO [Kafka Server 2], started (kafka.server.KafkaServer) [donald@Donald_Draper bin]$ netstat -ntlp (Not all processes could be identified, non-owned process info will not be shown, you would have to be root to see it all.) Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp 0 0 192.168.122.1:53 0.0.0.0:* LISTEN - tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN - tcp 0 0 127.0.0.1:631 0.0.0.0:* LISTEN - tcp 0 0 0.0.0.0:25 0.0.0.0:* LISTEN - tcp6 0 0 :::3306 :::* LISTEN - tcp6 0 0 :::22 :::* LISTEN - tcp6 0 0 :::36055 :::* LISTEN 5597/java tcp6 0 0 ::1:631 :::* LISTEN - tcp6 0 0 :::45464 :::* LISTEN 5334/java tcp6 0 0 :::45979 :::* LISTEN 5900/java tcp6 0 0 :::34402 :::* LISTEN 6611/java tcp6 0 0 :::9092 :::* LISTEN 5597/java tcp6 0 0 :::9093 :::* LISTEN 5900/java tcp6 0 0 :::2181 :::* LISTEN 5334/java tcp6 0 0 :::9094 :::* LISTEN 6611/java [donald@Donald_Draper bin]$
现在集群已经启动了,我们在创建一个复制因子为3的主题:
[donald@Donald_Draper bin]$ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic [2017-10-22 22:36:23,175] INFO Accepted socket connection from /127.0.0.1:41964 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2017-10-22 22:36:23,214] INFO Client attempting to establish new session at /127.0.0.1:41964 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-22 22:36:23,236] INFO Established session 0x15f44792df10005 with negotiated timeout 30000 for client /127.0.0.1:41964 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-22 22:36:23,754] INFO Got user-level KeeperException when processing sessionid:0x15f44792df10005 type:setData cxid:0x6 zxid:0x122 txntype:-1 reqpath:n/a Error Path:/config/topics/my-replicated-topic Error:KeeperErrorCode = NoNode for /config/topics/my-replicated-topic (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-22 22:36:23,764] INFO Got user-level KeeperException when processing sessionid:0x15f44792df10005 type:create cxid:0x8 zxid:0x123 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics (org.apache.zookeeper.server.PrepRequestProcessor) Created topic "my-replicated-topic". [2017-10-22 22:36:23,834] INFO Processed session termination for sessionid: 0x15f44792df10005 (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-22 22:36:23,838] INFO Closed socket connection for client /127.0.0.1:41964 which had sessionid 0x15f44792df10005 (org.apache.zookeeper.server.NIOServerCnxn) [2017-10-22 22:36:23,887] INFO Got user-level KeeperException when processing sessionid:0x15f44792df10000 type:create cxid:0x186 zxid:0x127 txntype:-1 reqpath:n/a Error Path:/brokers/topics/my-replicated-topic/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/my-replicated-topic/partitions/0 (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-22 22:36:23,889] INFO Got user-level KeeperException when processing sessionid:0x15f44792df10000 type:create cxid:0x187 zxid:0x128 txntype:-1 reqpath:n/a Error Path:/brokers/topics/my-replicated-topic/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/my-replicated-topic/partitions (org.apache.zookeeper.server.PrepRequestProcessor)
浏览主题列表
[donald@Donald_Draper bin]$ ./kafka-topics.sh --list --zookeeper localhost:2181 [2017-10-22 22:36:34,017] INFO Accepted socket connection from /127.0.0.1:41970 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2017-10-22 22:36:34,021] INFO Client attempting to establish new session at /127.0.0.1:41970 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-22 22:36:34,024] INFO Established session 0x15f44792df10006 with negotiated timeout 30000 for client /127.0.0.1:41970 (org.apache.zookeeper.server.ZooKeeperServer) __consumer_offsets my-replicated-topic test [2017-10-22 22:36:34,065] INFO Processed session termination for sessionid: 0x15f44792df10006 (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-22 22:36:34,067] INFO Closed socket connection for client /127.0.0.1:41970 which had sessionid 0x15f44792df10006 (org.apache.zookeeper.server.NIOServerCnxn) [donald@Donald_Draper bin]$
从输出来看我们的my-replicated-topic主题创建成功,test是我们standy模式的测试主题。
好了,现在我们已经有了一个集群了,我们怎么知道每个集群在做什么呢?运行命令“describe topics”
[donald@Donald_Draper bin]$ ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic [2017-10-22 22:39:34,232] INFO Accepted socket connection from /127.0.0.1:41972 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2017-10-22 22:39:34,233] INFO Client attempting to establish new session at /127.0.0.1:41972 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-22 22:39:34,237] INFO Established session 0x15f44792df10007 with negotiated timeout 30000 for client /127.0.0.1:41972 (org.apache.zookeeper.server.ZooKeeperServer) Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 [2017-10-22 22:39:35,852] INFO Processed session termination for sessionid: 0x15f44792df10007 (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-22 22:39:35,860] INFO Closed socket connection for client /127.0.0.1:41972 which had sessionid 0x15f44792df10007 (org.apache.zookeeper.server.NIOServerCnxn) [donald@Donald_Draper bin]$
关键在这两行:
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
下面来解释一下上面的输出,第一行是所有分区的摘要,每一个线提供一个分区信息,
因为我们只有一个分区,所有只有一条行介绍分区的信息。
1."leader":该节点负责所有指定分区的读和写,每个节点的领导都是随机选择的。
2."replicas":备份的节点,无论该节点是否是leader或者目前是否还活着,只是显示。
3."isr":备份节点的集合,也就是活着的节点集合。
从上面可看出,leader为broker2。
我们再次运行上面的这个命令,看看一开始我们创建的test主题的相关信息:
[donald@Donald_Draper bin]$ ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test [2017-10-22 22:46:54,560] INFO Accepted socket connection from /127.0.0.1:41974 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2017-10-22 22:46:54,579] INFO Client attempting to establish new session at /127.0.0.1:41974 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-22 22:46:54,619] INFO Established session 0x15f44792df10008 with negotiated timeout 30000 for client /127.0.0.1:41974 (org.apache.zookeeper.server.ZooKeeperServer) Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 [2017-10-22 22:46:55,069] INFO Processed session termination for sessionid: 0x15f44792df10008 (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-22 22:46:55,071] INFO Closed socket connection for client /127.0.0.1:41974 which had sessionid 0x15f44792df10008 (org.apache.zookeeper.server.NIOServerCnxn) [donald@Donald_Draper bin]$
关键在这两行:
Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
从上面可看出,由于上面我们运行的是standy模式,,所以test主题没有Replicas所以是0,同时isr为0,leader也是broker0。
现在我们来启动一个两个终端一个生产消息,一个消费消息:
生产者:
[donald@Donald_Draper bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic >
消费者:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
在生产者端,生产消息:
[donald@Donald_Draper bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic >hello everyone! >how about ? >
消费者中断输出为:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic hello everyone! how about ?
现在我们来,测试集群的容错,kill掉leader,Broker2作为当前的leader,也就是kill掉Broker2。
[donald@Donald_Draper bin]$ ps aux | grep server2.properties donald 6611 3.3 15.9 3676844 297348 pts/3 Sl 22:31 0:49 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/home/donald=/kafka_2.11-0.11.0.1/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/home/donald=/kafka_2.11-0.11.0.1/bin/../logs -Dlog4j.configuration=file:./../config/log4j.properties -cp :/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/aopalliance-repackaged-2.5.0-b05.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/argparse4j-0.7.0.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/commons-lang3-3.5.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/connect-api-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/connect-file-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/connect-json-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/connect-runtime-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/connect-transforms-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/guava-20.0.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/hk2-api-2.5.0-b05.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/hk2-locator-2.5.0-b05.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/hk2-utils-2.5.0-b05.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jackson-annotations-2.8.5.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jackson-core-2.8.5.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jackson-databind-2.8.5.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jackson-jaxrs-base-2.8.5.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jackson-jaxrs-json-provider-2.8.5.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jackson-module-jaxb-annotations-2.8.5.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/javassist-3.21.0-GA.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/javax.annotation-api-1.2.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/javax.inject-1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/javax.inject-2.5.0-b05.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/javax.servlet-api-3.1.0.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/javax.ws.rs-api-2.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jersey-client-2.24.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jersey-common-2.24.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jersey-container-servlet-2.24.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jersey-container-servlet-core-2.24.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jersey-guava-2.24.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jersey-media-jaxb-2.24.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jersey-server-2.24.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jetty-continuation-9.2.15.v20160210.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jetty-http-9.2.15.v20160210.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jetty-io-9.2.15.v20160210.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jetty-security-9.2.15.v20160210.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jetty-server-9.2.15.v20160210.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jetty-servlet-9.2.15.v20160210.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jetty-servlets-9.2.15.v20160210.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jetty-util-9.2.15.v20160210.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jopt-simple-5.0.3.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/kafka_2.11-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/kafka_2.11-0.11.0.1-sources.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/kafka_2.11-0.11.0.1-test-sources.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/kafka-clients-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/kafka-log4j-appender-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/kafka-streams-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/kafka-streams-examples-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/kafka-tools-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/log4j-1.2.17.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/lz4-1.3.0.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/maven-artifact-3.5.0.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/metrics-core-2.2.0.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/osgi-resource-locator-1.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/plexus-utils-3.0.24.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/reflections-0.9.11.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/rocksdbjni-5.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/scala-library-2.11.11.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/scala-parser-combinators_2.11-1.0.4.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/slf4j-api-1.7.25.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/slf4j-log4j12-1.7.25.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/snappy-java-1.1.2.6.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/validation-api-1.1.0.Final.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/zkclient-0.10.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/zookeeper-3.4.10.jar kafka.Kafka ../config/server2.properties donald 9171 0.0 0.0 112784 988 pts/0 S+ 22:55 0:00 grep --color=auto server2.properties [donald@Donald_Draper bin]$ kill -9 6611
在windows上:
> wmic process get processid,caption,commandline | find "java.exe" | find "server2.properties" java.exe java -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.11-0.11.0.1.jar" kafka.Kafka config\server2.properties 6611 > taskkill /pid 6611 /f
再次查看my-replicated-topic主题
[donald@Donald_Draper bin]$ ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic [2017-10-22 22:59:55,370] INFO Accepted socket connection from /127.0.0.1:42250 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2017-10-22 22:59:55,375] INFO Client attempting to establish new session at /127.0.0.1:42250 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-22 22:59:55,378] INFO Established session 0x15f44792df10009 with negotiated timeout 30000 for client /127.0.0.1:42250 (org.apache.zookeeper.server.ZooKeeperServer) Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1 [2017-10-22 22:59:55,630] INFO Processed session termination for sessionid: 0x15f44792df10009 (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-22 22:59:55,643] INFO Closed socket connection for client /127.0.0.1:42250 which had sessionid 0x15f44792df10009 (org.apache.zookeeper.server.NIOServerCnxn) [donald@Donald_Draper bin]$
从输出来看,备份节点broker0成为新的leader,而broker2已经不在同步备份集合里了。
重新启动一个消息者终端2:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic hello everyone! how about ?
从输出来看消息并没有丢失。
重新生产消息:
[donald@Donald_Draper bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic >hello everyone! >how about ? >[2017-10-22 22:58:33,385] WARN Connection to node 2 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) >who is living ? >
消费者终端输出为:
消息者终端1:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic hello everyone! how about ? ... [2017-10-22 22:58:39,207] WARN Connection to node 2 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) who is living ?
消息者终端2
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic hello everyone! how about ? who is living ?
从生产者和消息者1的输出来看,它们已经知道broker2已经不可用。
到目前为止kafka集群搭建及测试完毕,关闭集群:
[donald@Donald_Draper bin]$ ./kafka-server-stop.sh ../config/server.properties [donald@Donald_Draper bin]$ ./kafka-server-stop.sh ../config/server1.properties [donald@Donald_Draper bin]$ ./kafka-server-stop.sh ../config/server2.properties [donald@Donald_Draper bin]$ ./zookeeper-server-stop.sh ../config/zookeeper.properties [donald@Donald_Draper bin]$ netstat -ntlp (Not all processes could be identified, non-owned process info will not be shown, you would have to be root to see it all.) Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp 0 0 192.168.122.1:53 0.0.0.0:* LISTEN - tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN - tcp 0 0 127.0.0.1:631 0.0.0.0:* LISTEN - tcp 0 0 0.0.0.0:25 0.0.0.0:* LISTEN - tcp6 0 0 :::3306 :::* LISTEN - tcp6 0 0 :::35219 :::* LISTEN 9858/java tcp6 0 0 :::35286 :::* LISTEN 8893/java tcp6 0 0 :::22 :::* LISTEN - tcp6 0 0 ::1:631 :::* LISTEN - [donald@Donald_Draper bin]$
下一篇我们来讲kafka connect。
相关推荐
docker容器中搭建kafka集群环境,kafka集群配置注意事项与优化
本篇文档将详细介绍如何在Linux环境中搭建Kafka集群,同时结合Hadoop和Spark的分布式安装,构建一个完整的数据处理平台。 首先,搭建Kafka集群的基础条件包括: 1. 至少一台Linux服务器,但为了高可用性,推荐多台...
Kafka集群搭建和使用过程涉及多个技术要点和配置项,包括SASL安全机制、ACL权限设置、Kafka基础概念以及安装配置步骤等。下面将详细介绍这些知识点。 首先,SASL(Simple Authentication and Security Layer)是为C...
本文不讲kafka集群原理,只谈部署步骤。 默认读者已对kafka有最基本的认知,纯粹作为部署笔记,方便回忆。 另外本文是基于Windows部署的,Linux的步骤是基本相同的(只是启动脚本位置不同)。 kafka集群类型: ...
Kafka 集群搭建与使用 Kafka 是一种高吞吐量的分布式发布订阅消息系统,使用 Scala 编写。Kafka 拥有作为一个消息系统应该具备的功能,但是确有着独特的设计。Kafka 集群的搭建和使用是基于 Kafka 的设计理念和架构...
搭建Kafka集群涉及到对虚拟机的安装配置、JDK环境的搭建、Zookeeper的安装配置等关键步骤。下面详细介绍各个知识点。 首先,虚拟机的安装是搭建Kafka集群的基础。文中提到了使用VMWare来安装三台虚拟机,并分配了...
Java环境配置是kafka集群搭建的必要组件。需要解压安装Java安装包,重命名移动到新目录下,并配置环境变量。 三、ZooKeeper安装 ZooKeeper是kafka集群的必要组件,负责负载和容灾备份。需要解压安装ZooKeeper...
【Kafka集群搭建及测试】 Kafka是一种分布式流处理平台,常用于实时数据处理和大数据管道。本文档将详细介绍如何在三台Ubuntu 16虚拟机上搭建Kafka集群,并进行基本的测试,确保其正常运行。 **1. 准备工作** 在...
本教程将详细介绍如何搭建一个Kafka集群。 首先,我们需要理解Kafka集群的基础构成。一个Kafka集群通常包括多个服务器,称为Brokers,它们负责存储和传输消息。每个Broker都有自己的分区(Partitions),这些分区是...
本篇将详细解析如何在Linux环境下配置一个3节点的Kafka集群,特别关注`server.properties`配置文件中的`zookeeper.connect`设置。 首先,我们需要理解Kafka集群的基础架构。Kafka集群由多个Brokers(即服务器)组成...
### Kafka分布式集群搭建详解 #### 一、概述 Kafka是一种高性能、分布式的消息发布与订阅系统,被广泛应用于日志收集、流处理、消息传递等多个领域。为了提高系统的可用性与扩展性,通常会采用分布式集群的方式...
【Kafka集群搭建】Kafka是一款高吞吐量的分布式发布订阅消息系统,广泛应用于大数据实时处理和流计算领域。搭建Kafka集群是构建可靠、高效的数据传输平台的关键步骤。根据提供的信息,Kafka集群搭建有两种方式:在多...
【标题】"kafka+zookeeper高可用集群搭建shell使用脚本"所涉及的知识点主要集中在构建高可用的Kafka和Zookeeper集群上,以及如何利用Shell脚本来自动化这个过程。Kafka是一个分布式流处理平台,而Zookeeper是Apache...
### Kafka集群搭建 1. **安装依赖**: 首先,你需要在服务器上安装Java运行环境(JRE)和ZooKeeper,因为Kafka依赖ZooKeeper进行集群管理。 2. **下载Kafka**: 从Apache Kafka官方网站下载最新稳定版本的Kafka,...
Kafka集群搭建** Kafka集群提供高可用性和容错性。搭建步骤与单机类似,但需考虑以下几个关键点: 1. **多台机器**:至少需要两台机器来创建集群,每台机器上都需要部署Kafka。 2. **配置更改**:在`server....
内容涵盖Kafka集群的核心组件讲解、集群架构设计、分布式集群搭建与伪集群配置,帮助读者快速上手Kafka环境部署。 通过实战案例,深入解析Java Consumer与Producer的高级用法,包括手动提交Offset、数据回溯、...
**Kafka集群安装部署全量指南** Apache Kafka是一款开源流处理平台,由LinkedIn开发并贡献给了Apache软件基金会。它设计为一个高吞吐量、分布式的消息队列系统,用于处理实时数据流。Kafka通常与ZooKeeper一起使用...
kafka分布式集群多服务器和单机部署,需安装zookeeper环境,