- 浏览: 2193156 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
前面文章介绍了不少有关Spark Streaming的offset的管理以及如何优雅的关闭Spark Streaming的流程序。
到目前为止还有几个问题:
(1)有关spark streaming集成kafka时,如果kafka新增分区, 那么spark streaming程序能不能动态识别到而不用重启?
(2)如果需要重启,那么在自己管理offset时,如何才能识别到新增的分区?
(3)spark streaming优雅关闭的策略还有那些?
首先第一个问题,如果kafka要新增分区,对于正在运行的实时流程序能否动态识别到?
经过测试,是不能识别的,我推测使用createDirectStream创建流对象一旦创建就是不可变的,也就是说创建实例那一刻的分区数量,会一直使用直到流程序结束,就算中间kafka的分区数量扩展了,流程序也是不能识别到的。所以在扩展kafka分区前,一定要先把流程序给停掉,然后扩展完成后需要再次重启流程序。
然后看第二个问题,如果是我们自己管理offset时,一定要考虑到kafka扩展分区的情况,每次启动程序前都得检测下目前保存的偏移量里面的kafka的分区个数是否小于kafka实际元数据里面实际的分区个数,正常没扩展分区的情况下两个值应该是相等的,如果值不一致,就说明是kafka分区得到扩展了,所以我们的程序需要能够兼容这种情况。
核心代码如下:
//这个topic在zk里面最新的分区数量 val lastest_partitions= ZkUtils.getPartitionsForTopics(zkClient,Seq(topic)).get(topic).get var offsets = offsetsRangesStr.split(",")//按逗号split成数组 .map(s => s.split(":"))//按冒号拆分每个分区和偏移量 .map { case Array(partitionStr, offsetStr) => (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }//加工成最终的格式 .toMap//返回一个Map //说明有分区扩展了 if(offsets.size<lastest_partitions.size){ //得到旧的所有分区序号 val old_partitions=offsets.keys.map(p=>p.partition).toArray //通过做差集得出来多的分区数量数组 val add_partitions=lastest_partitions.diff(old_partitions) if(add_partitions.size>0){ log.warn("发现kafka新增分区:"+add_partitions.mkString(",")) add_partitions.foreach(partitionId=>{ offsets += (TopicAndPartition(topic,partitionId)->0) log.warn("新增分区id:"+partitionId+"添加完毕....") }) } }else{ log.warn("没有发现新增的kafka分区:"+lastest_partitions.mkString(",")) }
上面的代码在每次启动程序时,都会检查当前我们自己管理的offset的分区数量与zk元数据里面实际的分区数量,如果不一致就会把新增的分区id给加到TopicAndPartition里面并放入到Map对象里面,这样在启动前就会传入到createDirectStream对象中,就能兼容新增的分区了。
最后一个问题,前面的文章谈到过我们可以有两种方式来更加优雅的停止流程序,分别是通过http暴露服务,和通过HDFS做消息中转来定时扫描mark文件是否存在来触发关闭服务。
下面我们先来看下通过http暴露服务的核心代码:
/**** * 负责启动守护的jetty服务 * @param port 对外暴露的端口号 * @param ssc Stream上下文 */ def daemonHttpServer(port:Int,ssc: StreamingContext)={ val server=new Server(port) val context = new ContextHandler(); context.setContextPath( "/close" ); context.setHandler( new CloseStreamHandler(ssc) ) server.setHandler(context) server.start() } /*** 负责接受http请求来优雅的关闭流 * @param ssc Stream上下文 */ class CloseStreamHandler(ssc:StreamingContext) extends AbstractHandler { override def handle(s: String, baseRequest: Request, req: HttpServletRequest, response: HttpServletResponse): Unit ={ log.warn("开始关闭......") ssc.stop(true,true)//优雅的关闭 response.setContentType("text/html; charset=utf-8"); response.setStatus(HttpServletResponse.SC_OK); val out = response.getWriter(); out.println("close success"); baseRequest.setHandled(true); log.warn("关闭成功.....") } }然后在来看下另一种方式扫描HDFS文件的方式:
/*** * 通过一个消息文件来定时触发是否需要关闭流程序 * @param ssc StreamingContext */ def stopByMarkFile(ssc:StreamingContext):Unit= { val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在 var isStop = false val hdfs_file_path = "/spark/streaming/stop" //判断消息文件是否存在,如果存在就 while (!isStop) { isStop = ssc.awaitTerminationOrTimeout(intervalMills) if (!isStop && isExistsMarkFile(hdfs_file_path)) { log.warn("2秒后开始关闭sparstreaming程序.....") Thread.sleep(2000) ssc.stop(true, true) } } } /*** * 判断是否存在mark file * @param hdfs_file_path mark文件的路径 * @return */ def isExistsMarkFile(hdfs_file_path:String):Boolean={ val conf = new Configuration() val path=new Path(hdfs_file_path) val fs =path.getFileSystem(conf); fs.exists(path) }
上面是两种方式的核心代码,最后提下触发停止流程序:
第一种需要在启动服务的机器上,执行下面封装的脚本:
## tx.log是提交spark任务后的输出log重定向的log ## &> tx.log & #!/bin/bash driver=`cat tx.log | grep ApplicationMaster | grep -Po '\d+.\d+.\d+.\d+'` echo $driver curl http://$driver:port/close/ echo "stop finish"
第二种方式,找到一个拥有HDFS客户端机器,向HDFS上写入指定的文件:
#生成文件后,10秒后程序就会自动停止 hadoop fs -touch /spark/streaming/stop #下次启动前,需要清空这个文件,否则程序启动后就会停止 hadoop fs -rm -r /spark/streaming/stop
所有代码,已经同步更新到我的github上,有兴趣的朋友可以参考这个链接:
https://github.com/qindongliang/streaming-offset-to-zk
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。
发表评论
-
Scala里面的排序函数的使用
2018-01-09 20:20 2650排序方法在实际的应用场景中非常常见,Scala里面有三种排序 ... -
在Scala里面如何使用元组
2018-01-08 22:05 922元组在Scala语言中是一 ... -
Spark如何读取一些大数据集到本地机器上
2018-01-04 21:07 1689最近在使用spark处理分 ... -
使用Spark SQL的临时表解决一个小问题
2017-12-28 18:27 2459最近在使用spark处理一个业务场景时,遇到一个小问题,我在 ... -
Spark任务两个小问题笔记
2017-12-26 19:52 1658今天在用spark处理数据 ... -
Spark中foreachPartition和mapPartitions的区别
2017-12-25 21:19 3332spark的运算操作有两种类型:分别是Transformat ... -
kafka版本不一致导致的一个小问题(二)
2017-12-04 21:37 8578背景介绍: 我们公司的实时流项目现在用的spark stre ... -
谈谈如何优雅的关闭正在运行中的Spark Streaming的流程序
2017-11-30 19:20 2308前面的文章,已经简 ... -
如何管理Spark Streaming消费Kafka的偏移量(三)
2017-11-28 23:41 5224前面的文章已经介绍了在spark streaming集成kaf ... -
理解Spark的运行机制
2017-11-23 21:52 1255Spark生态系统目前已经非常成熟了,有很多类型的任务都可以使 ... -
如何管理Spark Streaming消费Kafka的偏移量(二)
2017-11-16 19:30 4743上篇文章,讨论了在spar ... -
如何管理Spark Streaming消费Kafka的偏移量(一)
2017-11-14 20:42 4098最近工作有点忙,所以更新文章频率低了点,在这里给大家说声抱 ... -
在scala中使用spark sql解决特定需求(2)
2017-07-21 16:00 2292接着上篇文章,本篇来 ... -
在scala中使用spark sql解决特定需求
2017-07-20 19:53 1040spark sql一个强大之处就 ... -
Spark如何在一个SparkContext中提交多个任务
2017-07-04 19:09 6776在使用spark处理数据的时候,大多数都是提交一个job执行, ... -
如何使用scala+spark读写hbase?
2017-06-12 19:48 3456最近工作有点忙,所以文章更新频率低了点,希望大家可以谅解,好 ... -
使用ES-Hadoop插件结合spark向es插入数据
2017-05-05 17:19 5070上篇文章简单介绍了E ... -
spark sql 快速体验调试
2017-04-13 16:40 1068spark sql提供了更快的查询性能,如何能够更快的体验,开 ... -
spark on yarn 如何集成elasticsearch
2017-04-11 16:16 1566随着spark越来越流行,我们的很多组件都有可能和spark集 ... -
如何使用Spark的local模式远程读取Hadoop集群数据
2017-03-31 11:49 3019我们在windows开发机上使用spark的local模式读取 ...
相关推荐
一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、kafka偏移量管理,web后台管理,web api启动、停止spark streaming,宕机告警、自动重启等等功能支持,用户只需要关心业务代码,无需关注繁琐的...
Apache Spark Streaming是Apache Spark用于处理实时流数据的一个组件。它允许用户使用Spark的高度抽象概念处理实时数据流,并且可以轻松地与存储解决方案、批处理数据和机器学习算法集成。Spark Streaming提供了一种...
Spark Streaming预研报告覆盖了Apache Spark Streaming的主要方面,包括其简介、架构、编程模型以及性能调优。以下是基于文档提供内容的详细知识点: 1. Spark Streaming简介与渊源 Spark Streaming是Spark生态中...
kafka+Spark Streaming开发文档 本文档主要讲解了使用Kafka和Spark Streaming进行实时数据处理的开发文档,涵盖了Kafka集群的搭建、Spark Streaming的配置和开发等内容。 一、Kafka集群搭建 首先,需要安装Kafka...
sparkStreaming消费数据不丢失,sparkStreaming消费数据不丢失
常见的性能瓶颈包括straggler(处理较慢的节点)和bottleneck(系统瓶颈),对于这些情况,Spark Streaming提供了一些优化手段,如通过预计算模型(pre-computed model)来减少重复计算,并且可以通过并行化操作和...
在大数据处理领域,Flume 和 Spark Streaming 是两个重要的工具,它们分别用于数据收集与实时流处理。本压缩包中的 jar 包是为了解决 Flume 与 Spark Streaming 的集成问题,确保数据能够从 Flume 无缝流转到 Spark ...
这些论文可能会详细讨论如何配置和优化Spark Streaming以处理大数据量的实时输入。 在自然语言处理(NLP)方面,论文可能涉及如何利用Spark Streaming与Apache Spark的MLlib库结合,进行大规模文本分析。例如,它们...
Spark Streaming 入门案例 Spark Streaming 是一种构建在 Spark 上的实时计算框架,用来处理大规模流式数据。它将从数据源(如 Kafka、Flume、Twitter、ZeroMQ、HDFS 和 TCP 套接字)获得的连续数据流,离散化成一...
(1)利用SparkStreaming从文件目录读入日志信息,日志内容包含: ”日志级别、函数名、日志内容“ 三个字段,字段之间以空格拆分。请看数据源的文件。 (2)对读入都日志信息流进行指定筛选出日志级别为error或warn...
流处理系统如Apache Spark Streaming和Apache Storm,都致力于提供高吞吐量、低延迟的数据处理能力。尽管它们的目的是类似的,但各自的设计哲学、运行模型、容错机制等方面存在着显著差异。以下将详细介绍Spark ...
### Spark Streaming概述 #### 一、Spark Streaming定义与特点 **Spark Streaming** 是Apache Spark生态中的一个重要组件,它主要用于处理实时数据流。该模块构建在基础Spark API之上,旨在实现可扩展、高吞吐量...
Spark Streaming 是 Apache Spark 的一个模块,它允许开发者处理实时数据流。这个强大的工具提供了一种弹性、容错性好且易于编程的模型,用于构建实时流处理应用。在这个"Spark Streaming 示例"中,我们将深入探讨...
**Spark Streaming:实时大数据处理** Spark Streaming是Apache Spark框架的一部分,专为实时数据处理而设计。...结合其容错性和性能优化,Spark Streaming在各种实时场景中都展现出强大的应用潜力。
Spark Streaming 是Apache Spark中的一个重要组件,专门设计用来处理实时数据流的计算框架。作为Spark核心API的一个扩展,它延续了Spark的易用性和高效性,能够将实时数据流处理与批量数据处理无缝集成在一起。利用...
1.理解Spark Streaming的工作流程。 2.理解Spark Streaming的工作原理。 3.学会使用Spark Streaming处理流式数据。 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 三、实验内容 (一)...
Spark Streaming是中国大数据技术领域中广泛使用的实时数据处理框架,它基于Apache Spark的核心设计,提供了对持续数据流的微批处理能力。本项目实战旨在帮助读者深入理解和应用Spark Streaming,通过实际操作来掌握...
根据提供的文件信息,本文将详细解析“Spark Streaming与Kafka的整合”这一主题,并结合代码片段探讨其在实际场景中的应用。尽管标签中提到“数学建模”,但从标题和描述来看,这部分内容与数学建模无关,因此我们将...
1.Spark Streaming整合Flume需要的安装包. 2. Spark Streaming拉取Flume数据的flume配置文件.conf 3. Flume向Spark Streaming推数据的flume配置文件.conf
### Spark Streaming 基础概念及应用 #### Spark Streaming 概述 Spark Streaming 是 Apache Spark 生态系统中的一个重要模块,它提供了对实时流数据进行高效处理的能力。与传统的批处理不同,Spark Streaming ...