- 浏览: 2551943 次
- 性别:
- 来自: 成都
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
Kafka(2)Install ubuntu and Try more JAVA client
1. Try to setup this on windows.
download and install this file
http://scalasbt.artifactoryonline.com/scalasbt/sbt-native-packages/org/scala-sbt/sbt-launcher/0.11.3/sbt.msi
Unzip the kafka to working directory:
D:\tool\kafka-0.7.0
>sbt update
>sbt package
sbt is installed on windows, but still, it is hard to install kafka on windows
2. Try to setup on ubuntu12.04
>wget http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
>tar zxvf kafka-0.7.0-incubating-src.tar.gz
>mv kafka-0.7.0-incubating-src /opt/tools/kafka-0.7.0
>cd /opt/tools/kafka-0.7.0
>./sbt update
>./sbt package
start the server
>bin/zookeeper-server-start.sh config/zookeeper.properties
>bin/kafka-server-start.sh config/server.properties
3. Fix the Java Client Problem
Error Message:
[2012-06-11 17:55:00,109] WARN Exception causing close of session 0x137daf68ab70001 due to java.io.IOException: Connection reset by peer (org.apache.zookeeper.server.NIOServerCnxn)
[2012-06-11 17:55:00,110] INFO Closed socket connection for client /192.168.56.1:62003 which had sessionid 0x137daf68ab70001 (org.apache.zookeeper.server.NIOServerCnxn)
Solution:
server.properties
# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
# may not be what you want.
hostname=x.x.x.x
#zk.connect=localhost:2181
zk.connect=x.x.x.x:2181
# Timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.sessiontimeout.ms=60000
zookeeper.properties
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
tickTime=8000
We need to use real ip address here in configuration.
The Java Client sample codes are under this directory: D:\book\distributed\kafka-0.7.0-incubating-src\examples\src\main\java\kafka\examples
The class will be as follow:
package com.sillycat.magicneptune.example;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.ProducerConfig;
public class TestProducerMain {
public static void main(String[] args) {
Properties props2 = new Properties();
props2.put("zk.connect", "192.168.56.101:2181");
props2.put("serializer.class", "kafka.serializer.StringEncoder");
// This is added by myself for changing the default timeout 6000.
props2.put("zk.connectiontimeout.ms", "15000");
ProducerConfig config = new ProducerConfig(props2);
Producer<String, String> producer = new Producer<String, String>(config);
// The message is sent to a randomly selected partition registered in ZK
ProducerData<String, String> data = new ProducerData<String, String>(
"test", "test-message,it is ok now.adsfasdf1111222");
producer.send(data);
producer.close();
}
}
package com.sillycat.magicneptune.example;
import java.net.InetAddress;
import java.net.UnknownHostException;
import kafka.api.FetchRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
public class TestConsumerMain {
public static void main(String[] args) {
try {
System.out.println(InetAddress.getLocalHost().getHostAddress());
} catch (UnknownHostException e) {
e.printStackTrace();
}
SimpleConsumer consumer = new SimpleConsumer("192.168.56.101", 9092, 10000,
1024000);
long offset = 0;
while (true) {
// create a fetch request for topic test , partition 0, current
// offset, and fetch size of 1MB
FetchRequest fetchRequest = new FetchRequest("test", 0, offset,
1000000);
// get the message set from the consumer and print them out
ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
for (MessageAndOffset msg : messages) {
System.out.println(ExampleUtils.getMessage(msg.message()) + "offset=" + offset);
// advance the offset after consuming each message
offset = msg.offset();
}
}
//consumer.close();
}
}
package com.sillycat.magicneptune.example;
import java.nio.ByteBuffer;
import kafka.message.Message;
public class ExampleUtils
{
public static String getMessage(Message message)
{
ByteBuffer buffer = message.payload();
byte [] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes);
}
}
references:
http://www.jonzobrist.com/2012/04/17/install-apache-kafka-and-zookeeper-on-ubuntu-10-04/
https://github.com/harrah/xsbt/wiki/Getting-Started-Setup
http://incubator.apache.org/kafka/faq.html
http://incubator.apache.org/kafka/quickstart.html
http://blog.sina.com.cn/s/blog_3fe961ae01011o4z.html
http://incubator.apache.org/kafka/faq.html
1. Try to setup this on windows.
download and install this file
http://scalasbt.artifactoryonline.com/scalasbt/sbt-native-packages/org/scala-sbt/sbt-launcher/0.11.3/sbt.msi
Unzip the kafka to working directory:
D:\tool\kafka-0.7.0
>sbt update
>sbt package
sbt is installed on windows, but still, it is hard to install kafka on windows
2. Try to setup on ubuntu12.04
>wget http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
>tar zxvf kafka-0.7.0-incubating-src.tar.gz
>mv kafka-0.7.0-incubating-src /opt/tools/kafka-0.7.0
>cd /opt/tools/kafka-0.7.0
>./sbt update
>./sbt package
start the server
>bin/zookeeper-server-start.sh config/zookeeper.properties
>bin/kafka-server-start.sh config/server.properties
3. Fix the Java Client Problem
Error Message:
[2012-06-11 17:55:00,109] WARN Exception causing close of session 0x137daf68ab70001 due to java.io.IOException: Connection reset by peer (org.apache.zookeeper.server.NIOServerCnxn)
[2012-06-11 17:55:00,110] INFO Closed socket connection for client /192.168.56.1:62003 which had sessionid 0x137daf68ab70001 (org.apache.zookeeper.server.NIOServerCnxn)
Solution:
server.properties
# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
# may not be what you want.
hostname=x.x.x.x
#zk.connect=localhost:2181
zk.connect=x.x.x.x:2181
# Timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.sessiontimeout.ms=60000
zookeeper.properties
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
tickTime=8000
We need to use real ip address here in configuration.
The Java Client sample codes are under this directory: D:\book\distributed\kafka-0.7.0-incubating-src\examples\src\main\java\kafka\examples
The class will be as follow:
package com.sillycat.magicneptune.example;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.ProducerConfig;
public class TestProducerMain {
public static void main(String[] args) {
Properties props2 = new Properties();
props2.put("zk.connect", "192.168.56.101:2181");
props2.put("serializer.class", "kafka.serializer.StringEncoder");
// This is added by myself for changing the default timeout 6000.
props2.put("zk.connectiontimeout.ms", "15000");
ProducerConfig config = new ProducerConfig(props2);
Producer<String, String> producer = new Producer<String, String>(config);
// The message is sent to a randomly selected partition registered in ZK
ProducerData<String, String> data = new ProducerData<String, String>(
"test", "test-message,it is ok now.adsfasdf1111222");
producer.send(data);
producer.close();
}
}
package com.sillycat.magicneptune.example;
import java.net.InetAddress;
import java.net.UnknownHostException;
import kafka.api.FetchRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
public class TestConsumerMain {
public static void main(String[] args) {
try {
System.out.println(InetAddress.getLocalHost().getHostAddress());
} catch (UnknownHostException e) {
e.printStackTrace();
}
SimpleConsumer consumer = new SimpleConsumer("192.168.56.101", 9092, 10000,
1024000);
long offset = 0;
while (true) {
// create a fetch request for topic test , partition 0, current
// offset, and fetch size of 1MB
FetchRequest fetchRequest = new FetchRequest("test", 0, offset,
1000000);
// get the message set from the consumer and print them out
ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
for (MessageAndOffset msg : messages) {
System.out.println(ExampleUtils.getMessage(msg.message()) + "offset=" + offset);
// advance the offset after consuming each message
offset = msg.offset();
}
}
//consumer.close();
}
}
package com.sillycat.magicneptune.example;
import java.nio.ByteBuffer;
import kafka.message.Message;
public class ExampleUtils
{
public static String getMessage(Message message)
{
ByteBuffer buffer = message.payload();
byte [] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes);
}
}
references:
http://www.jonzobrist.com/2012/04/17/install-apache-kafka-and-zookeeper-on-ubuntu-10-04/
https://github.com/harrah/xsbt/wiki/Getting-Started-Setup
http://incubator.apache.org/kafka/faq.html
http://incubator.apache.org/kafka/quickstart.html
http://blog.sina.com.cn/s/blog_3fe961ae01011o4z.html
http://incubator.apache.org/kafka/faq.html
发表评论
-
Update Site will come soon
2021-06-02 04:10 1678I am still keep notes my tech n ... -
Hadoop Docker 2019 Version 3.2.1
2019-12-10 07:39 294Hadoop Docker 2019 Version 3.2. ... -
Nginx and Proxy 2019(1)Nginx Enable Lua and Parse JSON
2019-12-03 04:17 449Nginx and Proxy 2019(1)Nginx En ... -
Data Solution 2019(13)Docker Zeppelin Notebook and Memory Configuration
2019-11-09 07:15 294Data Solution 2019(13)Docker Ze ... -
Data Solution 2019(10)Spark Cluster Solution with Zeppelin
2019-10-29 08:37 248Data Solution 2019(10)Spark Clu ... -
AMAZON Kinesis Firehose 2019(1)Firehose Buffer to S3
2019-10-01 10:15 322AMAZON Kinesis Firehose 2019(1) ... -
Rancher and k8s 2019(3)Clean Installation on CentOS7
2019-09-19 23:25 313Rancher and k8s 2019(3)Clean In ... -
Pacemaker 2019(1)Introduction and Installation on CentOS7
2019-09-11 05:48 343Pacemaker 2019(1)Introduction a ... -
Crontab-UI installation and Introduction
2019-08-30 05:54 455Crontab-UI installation and Int ... -
Spiderkeeper 2019(1)Installation and Introduction
2019-08-29 06:49 510Spiderkeeper 2019(1)Installatio ... -
Supervisor 2019(2)Ubuntu and Multiple Services
2019-08-19 10:53 370Supervisor 2019(2)Ubuntu and Mu ... -
Supervisor 2019(1)CentOS 7
2019-08-19 09:33 331Supervisor 2019(1)CentOS 7 Ins ... -
Redis Cluster 2019(3)Redis Cluster on CentOS
2019-08-17 04:07 373Redis Cluster 2019(3)Redis Clus ... -
Amazon Lambda and Version Limit
2019-08-02 01:42 438Amazon Lambda and Version Limit ... -
MySQL HA Solution 2019(1)Master Slave on MySQL 5.7
2019-07-27 22:26 530MySQL HA Solution 2019(1)Master ... -
RabbitMQ Cluster 2019(2)Cluster HA and Proxy
2019-07-11 12:41 464RabbitMQ Cluster 2019(2)Cluster ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:35 323Running Zeppelin with Nginx Aut ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:34 324Running Zeppelin with Nginx Aut ... -
ElasticSearch(3)Version Upgrade and Cluster
2019-05-20 05:00 329ElasticSearch(3)Version Upgrade ... -
Jetty Server and Cookie Domain Name
2019-04-28 23:59 404Jetty Server and Cookie Domain ...
相关推荐
在Java中使用Kafka,我们需要依赖Apache Kafka的Java客户端库。这个库提供了Producer和Consumer接口,使得我们能够方便地编写生产消息和消费消息的Java应用。在"Kafka-java-demo"中,你将看到如何使用这些接口来实现...
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=6.2.2 -Dfile=/root/kafka-schema-registry-client-6.2.2.jar -Dpackaging=jar 官网下载地址 packages....
本篇文章将深入探讨如何在Java环境中使用IDEA,通过Maven构建工具来实现Kafka的生产者和消费者。 首先,我们需要设置项目环境。使用IntelliJ IDEA创建一个新的Java Maven项目,然后在pom.xml文件中添加Kafka相关的...
在Java开发环境中,Kafka作为一个分布式流处理平台,被广泛用于构建实时数据管道和流应用。这个"Kafka的Java依赖包"包含了所有你需要在Java项目中与Kafka进行本地交互所需的jar包。这些jar包提供了完整的API,使得...
在本文中,我们将深入探讨基于Java的Kafka生产者与消费者的实现,这主要围绕着"Kafka-java-demo"项目展开。Kafka是一个分布式流处理平台,由LinkedIn开发并开源,现在是Apache软件基金会的一部分。它被广泛用于实时...
kafka_linux_自动安装脚本(基于环境_node_+_java)_kafka-install-package
Java开发Kafka客户端是构建基于Apache Kafka的应用程序的关键步骤,Kafka-clients库提供了与Kafka服务器进行交互的API,支持生产者和消费者的实现。在Java中使用Kafka-clients,你需要包含相应的jar包,这些包包含了...
Kafka主要用Java编写,因此在Java环境中与Kafka交互时,通常需要引入相关的jar包。"kafka的java的jar包"就是用于Java应用程序与Kafka进行通信的一系列库文件集合。 Kafka主要提供了消息队列的功能,能够高效地处理...
在Java开发环境中,Apache Kafka是一个不可或缺的分布式流处理平台,常用于构建实时数据管道和流应用。Kafka的Java客户端库使得Java开发者能够方便地与Kafka集群进行交互,包括生产消息、消费消息以及管理主题等操作...
消费kafka数据,然后批量导入到Elasticsearch,本例子使用的kafka版本0.10,es版本是6.4,使用bulk方式批量导入到es中,也可以一条一条的导入,不过比较慢。 <groupId>org.elasticsearch <artifactId>elastic...
Kafka 需要 Java 环境来运行,因此需要安装 Java。可以从 Oracle 官方网站下载 Java 安装包,然后按照安装向导进行安装。 2. 安装 Python 环境 Kafka 的一些工具使用 Python脚本,因此需要安装 Python 环境。这里...
在本文中,我们将深入探讨如何使用Java进行Kafka开发,主要基于提供的" kafka-study "压缩包中的学习代码。Kafka是由Apache开发的分布式流处理平台,它被广泛用于构建实时数据管道和流应用程序。Java是Kafka客户端库...
在Java中与Kafka交互通常需要使用到`kafka-clients`和`zookeeper-client`相关的jar包。 标题中的"Kafka集群Java开发jar包"指的是使用Java进行Kafka集群的开发工作,这通常涉及到创建生产者、消费者以及管理Kafka...
kafka_2.13-3.5.0.tgz 是Apache Kafka的一个特定版本,针对Scala 2.13构建。Apache Kafka是一个开源的流处理平台,主要用于构建实时流数据管道和应用程序。它设计用于水平扩展性、高吞吐量和容错性,已被广泛应用于...
在Java中实现Flink订阅Kerberos认证的Kafka消息是一项关键任务,特别是在处理安全敏感的数据流时。本文将深入探讨这一主题,介绍如何利用Apache Flink与Kafka的集成,以及如何通过Kerberos进行身份验证。 首先,...
kafka-schema-registry-client-3.2.0.jar包,亲测可用,在aliyun仓库内找不到,可以下载此jar包来进行手动安装
Java操作Kafka主要涉及到两个核心角色:生产者和消费者,它们是Kafka生态系统中的基本组件。Kafka是一个分布式流处理平台,由LinkedIn开发并贡献给了Apache软件基金会。它被广泛用于构建实时数据管道和流应用,能够...
how to install and build Kafka 0.8.x using different versions of Scala. Chapter 2, Setting Up a Kafka Cluster, describes the steps required to set up a single- or multi-broker Kafka cluster and shares...
2. **Java API**: Kafka提供了Java客户端API,使得开发者能够轻松地在Java应用中集成Kafka。这些API包括Producer API(生产者)和Consumer API(消费者),用于发送和接收消息。 3. **Maven**: Maven是一个项目管理...
在Java中调用Apache Kafka API是一项关键任务,特别是在构建分布式数据处理系统时。Apache Kafka是一个高性能、可扩展的消息队列(MQ),它提供实时的数据流处理能力。在Java应用程序中集成Kafka API可以让开发者...