`
sillycat
  • 浏览: 2568517 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Spark 2017 BigData Update(5)Spark Streaming in Java

 
阅读更多
Spark 2017 BigData Update(5)Spark Streaming in Java

The Streaming Small example Class, WordCountStreamingApp.java

package com.sillycat.sparkjava.app;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import com.sillycat.sparkjava.base.SparkBaseApp;

public class WordCountStreamingApp extends SparkBaseApp {

    private static final long serialVersionUID = 7401544141510431796L;

    protected String getAppName() {
        return "WordCountStreamingApp";
    }

    public void executeTask(List<String> params) {
        SparkConf conf = this.getSparkConf();
        // The time interval at which streaming data will be divided into
        // batches
        logger.info("Start to have the streaming");
        JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(30000));
        ssc.checkpoint(this.getAppName());
        logger.info("Prepare the resource for streaming");
        processStream(ssc, "carl");
        logger.info("Streaming is working");
        try {
            ssc.start();
            ssc.awaitTermination();
        } catch (InterruptedException e) {
            logger.error("InterruptedException:", e);
        }
    }

    private void processStream(JavaStreamingContext ssc, String keyword) {

        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "fr-stage-api:9092,fr-stage-consumer:9092,fr-perf1:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "WordCountStreamingApp");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", true);
        Collection<String> topics = Arrays.asList("sillycat-topic");

        logger.info("Init the Kafka Clients to fetch lines");
        JavaInputDStream<ConsumerRecord<String, String>> dStream = KafkaUtils.createDirectStream(ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
        dStream.foreachRDD(rdd -> {
            processRows(rdd, keyword);
        });
    }

    private void processRows(JavaRDD<ConsumerRecord<String, String>> rdds, String keyword) {
        JavaRDD<String> rows = rdds.map(record -> record.value());
        JavaRDD<String> lines = rows.filter(new Function<String, Boolean>() {
            private static final long serialVersionUID = 1L;

            public Boolean call(String s) throws Exception {
                if (s == null || s.trim().length() < 1) {
                    return false;
                }
                if (!s.contains(keyword)) {
                    return false;
                }
                return true;
            }
        });
        long count = lines.count();
        logger.info("Kafka received " + count + " " + keyword);
    }

}

Here is how we should run all these testing:

#Run the local#

>java -jar target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.CountLinesOfKeywordApp

>java -jar target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.WordCountStreamingApp

#Run binary on local#

>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp /Users/carl/work/sillycat/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.CountLinesOfKeywordApp

>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp /home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.WordCountStreamingApp

#Run binary on Remote YARN Cluster#

>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp --master yarn-client /home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.CountLinesOfKeywordApp

>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp --master yarn-client /home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.WordCountStreamingApp


References:
http://sillycat.iteye.com/blog/2407639


分享到:
评论

相关推荐

    Spark: Big Data Cluster Computing in Production

    Spark: Big Data Cluster Computing in Production English | 2016 | ISBN: 1119254019 | 216 pages | PDF | 5 MB Production-targeted Spark guidance with real-world use cases Spark: Big Data Cluster ...

    Scala and Spark for Big Data Analytics

    Scala and Spark for Big Data Analytics by Md. Rezaul Karim English | 25 July 2017 | ISBN: 1785280848 | ASIN: B072J4L8FQ | 898 Pages | AZW3 | 20.56 MB Harness the power of Scala to program Spark and ...

    Big Data Analytics with Spark PDF

    《Big Data Analytics with Spark》这本书由Mohammed Guller撰写,旨在为读者提供一个实用指南,帮助大家利用Apache Spark进行大规模数据分析。 ### Apache Spark简介 Apache Spark是一种开源的大规模数据处理框架...

    Big Data Analytics with Spark 无水印pdf 0分

    There is a critical shortage of people with big data expertise, so companies are willing to pay top dollar for people with skills in areas like Spark and Scala. So reading this book and absorbing its ...

    Spark-The Definitive Guide Big Data Processing Made Simple

    Spark-The Definitive Guide Big Data Processing Made Simple 完美true pdf。 Apache Spark is a unified computing engine and a set of libraries for parallel data processing on computer clusters. As of ...

    Frank Kane's Taming Big Data with Apache Spark and Python 【含代码】

    Process continuous streams of data in real time using the Spark streaming module Perform complex network analysis using Spark's GraphX library Use Amazon's Elastic MapReduce service to run your Spark ...

    Spark: The Definitive Guide: Big Data Processing Made Simple 英文.pdf版

    《Spark: The Definitive Guide: Big Data Processing Made Simple》是大数据处理领域的经典著作,由Databricks的创始人之一Michael Armbrust等专家撰写。这本书深入浅出地介绍了Apache Spark的核心概念、架构以及...

    spark core、spark sql以及spark streaming 的Scala、java项目混合框架搭建以及大数据案例

    本项目涉及的核心知识点包括Spark Core、Spark SQL和Spark Streaming,同时结合了Scala和Java编程语言,以及Maven构建工具,实现了混合框架的搭建。下面将详细阐述这些关键点。 1. **Spark Core**:作为Spark的基础...

    spark Streaming和structed streaming分析

    Apache Spark Streaming是Apache Spark用于处理实时流数据的一个组件。它允许用户使用Spark的高度抽象概念处理实时数据流,并且可以轻松地与存储解决方案、批处理数据和机器学习算法集成。Spark Streaming提供了一种...

    Frank Kane's Taming Big Data with Apache Spark and Python

    这本书《Taming Big Data with Apache Spark and Python》由Frank Kane所著,主要讲解了如何使用Apache Spark和Python来分析大规模数据集,并提供了真实的案例帮助读者理解和实践。Apache Spark是一个开源的分布式...

    scala and spark for big data analytics

    - "Spark in Action":详细介绍了如何使用Spark进行数据处理和分析。 - "Advanced Analytics with Spark":深入探讨了使用Spark进行数据挖掘和机器学习的高级主题。 - Springer出版社的书籍系列,例如"Realtime Data...

    Apache Spark 2.x for Java Developers

    Apache Spark is the buzzword in the big data industry right now, especially with the increasing need for real-time streaming and data processing. While Spark is built on Scala, the Spark Java API ...

    Real-time big data processing with Spark Streaming

    Spark Streaming是Apache Spark的一个扩展,它为实时处理大数据流提供了支持。Tathagata Das在2013年Spark Summit会议上发表了关于Spark Streaming的演讲,他首先介绍了为什么需要Spark Streaming。随着大数据的兴起...

    Spark Streaming Real-time big-data processing

    **Spark Streaming:实时大数据处理** Spark Streaming是Apache Spark框架的一部分,专为实时数据处理而设计。它构建在Spark核心之上,提供了对实时数据流的高吞吐量、容错性和可伸缩性处理能力。Spark Streaming...

    Big Data Processing with Apache Spark by Srini Penchikala

    Whether you are currently working on a big data project or interested in learning more about topics like machine learning, streaming data processing, and graph data analytics, this book is for you....

Global site tag (gtag.js) - Google Analytics