实时流计算的场景归纳起来多半是:
业务系统根据实时的操作,不断生成事件(消息/调用),然后引起一系列的处理分析,这个过程是分散在多台计算机上并行完成的,看上去就像事件连续不断的流经多个计算节点处理,形成一个实时流计算系统。
市场上流计算产品有很多,主要是通过消息中枢结合工人模式实现,大致过程如下:
1、开发者实现好流程输入输出节点逻辑,上传job到任务生产者
2、任务生产者将任务发送到zookeeper,然后监控任务状态
3、任务消费者从zookeeper上获取任务
4、任务消费者启动多个工人进程,每个进程又启动多个线程执行任务
5、工人之间通过zeroMQ交互
我们看看如何做一个简单的流计算系统,做法跟上面有些不同:
1、首先不过多依赖zookeerper,任务的分配最好直接给到工人,并能直接监控工人完成状态,这样效率会更高。
2、工人之间直接通讯,不依赖zeroMQ转发。
3、并行管理扁平化,多进程下再分多线程意义不大,增加管理成本,实际上一台机器8个进程,每个进程再开8个线程,总体跟8-10个进程或者线程的效果差不多(数量视机器性能不同)。
4、做成一个流计算系统,而不是平台。
这里我们借助fourinone提供的api和框架去实现,第一次使用可以参考
分布式计算上手demo指南,开发包下载地址
http://code.google.com/p/fourinone/
大致思路:用工头去做任务生产和分配,用工人去做任务执行,为了达到流的效果,需要在工人里面调用工头的方式,将多个工人节点串起来,形成一个计算拓扑图。
下面程序演示了连续多个消息先发到一个工人节点A处理,然后再发到两个工人节点B并行处理的流计算过程,并且获取到最后处理结果打印输出(如果不需要获取结果可以直接返回)。
StreamCtorA:工头A实现,它获取到线上工人A,然后将消息发给它处理,并轮循等待结果。工头A的main函数模拟了多个消息的连续调用。
StreamWorkerA:工人A实现,它接收到工头A的消息进行处理,然后创建一个工头B,通过工头B将结果同时发给两个工人B处理,然后将结果返回工头A。
StreamCtorB:工头B实现,它获取到线上两个工人B,调用doTaskBatch等待两个工人处理完成,然后返回结果给工人A。
StreamWorkerB:工人B实现,它接收到任务消息后模拟处理后返回结果。
运行步骤(在本地模拟):
1、启动ParkServerDemo(它的IP端口已经在配置文件指定)
java -cp fourinone.jar; ParkServerDemo
2、启动工人A
java -cp fourinone.jar; StreamWorkerA localhost 2008
3、启动两个工人B
java -cp fourinone.jar; StreamWorkerB localhost 2009
java -cp fourinone.jar; StreamWorkerB localhost 2010
4、启动工头A
java -cp fourinone.jar; StreamCtorA
多机部署说明:StreamCtorA可以单独部署一台机器,StreamWorkerA和StreamCtorB部署一台机器,两个StreamWorkerB可以部署两台机器。
总结:计算平台和计算系统的区别
如果我们只有几台机器,但是每天有人开发不同的流处理应用要在这几台机器上运行,我们需要一个计算平台来管理好job,让开发者按照规范配置好流程和运行时节点申请,打包成job上传,然后平台根据每个job配置动态分配资源依次执行每个job内容。
如果我们的几台机器只为一个流处理业务服务,比如实时营销,我们需要一个流计算系统,按照业务流程部署好计算节点即可,不需要运行多个job和动态分配资源,按照计算平台的方式做只会增加复杂性,开发者也不清楚每台机器上到底运行了什么逻辑。
如果你想实现一个计算平台,可以参考动态部署和进程管理功能(开发包内有指南)
//完整源码
// ParkServerDemo
import com.fourinone.BeanContext;
public class ParkServerDemo
{
public static void main(String[] args)
{
BeanContext.startPark();
}
}
//StreamCtorA
import com.fourinone.Contractor;
import com.fourinone.WareHouse;
import com.fourinone.WorkerLocal;
import java.util.ArrayList;
public class StreamCtorA extends Contractor
{
public WareHouse giveTask(WareHouse inhouse)
{
WorkerLocal[] wks = getWaitingWorkers("StreamWorkerA");
System.out.println("wks.length:"+wks.length);
WareHouse result = wks[0].doTask(inhouse);
while(true){
if(result.getStatus()!=WareHouse.NOTREADY)
{
break;
}
}
return result;
}
public static void main(String[] args)
{
StreamCtorA sc = new StreamCtorA();
for(int i=0;i<10;i++){
WareHouse msg = new WareHouse();
msg.put("msg","hello"+i);
WareHouse wh = sc.giveTask(msg);
System.out.println(wh);
}
sc.exit();
}
}
//StreamWorkerA
import com.fourinone.MigrantWorker;
import com.fourinone.WareHouse;
public class StreamWorkerA extends MigrantWorker
{
public WareHouse doTask(WareHouse inhouse)
{
System.out.println(inhouse);
//do something
StreamCtorB sc = new StreamCtorB();
WareHouse msg = new WareHouse();
msg.put("msg",inhouse.getString("msg")+",from StreamWorkerA");
WareHouse wh = sc.giveTask(msg);
sc.exit();
return wh;
}
public static void main(String[] args)
{
StreamWorkerA wd = new StreamWorkerA();
wd.waitWorking(args[0],Integer.parseInt(args[1]),"StreamWorkerA");
}
}
//StreamCtorB
import com.fourinone.Contractor;
import com.fourinone.WareHouse;
import com.fourinone.WorkerLocal;
import java.util.ArrayList;
public class StreamCtorB extends Contractor
{
public WareHouse giveTask(WareHouse inhouse)
{
WorkerLocal[] wks = getWaitingWorkers("StreamWorkerB");
System.out.println("wks.length:"+wks.length);
WareHouse[] hmarr = doTaskBatch(wks, inhouse);
WareHouse result = new WareHouse();
result.put("B1",hmarr[0]);
result.put("B2",hmarr[1]);
return result;
}
}
//StreamWorkerB
import com.fourinone.MigrantWorker;
import com.fourinone.WareHouse;
public class StreamWorkerB extends MigrantWorker
{
public WareHouse doTask(WareHouse inhouse)
{
System.out.println(inhouse);
//do something
inhouse.put("msg",inhouse.getString("msg")+",from StreamWorkerB");
return inhouse;
}
public static void main(String[] args)
{
StreamWorkerB wd = new StreamWorkerB();
wd.waitWorking(args[0],Integer.parseInt(args[1]),"StreamWorkerB");
}
}
分享到:
相关推荐
"物联网大数据处理中实时流计算系统的实践" 以下是从给定文件信息中生成的相关知识点: 1. Spark的大数据混合计算模型: Spark为大数据实时计算工作提供了一个优良的数据储存计算引擎,其在实际数据应用过程中,可...
在设计流式实时分布式计算系统时,还需要考虑到数据的低延迟和高可靠性,以及系统对海量数据流的吞吐能力。系统设计者需要在保证系统稳定性和性能的同时,为系统引入容错机制、负载均衡、状态管理等高级特性。此外,...
总的来说,天罡是一个强大的实时流计算框架,它结合了高性能、低延迟、易用性和可扩展性的优点,为开发者提供了一站式的实时数据处理解决方案。通过深入学习和实践,我们可以充分利用天罡来构建满足各种实时业务需求...
计算平台事业部 付空在2018云栖大会·上海峰会中做了题为《基于流计算构建实时大数据处理系统》的分享,就为什么要用流计算、为什么要用阿里云流计算、如何用流计算等方面的内容做了深入的分析。
本文中,作者构建了一个基于 Storm 的铁道调度监控流计算实时处理系统,通过 Metamorphosis 分布式消息系统接收数据源,再利用 HBase 分布式数据库存储处理后的监控信息,从而实现数据的高效并行处理。 实验结果...
在图计算中,数据以图形结构表示,每个节点代表一个实体,边表示实体之间的关系。算法如PageRank、最短路径算法等,用于评估节点的重要性或寻找路径。与流计算相比,图计算更侧重于发现数据间的关系,而非实时处理...
一个基于Flink-SQL的实时流计算Web平台通常包括以下几个组成部分: 1. 数据接入层:负责收集和导入实时数据,可采用多种数据源接入方式。 2. 数据处理层:利用Flink SQL进行实时流处理,实现数据清洗、转换和分析。 ...
"基于Spark流计算框架的银行实时存贷款规模系统设计与实现" 本文主要介绍了基于Spark流计算框架设计和实现的银行实时存贷款规模系统,该系统能够实时地计算和展示存贷款规模变化情况,满足银行对资产负债管理的需求...
本项目“基于Flink的实时流计算Web平台”便是对这一技术的实践应用,旨在为计算机专业学生提供一个课程设计或毕业设计的参考实例。 一、Apache Flink核心概念与特点 1. 流处理与批处理一体化:Flink支持同时处理...
这一资源不仅为本科课程设计、毕业设计提供了丰富的实践内容,同时也为深度学习算法学习者提供了一个深入了解车牌识别技术的平台。 此系统通过捕获实时视频流,运用OpenCV的图像处理和计算机视觉技术,对视频中的...
Storm设计的核心目标是提供一个简单易用的API,使得开发者能够可靠地处理源源不断的流数据,实现实时计算。其支持的语言主要是Clojure和Java,但对于非JVM语言,可以通过stdin/stdout与Storm交互,利用JSON格式协议...
王峰作为阿里巴巴集团的高级技术专家,在2013中国大数据技术大会上介绍了阿里搜索事业部在实时流计算技术方面的实践,其中重点讲解了一淘全网商品搜索系统架构以及iStream计算模型的构建和应用。iStream计算模型是由...
流计算架构设计是指流计算系统的设计和实现,包括流计算引擎、数据来源、数据处理、数据存储等方面的设计。流计算架构设计需要考虑到系统的可扩展性、可靠性、安全性等方面的要求,确保流计算系统的稳定运行和高效性...
实时计算(流计算)技术存在一些关键挑战,例如数据处理速度、数据规模、数据质量、系统可靠性等问题。为了解决这些挑战,需要选择合适的实时计算(流计算)技术和架构,设计合适的数据处理流程,确保数据的安全和...
在这个项目中,我们看到一个基于SpringBoot、Drools规则引擎、Flink流计算以及MongoDB数据库构建的系统。接下来,我们将详细探讨这些技术在金融风控中的应用及其重要性。 **SpringBoot** 是一个由Spring框架衍生出...
实时流媒体监控系统是现代科技发展中的一个重要领域,它结合了计算机视觉、网络通信和多媒体处理等多种技术。本文将深入探讨该系统的设计与实现,主要关注利用framebuff、OpenCV以及socket技术在C++环境中的应用。 ...
标题“PG多维存储、SQL流计算”涉及的是PostgreSQL数据库系统中的两个高级特性:多维存储和SQL流计算。PostgreSQL是一个开放源代码的对象关系型数据库系统,以其稳定性、扩展性著称。在这篇文章中,我们将讨论...