`
sillycat
  • 浏览: 2560378 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Kafka(3)Latest Zookeeper and Kafka With Scala

 
阅读更多

Kafka(3)Latest Zookeeper and Kafka With Scala 

1. Install the latest Zookeeper
http://zookeeper.apache.org/doc/r3.4.5/

Download and get the latest file zookeeper-3.4.5.tar.gz
Soft link the directory
>sudo ln -s /Users/carl/tool/zookeeper-3.4.5 /opt/zookeeper-3.4.5
>sudo ln -s /opt/zookeeper-3.4.5 /opt/zookeeper

Add this to the system path
>vi ~/.profile
export PATH=/opt/zookeeper/bin:$PATH
>. ~/.profile

Put the default configuration file
>cp conf/zoo_sample.cfg conf/zoo.cfg

And start the server like this
>zkServer.sh start zoo.cfg

Use JPS to check if the server is running
>jps
1957
10014 QuorumPeerMain
2260
10050 Jps

Connecting with client
>zkCli.sh -server localhost:2181
zookeeper>help
zookeeper>quit

Stop the server
>zkServer.sh stop

There are status, restart, upgrade, start, stop...

Configure to Cluster as follow
zoo1.cfg
dataDir=/tmp/zookeeper/zoo1 
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
clientPort=2181 

zoo2.cfg
dataDir=/tmp/zookeeper/zoo2  
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
clientPort=2182

zoo3.cfg
dataDir=/tmp/zookeeper/zoo3  
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
clientPort=2183 

>vi /tmp/zookeeper/zoo1/myid
1
>vi /tmp/zookeeper/zoo2/myid
2
>vi /tmp/zookeeper/zoo3/myid
3

Start 3 nodes
>zkServer.sh start zoo1.cfg
>zkServer.sh start zoo2.cfg
>zkServer.sh start zoo3.cfg

>jps

Or use client to connect
>zkCli.sh -server localhost:2181

>zkServer.sh stop zoo1.cfg
>zkServer.sh stop zoo2.cfg
>zkServer.sh stop zoo3.cfg

2. Working with Latest kafka
Download the latest source package from here https://dist.apache.org/repos/dist/release/kafka/kafka-0.8.0-beta1-src.tgz

>mv kafka-0.8.0-beta1-src kafka-0.8.0-beta1
>cd /Users/carl/tool/kafka-0.8.0-beta1
>./sbt clean
>./sbt update
>./sbt package
>./sbt assembly-package-dependency

>sudo ln -s /Users/carl/tool/kafka-0.8.0-beta1 /opt/kafka-0.8.0-beta1
>sudo ln -s /opt/kafka-0.8.0-beta1 /opt/kafka

>sudo vi ~/.profile
export PATH=/opt/kafka/bin:$PATH 
>. ~/.profile

>kafka-server-start.sh config/server.properties

Start the zookeeper first.

Today is Feb-11-2014, the latest version is this one http://apache.osuosl.org/kafka/0.8.0/kafka-0.8.0-src.tgz

3. Find the Jar and Prepare the Scala Client 
>./sbt release-zip 
>./sbt publish-local
Find the dependencies from here
>cat /opt/kafka/core/build.sbt

Add these 2 lines
        "com.101tec"          %   "zkclient"                  % "0.3",
        "org.apache.kafka"    %   "kafka_2.8.0"               % "0.8.0-beta1"

Try to fetch the latest codes
>git clone https://github.com/apache/kafka.git
>git clone https://github.com/kevinwright/kafka.git kafka_2.10

Or try this command directly on the official codes
>./sbt "++2.9.2 package" 
>./sbt "++2.9.2 publish-local"

It is not working for 
>./sbt "++2.10.0 package"
So I change the build.sbt
>vi core/build.sbt
case "2.10.0"=> "org.scalatest" %  "scalatest_2.10" % "1.9.1" % "test"

Have other error. So try with the kafka_2.10 and switch to branch 0.8
>./sbt "++2.10.0 package"
>./sbt +package
 
Go back to the official git URL and try this command
>./sbt "++2.9.2 publish-local"

"org.apache.kafka"    %   "kafka_2.9.2"               % "0.8.0-beta1", 

Error Message
java.lang.ClassNotFoundException: scala.reflect.ClassManifest 

Solution:
I do not know how to fix this right now.

I will leave this problem for the future. Just record some codes I have wrote.

package com.sillycat.superduty.jobs.producer

import java.util.Properties

import kafka.javaapi.producer.Producer
import kafka.producer.{ KeyedMessage, ProducerConfig }

object NewTaskKafka extends App {
  val props2: Properties = new Properties()
  props2.put("zk.connect", "localhost:2181")
  props2.put("metadata.broker.list", "localhost:9092");
  props2.put("serializer.class", "kafka.serializer.StringEncoder")
  props2.put("zk.connectiontimeout.ms", "15000")

  val config: ProducerConfig = new ProducerConfig(props2)

  val producer: Producer[String, String] = new Producer[String, String](config)

  val data = new KeyedMessage[String, String]("campaign", "test-message, it is ok")
  producer.send(data)
  producer.close
}

package com.sillycat.superduty.jobs.consumer

import kafka.api.{ FetchRequestBuilder, FetchRequest }
import kafka.javaapi.consumer.SimpleConsumer
import kafka.javaapi.FetchResponse
import kafka.javaapi.message.ByteBufferMessageSet
import scala.collection.JavaConversions._
import java.nio.ByteBuffer
import com.typesafe.scalalogging.slf4j.Logging

object WorkerKafka extends App with Logging {
  val simpleConsumer: SimpleConsumer = new SimpleConsumer("localhost", 9092, 10000, 1024000, "worker")

  val req: FetchRequest = new FetchRequestBuilder()
    .clientId("worker")
    .addFetch("test", 0, 0L, 100)
    .build()

  while (true) {
    val fetchResponse: FetchResponse = simpleConsumer.fetch(req)
    val messages: ByteBufferMessageSet = fetchResponse.messageSet("test", 0)

    messages foreach { msg =>
      val buffer: ByteBuffer = msg.message.payload
      val messages = new Array[Byte](buffer.remaining())
      val bytes = ByteBuffer.wrap(messages)
      logger.debug("message=" + bytes.toString)
    }
  }

}

Solution:
Same codes. try to update the package as follow.
"org.apache.kafka"          %   "kafka_2.10"            % "0.8.0" intransitive(),
"com.yammer.metrics"        %   "metrics-core"          % "2.2.0",

Since I am using the latest version of kafka, I need to configure the auto create topic or manually create topic.
I prefer manually.
Create topic
>bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test

List topic
>bin/kafka-list-topic.sh --zookeeper localhost:2181

Send and Consumer the message
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

That is cool.
Later, I will try the scala client and multiple nodes.

References:
Kafka 1~2
http://sillycat.iteye.com/blog/1563312
http://sillycat.iteye.com/blog/1563314

http://kafka.apache.org/

kafka cluster
http://www.jonzobrist.com/2012/04/17/install-apache-kafka-and-zookeeper-on-ubuntu-10-04/
http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-node/
http://kafka.apache.org/08/quickstart.html

zookeeper
http://sillycat.iteye.com/blog/1556108
http://sillycat.iteye.com/blog/1556141
http://rdc.taobao.com/team/jm/archives/665
http://blog.javachen.com/hadoop/2013/08/23/publish-proerties-using-zookeeper/
http://rdc.taobao.com/team/jm/archives/tag/zookeeper
https://github.com/alibaba/taokeeper

分享到:
评论

相关推荐

    kafka2.4.0+zookeeper+kafka-connect集成环境包

    标题中的“kafka2.4.0+zookeeper+kafka-connect集成环境包”指的是一个包含了Apache Kafka 2.4.0版本、ZooKeeper以及Kafka Connect的完整集成环境。这个安装包是为了方便用户一次性安装和配置这三个关键组件,用于...

    Kafka压缩包和zookeeper压缩包

    **Kafka与Zookeeper简介** Kafka是一款高吞吐量的分布式消息系统,由LinkedIn开发并贡献给Apache软件基金会。它的设计目标是成为一个实时、可扩展、持久化且具有容错性的消息中间件,用于处理大规模的数据流。Kafka...

    kafka+zookeeper

    《Kafka与Zookeeper在消息中间件中的应用与整合》 Kafka和Zookeeper是两个在分布式系统中广泛应用的关键组件,特别是在消息中间件领域。Kafka是一个高效、可扩展、持久化的发布/订阅消息系统,而Zookeeper则是一个...

    kafka-zookeeper.7z

    标题中的“kafka-zookeeper.7z”是一个包含Kafka和Zookeeper安装文件的压缩包。Kafka是一款高吞吐量的分布式消息系统,而Zookeeper是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的必备组件,提供命名...

    kafka_2.12-2.2.2+zookeeper-3.4.13

    3. **Kafka与Zookeeper的协作**: - **集群发现**:Kafka的生产者和消费者都需要知道哪些服务器是可用的,Zookeeper提供这个信息。 - **偏移量管理**:消费者组内的每个消费者保存其消费到的主题分区的偏移量,...

    kafka以及依赖安装包jdk+zookeeper+kafka

    `kafka_2.12-2.1.1.tgz`是Kafka的一个版本,它基于Scala语言(这里2.12指的是Scala版本号)。Kafka的核心特性包括发布/订阅消息模型、持久化存储、高吞吐量和低延迟。安装Kafka涉及解压文件,配置`server.properties...

    Linux下的Kafka+Zookeeper使用以及配置

    在实际应用中,你可能需要编写自定义的生产者和消费者程序,使用Kafka的Java或Scala API。这涉及到更深入的知识,如KafkaProducer和KafkaConsumer类的使用,以及如何处理错误和异常。 总的来说,掌握Linux下Kafka和...

    zookeeper3.4.12+hbase1.4.4+sqoop1.4.7+kafka2.10

    在构建大数据处理环境时,Hadoop集群是核心基础,而`zookeeper3.4.12+hbase1.4.4+sqoop1.4.7+kafka2.10`这一组合则提供了集群中不可或缺的组件。让我们逐一探讨这些组件的功能、作用以及它们之间的协同工作。 **...

    徐老师大数据培训Hadoop+HBase+ZooKeeper+Spark+Kafka+Scala+Ambari

    根据提供的标题、描述、标签及部分内容链接,我们可以推断出这是一个关于大数据技术栈的培训课程,涉及的技术包括Hadoop、HBase、Zookeeper、Spark、Kafka、Scala以及Ambari。下面将针对这些技术进行详细的介绍和...

    zookeeper、kafka

    `kafka_2.10-0.10.2.0.tgz`这个文件是Kafka的一个版本,适用于Scala 2.10和Kafka 0.10.2.0。在部署Kafka时,我们需要设置环境变量,创建 Zookeeper 配置,然后启动Kafka服务器。 Zookeeper与Kafka的结合使用是常见...

    spark环境安装(Hadoop HA+Hbase+phoneix+kafka+flume+zookeeper+spark+scala)

    本项目旨在搭建一套完整的Spark集群环境,包括Hadoop HA(高可用)、HBase、Phoenix、Kafka、Flume、Zookeeper以及Scala等多个组件的集成。这样的环境适用于大规模的数据处理与分析任务,能够有效地支持实时数据流...

    大数据处理利器:Spark+ZooKeeper+Kafka Scala源码示例

    项目核心采用Spark进行批处理与流处理,整合了ZooKeeper和Kafka以增强分布式计算和数据流管理能力。文件类型多样,包括175个class文件,109个crc校验文件,82个Parquet数据文件,以及67个Scala源码文件等。 项目...

    Hadoop+Hive+Spark+Kafka+Zookeeper+Flume+Sqoop+Azkaban+Scala

    基于 Zookeeper 搭建 Hadoop 高可用集群 二、Hive 简介及核心概念 Linux 环境下 Hive 的安装部署 CLI 和 Beeline 命令行的基本使用 常用 DDL 操作 分区表和分桶表 视图和索引 常用 DML 操作 数据查询详解 三、Spark ...

    kafka开发及安装工具包

    3. **配置Kafka**: 修改`config/server.properties`文件,设置Kafka服务器的相关参数,如broker.id、zookeeper.connect等。 4. **启动Kafka**: 使用`bin/kafka-server-start.sh config/server.properties`命令启动...

    kafka_2.12-2.5.1&&apache-zookeeper-3.5.8

    这个版本号表示Kafka是针对JDK 1.8及更高版本(因为2.12对应的是Java 8的版本号),并且其核心库是用Scala 2.12编译的。2.5.1是Kafka的稳定版本,包含了各种性能优化和新特性。 “apache-zookeeper-3.5.8”则是...

    kafka_2.12-2.5.0 s scala-2-12.11.tg

    3. Akka整合:Scala社区的Akka框架可以无缝集成Kafka,提供强大的并发处理和Actor模型,非常适合构建分布式系统。 四、Kafka实战——创建和消费主题 1. 创建主题:使用Kafka的命令行工具`kafka-topics.sh`创建主题...

    kakfa,kafka集群安装部署全量安装包

    2. **Kafka**: 提供的`kafka_2.11-1.1.1.tgz`是Kafka的二进制包,适用于Scala 2.11的环境。这个版本的Kafka支持消息的持久化和高可用性,是企业级应用的常见选择。 安装步骤如下: **1. 安装ZooKeeper** - 解压`...

    zookeeper-3.5.8-kafka_2.11-2.4.1.rar

    3. 客户端连接管理:ZooKeeper帮助客户端发现Kafka集群的地址,实现服务的动态发现。 总的来说,这个压缩包可能包含了安装和配置ZooKeeper和Kafka的必要文件,对于需要构建分布式环境或进行大数据处理的用户来说,...

    windows下kafka_2.12-2.9.0.rar(含单机伪分布式配置)

    2.12-2.9.0是Kafka的一个特定发行版,其中2.12代表它支持Scala 2.12编译器,而2.9.0则是Kafka的版本号。 描述中的“含单机伪分布式配置”意味着这个压缩包包含了使Kafka在单台机器上模拟分布式环境的配置文件。在...

    kafka安装包-2.13-3.6.2

    Scala是一种多范式的编程语言,Kafka的API是用Scala编写的,因此这个数字表示的是与哪个版本的Scala API兼容。在这个例子中,是Scala 2.13。 - `3.6.2` 是Kafka本身的版本号。每个版本都会包含新功能、性能优化、bug...

Global site tag (gtag.js) - Google Analytics