kafka是一种高吞吐量的分布式发布订阅消息系统,它有如下特性:
- 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
- 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
- 支持通过kafka服务器和消费机集群来分区消息。
- 支持Hadoop并行数据加载。
卡夫卡的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。
kafka是显式分布式架构,producer、broker(Kafka)和consumer都可以有多个。Kafka的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。几个基本概念:
(1)message(消息)是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。如果consumer订阅了这个主题,那么新发布的消息就会广播给这些consumer。
(2)Kafka是显式分布式的,多个producer、consumer和broker可以运行在一个大的集群上,作为一个逻辑整体对外提供服务。对于consumer,多个consumer可以组成一个group,这个message只能传输给某个group中的某一个consumer.
1、kafka安装
1
|
http://kafka.apache.org/documentation.html#quickstart
wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
[root@itr-mastertest01 local]# tar -zxvf software/kafka_2.10-0.8.2-beta.tgz
#TCP TEST
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
# vim config/server.properties
brokerid:这个每个server(broker)必须唯一,写数字
host.name:这个也是唯一的,写ip或者hostname
[root@itr-mastertest01 config]# egrep -v "^$|#" server.properties
broker.id=1
port=9092
host.name=itr-mastertest01 #所在节点的ip地址,如果不想配置全为localhost即可
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=65536
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181
zookeeper.connection.timeout.ms=2000
[root@itr-mastertest01 config]# egrep -v "^$|#" zookeeper.properties
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
[root@itr-mastertest01 local]# scp -rq kafka_2.10-0.8.2-beta itr-mastertest02:/usr/local/
|
2、启动kafka
1
|
#启动之前zookeeper必须启动!zookeeper请参考[zookeeper cluster deploy](http://www.itweet.cn/2015/07/12/zk-cluster-deploy/)
[root@itr-mastertest01 kafka_2.10-0.8.2-beta]# bin/kafka-server-start.sh config/server.properties &
--建议下面方式启动否则关闭终端kafka就挂了
# nohup kafka/bin/kafka-server-start.sh kafka/config/server.properties > kafka/kafka-logs/kafka-server.log &
[root@itr-mastertest02 kafka_2.10-0.8.2-beta]# bin/kafka-server-start.sh config/server.properties &
[root@itr-nodetest01 kafka_2.10-0.8.2-beta]# bin/kafka-server-start.sh config/server.properties &
[root@itr-nodetest02 kafka_2.10-0.8.2-beta]# bin/kafka-server-start.sh config/server.properties &
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids
null
cZxid = 0x1d00000939
ctime = Tue May 12 23:52:57 CST 2015
mZxid = 0x1d00000939
mtime = Tue May 12 23:52:57 CST 2015
pZxid = 0x1d00000ca7
cversion = 4
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
|
3、创建一个topic
1
|
[root@itr-mastertest01 kafka_2.10-0.8.2-beta]# bin/kafka-topics.sh --create --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --replication-factor 3 --partitions 1 --topic test
|
4、命令行查看topic
1
|
[root@itr-mastertest02 kafka_2.10-0.8.2-beta]# bin/kafka-topics.sh --list --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181
test
|
5、发送一些消息
1
|
[root@itr-mastertest01 kafka_2.10-0.8.2-beta]# bin/kafka-console-producer.sh --broker-list itr-mastertest01:9092 --topic test
This is a message
This is another message
[zk: localhost:2181(CONNECTED) 7] get /brokers/topics/test/partitions/3/state
{"controller_epoch":1,"leader":3,"version":1,"leader_epoch":0,"isr":[3,1]}
[zk: localhost:2181(CONNECTED) 12] get /brokers/ids/3
{"jmx_port":-1,"timestamp":"1431447378690","host":"itr-nodetest01","version":1,"port":9092}
|
6、开始消费信息
1
|
[root@itr-mastertest02 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic test --from-beginning
This is a message
This is another message
[root@itr-nodetest01 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic test --from-beginning
This is a message
This is another message
kafaka to slave msg...
[root@itr-nodetest02 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic test --from-beginning
This is a message
This is another message
kafaka to slave msg.
|
8、查看集群topic详细信息
1
|
[root@itr-mastertest02 kafka_2.10-0.8.2-beta]# bin/kafka-topics.sh --describe --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic test
Topic:test PartitionCount:4 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 4 Replicas: 4,2 Isr: 4,2
Topic: test Partition: 1 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: test Partition: 2 Leader: 2 Replicas: 2,4 Isr: 2,4
Topic: test Partition: 3 Leader: 3 Replicas: 3,1 Isr: 3,1
消息被写到目录:
[root@itr-mastertest02 bin]# ls /tmp/kafka-logs/test-2
00000000000000000000.index 00000000000000000000.log
|
9、删除topic
1
|
$ kafka-topics.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
配置server.config , delete.topic.enable=true即可删除
|
10、kafka的webui
-
Kafka监控工具一
1
|
https://github.com/quantifind/KafkaOffsetMonitor
wget https://github.com/quantifind/KafkaOffsetMonitor/releases/download/v0.2.1/KafkaOffsetMonitor-assembly-0.2.1.jar
Running It
This is a small webapp, you can run it locally or on a server, as long as you have access to the ZooKeeper nodes controlling kafka.
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk zk-server1,zk-server2 \
--port 8080 \
--refresh 10.seconds \
--retain 2.days
The arguments are:
- zk the ZooKeeper hosts
- port on what port will the app be available
- refresh how often should the app refresh and store a point in the DB
- retain how long should points be kept in the DB
- dbName where to store the history (default 'offsetapp')
[root@itr-mastertest01 local]# mkdir kafka-offset-console
[root@itr-mastertest01 local]# cd kafka-offset-console/
[root@itr-mastertest01 kafka-offset-console]# cat mobile_start_en.sh
#!/bin/bash
cd /usr/local/kafka-offset-console
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp ./KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk itr-mastertest01,itr-mastertest02,itr-nodetest01/config/mobile/kafka-offset-console --port 9999 --refresh 10.seconds --retain 7.days 1>./stdout.log 2>./stderr.log &
[root@itr-mastertest01 kafka-offset-console]# chmod +x mobile_start_en.sh
[root@itr-mastertest01 kafka-offset-console]# sh mobile_start_en.sh
[root@itr-mastertest01 kafka-offset-console]# tail -f stdout.log
serving resources from: jar:file:/usr/local/kafka-offset-console/KafkaOffsetMonitor-assembly-0.2.1.jar!/offsetapp
|
-
Kafka监控工具二
1
|
https://github.com/yahoo/kafka-manager
[root@itr-mastertest01 kafka-manager]# echo $JAVA_HOME
/usr/local/jdk1.7.0_45
[root@itr-mastertest01 sbt]# sbt -version
Getting org.scala-sbt sbt 0.13.8 ...
https://github.com/yahoo/kafka-manager
[root@itr-mastertest01 hsu]# yum install git -y
[root@itr-mastertest01 hsu]# git clone https://github.com/yahoo/kafka-manager.git
[root@itr-mastertest01 hsu]# cd kafka-manager/
[root@itr-mastertest01 kafka-manager]# sbt clean dist #这里生产zip包
Configuration
$ unzip kafka-manager-1.0-SNAPSHOT.zip
在kafka-manager安装包的conf目录下面的application.conf文件中进行配置
kafka-manager.zkhosts="itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181"
Starting the service
$ bin/kafka-manager -Dconfig.file=./conf/application.conf -Dhttp.port=8081 > manager-ui.log &
|
这个工具需要自己编译,也可以直接找我获取编译包!
- Kafka监控工具三
1
|
https://github.com/shunfei/DCMonitor
druid:
https://github.com/alibaba/druid/wiki/%E9%85%8D%E7%BD%AE_StatViewServlet%E9%85%8D%E7%BD%AE
|
11、kafka和MQ区别
1
|
https://github.com/alibaba/RocketMQ/wiki/rmq_vs_kafka
http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
|
- 性能对比
• Kafka单机写入TPS约在百万条/秒,消息大小10个字节
• RocketMQ单机写入TPS单实例约7万条/秒,单机部署3个Broker,可以跑到最高12万条/秒,消息大小10个字节
总结:Kafka的TPS跑到单机百万,主要是由于Producer端将多个小消息合并,批量发向Broker。
12、项目分析
1
|
数据源:
Oracle订单表
写代码
发送数据给kafka
接收:
[root@itr-mastertest01 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic order --from-beginning
998801026 11118 731 44 0 2015-05-17 00:36:08
46300226 68178 556 14 0 2015-05-17 00:36:08
367575834 42812 820 21 0 2015-05-17 00:36:08
97829386 96289 583 67 1 2015-05-17 00:36:08
#需要拷贝kafka的包到storm相应的lib目录下
#zk创建节点
[zk: localhost:2181(CONNECTED) 11] create /order 1
Created /order
[zk: localhost:2181(CONNECTED) 12] create /order/id 1
Created /order/id
[zk: localhost:2181(CONNECTED) 13] ls /order
[id]
#创建topic
kafka-topics.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --delete --topic order
bin/kafka-topics.sh --create --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --replication-factor 3 --partitions 4 --topic order
bin/kafka-topics.sh --describe --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic order
#打包测试1
storm-0.9.2/bin/storm jar storm-code-1.0-SNAPSHOT-jar-with-dependencies.jar com.kafka_storm.topology.CounterTopology
#打包测试2
$ storm/bin/storm jar storm-test_fat.jar com.kafka_storm.topology.OrderTopology
#优化数据结构保证数据不丢失!通过存储到第三方内存系统中如redis/memcached
#zk分布式锁保证多线程处理数据,数据一致性,使用第三方封装zkcli包,保证每次只有一个线程去操作mysql,而不是多个线程导致数据混乱!保证同一条数据不会被多个线程同时处理![这样就可以topology开启多线程处理数据!]
http://repo1.maven.org/maven2/com/netflix/curator/
#集群模式测试,多线程
# storm/bin/storm jar storm-test_fat.jar com.kafka_storm.topology.OrderTopology OrderTopology > order.log
|
参考:
原创文章,转载请注明: 转载自sparkjvm的博客
分享到:
相关推荐
- `bin`目录:包含了启动和管理Kafka服务的脚本,如`kafka-server-start.sh`、`kafka-console-producer.sh`和`kafka-console-consumer.sh`。 - `config`目录:包含默认配置文件,如`server.properties`(配置Kafka ...
To download the Kafka UI Tool for your operating system, use the links below. All versions of Kafka Tool come with a bundled JRE with the exception of the Linux version. For Linux, you must have Java ...
Use Spark SQL, DataFrames, and Datasets to process data using traditional SQL queries Work with different machine learning concepts and libraries using Spark’s MLlib packages Who This Book Is For ...
producers and some advanced level Java producers that use message partitioning. Chapter 5, Writing Consumers, provides detailed information about how to write basic consumers and some advanced level ...
Our goal is to give you an understanding not just of what Apache Kafka is, but also how to use it as a part of your broader technical infrastructure. In the end, we will walk you through ...
To download the Kafka UI Tool for your operating system, use the links below. All versions of Kafka Tool come with a bundled JRE with the exception of the Linux version. For Linux, you must have Java ...
Kafka Tool is free for personal use only. Any non-personal use, including commercial, educational and non-profit work is not permitted without purchasing a license. Non-personal use is allowed for ...
这包括设置`security.kerberos.login.use-ticket-cache`为`true`,以及指定`security.kerberos.login.contexts`,例如设置为`Kafka`。 - 对于Kafka,配置`server.properties`中的`authorizer.class.name`为`...
Apache Kafka 是一个分布式流处理平台,常用于构建实时数据管道和流应用。在这个"apache-kafka-1.0.0 java Demo"中,我们将探讨如何使用Java API与Kafka进行交互,实现基本的消息生产与消费功能。这个Demo虽然没有...
We looked back at our experience writing Kafka, running Kafka in production, and helping many companies use Kafka to build software architectures and manage their data pipelines and we asked ...
Kafka Streams in Action teaches you everything you... By the end of the book, you'll be ready to use Kafka Streams in your projects to reap the benefits of the insight your data holds quickly and easily.
Kafka应用场景(Use Cases)包括构建实时数据管道、收集日志、构建流应用程序等。快速入门(Quick Start)部分则会提供简单的步骤来启动和运行Kafka,让使用者快速了解和使用Kafka。 软件生态(Ecosystem)部分讲述...
#### 1.2 Use Cases Kafka 主要应用于两个场景: - **实时数据管道**:构建可确保数据在不同系统或应用间稳定传输的管道。 - **实时流应用程序**:创建能够对数据流进行转换或响应的应用程序。 #### 1.3 Quick ...
to use Kafka efficiently to handle high data volumes with ease. This book first takes you through understanding the type messaging system and then provides a thorough introduction to Apache Kafka and ...
2. **使用场景 (Use Cases)**:Kafka常用于日志收集、用户行为追踪、流式处理、消息系统、网站活动统计、运营指标收集、实时分析等。它也常作为微服务架构中的消息中间件,提供服务间的解耦和异步通信。 3. **快速...
Kafka, running Kafka in production, and helping many companies use Kafka to build software architectures and manage their data pipelines and we asked ourselves, “What are the most useful things we ...
To download the Kafka UI Tool for your operating system, use the links below. All versions of Kafka Tool come with a bundled JRE with the exception of the Linux version. For Linux, you must have Java ...