Kafka(3)Latest Zookeeper and Kafka With Scala
1. Install the latest Zookeeper
http://zookeeper.apache.org/doc/r3.4.5/
Download and get the latest file zookeeper-3.4.5.tar.gz
Soft link the directory
>sudo ln -s /Users/carl/tool/zookeeper-3.4.5 /opt/zookeeper-3.4.5
>sudo ln -s /opt/zookeeper-3.4.5 /opt/zookeeper
Add this to the system path
>vi ~/.profile
export PATH=/opt/zookeeper/bin:$PATH
>. ~/.profile
Put the default configuration file
>cp conf/zoo_sample.cfg conf/zoo.cfg
And start the server like this
>zkServer.sh start zoo.cfg
Use JPS to check if the server is running
>jps
1957
10014 QuorumPeerMain
2260
10050 Jps
Connecting with client
>zkCli.sh -server localhost:2181
zookeeper>help
zookeeper>quit
Stop the server
>zkServer.sh stop
There are status, restart, upgrade, start, stop...
Configure to Cluster as follow
zoo1.cfg
dataDir=/tmp/zookeeper/zoo1
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
clientPort=2181
zoo2.cfg
dataDir=/tmp/zookeeper/zoo2
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
clientPort=2182
zoo3.cfg
dataDir=/tmp/zookeeper/zoo3
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
clientPort=2183
>vi /tmp/zookeeper/zoo1/myid
1
>vi /tmp/zookeeper/zoo2/myid
2
>vi /tmp/zookeeper/zoo3/myid
3
Start 3 nodes
>zkServer.sh start zoo1.cfg
>zkServer.sh start zoo2.cfg
>zkServer.sh start zoo3.cfg
>jps
Or use client to connect
>zkCli.sh -server localhost:2181
>zkServer.sh stop zoo1.cfg
>zkServer.sh stop zoo2.cfg
>zkServer.sh stop zoo3.cfg
2. Working with Latest kafka
Download the latest source package from here https://dist.apache.org/repos/dist/release/kafka/kafka-0.8.0-beta1-src.tgz
>mv kafka-0.8.0-beta1-src kafka-0.8.0-beta1
>cd /Users/carl/tool/kafka-0.8.0-beta1
>./sbt clean
>./sbt update
>./sbt package
>./sbt assembly-package-dependency
>sudo ln -s /Users/carl/tool/kafka-0.8.0-beta1 /opt/kafka-0.8.0-beta1
>sudo ln -s /opt/kafka-0.8.0-beta1 /opt/kafka
>sudo vi ~/.profile
export PATH=/opt/kafka/bin:$PATH
>. ~/.profile
>kafka-server-start.sh config/server.properties
Start the zookeeper first.
Today is Feb-11-2014, the latest version is this one http://apache.osuosl.org/kafka/0.8.0/kafka-0.8.0-src.tgz
3. Find the Jar and Prepare the Scala Client
>./sbt release-zip
>./sbt publish-local
Find the dependencies from here
>cat /opt/kafka/core/build.sbt
Add these 2 lines
"com.101tec" % "zkclient" % "0.3",
"org.apache.kafka" % "kafka_2.8.0" % "0.8.0-beta1"
Try to fetch the latest codes
>git clone https://github.com/apache/kafka.git
>git clone https://github.com/kevinwright/kafka.git kafka_2.10
Or try this command directly on the official codes
>./sbt "++2.9.2 package"
>./sbt "++2.9.2 publish-local"
It is not working for
>./sbt "++2.10.0 package"
So I change the build.sbt
>vi core/build.sbt
case "2.10.0"=> "org.scalatest" % "scalatest_2.10" % "1.9.1" % "test"
Have other error. So try with the kafka_2.10 and switch to branch 0.8
>./sbt "++2.10.0 package"
>./sbt +package
Go back to the official git URL and try this command
>./sbt "++2.9.2 publish-local"
"org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1",
Error Message
java.lang.ClassNotFoundException: scala.reflect.ClassManifest
Solution:
I do not know how to fix this right now.
I will leave this problem for the future. Just record some codes I have wrote.
package com.sillycat.superduty.jobs.producer
import java.util.Properties
import kafka.javaapi.producer.Producer
import kafka.producer.{ KeyedMessage, ProducerConfig }
object NewTaskKafka extends App {
val props2: Properties = new Properties()
props2.put("zk.connect", "localhost:2181")
props2.put("metadata.broker.list", "localhost:9092");
props2.put("serializer.class", "kafka.serializer.StringEncoder")
props2.put("zk.connectiontimeout.ms", "15000")
val config: ProducerConfig = new ProducerConfig(props2)
val producer: Producer[String, String] = new Producer[String, String](config)
val data = new KeyedMessage[String, String]("campaign", "test-message, it is ok")
producer.send(data)
producer.close
}
package com.sillycat.superduty.jobs.consumer
import kafka.api.{ FetchRequestBuilder, FetchRequest }
import kafka.javaapi.consumer.SimpleConsumer
import kafka.javaapi.FetchResponse
import kafka.javaapi.message.ByteBufferMessageSet
import scala.collection.JavaConversions._
import java.nio.ByteBuffer
import com.typesafe.scalalogging.slf4j.Logging
object WorkerKafka extends App with Logging {
val simpleConsumer: SimpleConsumer = new SimpleConsumer("localhost", 9092, 10000, 1024000, "worker")
val req: FetchRequest = new FetchRequestBuilder()
.clientId("worker")
.addFetch("test", 0, 0L, 100)
.build()
while (true) {
val fetchResponse: FetchResponse = simpleConsumer.fetch(req)
val messages: ByteBufferMessageSet = fetchResponse.messageSet("test", 0)
messages foreach { msg =>
val buffer: ByteBuffer = msg.message.payload
val messages = new Array[Byte](buffer.remaining())
val bytes = ByteBuffer.wrap(messages)
logger.debug("message=" + bytes.toString)
}
}
}
Solution:
Same codes. try to update the package as follow.
"org.apache.kafka" % "kafka_2.10" % "0.8.0" intransitive(),
"com.yammer.metrics" % "metrics-core" % "2.2.0",
Since I am using the latest version of kafka, I need to configure the auto create topic or manually create topic.
I prefer manually.
Create topic
>bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
List topic
>bin/kafka-list-topic.sh --zookeeper localhost:2181
Send and Consumer the message
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
That is cool.
Later, I will try the scala client and multiple nodes.
References:
Kafka 1~2
http://sillycat.iteye.com/blog/1563312
http://sillycat.iteye.com/blog/1563314
http://kafka.apache.org/
kafka cluster
http://www.jonzobrist.com/2012/04/17/install-apache-kafka-and-zookeeper-on-ubuntu-10-04/
http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-node/
http://kafka.apache.org/08/quickstart.html
zookeeper
http://sillycat.iteye.com/blog/1556108
http://sillycat.iteye.com/blog/1556141
http://rdc.taobao.com/team/jm/archives/665
http://blog.javachen.com/hadoop/2013/08/23/publish-proerties-using-zookeeper/
http://rdc.taobao.com/team/jm/archives/tag/zookeeper
https://github.com/alibaba/taokeeper
- 浏览: 2560629 次
- 性别:
- 来自: 成都
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
发表评论
-
Update Site will come soon
2021-06-02 04:10 1686I am still keep notes my tech n ... -
Hadoop Docker 2019 Version 3.2.1
2019-12-10 07:39 304Hadoop Docker 2019 Version 3.2. ... -
Nginx and Proxy 2019(1)Nginx Enable Lua and Parse JSON
2019-12-03 04:17 462Nginx and Proxy 2019(1)Nginx En ... -
Data Solution 2019(13)Docker Zeppelin Notebook and Memory Configuration
2019-11-09 07:15 307Data Solution 2019(13)Docker Ze ... -
Data Solution 2019(10)Spark Cluster Solution with Zeppelin
2019-10-29 08:37 259Data Solution 2019(10)Spark Clu ... -
AMAZON Kinesis Firehose 2019(1)Firehose Buffer to S3
2019-10-01 10:15 336AMAZON Kinesis Firehose 2019(1) ... -
Rancher and k8s 2019(3)Clean Installation on CentOS7
2019-09-19 23:25 330Rancher and k8s 2019(3)Clean In ... -
Pacemaker 2019(1)Introduction and Installation on CentOS7
2019-09-11 05:48 356Pacemaker 2019(1)Introduction a ... -
Crontab-UI installation and Introduction
2019-08-30 05:54 467Crontab-UI installation and Int ... -
Spiderkeeper 2019(1)Installation and Introduction
2019-08-29 06:49 524Spiderkeeper 2019(1)Installatio ... -
Supervisor 2019(2)Ubuntu and Multiple Services
2019-08-19 10:53 382Supervisor 2019(2)Ubuntu and Mu ... -
Supervisor 2019(1)CentOS 7
2019-08-19 09:33 340Supervisor 2019(1)CentOS 7 Ins ... -
Redis Cluster 2019(3)Redis Cluster on CentOS
2019-08-17 04:07 380Redis Cluster 2019(3)Redis Clus ... -
Amazon Lambda and Version Limit
2019-08-02 01:42 447Amazon Lambda and Version Limit ... -
MySQL HA Solution 2019(1)Master Slave on MySQL 5.7
2019-07-27 22:26 543MySQL HA Solution 2019(1)Master ... -
RabbitMQ Cluster 2019(2)Cluster HA and Proxy
2019-07-11 12:41 471RabbitMQ Cluster 2019(2)Cluster ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:35 328Running Zeppelin with Nginx Aut ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:34 330Running Zeppelin with Nginx Aut ... -
ElasticSearch(3)Version Upgrade and Cluster
2019-05-20 05:00 333ElasticSearch(3)Version Upgrade ... -
Jetty Server and Cookie Domain Name
2019-04-28 23:59 413Jetty Server and Cookie Domain ...
相关推荐
标题中的“kafka2.4.0+zookeeper+kafka-connect集成环境包”指的是一个包含了Apache Kafka 2.4.0版本、ZooKeeper以及Kafka Connect的完整集成环境。这个安装包是为了方便用户一次性安装和配置这三个关键组件,用于...
**Kafka与Zookeeper简介** Kafka是一款高吞吐量的分布式消息系统,由LinkedIn开发并贡献给Apache软件基金会。它的设计目标是成为一个实时、可扩展、持久化且具有容错性的消息中间件,用于处理大规模的数据流。Kafka...
《Kafka与Zookeeper在消息中间件中的应用与整合》 Kafka和Zookeeper是两个在分布式系统中广泛应用的关键组件,特别是在消息中间件领域。Kafka是一个高效、可扩展、持久化的发布/订阅消息系统,而Zookeeper则是一个...
标题中的“kafka-zookeeper.7z”是一个包含Kafka和Zookeeper安装文件的压缩包。Kafka是一款高吞吐量的分布式消息系统,而Zookeeper是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的必备组件,提供命名...
3. **Kafka与Zookeeper的协作**: - **集群发现**:Kafka的生产者和消费者都需要知道哪些服务器是可用的,Zookeeper提供这个信息。 - **偏移量管理**:消费者组内的每个消费者保存其消费到的主题分区的偏移量,...
`kafka_2.12-2.1.1.tgz`是Kafka的一个版本,它基于Scala语言(这里2.12指的是Scala版本号)。Kafka的核心特性包括发布/订阅消息模型、持久化存储、高吞吐量和低延迟。安装Kafka涉及解压文件,配置`server.properties...
在实际应用中,你可能需要编写自定义的生产者和消费者程序,使用Kafka的Java或Scala API。这涉及到更深入的知识,如KafkaProducer和KafkaConsumer类的使用,以及如何处理错误和异常。 总的来说,掌握Linux下Kafka和...
在构建大数据处理环境时,Hadoop集群是核心基础,而`zookeeper3.4.12+hbase1.4.4+sqoop1.4.7+kafka2.10`这一组合则提供了集群中不可或缺的组件。让我们逐一探讨这些组件的功能、作用以及它们之间的协同工作。 **...
根据提供的标题、描述、标签及部分内容链接,我们可以推断出这是一个关于大数据技术栈的培训课程,涉及的技术包括Hadoop、HBase、Zookeeper、Spark、Kafka、Scala以及Ambari。下面将针对这些技术进行详细的介绍和...
`kafka_2.10-0.10.2.0.tgz`这个文件是Kafka的一个版本,适用于Scala 2.10和Kafka 0.10.2.0。在部署Kafka时,我们需要设置环境变量,创建 Zookeeper 配置,然后启动Kafka服务器。 Zookeeper与Kafka的结合使用是常见...
本项目旨在搭建一套完整的Spark集群环境,包括Hadoop HA(高可用)、HBase、Phoenix、Kafka、Flume、Zookeeper以及Scala等多个组件的集成。这样的环境适用于大规模的数据处理与分析任务,能够有效地支持实时数据流...
项目核心采用Spark进行批处理与流处理,整合了ZooKeeper和Kafka以增强分布式计算和数据流管理能力。文件类型多样,包括175个class文件,109个crc校验文件,82个Parquet数据文件,以及67个Scala源码文件等。 项目...
基于 Zookeeper 搭建 Hadoop 高可用集群 二、Hive 简介及核心概念 Linux 环境下 Hive 的安装部署 CLI 和 Beeline 命令行的基本使用 常用 DDL 操作 分区表和分桶表 视图和索引 常用 DML 操作 数据查询详解 三、Spark ...
3. **配置Kafka**: 修改`config/server.properties`文件,设置Kafka服务器的相关参数,如broker.id、zookeeper.connect等。 4. **启动Kafka**: 使用`bin/kafka-server-start.sh config/server.properties`命令启动...
这个版本号表示Kafka是针对JDK 1.8及更高版本(因为2.12对应的是Java 8的版本号),并且其核心库是用Scala 2.12编译的。2.5.1是Kafka的稳定版本,包含了各种性能优化和新特性。 “apache-zookeeper-3.5.8”则是...
3. Akka整合:Scala社区的Akka框架可以无缝集成Kafka,提供强大的并发处理和Actor模型,非常适合构建分布式系统。 四、Kafka实战——创建和消费主题 1. 创建主题:使用Kafka的命令行工具`kafka-topics.sh`创建主题...
2. **Kafka**: 提供的`kafka_2.11-1.1.1.tgz`是Kafka的二进制包,适用于Scala 2.11的环境。这个版本的Kafka支持消息的持久化和高可用性,是企业级应用的常见选择。 安装步骤如下: **1. 安装ZooKeeper** - 解压`...
3. 客户端连接管理:ZooKeeper帮助客户端发现Kafka集群的地址,实现服务的动态发现。 总的来说,这个压缩包可能包含了安装和配置ZooKeeper和Kafka的必要文件,对于需要构建分布式环境或进行大数据处理的用户来说,...
2.12-2.9.0是Kafka的一个特定发行版,其中2.12代表它支持Scala 2.12编译器,而2.9.0则是Kafka的版本号。 描述中的“含单机伪分布式配置”意味着这个压缩包包含了使Kafka在单台机器上模拟分布式环境的配置文件。在...
Scala是一种多范式的编程语言,Kafka的API是用Scala编写的,因此这个数字表示的是与哪个版本的Scala API兼容。在这个例子中,是Scala 2.13。 - `3.6.2` 是Kafka本身的版本号。每个版本都会包含新功能、性能优化、bug...