眼下大数据领域最热门的词汇之一便是流计算了,其中最耀眼的项目无疑是来自Spark社区的Spark Streaming项目,其从一诞生就受到广泛关注并迅速发展,目前已有追赶并超越Storm的架势。
对于流计算而言,毫无疑问最核心的特点是它的低时延能力,这主要是来自对数据不落磁盘就进行计算的内部机制,但这也带来了数据可靠性的问题,即有节点失效或者网络异常时,如何在节点间进行合适的协商来进行重传。更进一步的,若发生计划外的数据重传,怎么能保证没有产生重复的数据,所有数据都是精确一次的(Exact Once)?如果不解决这些问题,大数据的流计算将无法满足大多数企业级可靠性要求而流于徒有虚名。
本文将重点分析Spark Streaming是如何设计可靠性机制并实现数据一致性的。
Driver HA
由于流计算系统是长期运行、数据不断流入的,因此其Spark守护进程(Driver)的可靠性是至关重要的,它决定了Streaming程序能否一直正确地运行下去。
图一 Driver数据持久化
Driver实现HA的解决方案就是将元数据持久化,以便重启后的状态恢复。如图一所示,Driver持久化的元数据包括:
- Block元数据(图一中的绿色箭头):Receiver从网络上接收到的数据,组装成Block后产生的Block元数据;
- Checkpoint数据(图一中的橙色箭头):包括配置项、DStream操作、未完成的Batch状态、和生成的RDD数据等;
图二 Driver故障恢复
Driver失败重启后:
- 恢复计算(图二中的橙色箭头):使用Checkpoint数据重启driver,重新构造上下文并重启接收器。
- 恢复元数据块(图二中的绿色箭头):恢复Block元数据。
- 恢复未完成的作业(图二中的红色箭头):使用恢复出来的元数据,再次产生RDD和对应的job,然后提交到Spark集群执行。
通过如上的数据备份和恢复机制,Driver实现了故障后重启、依然能恢复Streaming任务而不丢失数据,因此提供了系统级的数据高可靠。
可靠的上下游IO系统
流计算主要通过网络socket通信来实现与外部IO系统的数据交互。由于网络通信的不可靠特点,发送端与接收端需要通过一定的协议来保证数据包的接收确认、和失败重发机制。
不是所有的IO系统都支持重发,这至少需要实现数据流的持久化,同时还要实现高吞吐和低时延。在Spark Streaming官方支持的data source里面,能同时满足这些要求的只有Kafka,因此在最近的Spark Streaming release里面,也是把Kafka当成推荐的外部数据系统。
除了把Kafka当成输入数据源(inbound data source)之外,通常也将其作为输出数据源(outbound data source)。所有的实时系统都通过Kafka这个MQ来做数据的订阅和分发,从而实现流数据生产者和消费者的解耦。
一个典型的企业大数据中心数据流向视图如下所示:
图三 企业大数据中心数据流向视图
除了从源头保证数据可重发之外,Kafka更是流数据Exact Once语义的重要保障。Kafka提供了一套低级API,使得client可以访问topic数据流的同时也能访问其元数据。Spark Streaming的每个接收任务可以从指定的Kafka topic、partition和offset去获取数据流,各个任务的数据边界很清晰,任务失败后可以重新去接收这部分数据而不会产生“重叠的”数据,因而保证了流数据“有且仅处理一次”。
可靠的接收器
在Spark 1.3版本之前,Spark Streaming是通过启动专用的Receiver任务来完成从Kafka集群的数据流拉取。
Receiver任务启动后,会使用Kafka的高级API来创建topicMessageStreams对象,并逐条读取数据流缓存,每个batchInerval时刻到来时由JobGenerator提交生成一个spark计算任务。
由于Receiver任务存在宕机风险,因此Spark提供了一个高级的可靠接收器-ReliableKafkaReceiver类型来实现可靠的数据收取,它利用了Spark 1.2提供的WAL(Write Ahead Log)功能,把接收到的每一批数据持久化到磁盘后,更新topic-partition的offset信息,再去接收下一批Kafka数据。万一Receiver失败,重启后还能从WAL里面恢复出已接收的数据,从而避免了Receiver节点宕机造成的数据丢失(以下代码删除了细枝末节的逻辑):
class ReliableKafkaReceiver{ private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null override def onStart(): Unit = { // Initialize the topic-partition / offset hash map. topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long] // Initialize the block generator for storing Kafka message. blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, conf) messageHandlerThreadPool = Utils.newDaemonFixedThreadPool( topics.values.sum, "KafkaMessageHandler") blockGenerator.start() val topicMessageStreams = consumerConnector.createMessageStreams( topics, keyDecoder, valueDecoder) topicMessageStreams.values.foreach { streams => streams.foreach { stream => messageHandlerThreadPool.submit(new MessageHandler(stream)) } } }
启用WAL后虽然Receiver的数据可靠性风险降低了,但却由于磁盘持久化带来的开销,系统整体吞吐率会有明显的下降。因此,在最新发布的Spark 1.3版本里,Spark Streaming增加了使用Direct API的方式来实现Kafka数据源的访问。
引入了Direct API后,Spark Streaming不再启动常驻的Receiver接收任务,而是直接分配给每个Batch及RDD最新的topic partition offset。job启动运行后Executor使用Kafka的simple consumer API去获取那一段offset的数据。
这样做的好处不仅避免了Receiver宕机带来的数据可靠性风险,同时也由于避免使用ZooKeeper做offset跟踪,而实现了数据的精确一次性(以下代码删除了细枝末节的逻辑):
class DirectKafkaInputDStream{ protected val kc = new KafkaCluster(kafkaParams) protected var currentOffsets = fromOffsets override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) val rdd = KafkaRDD[K, V, U, T, R]( context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd) }
预写日志 Write Ahead Log
Spark 1.2开始提供了预写日志能力,用于Receiver数据及Driver元数据的持久化和故障恢复。WAL之所以能提供持久化能力,是因为它利用了可靠的HDFS做数据存储。
Spark Streaming预写日志机制的核心API包括:
- 管理WAL文件的WriteAheadLogManager
- 读/写WAL的WriteAheadLogWriter和WriteAheadLogReader
- 基于WAL的RDD:WriteAheadLogBackedBlockRDD
- 基于WAL的Partition:WriteAheadLogBackedBlockRDDPartition
以上核心API在数据接收和恢复阶段的交互示意图如图四所示。
图四 基于WAL的数据接收和恢复示意图
从WriteAheadLogWriter的源码里可以清楚地看到,每次写入一块数据buffer到HDFS后都会调用flush方法去强制刷入磁盘,然后才去取下一块数据。因此receiver接收的数据是可以保证持久化到磁盘了,因而做到了较好的数据可靠性。
private[streaming] class WriteAheadLogWriter{ private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf) def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized { data.rewind() // Rewind to ensure all data in the buffer is retrieved val lengthToWrite = data.remaining() val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite) stream.writeInt(lengthToWrite) if (data.hasArray) { stream.write(data.array()) } else { while (data.hasRemaining) { val array = new Array[Byte](data.remaining) data.get(array) stream.write(array) } } flush() nextOffset = stream.getPos() segment }
结束语
得益于Kafka这类可靠的data source、以及自身的checkpoint/WAL等机制,Spark Streaming的数据可靠性得到了很好的保证,数据能保证“至少一次”(at least once)被处理。但由于其outbound端的一致性实现还未完善,因此Exact once语义仍然不能端到端保证。Spark Streaming社区已经在跟进这个特性的实现(SPARK-4122),预计很快将合入trunk发布。
作者简介:叶琪,华为软件公司Universe产品部高级架构师,专注于大数据底层分布式存储和计算基础设施,是华为软件公司Hadoop发行版的主要架构师,目前兴趣点在流计算与Spark。
相关推荐
摘要:眼下大数据领域最热门的词汇之一便是流计算了,而其中最耀眼的无疑是来自Spark社区的SparkStreaming项目。对于流计算而言,最核心的特点毫无疑问就是它对低时的需求,但这也带来了相关的数据可靠性问题。由于...
- **容错和数据保证**:Spark Streaming 提供了更强大的状态管理和数据一致性保证。在处理故障恢复时,Spark Streaming 可以确保每条记录被准确处理一次,而 Storm 则只能保证每条记录至少被处理一次。 #### 2. ...
需要注意的是,为了保证数据的一致性和可靠性,可以设置适当的检查点策略。 在电商系统中,这样的架构可以支持大规模的图片数据处理,例如用于商品推荐、用户行为分析或者反欺诈检测。例如,通过分析用户浏览和上传...
Flume的灵活性和可配置性使其能够适应多种日志源,确保数据的完整性和一致性。 4. **后端开发与Qt**: 虽然这个项目主要关注的是日志处理系统,但标签中提到了"后端 qt"。这可能意味着项目中包含了一些后端界面...
Spark Streaming 是 Apache Spark 的一个模块,它允许实时处理连续的数据流。Kafka 是一个流行的分布式消息系统,常用于数据管道和流处理。Spark Streaming 与 Kafka 的集成是大数据实时处理中的常见应用场景,它能...
3. **元数据管理**:Delta Lake的元数据存储在Apache ZooKeeper中,确保了元数据的一致性和高可用性,方便管理和查询。 4. **数据验证**:Delta Lake提供了强大的数据校验功能,如检查约束,确保数据质量。 Apache...
HBase的强一致性和高并发读写能力,保证了数据的准确性和系统性能。 5. Java编程语言:在整个系统的设计和实现过程中,Java被选为开发语言,因为它拥有丰富的库和工具,尤其在大数据处理领域。Java的跨平台性也使得...
它允许Spark应用在与Kafka交互时使用SASL(Simple Authentication and Security Layer)进行安全身份验证,增强了系统的安全性和可靠性。 4. **配置与集成步骤** 在Spark Streaming应用中集成Kafka,你需要在Spark...
- Delta Lake引入了事务日志机制,使得它能够支持ACID(原子性、一致性、隔离性、持久性)事务,从而保证数据操作的安全性和一致性。 - Delta架构结合了Apache Spark Structured Streaming和Delta Lake的优势,通过...
Delta Lake的出现解决了数据湖中常见的不一致性和数据丢失问题,确保了数据的ACID(原子性、一致性、隔离性和持久性)特性。2019年,Delta Lake加入了Linux基金会,这标志着其社区支持和稳定性得到了进一步加强。 ...
《Apress.Pro.Spark.Streaming.The.Zen.of.Real-Time.A》这本书主要聚焦于Apache Spark Streaming这一实时数据处理框架,深入探讨了如何利用Spark Streaming构建高效、可靠的实时数据处理系统。Spark Streaming是...
- **Delta Lake**:Databricks提供的数据湖解决方案,提供ACID事务、版本控制和高可用性,确保数据的可靠性和一致性。 - **Workspaces**:团队协作平台,允许团队成员共享代码、文档和数据,提高工作效率。 通过...
- 容错性:在分布式环境中,Structured Streaming设计为能够容忍系统故障,确保数据处理的连续性和一致性。 3. **工作负载复杂性**: - 结合流处理与交互式查询:用户可以同时进行实时分析和交互式查询,满足不同...
2. 数据复制:能够实时复制数据,提供数据的一致性和可靠性。 3. 分析和报表:能够实时地进行数据分析和报表生成,提供了数据的实时insight。 挑战 Elastic Streaming 的挑战主要包括: 1. Spark Streaming 的微...
1. Spark Streaming的可靠性:他提到了提高Spark Streaming可靠性的研究工作,这对于处理实时数据流至关重要。例如,通过改进Shuffle机制和调度器,以及引入YARN客户模式等方式,以确保数据的快速准确传递和任务的...
其核心特性包括强一致性、水平扩展性和高可用性,使得HBase成为处理PB级别数据的理想选择。在企业级数据处理平台上,HBase常用于存储结构化和半结构化的海量数据,例如日志、用户行为数据等。 Spark,则是Apache...
4. **数据重传**:如果某个节点出现故障导致数据丢失,Spark Streaming可以通过数据重传来恢复丢失的数据块,确保数据处理的一致性和完整性。 5. **Exactly Once语义**:Spark Streaming通过上述机制实现了Exactly ...
2. 可靠的流式处理(Reliable Streaming Processing):可靠的流式处理是指在流式处理过程中保证数据的一致性和可靠性的技术。它通常通过使用检查点、事务和 buffer 等机制来实现。 3. Apache Storm:Apache Storm ...
它能够以高吞吐量和低延迟的方式处理数据流,同时保持 Spark 的易用性和弹性。在这个项目中,Spark Streaming 作为数据处理的核心,负责从 Kafka 接收消息,进行处理,然后将处理结果发送到 HBase。 Apache Kafka ...
MySQL则是传统的关系型数据库,具有ACID(原子性、一致性、隔离性和持久性)特性,适合处理事务性工作负载。在数据仓库中,MySQL通常作为数据源,存储原始业务数据,确保数据的完整性和一致性。 Spark作为一个通用...