`
Donald_Draper
  • 浏览: 984265 次
社区版块
存档分类
最新评论

Kafka Standy模式、创建主题,生产消费消息

阅读更多
Kafka配置文件:http://donald-draper.iteye.com/blog/2397000
Quickstart:http://kafka.apache.org/quickstart
上面我们简单看了一下kafka的相关配置,今天我们启动standy模式的kafka,即单机版,并使用相关命令创建topic,
生产消息和消费消息。
1.下载kafka
[donald@Donald_Draper ~]$ ls
Desktop  Documents  Downloads  kafka_2.11-0.11.0.1.tgz  Music  Pictures  Public  Templates  Videos
[donald@Donald_Draper ~]$

解压:
[donald@Donald_Draper ~]$ tar -zxvf kafka_2.11-0.11.0.1.tgz 
kafka_2.11-0.11.0.1/
kafka_2.11-0.11.0.1/LICENSE
kafka_2.11-0.11.0.1/NOTICE
kafka_2.11-0.11.0.1/bin/
...
kafka_2.11-0.11.0.1/libs/connect-json-0.11.0.1.jar
kafka_2.11-0.11.0.1/libs/connect-file-0.11.0.1.jar
kafka_2.11-0.11.0.1/libs/kafka-streams-0.11.0.1.jar
kafka_2.11-0.11.0.1/libs/rocksdbjni-5.0.1.jar
kafka_2.11-0.11.0.1/libs/kafka-streams-examples-0.11.0.1.jar
[donald@Donald_Draper ~]$ ls
Desktop  Documents  Downloads  kafka_2.11-0.11.0.1  kafka_2.11-0.11.0.1.tgz  Music  Pictures  Public  Templates  Videos
[donald@Donald_Draper ~]$ 
[donald@Donald_Draper ~]$ cd kafka_2.11-0.11.0.1/
[donald@Donald_Draper kafka_2.11-0.11.0.1]$ ls
bin  config  libs  LICENSE  NOTICE  site-docs


从上面来看解压后的文件夹 kafka_2.11-0.11.0.1内有bin,config,libs主要文件夹,
下面分别来看这几个文件夹下的文件:
先来看bin目录下:
[donald@Donald_Draper kafka_2.11-0.11.0.1]$ cd bin/
[donald@Donald_Draper bin]$ ls
connect-distributed.sh            kafka-delete-records.sh              kafka-simple-consumer-shell.sh
connect-standalone.sh             kafka-mirror-maker.sh                kafka-streams-application-reset.sh
kafka-acls.sh                     kafka-preferred-replica-election.sh  kafka-topics.sh
kafka-broker-api-versions.sh      kafka-producer-perf-test.sh          kafka-verifiable-consumer.sh
kafka-configs.sh                  kafka-reassign-partitions.sh         kafka-verifiable-producer.sh
kafka-console-consumer.sh         kafka-replay-log-producer.sh         windows
kafka-console-producer.sh         kafka-replica-verification.sh        zookeeper-security-migration.sh
kafka-consumer-groups.sh          kafka-run-class.sh                   zookeeper-server-start.sh
kafka-consumer-offset-checker.sh  kafka-server-start.sh                zookeeper-server-stop.sh
kafka-consumer-perf-test.sh       kafka-server-stop.sh                 zookeeper-shell.sh

从上面来看bin主要是sh脚本,
再来看config文件夹:
[donald@Donald_Draper bin]$ cd ..
[donald@Donald_Draper kafka_2.11-0.11.0.1]$ ls
bin  config  libs  LICENSE  NOTICE  site-docs
[donald@Donald_Draper kafka_2.11-0.11.0.1]$ cd config/
[donald@Donald_Draper config]$ ls
connect-console-sink.properties    connect-file-source.properties  log4j.properties        zookeeper.properties
connect-console-source.properties  connect-log4j.properties        producer.properties
connect-distributed.properties     connect-standalone.properties   server.properties
connect-file-sink.properties       consumer.properties             tools-log4j.properties

主要生产者消息者配置文件,日志配置文件,broker配置文件,connect和内置zookeeper配置文件;
2.启动服务器

由于kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果没有Zookeeper,可以使用kafka内部自带的Zookeeper。
[donald@Donald_Draper bin]$ ./zookeeper-server-start.sh ../config/zookeeper.properties &
[2017-10-20 08:56:08,397] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2017-10-20 08:56:08,418] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2017-10-20 08:56:08,418] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2017-10-20 08:56:08,419] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2017-10-20 08:56:08,419] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2017-10-20 08:56:08,539] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2017-10-20 08:56:08,539] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2017-10-20 08:56:08,550] INFO Server environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,550] INFO Server environment:host.name=Donald_Draper.server.com (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,550] INFO Server environment:java.version=1.8.0_91 (org.apache.zookeeper.server.ZooKeeperServer)
...

 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,550] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,550] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,551] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,551] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,551] INFO Server environment:os.version=3.10.0-327.22.2.el7.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,551] INFO Server environment:user.name=donald (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,551] INFO Server environment:user.home=/home/donald= (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,551] INFO Server environment:user.dir=/home/donald=/kafka_2.11-0.11.0.1/bin (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,576] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,576] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,576] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,648] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)



启动kafka服务器:


[donald@Donald_Draper bin]$ ./kafka-server-start.sh ../config/server.properties  &
[2] 5077
[donald@Donald_Draper bin]$ [2017-10-20 09:00:20,669] INFO KafkaConfig values: 
        advertised.host.name = null
        advertised.listeners = null
        advertised.port = null
        alter.config.policy.class.name = null
        authorizer.class.name = 
        auto.create.topics.enable = true
        auto.leader.rebalance.enable = true
        background.threads = 10
        broker.id = 0
        broker.id.generation.enable = true
        broker.rack = null
        compression.type = producer
        connections.max.idle.ms = 600000
        controlled.shutdown.enable = true
        controlled.shutdown.max.retries = 3
        controlled.shutdown.retry.backoff.ms = 5000
        controller.socket.timeout.ms = 30000
        create.topic.policy.class.name = null
        default.replication.factor = 1
        delete.records.purgatory.purge.interval.requests = 1
        delete.topic.enable = false
        fetch.purgatory.purge.interval.requests = 1000
        group.initial.rebalance.delay.ms = 0
        group.max.session.timeout.ms = 300000
        group.min.session.timeout.ms = 6000
        host.name = 
        inter.broker.listener.name = null
        inter.broker.protocol.version = 0.11.0-IV2
        leader.imbalance.check.interval.seconds = 300
        leader.imbalance.per.broker.percentage = 10
        listener.security.protocol.map = SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,TRACE:TRACE,SASL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT
        listeners = null
        log.cleaner.backoff.ms = 15000
        log.cleaner.dedupe.buffer.size = 134217728
        log.cleaner.delete.retention.ms = 86400000
        log.cleaner.enable = true
        log.cleaner.io.buffer.load.factor = 0.9
        log.cleaner.io.buffer.size = 524288
        log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
        log.cleaner.min.cleanable.ratio = 0.5
        log.cleaner.min.compaction.lag.ms = 0
        log.cleaner.threads = 1
        log.cleanup.policy = [delete]
        log.dir = /tmp/kafka-logs
        log.dirs = /tmp/kafka-logs
        log.flush.interval.messages = 9223372036854775807
        log.flush.interval.ms = null
        log.flush.offset.checkpoint.interval.ms = 60000
        log.flush.scheduler.interval.ms = 9223372036854775807
        log.flush.start.offset.checkpoint.interval.ms = 60000
        log.index.interval.bytes = 4096
        log.index.size.max.bytes = 10485760
        log.message.format.version = 0.11.0-IV2
        log.message.timestamp.difference.max.ms = 9223372036854775807
        log.message.timestamp.type = CreateTime
        log.preallocate = false
        log.retention.bytes = -1
        log.retention.check.interval.ms = 300000
        log.retention.hours = 168
        log.retention.minutes = null
        log.retention.ms = null
        log.roll.hours = 168
        log.roll.jitter.hours = 0
        log.roll.jitter.ms = null
        log.roll.ms = null
        log.segment.bytes = 1073741824
        log.segment.delete.delay.ms = 60000
        max.connections.per.ip = 2147483647
        max.connections.per.ip.overrides = 
        message.max.bytes = 1000012
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        min.insync.replicas = 1
        num.io.threads = 8
        num.network.threads = 3
        num.partitions = 1
        num.recovery.threads.per.data.dir = 1
        num.replica.fetchers = 1
        offset.metadata.max.bytes = 4096
        offsets.commit.required.acks = -1
        offsets.commit.timeout.ms = 5000
        offsets.load.buffer.size = 5242880
        offsets.retention.check.interval.ms = 600000
        offsets.retention.minutes = 1440
        offsets.topic.compression.codec = 0
        offsets.topic.num.partitions = 50
        offsets.topic.replication.factor = 1
        offsets.topic.segment.bytes = 104857600
        port = 9092
        principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
        producer.purgatory.purge.interval.requests = 1000
        queued.max.requests = 500
        quota.consumer.default = 9223372036854775807
        quota.producer.default = 9223372036854775807
        quota.window.num = 11
        quota.window.size.seconds = 1
        replica.fetch.backoff.ms = 1000
        replica.fetch.max.bytes = 1048576
        replica.fetch.min.bytes = 1
        replica.fetch.response.max.bytes = 10485760
        replica.fetch.wait.max.ms = 500
        replica.high.watermark.checkpoint.interval.ms = 5000
        replica.lag.time.max.ms = 10000
        replica.socket.receive.buffer.bytes = 65536
        replica.socket.timeout.ms = 30000
        replication.quota.window.num = 11
        replication.quota.window.size.seconds = 1
        request.timeout.ms = 30000
        reserved.broker.max.id = 1000
        sasl.enabled.mechanisms = [GSSAPI]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism.inter.broker.protocol = GSSAPI
        security.inter.broker.protocol = PLAINTEXT
        socket.receive.buffer.bytes = 102400
        socket.request.max.bytes = 104857600
        socket.send.buffer.bytes = 102400
        ssl.cipher.suites = null
        ssl.client.auth = none
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
        transaction.max.timeout.ms = 900000
        transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
        transaction.state.log.load.buffer.size = 5242880
        transaction.state.log.min.isr = 1
        transaction.state.log.num.partitions = 50
        transaction.state.log.replication.factor = 1
        transaction.state.log.segment.bytes = 104857600
        transactional.id.expiration.ms = 604800000
        unclean.leader.election.enable = false
        zookeeper.connect = localhost:2181
        zookeeper.connection.timeout.ms = 6000
        zookeeper.session.timeout.ms = 6000
        zookeeper.set.acl = false
        zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2017-10-20 09:00:20,934] INFO starting (kafka.server.KafkaServer)
[2017-10-20 09:00:20,966] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2017-10-20 09:00:21,024] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2017-10-20 09:00:21,042] INFO Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,042] INFO Client environment:host.name=Donald_Draper.server.com (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,042] INFO Client environment:java.version=1.8.0_91 (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,042] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.91-1.b14.el7_2.x86_64/jre (org.apache.zookeeper.ZooKeeper)

...

[2017-10-20 09:00:21,043] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:os.version=3.10.0-327.22.2.el7.x86_64 (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:user.name=donald (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:user.home=/home/donald= (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:user.dir=/home/donald=/kafka_2.11-0.11.0.1/bin (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,044] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@1139b2f3 (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,065] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2017-10-20 09:00:21,203] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2017-10-20 09:00:21,337] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2017-10-20 09:00:21,339] INFO Accepted socket connection from /127.0.0.1:35852 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2017-10-20 09:00:21,455] INFO Client attempting to establish new session at /127.0.0.1:35852 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 09:00:21,476] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)
[2017-10-20 09:00:21,498] INFO Established session 0x15f374a56ac0000 with negotiated timeout 6000 for client /127.0.0.1:35852 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 09:00:21,501] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x15f374a56ac0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2017-10-20 09:00:21,507] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2017-10-20 09:00:21,601] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x5 zxid:0x3 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NoNode for /brokers (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-20 09:00:21,669] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0xb zxid:0x7 txntype:-1 reqpath:n/a Error Path:/config Error:KeeperErrorCode = NoNode for /config (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-20 09:00:21,691] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x13 zxid:0xc txntype:-1 reqpath:n/a Error Path:/admin Error:KeeperErrorCode = NoNode for /admin (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-20 09:00:21,902] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x1d zxid:0x12 txntype:-1 reqpath:n/a Error Path:/cluster Error:KeeperErrorCode = NoNode for /cluster (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-20 09:00:21,912] INFO Cluster ID = IGYQ5SHuTHemtryrMBrFWA (kafka.server.KafkaServer)
[2017-10-20 09:00:22,044] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2017-10-20 09:00:22,177] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2017-10-20 09:00:22,177] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2017-10-20 09:00:22,555] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2017-10-20 09:00:22,753] INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager)
[2017-10-20 09:00:22,773] INFO Loading logs. (kafka.log.LogManager)
[2017-10-20 09:00:22,785] INFO Logs loading complete in 11 ms. (kafka.log.LogManager)
[2017-10-20 09:00:24,089] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2017-10-20 09:00:24,091] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2017-10-20 09:00:24,173] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2017-10-20 09:00:24,178] INFO [Socket Server on Broker 0], Started 1 acceptor threads (kafka.network.SocketServer)
[2017-10-20 09:00:24,197] INFO [ExpirationReaper-0-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-10-20 09:00:24,198] INFO [ExpirationReaper-0-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-10-20 09:00:24,197] INFO [ExpirationReaper-0-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-10-20 09:00:24,278] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-10-20 09:00:24,295] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-10-20 09:00:24,295] INFO [ExpirationReaper-0-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-10-20 09:00:24,308] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2017-10-20 09:00:24,332] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2017-10-20 09:00:24,335] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2017-10-20 09:00:24,340] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2017-10-20 09:00:24,350] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:setData cxid:0x29 zxid:0x16 txntype:-1 reqpath:n/a Error Path:/controller_epoch Error:KeeperErrorCode = NoNode for /controller_epoch (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-20 09:00:24,371] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 5 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2017-10-20 09:00:24,420] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager)
[2017-10-20 09:00:24,472] INFO [Transaction Coordinator 0]: Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2017-10-20 09:00:24,477] INFO [Transaction Coordinator 0]: Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2017-10-20 09:00:24,493] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2017-10-20 09:00:24,593] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2017-10-20 09:00:24,638] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:delete cxid:0x45 zxid:0x19 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-20 09:00:24,657] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2017-10-20 09:00:24,665] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x46 zxid:0x1a txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-20 09:00:24,668] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x47 zxid:0x1b txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-20 09:00:24,679] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2017-10-20 09:00:24,681] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(Donald_Draper.server.com,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2017-10-20 09:00:24,682] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2017-10-20 09:00:24,701] INFO Kafka version : 0.11.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2017-10-20 09:00:24,701] INFO Kafka commitId : c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser)
[2017-10-20 09:00:24,703] INFO [Kafka Server 0], started (kafka.server.KafkaServer)


3.创建一个主题


创建一个只有一个分区和备份的test Topic,

[donald@Donald_Draper bin]$ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
[2017-10-21 17:21:29,587] INFO Accepted socket connection from /127.0.0.1:53310 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2017-10-21 17:21:29,592] INFO Client attempting to establish new session at /127.0.0.1:53310 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-21 17:21:29,598] INFO Established session 0x15f3e3b9f1b0001 with negotiated timeout 30000 for client /127.0.0.1:53310 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-21 17:21:29,762] INFO Got user-level KeeperException when processing sessionid:0x15f3e3b9f1b0001 type:setData cxid:0x4 zxid:0x27 txntype:-1 reqpath:n/a Error Path:/config/topics/test Error:KeeperErrorCode = NoNode for /config/topics/test (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-21 17:21:29,773] INFO Got user-level KeeperException when processing sessionid:0x15f3e3b9f1b0001 type:create cxid:0x6 zxid:0x28 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics (org.apache.zookeeper.server.PrepRequestProcessor)
Created topic "test".
[2017-10-21 17:21:29,795] INFO Processed session termination for sessionid: 0x15f3e3b9f1b0001 (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-21 17:21:29,799] INFO Closed socket connection for client /127.0.0.1:53310 which had sessionid 0x15f3e3b9f1b0001 (org.apache.zookeeper.server.NIOServerCnxn)
[2017-10-21 17:21:29,834] INFO Got user-level KeeperException when processing sessionid:0x15f3e3b9f1b0000 type:create cxid:0x3f zxid:0x2c txntype:-1 reqpath:n/a Error Path:/brokers/topics/test/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions/0 (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-21 17:21:29,839] INFO Got user-level KeeperException when processing sessionid:0x15f3e3b9f1b0000 type:create cxid:0x40 zxid:0x2d txntype:-1 reqpath:n/a Error Path:/brokers/topics/test/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-21 17:21:29,883] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions test-0 (kafka.server.ReplicaFetcherManager)
[2017-10-21 17:21:29,935] INFO Loading producer state from offset 0 for partition test-0 with message format version 2 (kafka.log.Log)
[2017-10-21 17:21:29,944] INFO Completed load of log test-0 with 1 log segments, log start offset 0 and log end offset 0 in 35 ms (kafka.log.Log)
[2017-10-21 17:21:29,948] INFO Created log for partition [test,0] in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 0.11.0-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
[2017-10-21 17:21:29,950] INFO Partition [test,0] on broker 0: No checkpointed highwatermark is found for partition test-0 (kafka.cluster.Partition)
[2017-10-21 17:21:29,955] INFO Replica loaded for partition test-0 with initial high watermark 0 (kafka.cluster.Replica)
[2017-10-21 17:21:29,958] INFO Partition [test,0] on broker 0: test-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[donald@Donald_Draper bin]$ 

我们可以通过kafka-topics命令查看topic信息,具体如下:

[donald@Donald_Draper bin]$ ./kafka-topics.sh --list --zookeeper localhost:2181
[2017-10-21 17:23:13,183] INFO Accepted socket connection from /127.0.0.1:53312 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2017-10-21 17:23:13,187] INFO Client attempting to establish new session at /127.0.0.1:53312 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-21 17:23:13,191] INFO Established session 0x15f3e3b9f1b0002 with negotiated timeout 30000 for client /127.0.0.1:53312 (org.apache.zookeeper.server.ZooKeeperServer)
test
[2017-10-21 17:23:13,253] INFO Processed session termination for sessionid: 0x15f3e3b9f1b0002 (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-21 17:23:13,255] INFO Closed socket connection for client /127.0.0.1:53312 which had sessionid 0x15f3e3b9f1b0002 (org.apache.zookeeper.server.NIOServerCnxn)
[donald@Donald_Draper bin]$ 

从上面来看,有一个test主题。

另外,除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时自动创建topic。

4.生产消息

Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
运行producer(生产者),然后在控制台输入几条消息到服务器。

[donald@Donald_Draper bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
>my name is donald.
>hello!


5.消费消息

Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。
重新启动一个终端,用于消费者消费消息:

[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
my name is donald.
hello!



如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。
我们在生产者终端生产消息,在消息者终端消费消息。

在生产者终端继续生产如下消息:
[donald@Donald_Draper bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
>my name is donald.
>hello!
>what's your name?
>

消息者终端显示如下:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
my name is donald.
hello!
what's your name?



所有的命令行工具有很多的选项,可以运行无参数命令,查看命令文档来了解更多的功能。

比如kafka-topics命令:
[donald@Donald_Draper bin]$ ./kafka-topics.sh
Create, delete, describe, or change a topic.
Option                                   Description                            
------                                   -----------                            
--alter                                  Alter the number of partitions,        
                                           replica assignment, and/or           
                                           configuration for the topic.         
--config <String: name=value>            A topic configuration override for the 
                                           topic being created or altered.The   
                                           following is a list of valid         
                                           configurations:                      
                                                cleanup.policy                        
                                                compression.type                      
                                                delete.retention.ms                   
                                                file.delete.delay.ms                  
                                                flush.messages                        
                                                flush.ms                              
                                                follower.replication.throttled.       
                                           replicas                             
                                                index.interval.bytes                  
                                                leader.replication.throttled.replicas 
                                                max.message.bytes                     
                                                message.format.version                
                                                message.timestamp.difference.max.ms   
                                                message.timestamp.type                
                                                min.cleanable.dirty.ratio             
                                                min.compaction.lag.ms                 
                                                min.insync.replicas                   
                                                preallocate                           
                                                retention.bytes                       
                                                retention.ms                          
                                                segment.bytes                         
                                                segment.index.bytes                   
                                                segment.jitter.ms                     
                                                segment.ms                            
                                                unclean.leader.election.enable        
                                         See the Kafka documentation for full   
                                           details on the topic configs.        
--create                                 Create a new topic.                    
--delete                                 Delete a topic                         
--delete-config <String: name>           A topic configuration override to be   
                                           removed for an existing topic (see   
                                           the list of configurations under the 
                                           --config option).                    
--describe                               List details for the given topics.     
--disable-rack-aware                     Disable rack aware replica assignment  
--force                                  Suppress console prompts               
--help                                   Print usage information.               
--if-exists                              if set when altering or deleting       
                                           topics, the action will only execute 
                                           if the topic exists                  
--if-not-exists                          if set when creating topics, the       
                                           action will only execute if the      
                                           topic does not already exist         
--list                                   List all available topics.             
--partitions <Integer: # of partitions>  The number of partitions for the topic 
                                           being created or altered (WARNING:   
                                           If partitions are increased for a    
                                           topic that has a key, the partition  
                                           logic or ordering of the messages    
                                           will be affected                     
--replica-assignment <String:            A list of manual partition-to-broker   
  broker_id_for_part1_replica1 :           assignments for the topic being      
  broker_id_for_part1_replica2 ,           created or altered.                  
  broker_id_for_part2_replica1 :                                                
  broker_id_for_part2_replica2 , ...>                                           
--replication-factor <Integer:           The replication factor for each        
  replication factor>                      partition in the topic being created.
--topic <String: topic>                  The topic to be create, alter or       
                                           describe. Can also accept a regular  
                                           expression except for --create option
--topics-with-overrides                  if set when describing topics, only    
                                           show topics that have overridden     
                                           configs                              
--unavailable-partitions                 if set when describing topics, only    
                                           show partitions whose leader is not  
                                           available                            
--under-replicated-partitions            if set when describing topics, only    
                                           show under replicated partitions     
--zookeeper <String: urls>               REQUIRED: The connection string for    
                                           the zookeeper connection in the form 
                                           host:port. Multiple URLS can be      
                                           given to allow fail-over

删除topic命令:
[donald@Donald_Draper bin]$ ./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
Topic test is marked for deletion.4.生产发送消息
Note: This will have no impact if delete.topic.enable is not set to true.
[donald@Donald_Draper bin]$ 

再次浏览topic:
[donald@Donald_Draper bin]$ ./kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
test
[donald@Donald_Draper bin]$ 

从上面的输出来看,我们并没有删除test topic主要是因为我们没有开启broker的如下配置:
# Switch to enable topic deletion or not, default value is false
是否可以删除topic,如果为true,我们可以在命令行删除topic,否则,不能。
#delete.topic.enable=true



上面的操作我们启动的使standy模式,下面我们来,搭建一个kafka集群。
在搭建集群之前,先关闭原先的standy kafka:
[donald@Donald_Draper bin]$ ./kafka-server-stop.sh ../config/server.properties 
[donald@Donald_Draper bin]$ ./zookeeper-server-stop.sh  ../config/zookeeper.properties 

kafka集群,我们放在下一篇来讲。
0
0
分享到:
评论

相关推荐

    springboot 基于spring-kafka动态创建kafka消费者

    本文将详细探讨如何在Spring Boot项目中基于Spring Kafka动态创建Kafka消费者。 首先,了解Kafka基本概念:Kafka是一种分布式流处理平台,用于构建实时数据管道和流应用程序。它提供了高吞吐量、低延迟的消息传递,...

    springboot集成kafka实战项目,kafka生产者、消费者、创建topic,指定消费分区

    在本项目中,我们将深入探讨如何使用Spring Boot与Kafka进行集成,实现一个实战项目,包括Kafka的生产者、消费者以及如何创建Topic,并且特别关注指定消费分区这一高级特性。Kafka是一款高吞吐量的分布式消息系统,...

    nodejs kafka-node 消费消息,生产消息(csdn)————程序.pdf

    创建生产者对象来发送消息到Kafka主题: ```javascript let Producer = kafka.Producer; let producer = new Producer(client); ``` ### 6. 发送消息 一旦生产者准备好,就可以发送消息了。以下代码创建一个包含...

    kafka模拟生产者消费者(集群模式)实例

    `kafka-demo`这个压缩包可能包含了示例代码,用于演示如何在集群模式下创建Kafka生产者和消费者。解压后,你可以看到相关的Java或Python源码,这些代码提供了如何连接到Kafka集群、发布和接收消息的示例。通过学习和...

    kettle kafka 消息生产插件

    标题中的“kettle kafka 消息生产插件”指的是Pentaho Data Integration(通常称为Kettle或PDI)中的一款插件,它允许用户通过Kettle工作流将数据发布到Apache Kafka分布式消息系统。Kafka是一种高效、可扩展且容错...

    kafka demo ,两种线程消费方式

    Kafka是一个发布/订阅模型的消息队列,它包含生产者(Producer)、消费者(Consumer)和主题(Topic)。生产者负责发布消息到主题,而消费者则订阅这些主题并消费消息。消费者通过消费者组(Consumer Group)进行...

    kafka-java-demo 基于java的kafka生产消费者示例

    Kafka消费者则用于订阅和消费Kafka主题中的消息。在Java中,消费者需要设置group id、订阅主题、以及offset管理策略等。一旦配置完成,可以使用`poll()`方法从Kafka服务器拉取新消息。 【Kafka Topic】 在Kafka中...

    springboot整合kafka实现生产者和消费者

    这个简单的例子展示了如何在Spring Boot中集成Kafka,创建生产者和消费者,以及如何通过REST API发送消息。实际应用中,你可以根据需求调整配置,例如设置分区策略、错误处理、消息序列化等。此外,你还可以利用...

    kafka生产和消费示例

    在Kafka中,生产者是负责发布消息到主题(Topic)的组件,而消费者则负责从主题订阅并消费这些消息。每个主题可以被分成多个分区(Partition),每个分区具有唯一的顺序,并且可以分布在集群的不同节点上,以提高可...

    使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据

    5. **Kafka消费者**:在消费端,我们可以创建一个Kafka消费者,通过`@KafkaListener`注解监听特定主题。为了实现批量消费,可以使用`poll()`方法来获取一批记录,然后一次性处理。这种方式可以优化处理性能,尤其在...

    Kafka创建Topic,还有生产者消费者按照FileBeat格式通信

    需求: 能力申请提交成功后,自动根据标识ID创建对应Kafka的Topic。 设计思路: 1、在Java代码中调用ZooKeeper的工具类,创建Topic。...5、完整代码见附件,包含消费者和生产者的java直接调用获取。

    kettle整合kafka生产者消费者插件

    kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看我博文。

    kafka实例 消费者生产者

    在本文中,我们将深入探讨Apache Kafka,一个分布式流处理平台,以及如何在其中创建消费者和生产者实例。Kafka是一个高度可扩展、高吞吐量的消息队列系统,广泛用于实时数据流处理和大数据分析。 首先,让我们了解...

    kafka生产消费demo

    总结,这个“kafka生产消费demo”涵盖了Kafka 0.10.2版本的基本使用,包括生产者和消费者的创建、消息的发布与消费,以及可能的多线程处理。了解这些核心概念和API对于理解和构建基于Kafka的数据处理系统至关重要。

    kafka生产者消费者Demo

    生产者是负责创建和发布消息到主题(Topic)的应用程序,而消费者则订阅这些主题并处理发布的消息。在这个Demo中,我们可能会看到如何创建这两个角色的实例,并进行通信。 生产者API允许开发者向Kafka服务器发送...

    kafka生产及消费示例代码,下到就是赚到

    5. **消息模型**:Kafka支持发布/订阅模式,生产者发送消息到主题,消费者订阅主题来接收消息。示例代码可能展示了如何定义和使用这种模型。 6. **Offset管理**:消费者组中的每个消费者都会维护一个offset,表示其...

    kafka生产者消费者实例

    例如,可以创建一个简单的生产者,将日志消息写入名为"log_events"的主题。 对于消费者,我们同样编写代码订阅这个"log_events"主题。消费者代码会连接到 Kafka 集群,加入到特定的消费者组,然后开始监听并处理...

    分布式消息系统Kafka项目-生产者消费者代码实现(基于5台虚拟机完全分布式)

    3. **消费者实现**:Kafka消费者通过消费组(consumer groups)来订阅主题,并处理消息。消费者通过调用poll方法轮询新消息,同时需要管理offset(消息位置)以保持消费进度。在完全分布式环境中,消费者组内的成员...

    kafka客户端生产者消费者kafka可视化工具(可生产和消费消息)

    Kafka消费者则负责从主题中读取并处理这些消息。消费者可以并行地从多个分区读取消息,提高了处理大量数据的效率。此外,消费者还可以自动处理偏移量,以便在出现问题时能够重新开始消费。 这个工具的优点在于它们...

Global site tag (gtag.js) - Google Analytics