Google一下发现 Structured Streaming + Kafka集成,记录Offset的文章挺少的,我自己摸索一下,写了一个DEMO。Github地址
1. 准备
配置起始和结束的offset值(默认)
Schema信息
读取后的数据的Schema是固定的,包含的列如下:
ColumnType说明
keybinary信息的key
valuebinary信息的value(我们自己的数据)
topicstring主题
partitionint分区
offsetlong偏移值
timestamplong时间戳
timestampTypeint类型
example:
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
1
2
3
4
5
6
7
8
9
10
11
批处理还是流式处理都必须为Kafka Source设置如下选项:
选项值意义
assignjson值{“topicA”:[0,1],“topicB”:[2,4]}指定消费的TopicPartition,Kafka Source只能指定"assign",“subscribe”,"subscribePattern"选项中的一个
subscribe一个以逗号隔开的topic列表订阅的topic列表,Kafka Source只能指定"assign",“subscribe”,"subscribePattern"选项中的一个
subscribePatternJava正则表达式订阅的topic列表的正则式,Kafka Source只能指定"assign",“subscribe”,"subscribePattern"选项中的一个
kafka.bootstrap.servers以逗号隔开的host:port列表Kafka的"bootstrap.servers"配置
startingOffsets 与 endingOffsets 说明:
选项值默认值支持的查询类型意义
startingOffsets“earliest”,“lates”(仅streaming支持);或者json 字符"""{“topicA”:{“0”:23,“1”:-1},“TopicB”:{“0”:-2}}"""对于流式处理来说是"latest",对于批处理来说是"earliest"streaming和batch查询开始的位置可以是"earliest"(从最早的位置开始),“latest”(从最新的位置开始),或者通过一个json为每个TopicPartition指定开始的offset。通过Json指定的话,json中-2可以用于表示earliest,-1可以用于表示latest。注意:对于批处理而言,latest值不允许使用的。
endingOffsetslatest or json string{“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-1}}latestbatch一个批查询的结束位置,可以是"latest",即最近的offset,或者通过json来为每个TopicPartition指定一个结束位置,在json中,-1表示latest,而-2是不允许使用的
2. 主要代码
/**
* StructuredStreaming
* 记录kafka上一次的Offset,从之前的Offset继续消费
*/
object StructuredStreamingOffset {
val LOGGER: Logger = LogManager.getLogger("StructuredStreamingOffset")
//topic
val SUBSCRIBE = "log"
case class readLogs(context: String, offset: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.appName("StructuredStreamingOffset")
.getOrCreate()
//开始 offset
var startOffset = -1
//init
val redisSingle: RedisSingle = new RedisSingle()
redisSingle.init(Constants.IP, Constants.PORT)
//get redis
if (redisSingle.exists(Constants.REDIDS_KEY) && redisSingle.getTime(Constants.REDIDS_KEY) != -1) {
startOffset = redisSingle.get(Constants.REDIDS_KEY).toInt
}
//sink
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", SUBSCRIBE)
.option("startingOffsets", "{\"" + SUBSCRIBE + "\":{\"0\":" + startOffset + "}}")
.load()
import spark.implicits._
//row 包含: key、value 、topic、 partition、offset、timestamp、timestampType
val lines = df.selectExpr("CAST(value AS STRING)", "CAST(offset AS LONG)").as[(String, Long)]
val content = lines.map(x => readLogs(x._1, x._2.toString))
val count = content.toDF("context", "offset")
//sink foreach 记录offset
val query = count
.writeStream
.foreach(new RedisWriteKafkaOffset)
.outputMode("update")
.trigger(Trigger.ProcessingTime("5 seconds"))
.format("console")
.start()
query.awaitTermination()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
Spark Structured streaming API支持的输出源有:Console、Memory、File和Foreach, 这里用到Foreach,用Redis存储value
class RedisWriteKafkaOffset extends ForeachWriter[Row] {
var redisSingle: RedisSingle = _
override def open(partitionId: Long, version: Long): Boolean = {
redisSingle = new RedisSingle()
redisSingle.init(Constants.IP, Constants.PORT)
true
}
override def process(value: Row): Unit = {
val offset = value.getAs[String]("offset")
redisSingle.set(Constants.REDIDS_KEY, offset)
}
override def close(errorOrNull: Throwable): Unit = {
redisSingle.getJedis().close()
redisSingle.getPool().close()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
3. 总结
Structured Streaming 是一个基于 Spark SQL 引擎的、可扩展的且支持容错的流处理引擎。你可以像表达静态数据上的批处理计算一样表达流计算。Spark SQL 引擎将随着流式数据的持续到达而持续运行,并不断更新结果。性能上,结构上比Spark Streaming的有一定优势。
点赞
————————————————
版权声明:本文为CSDN博主「vitahao」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/vitahao/article/details/88994480
相关推荐
Flume+Kafka+StructuredStreaming+Mysql分布式采集与微批处理
在项目开发过程中,"code_resource_010"可能是包含代码资源的文件,其中可能包括了实现这些组件集成和交互的源代码,例如Flume的配置文件、Spark的Streaming或Structured Streaming作业代码、Kafka生产者和消费者的...
《Delta架构:Delta Lake与Apache SparkStructured Streaming的融合》 Delta Lake是近年来在大数据处理领域备受瞩目的开源项目,它提供了可靠的数据湖解决方案,而Apache Spark的Structured Streaming则是流处理...
pyspark充当kafka消费者时报错:pyspark.sql.utils.AnalysisException: ... Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide". 错误所缺少的jar包
Apache Spark的Structured Streaming是针对流处理的一个强大且高级的API,它将流处理与Spark SQL引擎相结合,提供了快速、可扩展且容错的处理能力。Structured Streaming的核心理念是让开发者能够像处理静态数据集...
structured streaming 简介 1. Spark Streaming 不足 2. Structured Streaming 介绍 3. Structured Streaming 核心设计 Structured Streaming 编程模型 ...
Structured Streaming提供了Source和Sink的概念,它支持多种数据源作为输入(如Kafka、Flume、socket等)和输出(如Kafka、文件系统、console等)。它也支持状态存储,这意味着可以维护和更新流处理中的状态信息。 ...
### Spark 2018 欧洲峰会中关于Structured Streaming中的Stateful Stream Processing 在Spark 2018欧洲峰会中,有一场引人注目的演讲深入探讨了Structured Streaming框架下的状态流处理(stateful stream processing...
Structured Streaming was the 2nd (and the latest) major streaming effort in Spark. Its design decouples the frontend (user-facing APIs) and backend (execution), and allows us to change the execution ...
### 使用Structured Streaming实现简易、可扩展且容错的流处理 在大数据处理领域,特别是针对实时数据流的应用场景,如何构建高效、可靠且易于管理的流处理系统一直是个挑战。Structured Streaming作为Apache Spark...
标题中的“The Delta Architecture Delta Lake + Apache Spark Structured Streaming.pdf”指明了文件的焦点在于介绍Delta架构,并特别强调了Delta Lake与Apache Spark Structured Streaming的结合使用。Delta ...
3. **复用Spark SQL执行引擎**:Structured Streaming充分利用了Spark SQL执行引擎的优势,包括执行计划优化、代码生成(Codegen)、内存管理等方面。这些优化措施使得Structured Streaming在处理流数据时具有高性能...
A Deep Dive into Stateful Stream Processing in Structured Streaming A Deep Dive into Stateful Stream Processing in Structured Streaming
Apache Spark的Structured Streaming是其对流处理的一种革命性框架,旨在提供一种类似SQL的、声明式的编程模型,使得数据流处理更加简洁、高效且易于理解。在这个深入的指南中,我们将探讨Structured Streaming的...
3,综合运用HttpClient+Jsoup+Kafka+SparkStreaming+StructuredStreaming+SpringBoot+Echarts等多种实用技术 适用人群 1、对大数据感兴趣的在校生及应届毕业生。 2、对目前职业有进一步提升要求,希望从事大数据...
需要的配置只有一个sql文件 代码整体的结构参考开源项目 waterdrop 代码中SQL文件解析的部分参考开源项目flinkStreamSQL 1.实现socket输入 console输出 配置: CREATE TABLE SocketTable( word String, ...
《藏经阁-From Spark Streaming to Structured Streaming》是一份关于大数据处理技术的文档,主要对比分析了Apache Spark Streaming和Google Dataflow,以及引出了新兴的Structured Streaming技术。以下是这份文档...
在Spark 2.3.x版本中,Structured Streaming是其核心的流处理框架,它提供了一种高级、声明式的数据处理模型,使得开发人员能够以类似批处理的方式编写实时数据处理应用。本项目实时分析旨在深入理解和实践这一强大...
机器学习(Machine Learning, ML)是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科。专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识...