Spark Streaming简介
NetworkWordCount代码
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.examples.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel /** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. * * Usage: NetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999` */ object NetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: NetworkWordCount <hostname> <port>") System.exit(1) } StreamingExamples.setStreamingLogLevels() // Create the context with a 1 second batch size // 创建SparkConf实例 val sparkConf = new SparkConf().setAppName("NetworkWordCount") ///创建Spark Streaming Context,每隔1秒钟处理一批数据,那么这一秒收集的数据存放在哪,如何将收集的数据推送出去?是生产者主动推出去还是消费者每隔1秒钟来拉取一次数据 val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) //flatMap是把将每一行使用空格做分解,那么words对应的数据结构是怎么样的? ///words是个集合,每个集合元素依然是个集合,这个集合存放单词 val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() //启动计算作业 ssc.start() //等待结束,什么时候结束作业,即触发什么条件会让作业执行结束 ssc.awaitTermination() } }
运行NetworkWordCount
1. 启动NetCat命令
[hadoop@hadoop ~]$ nc -lk 9999 This is a book
启动后,可以在后面输入文本,比如This is a book
2. 在另外一个终端,Spark提交NetworkWordCount任务
./bin/run-example streaming.NetworkWordCount localhost 9999
启动后,发现每隔一秒钟,打印一行日志,例如
15/01/11 00:10:09 INFO scheduler.JobScheduler: Added jobs for time 1420953009000 ms 15/01/11 00:10:10 INFO scheduler.JobScheduler: Added jobs for time 1420953010000 ms 15/01/11 00:10:11 INFO scheduler.JobScheduler: Added jobs for time 1420953011000 ms 15/01/11 00:10:12 INFO scheduler.JobScheduler: Added jobs for time 1420953012000 ms 15/01/11 00:10:13 INFO scheduler.JobScheduler: Added jobs for time 1420953013000 ms 15/01/11 00:10:14 INFO scheduler.JobScheduler: Added jobs for time 1420953014000 ms 15/01/11 00:10:15 INFO scheduler.JobScheduler: Added jobs for time 1420953015000 ms 15/01/11 00:10:16 INFO scheduler.JobScheduler: Added jobs for time 1420953016000 ms 15/01/11 00:10:17 INFO scheduler.JobScheduler: Added jobs for time 1420953017000 ms 15/01/11 00:10:18 INFO scheduler.JobScheduler: Added jobs for time 1420953018000 ms ///Added jobs是个什么概念?不就是一个Job吗 15/01/11 00:10:19 INFO scheduler.JobScheduler: Added jobs for time 1420953019000 ms 15/01/11 00:10:20 INFO scheduler.JobScheduler: Added jobs for time 1420953020000 ms 15/01/11 00:10:21 INFO scheduler.JobScheduler: Added jobs for time 1420953021000 ms 15/01/11 00:10:22 INFO scheduler.JobScheduler: Added jobs for time 1420953022000 ms 15/01/11 00:10:23 INFO scheduler.JobScheduler: Added jobs for time 1420953023000 ms 15/01/11 00:10:24 INFO scheduler.JobScheduler: Added jobs for time 1420953024000 ms 15/01/11 00:10:25 INFO scheduler.JobScheduler: Added jobs for time 1420953025000 ms 15/01/11 00:10:26 INFO scheduler.JobScheduler: Added jobs for time 1420953026000 ms 15/01/11 00:10:27 INFO scheduler.JobScheduler: Added jobs for time 1420953027000 ms 15/01/11 00:10:28 INFO scheduler.JobScheduler: Added jobs for time 1420953028000 ms
3. 在nc -lk运行的终端,输入文本,发现Spark作业没有将文本输出
不过通过nc -lk输入一行文本后,控制台会显示如下信息
5/01/11 00:29:08 INFO storage.MemoryStore: ensureFreeSpace(11) called with curMem=91198, maxMem=280248975 15/01/11 00:29:08 INFO storage.MemoryStore: Block input-0-1420954147800 stored as bytes in memory (estimated size 11.0 B, free 267.2 MB) 15/01/11 00:29:08 INFO scheduler.JobScheduler: Added jobs for time 1420954148000 ms 15/01/11 00:29:08 INFO storage.BlockManagerInfo: Added input-0-1420954147800 in memory on localhost:57786 (size: 11.0 B, free: 267.2 MB) 15/01/11 00:29:08 INFO storage.BlockManagerMaster: Updated info of block input-0-1420954147800 15/01/11 00:29:08 INFO receiver.BlockGenerator: Pushed block input-0-1420954147800
4. 关闭nc命令的执行,此时,Spark报错,提示9999端口连接不上
///相对于数据源而言,Spark是Receiver,所以Spark Streaming有Receiver这个模块 15/01/11 00:09:36 INFO receiver.ReceiverSupervisorImpl: Stopped receiver 0 15/01/11 00:09:37 INFO scheduler.JobScheduler: Added jobs for time 1420952977000 ms 15/01/11 00:09:38 INFO scheduler.JobScheduler: Added jobs for time 1420952978000 ms ///重提开始Receiver 15/01/11 00:09:38 INFO receiver.ReceiverSupervisorImpl: Starting receiver again 15/01/11 00:09:38 INFO receiver.ReceiverSupervisorImpl: Starting receiver 15/01/11 00:09:38 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart 15/01/11 00:09:38 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from akka://sparkDriver 15/01/11 00:09:38 INFO receiver.ReceiverSupervisorImpl: Receiver started again 15/01/11 00:09:38 INFO dstream.SocketReceiver: Connecting to localhost:9999 15/01/11 00:09:38 WARN receiver.ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error connecting to localhost:9999 java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.<init>(Socket.java:425) at java.net.Socket.<init>(Socket.java:208) at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:71) at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:57)
关于接不到消息
下面的代码收不到消息
val sparkConf = new SparkConf().setAppName("SparkStreamingExample").setMaster("local")
而下面的代码则能收到消息
val sparkConf = new SparkConf().setAppName("SparkStreamingExample").setMaster("local[2]")
原因来自于http://spark.apache.org/docs/latest/streaming-programming-guide.html:
-
When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL where n > number of receivers to run (see Spark Properties for information on how to set the master).
-
Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process them.
关于Receiver的线程数
上面提到,有一个线程用于运行Receiver,实际中,可能需要多个线程运行Receiver:
Receiving data over the network (like Kafka, Flume, socket, etc.) requires the data to deserialized and stored in Spark. If the data receiving becomes a bottleneck in the system, then consider parallelizing the data receiving. Note that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data. Receiving multiple data streams can therefore be achieved by creating multiple input DStreams and configuring them to receive different partitions of the data stream from the source(s). For example, a single Kafka input DStream receiving two topics of data can be split into two Kafka input streams, each receiving only one topic. This would run two receivers on two workers, thus allowing data to be received in parallel, and increasing overall throughput. These multiple DStream can be unioned together to create a single DStream. Then the transformations that was being applied on the single input DStream can applied on the unified stream. This is done as follows
val numStreams = 5 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) } val unifiedStream = streamingContext.union(kafkaStreams) unifiedStream.print()
相关推荐
`SparkStreaming.zip` 文件可能包含了一个示例项目,演示了如何创建一个 Spark Streaming 应用来从 Kafka 消费数据。代码通常会包含以下步骤: 1. 创建 SparkConf 对象,配置 Spark 应用的属性,如应用程序名称、...
需要注意的是,为了使SparkStreaming能够持续运行并处理数据,我们需要调用`ssc.start()`来启动计算任务,并调用`ssc.awaitTermination()`等待任务结束。在这个例子中,我们向队列中添加了5个RDD,每个包含1到300的...
Scala代码积累之spark streaming kafka 数据存入到hive源码实例,Scala代码积累之spark streaming kafka 数据存入到hive源码实例。
spark streaming demo,应用实例代码,可供参考学习整理
### Spark Streaming原理与实践 #### 一、为什么需要流处理? 传统的批处理框架如MapReduce在处理实时数据流时存在一些局限性,主要是因为每次处理一批数据就需要启动一个新任务,而任务的启动过程(包括输入切分...
- **配置Spark Streaming**:通过`SparkConf`配置Spark Streaming环境,指定应用名称和运行模式。 - **Kafka参数配置**:`kafkaParams`定义了Kafka的相关参数,包括Broker列表、消费者组ID以及自动偏移重置策略等。 ...
Spark Streaming是中国大数据技术领域中广泛使用的实时数据处理框架,它基于Apache Spark的核心设计,提供了对持续数据流的微批处理能力。本项目实战旨在帮助读者深入理解和应用Spark Streaming,通过实际操作来掌握...
-SparkStreaming原理介绍.pdf7.SparkStreaming(下)--SparkStreaming实战.pdf8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf8.SparkMLlib(下)--SparkMLlib实战.pdf9.SparkGraphX介绍及实例.pdf10.分布式内存...
7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...
7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...
当你使用Java或Scala编写Spark应用程序时,通常会在`main`方法中创建一个`SparkContext`实例。如果是在Spark Shell中开发,则`SparkContext`会被自动创建,并可以通过预定义的变量`sc`访问。 #### Executor 在Spark...
7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...
7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...
7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...
7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...
为了运行 Spark Streaming 应用程序,首先需要在 IDE(如 IntelliJ IDEA)中设置项目,并添加相应的依赖库。在 `pom.xml` 文件中添加 Spark Streaming 的依赖: ```xml <groupId>org.apache.spark <artifactId>...
【Spark Streaming 概述】 Spark Streaming 是 Apache Spark 项目中的一个模块,专为实时数据流处理设计。它提供了一个高级抽象,使得开发人员能够轻松构建可扩展且容错的流处理应用。Spark Streaming 支持多种数据...
**Kafka、Spark Streaming与HBase的集成** 在大数据处理领域,Kafka作为一个高吞吐量的分布式消息系统,常用于实时数据流处理;Spark Streaming则提供了基于微批处理的实时计算框架,能够高效地处理持续的数据流;...
7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...
Spark的核心特性包括对批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)和机器学习(MLlib)的支持。 在本项目中,我们重点关注的是Spark的算子应用,这些算子是Spark编程模型的基础,允许开发者...