`
m635674608
  • 浏览: 5027501 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Spark Streaming 自定义接收器

 
阅读更多
Spark Streaming可以从任意数据源接受流数据,而不仅仅是那些内置支持的数据源(如Flume、kafka等)。这就要求开发人员实现一个接收器(recevier),用于接收来自有关数据源的数据。本篇手册以一个自定义的接收器(recevier)实现和其在spark streaming中的应为为主线进行讲解。需要注意的是,自定义接收器可以用Scala或者Java实现。
 
实现自定义Receiver
 
自定义接收器必须继承自抽象类Receiver,实现两个方法
onStart():接收数据。
onStop():停止接收数据。
 
onStart()方法和onStop()方法不能无限期的阻塞。通常,onStart()方法将会启动负责接收数据的线程,而onStop()方法负责确保这些接收数据的线程是停止的。接收收据的线程可以使用isStopped()方法来检查它们是否应该停止接收数据。
 
 
一旦接收到数据,通过调用store(data)方法(Receiver类中提供的方法)数据被存储在spark中。store()方法有很多种,如允许一次只接收一条数据(record-at-a-time)或者全部object/序列化字节的collection。注意,store()类型影响了接收器(receiver)的可靠性和容错性语义。这是稍后讨论的更详细的。
 
在接收线程中的任何异常都应该被捕获并处理,以避免接收器的无声故障。restart(<exception>)将会通过异步调用onStop()方法,延迟一段时间后调用onStop()方法重启接收器(receiver)。stop(<exception>)将会调用onStop()方法终止接收器。同时,reporterror(<error>)向驱动程序报告错误信息而不停止或者重启接收器,这些错误信息可以在UI上或者日志中看见。
 
 
以下是接收一个套接字上的文本流的自定义接收器。它以文本流中的“\”分隔线分割,并将它们储存在spark中。如果接收线程有任何连接错误或接收错误,则接收器将重新启动。
 
[java] view plain copy
 
  1. public class JavaCustomReceiver extends Receiver<String> {  
  2.   
  3.   String host = null;  
  4.   int port = -1;  
  5.   
  6.   public JavaCustomReceiver(String host_ , int port_) {  
  7.     super(StorageLevel.MEMORY_AND_DISK_2());  
  8.     host = host_;  
  9.     port = port_;  
  10.   }  
  11.   
  12.   public void onStart() {  
  13.     // Start the thread that receives data over a connection  
  14.     new Thread()  {  
  15.       @Override public void run() {  
  16.         receive();  
  17.       }  
  18.     }.start();  
  19.   }  
  20.   
  21.   public void onStop() {  
  22.     // There is nothing much to do as the thread calling receive()  
  23.     // is designed to stop by itself if isStopped() returns false  
  24.   }  
  25.   
  26.   /** Create a socket connection and receive data until receiver is stopped */  
  27.   private void receive() {  
  28.     Socket socket = null;  
  29.     String userInput = null;  
  30.   
  31.     try {  
  32.       // connect to the server  
  33.       socket = new Socket(host, port);  
  34.   
  35.       BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));  
  36.   
  37.       // Until stopped or connection broken continue reading  
  38.       while (!isStopped() && (userInput = reader.readLine()) != null) {  
  39.         System.out.println("Received data '" + userInput + "'");  
  40.         store(userInput);  
  41.       }  
  42.       reader.close();  
  43.       socket.close();  
  44.   
  45.       // Restart in an attempt to connect again when server is active again  
  46.       restart("Trying to connect again");  
  47.     } catch(ConnectException ce) {  
  48.       // restart if could not connect to server  
  49.       restart("Could not connect", ce);  
  50.     } catch(Throwable t) {  
  51.       // restart if there is any other error  
  52.       restart("Error receiving data", t);  
  53.     }  
  54.   }  
  55. }  
 

在spark streaming中使用自定义的接收器
 
自定义接收器可用于在Spark Streaming应用中,通过使用streamingContext.receiverStream(自定义接收器的实例)。如下所示,创建一个输入Dstream
 
[java] view plain copy
 
  1. // Assuming ssc is the JavaStreamingContext  
  2. JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));  
  3. JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... });  
  4. ...  
 
接收器的可靠性
 
正如在spark streaming编程指南中讨论的那样,基于接收器的可靠性和容错语义,有两种类型的接收器:
 
1 可靠的接收器:对于可靠的消息来源,允许发送的数据被确认,一个可靠的接收器正确地确认数据被接收器接收同时被可靠地存储在spark中。通常,实现可靠的接收器需仔细考量消息确认的语义。
 
 
2 不可靠的接收器:不可靠的接收器不向数据源发送确认信息。它可用于不支持确认机制的数据源,或者那些可靠的数据源但是我们不需要其使用复杂的确认机制。
 
为了实现可靠的接收器,必须要使用store(multiple-records) 取存储数据。这种类型的store()一种阻塞调用,只有在所有给定的记录被储存在spark里之后才返回。如果接收器的配置存储级别使用复制(默认启用),则复制完成后这个调用返回。因此,它确保了数据被可靠的存储,和接收器可以现在正确地确认消息;在复制数据的中间过程中接收器失败了,这将确保没有数据丢失(缓冲数据没有被确认,数据源将会重新发送)。
 
不可靠的接收器没有实现这种逻辑,它可以简单地从数据源接收记录并使用store(single-record)将它们插入(one-at-a-time )。它没有store(multiple-records)那样的可靠保证,其优点如下:
 
  • 系统考虑到将数据转化为适当大小的块;
  • 系统考虑到控制接收速率,如果速率限制已指定;
  • 不可靠接收器比可靠接收器更加容易实现;
原文地址:https://spark.apache.org/docs/latest/streaming-custom-receivers.html

http://blog.csdn.net/ouyang111222/article/details/50414621

分享到:
评论

相关推荐

    Flume push数据到SparkStreaming

    标题中的“Flume push数据到SparkStreaming”是指在大数据处理领域中,使用Apache Flume将实时数据流推送到Apache Spark Streaming进行进一步的实时分析和处理的过程。这两个组件都是Apache Hadoop生态系统的重要...

    metrics-spark-receiver:用于metrics-spark-reporter的Apache Spark Streaming接收器

    用于 repo [metrics-spark-reporter] ( ) 的 Apache Spark Streaming 的 java 自定义接收器。 配置 为了在您的 Spark 应用程序中接收一些指标,您需要添加依赖项: &lt;groupId&gt;fr.ippon.spark.metrics ...

    spark-sqs-sink:用于Apache Spark的自定义接收器提供程序的示例,该示例将数据帧的内容发送到AWS SQS

    Spark-SQS-Sink 是一个专门为Apache Spark设计的自定义接收器提供程序,其目的是将Spark数据帧中的数据发送到Amazon Simple Queue Service (SQS)。 Amazon SQS 是一种完全托管的消息队列服务,允许应用程序之间异步...

    25:Spark2.3.x Streaming大数据项目实时分析.rar

    一个Spark Streaming应用可以同时处理多个输入源,并将结果发送到多个输出目标,如HDFS、HBase、文件系统或自定义接收器。 8. **整合Kafka**: Kafka作为流行的消息中间件,经常被用作Spark Streaming的数据源。...

    基于Spark Streaming的明安图射电频谱日像仪实时数据处理.pdf

    本文提出了一种基于Spark Streaming的实时数据处理方法,并设计了自定义的接收器,将多个图形处理器节点集成到分布式集群中,以期提高数据处理的速度和效率。 对于实时数据流处理来说,分布式计算是核心,其将大量...

    藏经阁-LEARNINGS USING SPARK STREAMIN.pdf

    Spark Streaming 和自定义的 Elastic Search 数据加载器也用于加载数据到 Elastic Search 中,以便进行实时分析。 技术栈 ----- 在 Walmart 搜索中,我们使用了以下技术栈: * Spark:Streaming,DataFrames * ...

    积分java源码-kafka-spark-consumer:用于SparkStreaming的高性能Kafka连接器。支持多主题获取、Kafk

    这个消费者已经实现了一个自定义可靠接收器,它使用 Kafka Consumer API 从 Kafka 获取消息并将每个接收到的块存储在 Spark BlockManager 中。 该逻辑将自动检测主题的分区数量,并根据配置的接收器数量生成尽可能多...

    spark-streaming-gnip:一个Apache Spark实用程序,用于实时从Gnip的PowerTrack中提取推文

    这段代码实现了一个自定义接收器,该接收器使用Spark流技术从Gnip获取推文并将其“存储”在Spark BlockManager中。 逻辑将在Spark Streaming上下文中指定的时间间隔内自动存储来自Gnip的推文。 请参阅Scala代码示例...

    metaQ向spark传数据

    因此,我们需要自己编写定制的接收器或者利用现有的第三方库来实现MetaQ与Spark Streaming的对接。 一种常见的方法是通过Apache Kafka作为桥梁,因为MetaQ和Spark都支持与Kafka的集成。你可以将MetaQ中的消息发布到...

    flume kafak实验报告.docx

    同时,编写相应的 Flume 源、通道和接收器配置,实现数据从源头流向 Spark Streaming。 4. **Flume 与 Spark streaming 整合代码**:在代码层面,定义一个自定义的 Spark Sink,负责将接收到的 Flume 数据转换为 ...

    spark1.3.1源码下载

    - `streaming/`:Spark Streaming的实现,包括DStream(Discrete Stream)和相关的接收器代码。 - `mllib/`:MLlib机器学习库的源码,包括算法实现和模型持久化。 - `graphx/`:GraphX的源码,用于处理图数据的API和...

    kafka-spark-consumer

    该使用者已经实现了一个自定义可靠接收器,该接收器使用Kafka使用者API从Kafka中获取消息,并将每个接收到的块存储在Spark BlockManager中。 该逻辑将自动检测主题的分区数,并根据配置的接收器数量产生尽可能多的...

    基于spark的电商用户行为分析系统-源码

    Spark Streaming能够接收这些数据流,并通过DStream(Discretized Stream)抽象来处理,实现低延迟的数据处理和分析。 接着,数据预处理是分析过程中的关键步骤。这包括数据清洗(去除异常值、空值填充)、数据转化...

    spark-2.3.4-bin-hadoop2.7.tgz

    - **Spark Streaming改进**:提升了处理延迟,增加了新的源和接收器,增强了容错机制。 - **MLlib更新**:添加了新的机器学习算法,改进了现有算法的性能,以及对模型解释性的支持。 - **图计算优化**:GraphX的性能...

    基于Java实现Spark2x新闻网大数据实时分析可视化系统项目【100012794】

    开发部分涉及编写Java代码实现业务逻辑,使用Spark的DataFrame和Spark SQL处理数据,以及创建自定义Spark Streaming接收器处理实时流数据。 最后,web可视化交互设计是项目的重要组成部分。可能使用JavaScript库如...

    ElasticSearch+Spark 构建高相关性搜索服务,千人千面推荐系统

    3. **实时数据更新**: Spark Streaming接收实时数据流,更新Elasticsearch中的索引,保持数据新鲜度。 4. **复杂查询**: 利用Elasticsearch的高级查询功能,实现用户输入的多样化查询需求。 5. **推荐生成**: Spark...

    ElasticSearch集成.rar

    当Spark Streaming接收到实时数据流后,可以通过`Spark Sink`将处理后的数据实时写入Elasticsearch,以供后续的查询和分析。这种集成方式通常涉及到创建DataFrame或Dataset,然后使用`saveAsNewAPIHadoopFile`或者`...

    Spark编程指南简体中文版

    - **传递函数到Spark**:解释了如何将自定义函数传递给Spark进行并行处理。 - **使用键值对**:重点介绍了如何利用键值对结构优化数据处理流程。 - **Transformations**:详细阐述了数据转换的方法,包括map、filter...

    spark面试题整理.pdf

    例如,错误排查技巧、Spark内核的工作原理、分布式执行计划的机制、延迟计算概念、Catalyst优化器的作用等。 #### 三、高级问题(41-60) - 这部分进一步探讨了Spark在机器学习、图处理等领域的高级应用,如实现...

Global site tag (gtag.js) - Google Analytics