`
jybbh
  • 浏览: 16288 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

spark 连kafka_2.10-0.10.0.0

阅读更多
包:
chill_2.11-0.8.0.jar
commons-collections-3.2.2.jar
commons-configuration-1.6.jar
commons-lang-2.6.jar
commons-lang3-3.5.jar
commons-logging-1.1.3.jar
guava-14.0.1.jar
hadoop-auth-2.6.4.jar
hadoop-common-2.6.4.jar
jackson-annotations-2.6.5.jar
jackson-core-2.6.5.jar
jackson-databind-2.6.5.jar
jackson-module-paranamer-2.6.5.jar
jackson-module-scala_2.11-2.6.5.jar
javax.servlet-api-3.1.0.jar
javax.ws.rs-api-2.0.1.jar
jersey-container-servlet-core-2.22.2.jar
jersey-server-2.22.2.jar
json4s-ast_2.11-3.2.11.jar
kafka-clients-0.10.2.0.jar
kryo-shaded-3.0.3.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
metrics-core-3.1.2.jar
metrics-json-3.1.2.jar
netty-all-4.0.42.Final.jar
paranamer-2.3.jar
scala-library-2.11.8.jar
scala-library.jar
slf4j-api-1.7.16.jar
slf4j-log4j12-1.7.16.jar
spark-core_2.11-2.1.0.jar
spark-launcher_2.11-2.1.0.jar
spark-network-common_2.11-2.1.0.jar
spark-network-shuffle_2.11-2.1.0.jar
spark-streaming-kafka-0-10_2.10-2.1.0.jar
spark-streaming_2.11-2.1.0.jar
spark-unsafe_2.11-2.1.0.jar
xbean-asm5-shaded-4.4.jar

PS:下面这个写法支持JDK1.7,即非lambda表达式写法
-----------------------------------------------------------
import java.util.*;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;

import org.apache.spark.streaming.kafka010.LocationStrategies;


public class PrintDirectMsgDirect {
  public static void main(String[] args) {
    try {
      SparkConf sparkConf = new SparkConf().setAppName("PrintDirectMsg").setMaster("local[2]");
      final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));

      String brokers = "localhost:9092";
      String topics = "test";
      Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
      Map<String, Object> kafkaP = new HashMap<>();
      kafkaP.put("metadata.broker.list", brokers);
      kafkaP.put("bootstrap.servers", brokers);
      kafkaP.put("group.id", "group1");
      kafkaP.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      kafkaP.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      kafkaP.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

      JavaInputDStream<ConsumerRecord<Object, Object>> lines = KafkaUtils.createDirectStream(
              jssc,
              LocationStrategies.PreferConsistent(),
              ConsumerStrategies.Subscribe(topicsSet, kafkaP)
      );

      lines.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<Object, Object>>>(){
        @Override
        public void call(JavaRDD<ConsumerRecord<Object, Object>> consumerRecordJavaRDD) throws Exception {
          consumerRecordJavaRDD.foreach(new VoidFunction<ConsumerRecord<Object, Object>>(){
            @Override
            public void call(ConsumerRecord<Object, Object> objectObjectConsumerRecord) throws Exception {
              System.out.println(">>>>>>>>>>objectObjectConsumerRecord:"+objectObjectConsumerRecord.value()+"]");
            }
          });
        }
      });
//      lines.foreachRDD(rdd -> {
//        rdd.foreach(x -> {
//          System.out.println(">>>>>>>>>>>>>>>>x:" + x + "]");
//          System.out.println(">>>>>>>>>>>>>>>>x.value:" + x.value() + "]");
//        });
//      });
      jssc.start();
      jssc.awaitTermination();
      jssc.close();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}
分享到:
评论

相关推荐

    kafka_2.11-0.10.1.0.tgz

    kafka_2.11-0.10.1.0.tgzKafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素...

    kafka_2.11-0.10.0.1-API文档-中文版.zip

    赠送jar包:kafka_2.11-0.10.0.1.jar; 赠送原API文档:kafka_2.11-0.10.0.1-javadoc.jar; 赠送源代码:kafka_2.11-0.10.0.1-sources.jar; 赠送Maven依赖信息文件:kafka_2.11-0.10.0.1.pom; 包含翻译后的API文档...

    kafka-2.12-0.10.2.0文件安装包

    7. **生产与消费消息**:使用kafka-console-producer.sh和kafka-console-consumer.sh脚本进行消息的生产和消费测试。 需要注意的是,由于Kafka最初设计时更多考虑的是Linux环境,因此在Windows上部署可能会遇到一些...

    kafka-clients-0.10.0.1-API文档-中文版.zip

    赠送jar包:kafka-clients-0.10.0.1.jar; 赠送原API文档:kafka-clients-0.10.0.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.10.0.1.pom; 包含...

    kafka_2.10-0.8.2.1.tgz

    4. 监控与管理:通过Kafka提供的监控工具,如`kafka-console-consumer.sh`和`kafka-run-class.sh`,监控主题和消费者状态。 六、大数据应用场景 1. 数据管道:Kafka作为实时数据流处理的基础设施,用于收集、存储和...

    kafka_2.10-0.10.0.1.tgz

    标题中的"kafka_2.10-0.10.0.1.tgz"标识的是Apache Kafka的一个特定版本,这是Apache Kafka的二进制发行版,适用于Java 2.10平台,版本号为0.10.0.1。Kafka是一个分布式流处理平台,由LinkedIn开发并贡献给了Apache...

    kafka-2.10-0.10.2.1.zip

    在本提供的资源“kafka-2.10-0.10.2.1.zip”中,包含的是Windows版的Kafka 0.10.2.1版本。这个版本适用于基于Java 2.10的开发环境,它提供了消息队列和实时数据流处理的能力,广泛应用于大数据处理、日志收集、网站...

    kafka_2.10-0.10.0.0.tgz

    "kafka_2.10-0.10.0.0.tgz"是针对Java开发者的Kafka版本,特别注明可在JDK1.7环境下运行,这使得它对那些尚未升级到更高版本JDK的系统具有广泛的适用性。本文将深入探讨Kafka的核心特性、0.10.0.0版本的新功能以及...

    kafka资源下载kafka_2.11-2.0.0.tgz

    - 使用Kafka自带的监控工具,如`kafka-topics.sh`、`kafka-consumer-groups.sh`等。 - 集成第三方监控系统,如Prometheus和Grafana,实现可视化监控。 总之,Kafka作为一种成熟的消息队列解决方案,在大数据处理...

    kafka_2.10-0.10.2.1.tgz

    标题中的"kafka_2.10-0.10.2.1.tgz"是一个Apache Kafka的特定版本压缩包,适用于Scala 2.10,并且版本号为0.10.2.1。Kafka是一款分布式流处理平台,由LinkedIn开发并捐赠给Apache软件基金会。它被广泛用于构建实时...

    kafka_2.10-0.10.1.0.tgz

    Kafka提供了`kafka-topics.sh`、`kafka-consumer-groups.sh`等管理工具,用于创建、查询和管理主题、消费者组。JMX接口也可用于远程监控Kafka集群的性能指标。 7. **安全性** 0.10.1.0版本开始支持SSL加密和SASL...

    kafka_2.10-0.10.2.2

    标题中的"kafka_2.10-0.10.2.2"指的是Apache Kafka的一个特定版本,这是分布式流处理平台Kafka的一个发行版。Kafka最初由LinkedIn开发,后来成为Apache软件基金会的顶级项目。它被设计为高吞吐量、低延迟的消息发布...

    kafka_2.12-2.4.1.zip

    - 使用`./kafka-console-producer.sh --broker-list localhost:9092 --topic test`和`./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning`进行消息的生产和消费,操作...

    Kafka_2.11-0.10.0.0

    《Apache Kafka 2.11-0.10.0.0:分布式流处理平台的基石》 Apache Kafka是一款开源的分布式流处理平台,由LinkedIn开发并在2011年贡献给了Apache软件基金会。Kafka的主要设计目标是提供高吞吐量、低延迟的消息传递...

    kafka_2.11-0.10.2.0.tar.gz

    kafka_2.11-0.10.2.0 版解压使用 kafka kafka kafka kafka kafka

    kafka-clients-0.9.0.0-API文档-中英对照版.zip

    赠送jar包:kafka-clients-0.9.0.0.jar; 赠送原API文档:kafka-clients-0.9.0.0-javadoc.jar; 赠送源代码:kafka-clients-0.9.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.9.0.0.pom; 包含翻译后...

    kafka_2.11-2.0.0.tgz

    - 使用bin/kafka-topics.sh脚本创建主题,指定主题名、分区数量和副本数量。 4. **生产者与消费者编程** - **生产者示例**:使用Kafka的Java API,编写生产者代码,通过new Producer()创建实例,然后使用send()...

    kafka2.11-0.11.0.0

    《Kafka 2.11-0.11.0.0:Linux环境下的部署与配置详解》 Kafka是一款高性能、分布式的消息中间件,常用于大数据实时处理和流计算中。在这里,我们关注的是其特定版本——Kafka 2.11-0.11.0.0。这个版本兼容Scala ...

    kafka_2.11-2.2.2.tgz

    Kafka提供了丰富的命令行工具,如`kafka-topics.sh`、`kafka-consumer-groups.sh`等,用于查看和管理Topics、Partitions、Consumer Groups等。 十一、注意事项 1. Kafka默认使用9092端口,确保没有其他服务占用。 ...

    spark-streaming-kafka_2.10-1.6.0.jar

    KafkaUtils所依赖的jar包,导入文件中KafkaUtils报错,需要导入spark-streaming-kafka_2.10-1.6.0.jar包

Global site tag (gtag.js) - Google Analytics