`
dalan_123
  • 浏览: 87409 次
  • 性别: Icon_minigender_1
  • 来自: 郑州
社区版块
存档分类
最新评论

JStorm之Supervisor简介

 
阅读更多

一、简介
Supervisor是JStorm中的工作节点,类似于MR的TT,subscribe zookeeper的任务调度结果数据,根据任务调度情况启动/停止工作进程Worker。同时Supervisor需要定期向zookeeper写入活跃端口信息以便Nimbus监控。Supervisor不执行具体处理工作,所有的计算任务都交Worker完成。从整个架构上看,Supervisor处在整个JStorm三级管理架构的中间环节,辅助管理任务调度和资源管理工作。
二、架构

1.Supervisor


 

Supervisor单节点架构如上图所示,初始化时启动进程Supervisor,根据Nimbus分配的任务情况触发启动/停用Worker JVM进程,其中每个Worker进程启动一个或多个Task线程,其中Task须同属单个Topology。从整个Supervisor节点来看运行多个JVM进程,包括一个Supervisor进程和一个或多个Worker进程。
不同角色状态通过不同的方式维护。其中Task通过hb直接将包括时间信息和当前Task的统计信息写到zookeeper;Worker定期将包括Topology id,端口,Task id集合及当前时间写入本地;Supervisor定期将包括时间及节点资源(端口集合)写到zookeeper,同时从zookeeper读取任务调度结果,根据结果启动/停用Worker进程。
2.Worker


 

在Worker JVM进程内部,除了相互独立的Task线程外,Task线程会共享数据收发和节点之间连接管理等Worker进程内的公共资源,如图所示。其中:
VirtualPort:数据接收线程;
KeyoTupleSerialize:Tuple数据序列化;
TransferQueue:数据发送管道;
DrainerRunnable:数据发送线程;
RefreshConnections:节点之间连接管理线程。
三、实现与代码剖析

1.Supervisor

在jstorm-0.7.1中,Supervisor daemon实现在jstorm-server/src/main/java目录下com.alipay.dw.jstorm.daemon.supervisor包里。Supervisor.java是Supervisor daemon的入口,Supervisor进程主要做以下几件事情。
初始化

1、清理本地临时目录下数据$jstorm-local-dir/supervisor/tmp;
2、创建zk操作实例;
3、本地新建状态文件,$jstorm-local-dir/supervisor/localstate;
4、生成supervisor-id并写入localstate,其中key=”supervisor-id”;如果supervisor重启,先检查supervisor-id是否已经存在,若存在直接读取即可;
5、初始化并启动Heartbeat线程;
6、初始化并启动SyncProcessEvent线程;
7、初始化并启动SyncProcessEvent线程;
8、注册主进程退出数据清理Hook in SupervisorManger。
@SuppressWarnings("rawtypes")public SupervisorManger mkSupervisor(Map conf, MQContext sharedContext)throwsException{
    LOG.info("Starting Supervisor with conf "+ conf);
    active =new AtomicBoolean(true);/*
     * Step 1: cleanup all files in /storm-local-dir/supervisor/tmp
     */String path = StormConfig.supervisorTmpDir(conf);
    FileUtils.cleanDirectory(newFile(path));/*
     * Step 2: create ZK operation instance
     * StromClusterState
     */
    StormClusterState stormClusterState = Cluster
            .mk_storm_cluster_state(conf);/*
     * Step 3, create LocalStat
     * LocalStat is one KV database
     * 4.1 create LocalState instance
     * 4.2 get supervisorId, if no supervisorId, create one
     */
    LocalState localState = StormConfig.supervisorState(conf);String supervisorId =(String) localState.get(Common.LS_ID);if(supervisorId ==null){
        supervisorId = UUID.randomUUID().toString();
        localState.put(Common.LS_ID, supervisorId);}Vector threads =newVector();// Step 5 create HeartBeat// every supervisor.heartbeat.frequency.secs, write SupervisorInfo to ZKString myHostName = NetWorkUtils.hostname();int startTimeStamp = TimeUtils.current_time_secs();
    Heartbeat hb =new Heartbeat(conf, stormClusterState, supervisorId,
            myHostName, startTimeStamp, active);
    hb.update();
    AsyncLoopThread heartbeat =new AsyncLoopThread(hb, false, null,
            Thread.MIN_PRIORITY, true);
    threads.add(heartbeat);// Step 6 create and start sync Supervisor thread// every supervisor.monitor.frequency.secs second run SyncSupervisor
    EventManager processEventManager =new EventManagerImp(false);
    ConcurrentHashMap workerThreadPids =new ConcurrentHashMap();//读取$jstorm-local-dir/supervior/localstate中key=local-assignments的value值,根据该值执行workers的kill/start
    SyncProcessEvent syncProcessEvent =new SyncProcessEvent(supervisorId,
            conf, localState, workerThreadPids, sharedContext);
    EventManager syncSupEventManager =new EventManagerImp(false);//通过比较$zkroot/assignments/{topologyid}全量数据和本地STORM-LOCAL-DIR/supervisor/stormdist/{topologyid}://1.从nimbus下载有任务分配到本节点的topology的jar和配置数据//2.从本地删除已经失效的topology的jar和配置数据
    SyncSupervisorEvent syncSupervisorEvent =new SyncSupervisorEvent(
            supervisorId, conf, processEventManager, syncSupEventManager,
            stormClusterState, localState, syncProcessEvent);int syncFrequence =(Integer) conf
            .get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS);
    EventManagerPusher syncSupervisorPusher =new EventManagerPusher(
            syncSupEventManager, syncSupervisorEvent, active, syncFrequence);
    AsyncLoopThread syncSupervisorThread =new AsyncLoopThread(
            syncSupervisorPusher);
    threads.add(syncSupervisorThread);
    LOG.info("Starting supervisor with id "+ supervisorId +" at host "+ myHostName);// SupervisorManger which can shutdown all supervisor and workersreturnnew SupervisorManger(conf, supervisorId, active, threads,
            syncSupEventManager, processEventManager, stormClusterState,
            workerThreadPids);}
Heartbeat线程

1、默认间隔60s向zookeeper汇报supervisor信息,汇报内容打包成SupervisorInfo,包括hostname,workerports,current time和during time等信息;
@SuppressWarnings("unchecked")publicvoid update(){
    SupervisorInfo sInfo =new SupervisorInfo(
            TimeUtils.current_time_secs(), myHostName,
            (List) conf.get(Config.SUPERVISOR_SLOTS_PORTS),
            (int)(TimeUtils.current_time_secs()- startTime));try{
        stormClusterState.supervisor_heartbeat(supervisorId, sInfo);}catch(Exception e){
        LOG.error("Failed to update SupervisorInfo to ZK", e);

    }}
SyncProcessEvent线程

1、定期从本地文件$jstorm-local-dir/supervisor/localstate中读取key=”local-assignments”数据;该数据会由SyncSupervisorEvent线程定期写入;
2、读取本地$jstorm-local-dir /worker/ids/heartbeat中Worker状态数据;
3、对比local-assignments及worker的状态数据,执行操作start/kill worker进程;其中Worker和Supervisor属于不同JVM进程,Supervisor通过Shell命令启动Worker:
nohup java –server
-Djava.library.path="$JAVA.LIBRARY.PATH"
-Dlogfile.name="$topologyid-worker-$port.log"
-Dlog4j.configuration=jstorm.log4j.properties
-Djstorm.home="$JSTORM_HOME"
-cp $JAVA_CLASSSPATH:$JSTORM_CLASSPATH
com.alipay.dw.jstorm.daemon.worker.Worker
topologyid supervisorid port workerid
SyncProcessEvent线程执行流程如下:

@SuppressWarnings("unchecked")
@Override
publicvoid run(){
    LOG.debug("Syncing processes");try{/**
         * Step 1: get assigned tasks from localstat Map
         *///1.从本地文件$jstorm-local-dir/supervisor/localstate里读取key=“local-assignments”数据Map localAssignments =null;try{
            localAssignments =(Map) localState
                    .get(Common.LS_LOCAL_ASSIGNMENTS);}catch(IOException e){
            LOG.error("Failed to get LOCAL_ASSIGNMENTS from LocalState", e);throw e;}if(localAssignments ==null){
            localAssignments =newHashMap();}
        LOG.debug("Assigned tasks: "+ localAssignments);/**
         * Step 2: get local WorkerStats from local_dir/worker/ids/heartbeat
         * Map
         *///2.根据localAssignments与workers的hb比对结果得到workers的状态Map localWorkerStats =null;try{
            localWorkerStats = getLocalWorkerStats(conf, localState,
                    localAssignments);}catch(IOException e){
            LOG.error("Failed to get Local worker stats");throw e;}
        LOG.debug("Allocated: "+ localWorkerStats);/**
         * Step 3: kill Invalid Workers and remove killed worker from
         * localWorkerStats
         *///3.根据workers的状态值启动/停用相关workerSet keepPorts = killUselessWorkers(localWorkerStats);// start new workers
        startNewWorkers(keepPorts, localAssignments);}catch(Exception e){
        LOG.error("Failed Sync Process", e);// throw e}}
SyncSupervisorEvent线程

1、从$zk-root/assignments/{topologyid}下载所有任务调度结果,并筛选出分配到当前supervisor的任务集合,验证单个端口仅分配了单个Topology的任务通过后,将上述任务集合写入本地文件$jstorm-local-dir/supervisor/localstate,以便SyncProcessEvent读取及后续操作;
2、对比任务分配结果与已经存在的Topology,从Nimbus下载新分配过来的Topology,同时删除过期Topology。
SyncSupervisorEvent线程执行流程如下:

@Override
publicvoid run(){
    LOG.debug("Synchronizing supervisor");try{
        RunnableCallback syncCallback =new EventManagerZkPusher(this,
                syncSupEventManager);/**
         * Step 1: get all assignments
         * and register /ZK-dir/assignment and every assignment watch
         *
         *///1.从zk获取分配完成的任务集assignments:(topologyid -> Assignment)//$zkroot/assignments/{topologyid}Map assignments = Cluster.get_all_assignment(
                stormClusterState, syncCallback);
        LOG.debug("Get all assignments "+ assignments);/**
         * Step 2: get topologyIds list from
         * STORM-LOCAL-DIR/supervisor/stormdist/
         *///2.本地已经下载的topology集合$jstorm-local-dir/supervisor/stormdist/{topologyid}List downloadedTopologyIds = StormConfig
                .get_supervisor_toplogy_list(conf);
        LOG.debug("Downloaded storm ids: "+ downloadedTopologyIds);/**
         * Step 3: get  from ZK local node's
         * assignment
         *///3.从assignments里筛选出分配到当前supervisor的任务集合Map localAssignment = getLocalAssign(
                stormClusterState, supervisorId, assignments);/**
         * Step 4: writer local assignment to LocalState
         *///4.将步骤3得到的结果写本地文件$jstorm-local-dir/supervisor/localstatetry{
            LOG.debug("Writing local assignment "+ localAssignment);
            localState.put(Common.LS_LOCAL_ASSIGNMENTS, localAssignment);}catch(IOException e){
            LOG.error("put LS_LOCAL_ASSIGNMENTS "+ localAssignment
                    +" of localState failed");throw e;}// Step 5: download code from ZK//5.下载新分配任务的TopologyMap topologyCodes = getTopologyCodeLocations(assignments);
        downloadTopology(topologyCodes, downloadedTopologyIds);/**
         * Step 6: remove any downloaded useless topology
         */6.删除过期任务的Topology
        removeUselessTopology(topologyCodes, downloadedTopologyIds);/**
         * Step 7: push syncProcesses Event
         */
        processEventManager.add(syncProcesses);}catch(Exception e){
        LOG.error("Failed to Sync Supervisor", e);// throw new RuntimeException(e);}}
2.Worker

在jstorm-0.7.1里,Worker daemon实现在jstorm-server/src/main/java目录下com.alipay.dw.jstorm.daemon.worker包。其中Worker.java是Worker daemon的入口。Worker进程的生命周期:
1、初始化Tuple序列化功能和数据发送管道;
2、创建分配到当前Worker的Tasks;
3、初始化并启动接收Tuple dispatcher;
4、初始化并启动用于维护Worker间连接线程RefreshConnections,包括创建/维护/销毁节点之间的连接等功能;
5、初始化并启动心跳线程WorkerHeartbeatRunable,更新本地目录:$jstorm_local_dir/worker/{workerid}/heartbeats/{workerid};
6、初始化并启动发送Tuple线程DrainerRunable;
7、注册主线程退出现场数据清理Hook。
Worker Daemon初始化流程如下:

public WorkerShutdown execute()throwsException{//1. Tuple序列化+发送管道LinkedBlockingQueue
    WorkerTransfer workerTransfer = getSendingTransfer();// shutdown task callbacks//2. 初始化task线程List shutdowntasks = createTasks(workerTransfer);
    workerData.setShutdownTasks(shutdowntasks);//3. WorkerVirtualPort:tuple接收dispatcher// create virtual port object// when worker receives tupls, dispatch targetTask according to task_id// conf, supervisorId, topologyId, port, mqContext, taskids
    WorkerVirtualPort virtual_port =new WorkerVirtualPort(workerData);
    Shutdownable virtual_port_shutdown = virtual_port.launch();//3. RefreshConnections:维护节点间的连接:创建新连接|维护已建立连接|销毁无用连接// refresh connection
    RefreshConnections refreshConn = makeRefreshConnections();
    AsyncLoopThread refreshconn =new AsyncLoopThread(refreshConn);// refresh ZK active status
    RefreshActive refreshZkActive =new RefreshActive(workerData);
    AsyncLoopThread refreshzk =new AsyncLoopThread(refreshZkActive);//4. WorkerHeartbeatRunable:心跳线程// 每次心跳更新本地目录数据 $LOCAL_PATH/workers/{worker-id}/Heartbeats/{worker-id}// refresh hearbeat to Local dir
    RunnableCallback heartbeat_fn =new WorkerHeartbeatRunable(workerData);
    AsyncLoopThread hb =new AsyncLoopThread(heartbeat_fn, false, null,
            Thread.NORM_PRIORITY, true);//5. DrainerRunable:发送tuple线程// transferQueue, nodeportSocket, taskNodeport
    DrainerRunable drainer =new DrainerRunable(workerData);
    AsyncLoopThread dr =new AsyncLoopThread(drainer, false, null,
            Thread.MAX_PRIORITY, true);
    AsyncLoopThread[] threads ={ refreshconn, refreshzk, hb, dr };//6. 注册主线程退出数据清理hookreturnnew WorkerShutdown(workerData, shutdowntasks,
            virtual_port_shutdown, threads);}
3.Task

根据任务在Topology中不同节点角色,Task相应也会分成SpoutTask和BoltTask,二者除Task心跳及公共数据初始化等相同以外,各自有独立处理逻辑。核心实现在SpoutExecutors.java/BoltExecutors.java。
SpoutExecutors主要做两件事情:
1、作为DAG起点,负责发送原始Tuple数据;
2、如果Topology定义了Acker,SpoutExecutors会启动接收ack线程,根据接收到的ack决定是否重发Tuple;
BoltExecutor相比SpoutExecutor功能会稍微复杂:
1、接收从上游发送过来的Tuple,并根据Topology中定义的处理逻辑进行处理;
2、如果该Bolt存在下游,需要向下游发送新生成的Tuple;
3、如果Topology中定义了Acker,Bolt需要将经过简单计算的ack返回给根Spout。

 

  • 大小: 16.8 KB
  • 大小: 16.1 KB
分享到:
评论

相关推荐

    jstorm集成kafka插件demo

    对于Kafka,你需要设置Zookeeper地址,对于JStorm,你需要配置nimbus和supervisor节点。 2. **创建Kafka Topic**:在Kafka中,数据是以Topic的形式存在,我们需要先创建一个Topic,用于JStorm从中读取数据。 3. **...

    大数据技术分享 JStorm介绍 JStorm-分布式实时计算引擎 共40页.pptx

    4. **数据流稳定性**:JStorm改进了任务重平衡机制,避免了添加或关闭supervisor时触发不必要的rebalance,增强了任务执行的稳定性。 5. **资源隔离**:JStorm在任务调度时考虑了CPU、Memory、Disk和Net四个维度,...

    jstorm example

    **JStorm简介** JStorm是阿里巴巴开源的一款分布式实时计算系统,它是基于Apache Storm的一个高性能、高可用、热扩展的实时处理框架。JStorm的核心设计理念是简单、高效和稳定,能够处理大规模的数据流处理任务,...

    jstorm 阿里巴巴官方文档 pdf

    系统由Supervisor、Nimbus、Zookeeper、Worker、Task等组件构成,其中Nimbus负责任务调度,Supervisor管理节点上的工作进程,Zookeeper提供分布式协调,Worker运行实际的任务,而Task则是计算的最小单元。...

    jstorm-2.2.1

    《JStorm 2.2.1:分布式流处理框架详解》 JStorm是阿里巴巴开源的一款高性能、高可靠的分布式实时计算系统,它基于Apache Storm并针对大规模数据处理进行了优化。JStorm 2.2.1是该框架的一个稳定版本,提供了许多...

    jstorm课程

    2. **JStorm简介**:JStorm是Storm的Java实现,继承了Storm的特性,同时优化了与Java生态系统的集成,使得Java开发者更方便地进行实时计算应用开发。 3. **拓扑结构**:Storm中的核心概念是拓扑结构,由spout(数据...

    alibaba-jstorm

    阿里巴巴JStorm是一款由阿里巴巴开源的分布式实时计算系统,它基于Apache Storm,但在性能、稳定性、易用性等多个方面进行了优化,是大数据处理领域的重要工具之一。JStorm的设计理念是“简单、稳定、低延迟”,旨在...

    jstorm文档

    - 启动JStorm集群,包括nimbus、supervisor节点。 - 使用JStorm UI监控集群状态。 **4. 创建并提交Topology** 编写Topology代码,创建Spout和Bolt,定义Stream Grouping,然后使用`LocalCluster`进行本地测试,...

    JStorm 资源隔离测试(cgroup)1

    在JStorm的资源隔离测试中,`supervisor.enable.cgroup` 配置项被设置为 `true`,这表明系统将启用cgroups来限制和监控各个worker(工作进程)的资源使用。测试主要关注非root权限下supervisor的启动、cgroup的挂载...

    JStorm 调度测试1

    【JStorm 调度测试】是针对JStorm框架的一项重要测试内容,目的是确保在分布式环境中,任务(task)能够高效、均衡地被调度到不同的工作节点(worker)和监督节点(supervisor)上,从而优化拓扑性能。JStorm是一个...

    JStorm介绍

    JStorm介绍ppt,JStorm技术已经经历过双十一的考验,和大家分享一下

    Jstorm集群安装文档

    bin/storm start supervisor ``` - 查看JStorm的运行状态。 **3.11 启动Tomcat** - 在每台Tomcat服务器上启动Tomcat服务。 #### 三、总结 本文档详细介绍了如何在多台虚拟机上搭建JStorm集群的过程,包括软件...

    JStorm introduce

    阿里巴巴之所以选择开发并维护JStorm,主要基于以下几个原因: 1. **开源社区发展缓慢**:Apache Storm的社区发展速度无法满足阿里巴巴日益增长的业务需求。 2. **提升运维效率**:通过自定义开发,阿里巴巴可以...

    Storm流计算项目实战 JStorm介绍文档 共43页.pptx

    ### JStorm流计算项目实战简介 #### 一、现状与背景 随着大数据技术的发展,实时数据处理成为企业和组织关注的重点之一。Apache Storm作为一款开源的分布式实时计算系统,在这一领域有着广泛的应用。然而,随着...

    JStorm:JStorm原始码学习-源码包

    JStorm原始码学习:主要包含Storm重新启动,Nimbus启动,Supervisor启动,Executor创建和启动 风暴编程模型 Nimbus:负责资源分配和任务调度。 主管:负责接受nimbus分配的任务,启动和停止属于自己管理的worker...

    Monitor_JStorm

    JStorm的监控功能是其强大之处之一,它提供了全面、实时的拓扑运行状态监控,包括任务执行状态、拓扑性能、错误信息等。监控系统通过Web UI展示数据,使得运维人员可以直观地了解每个组件的工作情况,及时发现并解决...

    jstrom-2.1.0.zip

    JStorm的核心架构基于Apache Storm,采用了主从结构,由Supervisor、Nimbus、Zookeeper和Worker等组件构成。Nimbus作为中心调度器,负责任务分配和监控,而Supervisor则在各个节点上管理Worker进程,执行实际的计算...

    Storm 源码分析

    #### 一、Storm简介与应用场景 Apache Storm是一款开源的分布式实时计算系统,它提供了简单而强大的API来定义数据流处理逻辑,同时保证了消息处理的容错性和高性能。Storm的设计目标是成为实时计算领域的Hadoop,...

    Storm开发入门.pptx

    JStorm 阿里巴巴 高并发的计算任务 数据流之间相互无依赖 Nimbus: 资源调度角色 Supervisor: 接受nimubs 任务安排,启动任务 Worker: 进程 Executor: 执行线程 Task: 执行逻辑单元

    大数据分析架构师顶级培训课程 Storm基础理论与案例 共57页.pptx

    在中国,阿里巴巴基于Storm的核心部分使用Java重写,推出了JStorm,优化了性能。 - **特点**: - 开源且免费使用; - 支持多种编程语言; - 高性能、低延迟; - 易于扩展; - 容错能力强; - 消息传递可靠。 ...

Global site tag (gtag.js) - Google Analytics