在hadoop中,存在对应的counter计数器用于记录hadoop map/reduce job任务执行过程中自定义的一些计数器,其中hadoop任务中已经内置了一些计数器,例如CPU时间,GC时间等。
Storm中也存在类似counter的功能,metrics,详细介绍可以参考下面的文档:
Storm exposes a metrics interface to report summary statistics across the full topology. It's used internally to track the numbers you see in the Nimbus UI console: counts of executes and acks; average process latency per bolt; worker heap usage; and so forth.
所有的Metric都需要实现IMetric接口,该接口中只有一个方法用来表示取出现有的counter值,并将其清零。
public interface IMetric { public Object getValueAndReset(); }
以提供的实例CountMetric, MultiCountMetric和ReduceMetric为例,类图方式展示如下:
在CountMetric中,只是记录了一个long值,在每次incr和incrBy时进行递增记录;
MultiCountMetric中,其中内置了Map<String, CountMetric>用来记录多个CountMetric,scope方法用来以key的方式圈定范围,在每次getValueAndSet时,都会直接清掉Map中所有的CountMetric;
ReducedMetric是比较特殊的一个,因为它不仅仅记录了一个维度,还可以以reduce的方式来计算一段时间来的平均值,比如Storm中提供的实现MeanReducer,在ReducedMetric中的实现,其中_accumlator作为累积数据(从字面意思上理解),类型为Object,可以用任何类型来表示:
public class ReducedMetric implements IMetric { private final IReducer _reducer; private Object _accumulator; public ReducedMetric(IReducer reducer) { _reducer = reducer; _accumulator = _reducer.init(); } public void update(Object value) { _accumulator = _reducer.reduce(_accumulator, value); } public Object getValueAndReset() { Object ret = _reducer.extractResult(_accumulator); _accumulator = _reducer.init(); return ret; } }
MeanReducer中,就记录两个维度 count和总和,通过这两个维度,我们可以轻易计算出一段时间内的平均值。
class MeanReducerState { public int count = 0; public double sum = 0.0; } public class MeanReducer implements IReducer<MeanReducerState> { public MeanReducerState init() { return new MeanReducerState(); } public MeanReducerState reduce(MeanReducerState acc, Object input) { acc.count++; if(input instanceof Double) { acc.sum += (Double)input; } else if(input instanceof Long) { acc.sum += ((Long)input).doubleValue(); } else if(input instanceof Integer) { acc.sum += ((Integer)input).doubleValue(); } else { throw new RuntimeException( "MeanReducer::reduce called with unsupported input type `" + input.getClass() + "`. Supported types are Double, Long, Integer."); } return acc; } public Object extractResult(MeanReducerState acc) { if(acc.count > 0) { return acc.sum / (double) acc.count; } else { return null; } } }
所有的Metrics都需要在Spout/Bolt初始化之前记录,对应Spout.open(), Bolt.prepare方法,注册时需要指定指标的名称,对应实例,以及间隔时间(以秒为单位)。
context.registerMetric("execute_count", countMetric, 5); context.registerMetric("word_count", wordCountMetric, 60); context.registerMetric("word_length", wordLengthMeanMetric, 60);
IMetricsConsumer
注册metrics后,只是在定时进行记录metrics,但metrics该如何显示,这就取决于IMetricsConsumer,在Config中可以手动进行注册自定义的metricsConsumer,也可以直接使用storm中提供的记录日志的LoggingMetricsConsumer,该consumer会以日志的形式记录统计指标,下面是对其介绍:
Listens for all metrics, dumps them to log To use, add this to your topology's configuration: ```java conf.registerMetricsConsumer(org.apache.storm.metrics.LoggingMetricsConsumer.class, 1); ``` Or edit the storm.yaml config file: ```yaml topology.metrics.consumer.register: - class: "org.apache.storm.metrics.LoggingMetricsConsumer" parallelism.hint: 1
这表示,在config中可以通过手动注册的方式将LoggingMetricsConsumer注册上去,第二个参数为并行度:
config.registerMetricsConsumer(LoggingMetricsConsumer.class, 2);
此时Config对象(类似HashMap)会将topology.metrics.consumer.register属性注册,记录其class, parallelism.hint并行度,以及argument参数。
在Config中注册后,通过内置的特殊Bolt:MetricConsumerBolt来执行handleDataPoints方法,其中handleDataPoints赋给的两个参数taskInfo, dataPoints如下所示,给定了source task的一些状态,以及传输过来的汇总数据:
在应用后,就可以在storm的日志目录下查看到metrics日志文件:
/usr/local/apache-storm-1.0.1/logs/workers-artifacts/FirstTopo-46-1468485056/6703 -rw-rw-r-- 1 java java 55K 7月 14 18:47 gc.log.0 -rw-rw-r-- 1 java java 28K 7月 14 18:47 worker.log -rw-rw-r-- 1 java java 0 7月 14 16:31 worker.log.err -rw-rw-r-- 1 java java 1.2M 7月 14 18:47 worker.log.metrics -rw-rw-r-- 1 java java 0 7月 14 16:31 worker.log.out -rw-rw-r-- 1 java java 5 7月 14 16:31 worker.pid -rw-rw-r-- 1 java java 120 7月 14 16:31 worker.yaml
在worker.log.metrics中就可以查看到所有metrics的相关信息,注意不仅仅包含我们自定义的bolt类型,一些system类型的信息也会在上面显示出来:
2016-07-14 16:31:40,700 31721 1468485098 192.168.1.127:6702 6:bolt execute_count 5 2016-07-14 16:31:45,702 36723 1468485103 192.168.1.127:6702 6:bolt execute_count 5 2016-07-14 16:31:50,702 41723 1468485108 192.168.1.127:6702 6:bolt execute_count 5 2016-07-14 16:32:10,705 61726 1468485128 192.168.1.127:6702 6:bolt execute_count 5 2016-07-14 16:32:15,708 66729 1468485133 192.168.1.127:6702 6:bolt execute_count 5 2016-07-14 16:32:25,699 76720 1468485143 192.168.1.127:6702 6:bolt __ack-count {spout:default=60} 2016-07-14 16:32:25,701 76722 1468485143 192.168.1.127:6702 6:bolt __sendqueue {sojourn_time_ms=0.0, write_pos=10, read_pos=10, arrival_rate_secs=0.10267994660642776, overflow=0, capacity=1024, population=0} 2016-07-14 16:32:25,701 76722 1468485143 192.168.1.127:6702 6:bolt word_count {happy=18, angry=19, excited=14} 2016-07-14 16:32:25,702 76723 1468485143 192.168.1.127:6702 6:bolt __receive {sojourn_time_ms=817.6666666666666, write_pos=62, read_pos=61, arrival_rate_secs=1.222992254382389, overflow=0, capacity=1024, population=1}
相关推荐
在运行Storm集群时,还需要理解关于资源隔离的概念,比如使用CGroup来限制资源使用,保证系统稳定性。Metrics收集则用于监控Storm集群的性能指标。Tick Tuple机制则是一种定时机制,允许用户在固定间隔内向Bolts发送...
这四个系统的比较测试,如Hibench,可以帮助理解它们在不同场景下的性能和适用性,以便于选择最适合特定业务需求的流处理技术。 综上所述,选择流式大数据系统需综合考虑性能、实时性、容错性、易用性和社区支持等...
通过这些分析,开发者可以深入理解Storm的工作原理,有助于进一步优化和定制系统以适应特定的业务需求。 需要注意的是,文档内容中提到的一些细节,如配置文件内容、Nimbus和Supervisor的启动细节等,在提供的片段...
- Storm 提供了 Metrics API,可以收集各种性能指标。了解如何设置和使用监控可以帮助优化拓扑性能。 - 当处理实时数据时,性能调优是一个持续的过程,包括资源分配、拓扑重构等。 7. **项目实践**: - 在 ...
1. **流式计算优化**:陈玉兆对流式计算系统有深入的理解,他熟悉Storm、SparkStreaming和Flink等主流流处理框架的内核。这意味着他具备设计和实施高效实时数据处理解决方案的能力。 2. **平台生态建设**:他参与...
1. 数据仪表盘:Facebook使用Tableau或内部开发的数据可视化工具,将实时分析结果以图表形式展示,帮助决策者迅速理解业务状况并做出决策。 2. 实时报告:通过自定义的报告系统,团队可以获取实时的业务指标,如...
阿里巴巴采用了各种数据源的监控探针,如日志、指标(Metrics)、调用链(Traces)等,以实时收集各类系统的运行信息。这些探针需要高效且低侵入地集成到各个服务中,确保数据的全面性和准确性。 其次,数据处理和...
在实际工作中,大数据架构师还应关注监控工具如Ambari Metrics、Zookeeper,确保系统的稳定运行;数据可视化工具如Superset,用于生成直观的报表和分析结果;以及NiFi Registry、Hue等工具,帮助进行数据管理和用户...
其中,实时消息处理通过Reliable Multicast实现,流式处理使用了Storm框架,而数据落地则存储到HBase和HDFS。 6. 日志收集与代理(Logging Agents): 客户端使用 Logging Agent 收集日志信息,并通过 Collector 层...
在分布式计算领域,JStorm是一个强大的实时计算系统,由阿里巴巴开源,基于Apache Storm,专为大数据实时处理而设计。它以其高可用性、高性能和低延迟著称,广泛应用于互联网行业的实时数据分析。本文将深入探讨...