引言
接上一篇《一种基于kafka+storm实现的日志记录方法(二)》,讲述了java服务端向kafka怎么推送日志消息代码实现。本篇继续整理通过storm怎么消费kafka日志消息,并将日志内容存储到hbase。
主要过程分为三部分:
1、storm从kafka消费日志消息。
2、对日志消息按照系统标识进行过滤分组。
3、把不同的分组的日志消息,批量写入hbase。
初识Storm
在讲述这个三个过程之前,先简单了解下storm:storm是一个分布式的实时计算框架,相对其他实时计算框架,具备高容错、低延时等特点,实际上storm可以做到近实时。相对于spark steaming,storm更应该叫做流式计算框架,storm处理的最小单位是每次传入的一条消息(或者说数据),而spark steaming处理的是某个时间段内的一批消息(比如 1秒、3秒,5秒),在低延时方面storm表现更优异。
但spark steaming 也有自己的优势,spark steaming可以更好的融入hadoop体系,直接以hdfs作为数据源,并结合spark ML算法包进行计算,最后把计算结果以文件的方式存储到hdfs。
在本场景中,是以kafka为数据源,最终把日志消息存储到hbase。选择spark steaming和storm都可以,只是我们最终选择了storm。下面来看下storm的相关术语,以及它们之间的关系:
Topology:提交到storm集群里执行的一段程序,实际是一个开发好的jar包,包括一个spout,多个bolt,消息数据以tuple的形式在spout、bolt之间传递。
Spout:是Topology程序入口,可以从不同的“数据源”获取消息,以tuple为单位发射到一个或者多个bolt中。Storm支持从多种不同的“数据源”获取消息,比如:kafka、hbase、hive、redis、mq、mysql等。本场景中的“数据源”为kafka,对应的Spout为KafkaSpout。
Bolt:Bolt的输入为tuple(消息),并可以过一定的计算后,生成一个新的tuple,发射到下一个bolt(调用emit方法),或者把消息存储到数据库(比如 hbase、redis等)。每个消息处理成功后,一定要记得调用ack方法--告诉数据源该条消息已经处理完成。
Tuple:消息对象,也可以称为一条消息。Storm中处理的最小单位。
上述内容其实就是我们开发的一段java程序(包含一个main方法),最终会编译打包为一个jar吧。我们需要把这个jar包上传到storm集群,指定执行这个main方法即可。
Nimbus、Supervisor:分别类似于hadoop的name节点和data节点,Nimbus负责分发任务,Supervisor负责接收任务,并执行任务,Nimbus只有1个,Supervisor有多个。上述上传的jar包会首先被上传到Nimbus,然后被分发到各个Supervisor节点,Supervisor真正执行任务。每个Supervisor相当于一台机器,对应多个worker(多个jvm),每个worker里有多个Executor(线程),每个线程只会运行一个task实例(spout或者bolt)。
Zookeeper、ZeroMQ、Netty:Nimbus和Supervisor之间的不会直接交互,而是通过Zookeeper来协调并维持心跳信息,Zookeeper是storm实现分布式的核心组成。在Storm 0.8之前数据传递是通过ZeroMQ实现,即上述流程中spout发射消息到bolt,bolt发射消息到下一个bolt,这些都是通过ZeroMQ。从 storm 0.9开始改用netty实现,(这也是我个人比较喜欢netty的原因)。
关于storm就总结这么多吧,更多信息可以浏览storm官网:http://storm.apache.org/ 在Documentation菜单下可以了解各个版本相关信息。下面进入正题,讲述文章开头记录日志的“三个阶段”核心代码实现。
1、storm从kafka消费日志消息
这个过程其实就是创建spout的过程,我们在提交Topology的main方法中实现,代码如下:
/** * Created by gantianxing on 2017/7/29. */ public class LogTopology { public static void main(String[] args) throws Exception { //step1 配置zk String zks = "localhost:2181"; BrokerHosts brokerHosts = new ZkHosts(zks); //step2 配置kafka spout String topic = "self_log"; //指定kafka topic String zkRoot = "/storm/log"; // default zookeeper root configuration for storm String id = "LogTopology"; SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());//指定消息格式为String //step3 创建Topology,绑定spout、bolt。 int spoutNum = 5;//并行度 建议小于等于topic分区数 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), spoutNum); builder.setBolt("log-save", new LogMsgBolt(), 2).shuffleGrouping("kafka-reader");//shuffleGrouping随机发射 //step4 提交Topology Config conf = new Config(); String name = LogTopology.class.getSimpleName(); config.setNumWorkers(10);//指定worker个数 StormSubmitter.submitTopologyWithProgressBar(name, config, builder.createTopology()); } }
创建Topology大致分为4步:
Step1:配置zookeeper,文章前部分已经讲过storm的分布式是基于zookeeper实现。
Step2:配置kafka spout,通过new KafkaSpout(spoutConf), spoutNum)创建KafkaSpout实例。
Step3:创建Topology,主要是绑定spout,以及多个bolt,实现数据在spout、bolt之间传递。
Step4:提交Topology作业,可以指定一些运行参数,比如通过config.setNumWorkers(10);//指定需要多少个worker执行这个Topology作业(worker个数对应jvm个数)。
2、对日志消息按照系统标识进行过滤分组。
这步主要是通过Bolt实现,主要过程为:解析日志并把日志内容放入一个map中;再通过strom自带的定时器功能,每隔2分钟把map中的日志内容推送到hbase,并清空hbase。代码实现逻辑如下(删除部分公司业务代码):
public class LogMsgBolt extends BaseRichBolt{ private Logger LOG = LoggerFactory.getLogger(LogMsgBolt.class); private OutputCollector collector; private Map<String,String> dataInfo = new HashMap<>(); public void prepare(Map map, TopologyContext context, OutputCollector collector){ this.collector = collector; } public void execute(Tuple input){ //定时向hbase 批量写日志 if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)) { //批量执行hbase put方法 HBaseUtil.bacthPut(dataInfo); dataInfo.clear();//清空map } else //解析日志消息 放到map { String line = input.getString(0); //转化为json格式 JSONObject json = JSONObject.toJson(line); //省略代码:数据格式检查、数据转化为三部分 key、type、loginfo三部分 String key = "xxxx";//业务key String type = "xxxx";//系统id 每个系统对应不同的日志表 String loginfo = "xxxxx";//日志内容 //构造hbase rowkey; String logTime="xxxx";//日志时间 String rowKey = key+logTime+ UUID.randomUUID();//hbase rowkey //把日志内容放到map dataInfo.put("xxx","xxx"); // 确认:tuple成功处理 collector.ack(input); } } /** * 局部定时任务 * @return */ @Override public Map<String, Object> getComponentConfiguration() { HashMap<String, Object> hashMap = new HashMap<String, Object>(); hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);//每隔两分钟 写一次hbase return hashMap; } }
3、把不同的分组的日志消息,批量写入hbase。
第三步比较简单,写一个批量put到hbase的方法即可:
public class HBaseUtil { private static final Logger logger = LoggerFactory.getLogger(HBaseUtil.class); private static Configuration conf; private static Connection conn; static { try { if (conf == null) { conf = HBaseConfiguration.create(); // conf.set("hbase.zookeeper.property.clientPort", ConfigUtil.getInstance().getConfigVal("zkport", ConstantProperties.COMMON_PROP)); conf.set("hbase.zookeeper.quorum", ConfigUtil.getInstance().getConfigVal("zkhost", ConstantProperties.COMMON_PROP)); conf.set("zookeeper.znode.parent", "/hbase"); } } catch (Exception e) { logger.error("HBase Configuration Initialization failure !"); throw new RuntimeException(e) ; } } /** * 获得链接 * @return */ public static synchronized Connection getConnection() { try { if(conn == null || conn.isClosed()){ conn = ConnectionFactory.createConnection(conf); } // System.out.println("---------- " + conn.hashCode()); } catch (IOException e) { logger.error("HBase 建立链接失败 ", e); } return conn; } /** * 异步往指定表添加数据 */ public void long put(String tablename, Map logInfo) throws Exception { List<SocPut> puts = xxxx;//省略把map转换为一个List业务代码 Connection conn = getConnection(); final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { @Override public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) { for (int i = 0; i < e.getNumExceptions(); i++) { System.out.println("Failed to sent put " + e.getRow(i) + "."); logger.error("Failed to sent put " + e.getRow(i) + "."); } } }; BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename)) .listener(listener); params.writeBufferSize(5 * 1024 * 1024); final BufferedMutator mutator = conn.getBufferedMutator(params); try { mutator.mutate(puts); mutator.flush(); } finally { mutator.close(); closeConnect(conn); } } }
最后,通过拼装rowkey到指定bhase日志表提取日志即可。
至此整个通过java+kafka+strom+hbase,上报流水日志(敏感日志)流程讲解完毕。
相关推荐
Apache Kafka是一种高吞吐量的分布式发布订阅消息系统,设计目标是处理流式数据。它允许应用程序实时消费数据,同时提供强大的存储能力,保证数据的可靠性。Kafka与Flume结合,可以实现高效的数据流转和分发。 ...
在集成Log4j与Flume的过程中,Avro Source是一种常用的方式。Avro Source允许远程客户端通过网络接口发送数据到Flume Agent。在这个场景中,Flume版本为1.5.2。 **步骤一:配置Flume** 首先需要对Flume的配置文件`...
**CoapStormServer** 这个压缩包内的文件可能是服务器端的实现,可能基于CoAP(Constrained Application Protocol)协议,这是一种针对资源受限设备的轻量级M2M通信协议,常用于物联网应用。CoapStormServer可能作为...
Apache Kafka是一种分布式的、基于发布/订阅模式的消息系统,它能够处理大量的实时数据流。Kafka因其高性能、高吞吐量以及低延迟等特点,在大数据领域有着广泛的应用。Kafka主要应用于构建实时数据管道和流式应用。 ...
总结,这个配置实现了 ZooKeeper、Kafka、Storm 和 InfluxDB 的集群部署,利用 Docker Compose 提供了一种简化部署和管理的方式。通过这种方式,可以快速地搭建和扩展大数据处理环境,同时保持各个组件之间的通信和...
Kafka提供了三种不同的消息交付保证: - At-least-once:至少一次,可能会重复。 - At-most-once:最多一次,可能会丢失。 - Exactly-once:恰好一次,是最理想的,但实现起来复杂。 1.3 Kafka性能测试 通过Kafka...
【标题】"storm集成kafka插demo.zip"指的是一个演示如何将Apache Storm与Apache Kafka集成的实例项目。这个压缩包包含了一个示例,用于展示如何在Storm拓扑中消费和处理Kafka的消息。 【描述】"storm集成kafka插件...
总之,扩展Logback将日志输出到Kafka是一种常见的日志管理实践,它结合了Logback的灵活性和Kafka的高性能特性,为大数据环境下的日志处理提供了有力支持。通过理解和实现上述技术点,开发者可以构建出高效、可靠的...
总的来说,"storm-kafka实时趋势分析"是一种强大的实时数据处理解决方案,它结合了Storm的实时计算能力和Kafka的消息中间件特性,可以广泛应用于各种实时数据分析场景,如电商的实时销量分析、社交媒体的情绪分析等...
在本文中,作者提出了一种名为RL-DSCA(Real-time Log density algorithm)的算法,它结合了经典的数据流聚类框架Clustream和基于密度的聚类算法DBSCAN,实现了多粒度的日志数据流聚类。 Clustream算法由Aggarwal等...
在系统设计方面,本文提出了一种基于Storm的实时数据处理平台架构,该架构主要由分布式集群服务器、Web服务器、客户端三个部分组成。分布式集群服务器负责实时数据的采集和处理,Web服务器则负责与客户端通信,提供...
7. 提交日志:作为分布式系统的一种实现,Kafka用于记录分布式系统的变动数据。 **Kafka设计原理** Kafka基于ZooKeeper,一个用于维护配置信息、提供分布式协调服务的开源框架。Kafka集群通过ZooKeeper进行管理,...
综上所述,本文提出的基于Storm的实时推荐系统,通过结合Kafka的流数据处理能力和Storm的实时计算能力,实现了对用户行为数据的高效处理和实时分析。此外,通过改进的协同过滤算法和矩阵分解技术,进一步提高了推荐...
Kafka,由LinkedIn开源,现在是Apache软件基金会的顶级项目,是一种分布式流处理平台。它作为消息中间件,提供了高吞吐量的消息生产和消费能力,同时支持数据持久化,使得消息能够被多次消费,非常适合大数据实时...
Kafka 是 LinkedIn 公司开发的一种分布式消息队列系统,支持离线和在线日志处理。它可以实时处理大量数据,满足各种需求场景,如基于 Hadoop 的批处理系统、低延迟的实时系统、Storm/Spark 流式处理引擎、Web/nginx ...
在本文中,作者提出了一种基于Storm的实时报警服务设计方案。首先,Scribe会持续收集大量日志数据,然后将这些数据传递给Kafka进行临时存储。接着,Storm作为实时处理引擎,从Kafka中消费数据,进行分析过滤,匹配...
它以一种高吞吐量、低延迟的方式处理数据,适用于离线和在线的消息消费场景。Kafka最初由LinkedIn公司开发,后来捐赠给了Apache基金会。 #### 二、Kafka的特点 1. **高吞吐量**:Kafka被设计为支持高吞吐量的数据...
【Kafka】是一种分布式发布-订阅消息系统,由Apache开发,设计目的是为了处理大规模的数据流。Kafka将消息持久化到磁盘,并在集群中进行复制,以确保高可用性和容错性。它与ZooKeeper协同工作,提供了一个可靠且高...
描述中提到,该项目是针对"Storm日志数据处理应用",这意味着它可能涉及对日志数据的清洗、解析、分析等操作。日志数据通常包含应用程序运行时的信息,如错误、警告、性能指标等,通过处理这些数据,可以获取关于...