- 浏览: 979964 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Kafka配置文件:http://donald-draper.iteye.com/blog/2397000
Quickstart:http://kafka.apache.org/quickstart
上面我们简单看了一下kafka的相关配置,今天我们启动standy模式的kafka,即单机版,并使用相关命令创建topic,
生产消息和消费消息。
1.下载kafka
解压:
从上面来看解压后的文件夹 kafka_2.11-0.11.0.1内有bin,config,libs主要文件夹,
下面分别来看这几个文件夹下的文件:
先来看bin目录下:
从上面来看bin主要是sh脚本,
再来看config文件夹:
主要生产者消息者配置文件,日志配置文件,broker配置文件,connect和内置zookeeper配置文件;
2.启动服务器
由于kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果没有Zookeeper,可以使用kafka内部自带的Zookeeper。
启动kafka服务器:
3.创建一个主题
创建一个只有一个分区和备份的test Topic,
我们可以通过kafka-topics命令查看topic信息,具体如下:
从上面来看,有一个test主题。
另外,除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时自动创建topic。
4.生产消息
Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
运行producer(生产者),然后在控制台输入几条消息到服务器。
5.消费消息
Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。
重新启动一个终端,用于消费者消费消息:
如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。
我们在生产者终端生产消息,在消息者终端消费消息。
在生产者终端继续生产如下消息:
消息者终端显示如下:
所有的命令行工具有很多的选项,可以运行无参数命令,查看命令文档来了解更多的功能。
比如kafka-topics命令:
删除topic命令:
再次浏览topic:
从上面的输出来看,我们并没有删除test topic主要是因为我们没有开启broker的如下配置:
上面的操作我们启动的使standy模式,下面我们来,搭建一个kafka集群。
在搭建集群之前,先关闭原先的standy kafka:
kafka集群,我们放在下一篇来讲。
Quickstart:http://kafka.apache.org/quickstart
上面我们简单看了一下kafka的相关配置,今天我们启动standy模式的kafka,即单机版,并使用相关命令创建topic,
生产消息和消费消息。
1.下载kafka
[donald@Donald_Draper ~]$ ls Desktop Documents Downloads kafka_2.11-0.11.0.1.tgz Music Pictures Public Templates Videos [donald@Donald_Draper ~]$
解压:
[donald@Donald_Draper ~]$ tar -zxvf kafka_2.11-0.11.0.1.tgz kafka_2.11-0.11.0.1/ kafka_2.11-0.11.0.1/LICENSE kafka_2.11-0.11.0.1/NOTICE kafka_2.11-0.11.0.1/bin/ ... kafka_2.11-0.11.0.1/libs/connect-json-0.11.0.1.jar kafka_2.11-0.11.0.1/libs/connect-file-0.11.0.1.jar kafka_2.11-0.11.0.1/libs/kafka-streams-0.11.0.1.jar kafka_2.11-0.11.0.1/libs/rocksdbjni-5.0.1.jar kafka_2.11-0.11.0.1/libs/kafka-streams-examples-0.11.0.1.jar [donald@Donald_Draper ~]$ ls Desktop Documents Downloads kafka_2.11-0.11.0.1 kafka_2.11-0.11.0.1.tgz Music Pictures Public Templates Videos [donald@Donald_Draper ~]$ [donald@Donald_Draper ~]$ cd kafka_2.11-0.11.0.1/ [donald@Donald_Draper kafka_2.11-0.11.0.1]$ ls bin config libs LICENSE NOTICE site-docs
从上面来看解压后的文件夹 kafka_2.11-0.11.0.1内有bin,config,libs主要文件夹,
下面分别来看这几个文件夹下的文件:
先来看bin目录下:
[donald@Donald_Draper kafka_2.11-0.11.0.1]$ cd bin/ [donald@Donald_Draper bin]$ ls connect-distributed.sh kafka-delete-records.sh kafka-simple-consumer-shell.sh connect-standalone.sh kafka-mirror-maker.sh kafka-streams-application-reset.sh kafka-acls.sh kafka-preferred-replica-election.sh kafka-topics.sh kafka-broker-api-versions.sh kafka-producer-perf-test.sh kafka-verifiable-consumer.sh kafka-configs.sh kafka-reassign-partitions.sh kafka-verifiable-producer.sh kafka-console-consumer.sh kafka-replay-log-producer.sh windows kafka-console-producer.sh kafka-replica-verification.sh zookeeper-security-migration.sh kafka-consumer-groups.sh kafka-run-class.sh zookeeper-server-start.sh kafka-consumer-offset-checker.sh kafka-server-start.sh zookeeper-server-stop.sh kafka-consumer-perf-test.sh kafka-server-stop.sh zookeeper-shell.sh
从上面来看bin主要是sh脚本,
再来看config文件夹:
[donald@Donald_Draper bin]$ cd .. [donald@Donald_Draper kafka_2.11-0.11.0.1]$ ls bin config libs LICENSE NOTICE site-docs [donald@Donald_Draper kafka_2.11-0.11.0.1]$ cd config/ [donald@Donald_Draper config]$ ls connect-console-sink.properties connect-file-source.properties log4j.properties zookeeper.properties connect-console-source.properties connect-log4j.properties producer.properties connect-distributed.properties connect-standalone.properties server.properties connect-file-sink.properties consumer.properties tools-log4j.properties
主要生产者消息者配置文件,日志配置文件,broker配置文件,connect和内置zookeeper配置文件;
2.启动服务器
由于kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果没有Zookeeper,可以使用kafka内部自带的Zookeeper。
[donald@Donald_Draper bin]$ ./zookeeper-server-start.sh ../config/zookeeper.properties & [2017-10-20 08:56:08,397] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2017-10-20 08:56:08,418] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager) [2017-10-20 08:56:08,418] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager) [2017-10-20 08:56:08,419] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager) [2017-10-20 08:56:08,419] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain) [2017-10-20 08:56:08,539] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2017-10-20 08:56:08,539] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain) [2017-10-20 08:56:08,550] INFO Server environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,550] INFO Server environment:host.name=Donald_Draper.server.com (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,550] INFO Server environment:java.version=1.8.0_91 (org.apache.zookeeper.server.ZooKeeperServer) ... (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,550] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,550] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,551] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,551] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,551] INFO Server environment:os.version=3.10.0-327.22.2.el7.x86_64 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,551] INFO Server environment:user.name=donald (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,551] INFO Server environment:user.home=/home/donald= (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,551] INFO Server environment:user.dir=/home/donald=/kafka_2.11-0.11.0.1/bin (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,576] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,576] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,576] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,648] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
启动kafka服务器:
[donald@Donald_Draper bin]$ ./kafka-server-start.sh ../config/server.properties & [2] 5077 [donald@Donald_Draper bin]$ [2017-10-20 09:00:20,669] INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null alter.config.policy.class.name = null authorizer.class.name = auto.create.topics.enable = true auto.leader.rebalance.enable = true background.threads = 10 broker.id = 0 broker.id.generation.enable = true broker.rack = null compression.type = producer connections.max.idle.ms = 600000 controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 30000 create.topic.policy.class.name = null default.replication.factor = 1 delete.records.purgatory.purge.interval.requests = 1 delete.topic.enable = false fetch.purgatory.purge.interval.requests = 1000 group.initial.rebalance.delay.ms = 0 group.max.session.timeout.ms = 300000 group.min.session.timeout.ms = 6000 host.name = inter.broker.listener.name = null inter.broker.protocol.version = 0.11.0-IV2 leader.imbalance.check.interval.seconds = 300 leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,TRACE:TRACE,SASL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT listeners = null log.cleaner.backoff.ms = 15000 log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = /tmp/kafka-logs log.flush.interval.messages = 9223372036854775807 log.flush.interval.ms = null log.flush.offset.checkpoint.interval.ms = 60000 log.flush.scheduler.interval.ms = 9223372036854775807 log.flush.start.offset.checkpoint.interval.ms = 60000 log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 log.message.format.version = 0.11.0-IV2 log.message.timestamp.difference.max.ms = 9223372036854775807 log.message.timestamp.type = CreateTime log.preallocate = false log.retention.bytes = -1 log.retention.check.interval.ms = 300000 log.retention.hours = 168 log.retention.minutes = null log.retention.ms = null log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 60000 max.connections.per.ip = 2147483647 max.connections.per.ip.overrides = message.max.bytes = 1000012 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 num.partitions = 1 num.recovery.threads.per.data.dir = 1 num.replica.fetchers = 1 offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 1440 offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 offsets.topic.replication.factor = 1 offsets.topic.segment.bytes = 104857600 port = 9092 principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder producer.purgatory.purge.interval.requests = 1000 queued.max.requests = 500 quota.consumer.default = 9223372036854775807 quota.producer.default = 9223372036854775807 quota.window.num = 11 quota.window.size.seconds = 1 replica.fetch.backoff.ms = 1000 replica.fetch.max.bytes = 1048576 replica.fetch.min.bytes = 1 replica.fetch.response.max.bytes = 10485760 replica.fetch.wait.max.ms = 500 replica.high.watermark.checkpoint.interval.ms = 5000 replica.lag.time.max.ms = 10000 replica.socket.receive.buffer.bytes = 65536 replica.socket.timeout.ms = 30000 replication.quota.window.num = 11 replication.quota.window.size.seconds = 1 request.timeout.ms = 30000 reserved.broker.max.id = 1000 sasl.enabled.mechanisms = [GSSAPI] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism.inter.broker.protocol = GSSAPI security.inter.broker.protocol = PLAINTEXT socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 socket.send.buffer.bytes = 102400 ssl.cipher.suites = null ssl.client.auth = none ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000 transaction.max.timeout.ms = 900000 transaction.remove.expired.transaction.cleanup.interval.ms = 3600000 transaction.state.log.load.buffer.size = 5242880 transaction.state.log.min.isr = 1 transaction.state.log.num.partitions = 50 transaction.state.log.replication.factor = 1 transaction.state.log.segment.bytes = 104857600 transactional.id.expiration.ms = 604800000 unclean.leader.election.enable = false zookeeper.connect = localhost:2181 zookeeper.connection.timeout.ms = 6000 zookeeper.session.timeout.ms = 6000 zookeeper.set.acl = false zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig) [2017-10-20 09:00:20,934] INFO starting (kafka.server.KafkaServer) [2017-10-20 09:00:20,966] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer) [2017-10-20 09:00:21,024] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2017-10-20 09:00:21,042] INFO Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,042] INFO Client environment:host.name=Donald_Draper.server.com (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,042] INFO Client environment:java.version=1.8.0_91 (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,042] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.91-1.b14.el7_2.x86_64/jre (org.apache.zookeeper.ZooKeeper) ... [2017-10-20 09:00:21,043] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:os.version=3.10.0-327.22.2.el7.x86_64 (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:user.name=donald (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:user.home=/home/donald= (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:user.dir=/home/donald=/kafka_2.11-0.11.0.1/bin (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,044] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@1139b2f3 (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,065] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient) [2017-10-20 09:00:21,203] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2017-10-20 09:00:21,337] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2017-10-20 09:00:21,339] INFO Accepted socket connection from /127.0.0.1:35852 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2017-10-20 09:00:21,455] INFO Client attempting to establish new session at /127.0.0.1:35852 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 09:00:21,476] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog) [2017-10-20 09:00:21,498] INFO Established session 0x15f374a56ac0000 with negotiated timeout 6000 for client /127.0.0.1:35852 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 09:00:21,501] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x15f374a56ac0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2017-10-20 09:00:21,507] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2017-10-20 09:00:21,601] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x5 zxid:0x3 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NoNode for /brokers (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-20 09:00:21,669] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0xb zxid:0x7 txntype:-1 reqpath:n/a Error Path:/config Error:KeeperErrorCode = NoNode for /config (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-20 09:00:21,691] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x13 zxid:0xc txntype:-1 reqpath:n/a Error Path:/admin Error:KeeperErrorCode = NoNode for /admin (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-20 09:00:21,902] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x1d zxid:0x12 txntype:-1 reqpath:n/a Error Path:/cluster Error:KeeperErrorCode = NoNode for /cluster (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-20 09:00:21,912] INFO Cluster ID = IGYQ5SHuTHemtryrMBrFWA (kafka.server.KafkaServer) [2017-10-20 09:00:22,044] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2017-10-20 09:00:22,177] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2017-10-20 09:00:22,177] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2017-10-20 09:00:22,555] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2017-10-20 09:00:22,753] INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager) [2017-10-20 09:00:22,773] INFO Loading logs. (kafka.log.LogManager) [2017-10-20 09:00:22,785] INFO Logs loading complete in 11 ms. (kafka.log.LogManager) [2017-10-20 09:00:24,089] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2017-10-20 09:00:24,091] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2017-10-20 09:00:24,173] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor) [2017-10-20 09:00:24,178] INFO [Socket Server on Broker 0], Started 1 acceptor threads (kafka.network.SocketServer) [2017-10-20 09:00:24,197] INFO [ExpirationReaper-0-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2017-10-20 09:00:24,198] INFO [ExpirationReaper-0-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2017-10-20 09:00:24,197] INFO [ExpirationReaper-0-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2017-10-20 09:00:24,278] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2017-10-20 09:00:24,295] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2017-10-20 09:00:24,295] INFO [ExpirationReaper-0-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2017-10-20 09:00:24,308] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2017-10-20 09:00:24,332] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.group.GroupCoordinator) [2017-10-20 09:00:24,335] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2017-10-20 09:00:24,340] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.group.GroupCoordinator) [2017-10-20 09:00:24,350] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:setData cxid:0x29 zxid:0x16 txntype:-1 reqpath:n/a Error Path:/controller_epoch Error:KeeperErrorCode = NoNode for /controller_epoch (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-20 09:00:24,371] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 5 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2017-10-20 09:00:24,420] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager) [2017-10-20 09:00:24,472] INFO [Transaction Coordinator 0]: Starting up. (kafka.coordinator.transaction.TransactionCoordinator) [2017-10-20 09:00:24,477] INFO [Transaction Coordinator 0]: Startup complete. (kafka.coordinator.transaction.TransactionCoordinator) [2017-10-20 09:00:24,493] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager) [2017-10-20 09:00:24,593] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2017-10-20 09:00:24,638] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:delete cxid:0x45 zxid:0x19 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-20 09:00:24,657] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2017-10-20 09:00:24,665] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x46 zxid:0x1a txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-20 09:00:24,668] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x47 zxid:0x1b txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-20 09:00:24,679] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2017-10-20 09:00:24,681] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(Donald_Draper.server.com,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils) [2017-10-20 09:00:24,682] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2017-10-20 09:00:24,701] INFO Kafka version : 0.11.0.1 (org.apache.kafka.common.utils.AppInfoParser) [2017-10-20 09:00:24,701] INFO Kafka commitId : c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser) [2017-10-20 09:00:24,703] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
3.创建一个主题
创建一个只有一个分区和备份的test Topic,
[donald@Donald_Draper bin]$ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test [2017-10-21 17:21:29,587] INFO Accepted socket connection from /127.0.0.1:53310 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2017-10-21 17:21:29,592] INFO Client attempting to establish new session at /127.0.0.1:53310 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-21 17:21:29,598] INFO Established session 0x15f3e3b9f1b0001 with negotiated timeout 30000 for client /127.0.0.1:53310 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-21 17:21:29,762] INFO Got user-level KeeperException when processing sessionid:0x15f3e3b9f1b0001 type:setData cxid:0x4 zxid:0x27 txntype:-1 reqpath:n/a Error Path:/config/topics/test Error:KeeperErrorCode = NoNode for /config/topics/test (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-21 17:21:29,773] INFO Got user-level KeeperException when processing sessionid:0x15f3e3b9f1b0001 type:create cxid:0x6 zxid:0x28 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics (org.apache.zookeeper.server.PrepRequestProcessor) Created topic "test". [2017-10-21 17:21:29,795] INFO Processed session termination for sessionid: 0x15f3e3b9f1b0001 (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-21 17:21:29,799] INFO Closed socket connection for client /127.0.0.1:53310 which had sessionid 0x15f3e3b9f1b0001 (org.apache.zookeeper.server.NIOServerCnxn) [2017-10-21 17:21:29,834] INFO Got user-level KeeperException when processing sessionid:0x15f3e3b9f1b0000 type:create cxid:0x3f zxid:0x2c txntype:-1 reqpath:n/a Error Path:/brokers/topics/test/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions/0 (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-21 17:21:29,839] INFO Got user-level KeeperException when processing sessionid:0x15f3e3b9f1b0000 type:create cxid:0x40 zxid:0x2d txntype:-1 reqpath:n/a Error Path:/brokers/topics/test/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-21 17:21:29,883] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions test-0 (kafka.server.ReplicaFetcherManager) [2017-10-21 17:21:29,935] INFO Loading producer state from offset 0 for partition test-0 with message format version 2 (kafka.log.Log) [2017-10-21 17:21:29,944] INFO Completed load of log test-0 with 1 log segments, log start offset 0 and log end offset 0 in 35 ms (kafka.log.Log) [2017-10-21 17:21:29,948] INFO Created log for partition [test,0] in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 0.11.0-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager) [2017-10-21 17:21:29,950] INFO Partition [test,0] on broker 0: No checkpointed highwatermark is found for partition test-0 (kafka.cluster.Partition) [2017-10-21 17:21:29,955] INFO Replica loaded for partition test-0 with initial high watermark 0 (kafka.cluster.Replica) [2017-10-21 17:21:29,958] INFO Partition [test,0] on broker 0: test-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition) [donald@Donald_Draper bin]$
我们可以通过kafka-topics命令查看topic信息,具体如下:
[donald@Donald_Draper bin]$ ./kafka-topics.sh --list --zookeeper localhost:2181 [2017-10-21 17:23:13,183] INFO Accepted socket connection from /127.0.0.1:53312 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2017-10-21 17:23:13,187] INFO Client attempting to establish new session at /127.0.0.1:53312 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-21 17:23:13,191] INFO Established session 0x15f3e3b9f1b0002 with negotiated timeout 30000 for client /127.0.0.1:53312 (org.apache.zookeeper.server.ZooKeeperServer) test [2017-10-21 17:23:13,253] INFO Processed session termination for sessionid: 0x15f3e3b9f1b0002 (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-21 17:23:13,255] INFO Closed socket connection for client /127.0.0.1:53312 which had sessionid 0x15f3e3b9f1b0002 (org.apache.zookeeper.server.NIOServerCnxn) [donald@Donald_Draper bin]$
从上面来看,有一个test主题。
另外,除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时自动创建topic。
4.生产消息
Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
运行producer(生产者),然后在控制台输入几条消息到服务器。
[donald@Donald_Draper bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test >my name is donald. >hello!
5.消费消息
Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。
重新启动一个终端,用于消费者消费消息:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning my name is donald. hello!
如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。
我们在生产者终端生产消息,在消息者终端消费消息。
在生产者终端继续生产如下消息:
[donald@Donald_Draper bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test >my name is donald. >hello! >what's your name? >
消息者终端显示如下:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning my name is donald. hello! what's your name?
所有的命令行工具有很多的选项,可以运行无参数命令,查看命令文档来了解更多的功能。
比如kafka-topics命令:
[donald@Donald_Draper bin]$ ./kafka-topics.sh Create, delete, describe, or change a topic. Option Description ------ ----------- --alter Alter the number of partitions, replica assignment, and/or configuration for the topic. --config <String: name=value> A topic configuration override for the topic being created or altered.The following is a list of valid configurations: cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable See the Kafka documentation for full details on the topic configs. --create Create a new topic. --delete Delete a topic --delete-config <String: name> A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). --describe List details for the given topics. --disable-rack-aware Disable rack aware replica assignment --force Suppress console prompts --help Print usage information. --if-exists if set when altering or deleting topics, the action will only execute if the topic exists --if-not-exists if set when creating topics, the action will only execute if the topic does not already exist --list List all available topics. --partitions <Integer: # of partitions> The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected --replica-assignment <String: A list of manual partition-to-broker broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...> --replication-factor <Integer: The replication factor for each replication factor> partition in the topic being created. --topic <String: topic> The topic to be create, alter or describe. Can also accept a regular expression except for --create option --topics-with-overrides if set when describing topics, only show topics that have overridden configs --unavailable-partitions if set when describing topics, only show partitions whose leader is not available --under-replicated-partitions if set when describing topics, only show under replicated partitions --zookeeper <String: urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over
删除topic命令:
[donald@Donald_Draper bin]$ ./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test Topic test is marked for deletion.4.生产发送消息 Note: This will have no impact if delete.topic.enable is not set to true. [donald@Donald_Draper bin]$
再次浏览topic:
[donald@Donald_Draper bin]$ ./kafka-topics.sh --list --zookeeper localhost:2181 __consumer_offsets test [donald@Donald_Draper bin]$
从上面的输出来看,我们并没有删除test topic主要是因为我们没有开启broker的如下配置:
# Switch to enable topic deletion or not, default value is false 是否可以删除topic,如果为true,我们可以在命令行删除topic,否则,不能。 #delete.topic.enable=true
上面的操作我们启动的使standy模式,下面我们来,搭建一个kafka集群。
在搭建集群之前,先关闭原先的standy kafka:
[donald@Donald_Draper bin]$ ./kafka-server-stop.sh ../config/server.properties [donald@Donald_Draper bin]$ ./zookeeper-server-stop.sh ../config/zookeeper.properties
kafka集群,我们放在下一篇来讲。
相关推荐
本文将详细探讨如何在Spring Boot项目中基于Spring Kafka动态创建Kafka消费者。 首先,了解Kafka基本概念:Kafka是一种分布式流处理平台,用于构建实时数据管道和流应用程序。它提供了高吞吐量、低延迟的消息传递,...
在本项目中,我们将深入探讨如何使用Spring Boot与Kafka进行集成,实现一个实战项目,包括Kafka的生产者、消费者以及如何创建Topic,并且特别关注指定消费分区这一高级特性。Kafka是一款高吞吐量的分布式消息系统,...
创建生产者对象来发送消息到Kafka主题: ```javascript let Producer = kafka.Producer; let producer = new Producer(client); ``` ### 6. 发送消息 一旦生产者准备好,就可以发送消息了。以下代码创建一个包含...
`kafka-demo`这个压缩包可能包含了示例代码,用于演示如何在集群模式下创建Kafka生产者和消费者。解压后,你可以看到相关的Java或Python源码,这些代码提供了如何连接到Kafka集群、发布和接收消息的示例。通过学习和...
标题中的“kettle kafka 消息生产插件”指的是Pentaho Data Integration(通常称为Kettle或PDI)中的一款插件,它允许用户通过Kettle工作流将数据发布到Apache Kafka分布式消息系统。Kafka是一种高效、可扩展且容错...
Kafka是一个发布/订阅模型的消息队列,它包含生产者(Producer)、消费者(Consumer)和主题(Topic)。生产者负责发布消息到主题,而消费者则订阅这些主题并消费消息。消费者通过消费者组(Consumer Group)进行...
Kafka消费者则用于订阅和消费Kafka主题中的消息。在Java中,消费者需要设置group id、订阅主题、以及offset管理策略等。一旦配置完成,可以使用`poll()`方法从Kafka服务器拉取新消息。 【Kafka Topic】 在Kafka中...
这个简单的例子展示了如何在Spring Boot中集成Kafka,创建生产者和消费者,以及如何通过REST API发送消息。实际应用中,你可以根据需求调整配置,例如设置分区策略、错误处理、消息序列化等。此外,你还可以利用...
在Kafka中,生产者是负责发布消息到主题(Topic)的组件,而消费者则负责从主题订阅并消费这些消息。每个主题可以被分成多个分区(Partition),每个分区具有唯一的顺序,并且可以分布在集群的不同节点上,以提高可...
5. **Kafka消费者**:在消费端,我们可以创建一个Kafka消费者,通过`@KafkaListener`注解监听特定主题。为了实现批量消费,可以使用`poll()`方法来获取一批记录,然后一次性处理。这种方式可以优化处理性能,尤其在...
需求: 能力申请提交成功后,自动根据标识ID创建对应Kafka的Topic。 设计思路: 1、在Java代码中调用ZooKeeper的工具类,创建Topic。...5、完整代码见附件,包含消费者和生产者的java直接调用获取。
kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看我博文。
在本文中,我们将深入探讨Apache Kafka,一个分布式流处理平台,以及如何在其中创建消费者和生产者实例。Kafka是一个高度可扩展、高吞吐量的消息队列系统,广泛用于实时数据流处理和大数据分析。 首先,让我们了解...
总结,这个“kafka生产消费demo”涵盖了Kafka 0.10.2版本的基本使用,包括生产者和消费者的创建、消息的发布与消费,以及可能的多线程处理。了解这些核心概念和API对于理解和构建基于Kafka的数据处理系统至关重要。
生产者是负责创建和发布消息到主题(Topic)的应用程序,而消费者则订阅这些主题并处理发布的消息。在这个Demo中,我们可能会看到如何创建这两个角色的实例,并进行通信。 生产者API允许开发者向Kafka服务器发送...
5. **消息模型**:Kafka支持发布/订阅模式,生产者发送消息到主题,消费者订阅主题来接收消息。示例代码可能展示了如何定义和使用这种模型。 6. **Offset管理**:消费者组中的每个消费者都会维护一个offset,表示其...
例如,可以创建一个简单的生产者,将日志消息写入名为"log_events"的主题。 对于消费者,我们同样编写代码订阅这个"log_events"主题。消费者代码会连接到 Kafka 集群,加入到特定的消费者组,然后开始监听并处理...
3. **消费者实现**:Kafka消费者通过消费组(consumer groups)来订阅主题,并处理消息。消费者通过调用poll方法轮询新消息,同时需要管理offset(消息位置)以保持消费进度。在完全分布式环境中,消费者组内的成员...
Kafka消费者则负责从主题中读取并处理这些消息。消费者可以并行地从多个分区读取消息,提高了处理大量数据的效率。此外,消费者还可以自动处理偏移量,以便在出现问题时能够重新开始消费。 这个工具的优点在于它们...