`
sillycat
  • 浏览: 2551943 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Kafka(2)Install ubuntu and Try more JAVA client

 
阅读更多
Kafka(2)Install ubuntu and Try more JAVA client

1. Try to setup this on windows.
download and install this file
http://scalasbt.artifactoryonline.com/scalasbt/sbt-native-packages/org/scala-sbt/sbt-launcher/0.11.3/sbt.msi

Unzip the kafka to working directory:
D:\tool\kafka-0.7.0
>sbt update
>sbt package

sbt is installed on windows, but still, it is hard to install kafka on windows

2. Try to setup on ubuntu12.04
>wget http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
>tar zxvf kafka-0.7.0-incubating-src.tar.gz
>mv kafka-0.7.0-incubating-src /opt/tools/kafka-0.7.0
>cd /opt/tools/kafka-0.7.0
>./sbt update
>./sbt package

start the server
>bin/zookeeper-server-start.sh config/zookeeper.properties
>bin/kafka-server-start.sh config/server.properties


3. Fix the Java Client Problem
Error Message:
[2012-06-11 17:55:00,109] WARN Exception causing close of session 0x137daf68ab70001 due to java.io.IOException: Connection reset by peer (org.apache.zookeeper.server.NIOServerCnxn)
[2012-06-11 17:55:00,110] INFO Closed socket connection for client /192.168.56.1:62003 which had sessionid 0x137daf68ab70001 (org.apache.zookeeper.server.NIOServerCnxn)

Solution:
server.properties
# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
# may not be what you want.
hostname=x.x.x.x

#zk.connect=localhost:2181
zk.connect=x.x.x.x:2181

# Timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.sessiontimeout.ms=60000

zookeeper.properties
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
tickTime=8000

We need to use real ip address here in configuration.

The Java Client sample codes are under this directory: D:\book\distributed\kafka-0.7.0-incubating-src\examples\src\main\java\kafka\examples

The class will be as follow:
package com.sillycat.magicneptune.example;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.ProducerConfig;

public class TestProducerMain {

public static void main(String[] args) {
Properties props2 = new Properties();
props2.put("zk.connect", "192.168.56.101:2181");
props2.put("serializer.class", "kafka.serializer.StringEncoder");
// This is added by myself for changing the default timeout 6000.
props2.put("zk.connectiontimeout.ms", "15000");
ProducerConfig config = new ProducerConfig(props2);
Producer<String, String> producer = new Producer<String, String>(config);

// The message is sent to a randomly selected partition registered in ZK
ProducerData<String, String> data = new ProducerData<String, String>(
"test", "test-message,it is ok now.adsfasdf1111222");
producer.send(data);
producer.close();
}
}

package com.sillycat.magicneptune.example;

import java.net.InetAddress;
import java.net.UnknownHostException;

import kafka.api.FetchRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;

public class TestConsumerMain {

public static void main(String[] args) {


try {
System.out.println(InetAddress.getLocalHost().getHostAddress());
} catch (UnknownHostException e) {
e.printStackTrace();
}
SimpleConsumer consumer = new SimpleConsumer("192.168.56.101", 9092, 10000,
1024000);

long offset = 0;
while (true) {
// create a fetch request for topic   test  , partition 0, current
// offset, and fetch size of 1MB
FetchRequest fetchRequest = new FetchRequest("test", 0, offset,
1000000);

// get the message set from the consumer and print them out
ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
for (MessageAndOffset msg : messages) {
System.out.println(ExampleUtils.getMessage(msg.message()) + "offset=" + offset);
// advance the offset after consuming each message
offset = msg.offset();
}
}
//consumer.close();
}

}


package com.sillycat.magicneptune.example;

import java.nio.ByteBuffer;

import kafka.message.Message;

public class ExampleUtils
{
  public static String getMessage(Message message)
  {
    ByteBuffer buffer = message.payload();
    byte [] bytes = new byte[buffer.remaining()];
    buffer.get(bytes);
    return new String(bytes);
  }
}


references:
http://www.jonzobrist.com/2012/04/17/install-apache-kafka-and-zookeeper-on-ubuntu-10-04/
https://github.com/harrah/xsbt/wiki/Getting-Started-Setup
http://incubator.apache.org/kafka/faq.html
http://incubator.apache.org/kafka/quickstart.html
http://blog.sina.com.cn/s/blog_3fe961ae01011o4z.html
http://incubator.apache.org/kafka/faq.html

分享到:
评论

相关推荐

    kafka-java-demo 基于java的kafka生产消费者示例

    在Java中使用Kafka,我们需要依赖Apache Kafka的Java客户端库。这个库提供了Producer和Consumer接口,使得我们能够方便地编写生产消息和消费消息的Java应用。在"Kafka-java-demo"中,你将看到如何使用这些接口来实现...

    kafka-schema-registry-client-6.2.2.jar

    mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=6.2.2 -Dfile=/root/kafka-schema-registry-client-6.2.2.jar -Dpackaging=jar 官网下载地址 packages....

    Kafka中生产者和消费者java实现

    本篇文章将深入探讨如何在Java环境中使用IDEA,通过Maven构建工具来实现Kafka的生产者和消费者。 首先,我们需要设置项目环境。使用IntelliJ IDEA创建一个新的Java Maven项目,然后在pom.xml文件中添加Kafka相关的...

    kafka的java依赖包

    在Java开发环境中,Kafka作为一个分布式流处理平台,被广泛用于构建实时数据管道和流应用。这个"Kafka的Java依赖包"包含了所有你需要在Java项目中与Kafka进行本地交互所需的jar包。这些jar包提供了完整的API,使得...

    kafka-java-demo 基于java的kafka生产消费者例子

    在本文中,我们将深入探讨基于Java的Kafka生产者与消费者的实现,这主要围绕着"Kafka-java-demo"项目展开。Kafka是一个分布式流处理平台,由LinkedIn开发并开源,现在是Apache软件基金会的一部分。它被广泛用于实时...

    kafka_linux_自动安装脚本(基于环境_node_+_java)_kafka-install-package.zip

    kafka_linux_自动安装脚本(基于环境_node_+_java)_kafka-install-package

    java开发kafka-clients所需要的所有jar包以及源码

    Java开发Kafka客户端是构建基于Apache Kafka的应用程序的关键步骤,Kafka-clients库提供了与Kafka服务器进行交互的API,支持生产者和消费者的实现。在Java中使用Kafka-clients,你需要包含相应的jar包,这些包包含了...

    kafka的java的jar包

    Kafka主要用Java编写,因此在Java环境中与Kafka交互时,通常需要引入相关的jar包。"kafka的java的jar包"就是用于Java应用程序与Kafka进行通信的一系列库文件集合。 Kafka主要提供了消息队列的功能,能够高效地处理...

    kafka java 下载的jar

    在Java开发环境中,Apache Kafka是一个不可或缺的分布式流处理平台,常用于构建实时数据管道和流应用。Kafka的Java客户端库使得Java开发者能够方便地与Kafka集群进行交互,包括生产消息、消费消息以及管理主题等操作...

    java语言kafka数据批量导入到Elasticsearch实例

    消费kafka数据,然后批量导入到Elasticsearch,本例子使用的kafka版本0.10,es版本是6.4,使用bulk方式批量导入到es中,也可以一条一条的导入,不过比较慢。 &lt;groupId&gt;org.elasticsearch &lt;artifactId&gt;elastic...

    kafka环境搭建

    Kafka 需要 Java 环境来运行,因此需要安装 Java。可以从 Oracle 官方网站下载 Java 安装包,然后按照安装向导进行安装。 2. 安装 Python 环境 Kafka 的一些工具使用 Python脚本,因此需要安装 Python 环境。这里...

    kafka学习代码(java开发kafka)

    在本文中,我们将深入探讨如何使用Java进行Kafka开发,主要基于提供的" kafka-study "压缩包中的学习代码。Kafka是由Apache开发的分布式流处理平台,它被广泛用于构建实时数据管道和流应用程序。Java是Kafka客户端库...

    kafka集群Java开发jar包

    在Java中与Kafka交互通常需要使用到`kafka-clients`和`zookeeper-client`相关的jar包。 标题中的"Kafka集群Java开发jar包"指的是使用Java进行Kafka集群的开发工作,这通常涉及到创建生产者、消费者以及管理Kafka...

    Kafka最新Ubuntu安装包

    kafka_2.13-3.5.0.tgz 是Apache Kafka的一个特定版本,针对Scala 2.13构建。Apache Kafka是一个开源的流处理平台,主要用于构建实时流数据管道和应用程序。它设计用于水平扩展性、高吞吐量和容错性,已被广泛应用于...

    java实现flink订阅Kerberos认证的Kafka消息示例源码

    在Java中实现Flink订阅Kerberos认证的Kafka消息是一项关键任务,特别是在处理安全敏感的数据流时。本文将深入探讨这一主题,介绍如何利用Apache Flink与Kafka的集成,以及如何通过Kerberos进行身份验证。 首先,...

    kafka-schema-registry-client-3.2.0.jar

    kafka-schema-registry-client-3.2.0.jar包,亲测可用,在aliyun仓库内找不到,可以下载此jar包来进行手动安装

    java操作kafka

    Java操作Kafka主要涉及到两个核心角色:生产者和消费者,它们是Kafka生态系统中的基本组件。Kafka是一个分布式流处理平台,由LinkedIn开发并贡献给了Apache软件基金会。它被广泛用于构建实时数据管道和流应用,能够...

    Packt.Learning.Apache.Kafka.2nd.Edition

    how to install and build Kafka 0.8.x using different versions of Scala. Chapter 2, Setting Up a Kafka Cluster, describes the steps required to set up a single- or multi-broker Kafka cluster and shares...

    kafka java maven例子

    2. **Java API**: Kafka提供了Java客户端API,使得开发者能够轻松地在Java应用中集成Kafka。这些API包括Producer API(生产者)和Consumer API(消费者),用于发送和接收消息。 3. **Maven**: Maven是一个项目管理...

    2、java调用kafka api

    在Java中调用Apache Kafka API是一项关键任务,特别是在构建分布式数据处理系统时。Apache Kafka是一个高性能、可扩展的消息队列(MQ),它提供实时的数据流处理能力。在Java应用程序中集成Kafka API可以让开发者...

Global site tag (gtag.js) - Google Analytics