`

一种基于kafka+storm实现的日志记录方法(三)

阅读更多

引言

 

接上一篇《一种基于kafka+storm实现的日志记录方法(二)》,讲述了java服务端向kafka怎么推送日志消息代码实现。本篇继续整理通过storm怎么消费kafka日志消息,并将日志内容存储到hbase。

 

主要过程分为三部分:

1、storm从kafka消费日志消息。

2、对日志消息按照系统标识进行过滤分组。

3、把不同的分组的日志消息,批量写入hbase。

 

初识Storm

 

在讲述这个三个过程之前,先简单了解下storm:storm是一个分布式的实时计算框架,相对其他实时计算框架,具备高容错、低延时等特点,实际上storm可以做到近实时。相对于spark steaming,storm更应该叫做流式计算框架,storm处理的最小单位是每次传入的一条消息(或者说数据),而spark steaming处理的是某个时间段内的一批消息(比如 1秒、3秒,5秒),在低延时方面storm表现更优异。

 

但spark steaming 也有自己的优势,spark steaming可以更好的融入hadoop体系,直接以hdfs作为数据源,并结合spark ML算法包进行计算,最后把计算结果以文件的方式存储到hdfs。

 

在本场景中,是以kafka为数据源,最终把日志消息存储到hbase。选择spark steaming和storm都可以,只是我们最终选择了storm。下面来看下storm的相关术语,以及它们之间的关系:

 

Topology提交到storm集群里执行的一段程序,实际是一个开发好的jar包,包括一个spout,多个bolt,消息数据以tuple的形式在spout、bolt之间传递。

 

Spout是Topology程序入口,可以从不同的“数据源”获取消息,以tuple为单位发射到一个或者多个bolt中。Storm支持从多种不同的“数据源”获取消息,比如:kafka、hbase、hive、redis、mq、mysql等。本场景中的“数据源”为kafka,对应的Spout为KafkaSpout。

 

Bolt:Bolt的输入为tuple(消息),并可以过一定的计算后,生成一个新的tuple,发射到下一个bolt(调用emit方法),或者把消息存储到数据库(比如 hbase、redis等)。每个消息处理成功后,一定要记得调用ack方法--告诉数据源该条消息已经处理完成。

 

Tuple消息对象,也可以称为一条消息。Storm中处理的最小单位。

 

上述内容其实就是我们开发的一段java程序(包含一个main方法),最终会编译打包为一个jar吧。我们需要把这个jar包上传到storm集群,指定执行这个main方法即可。

 

Nimbus、Supervisor分别类似于hadoop的name节点和data节点,Nimbus负责分发任务,Supervisor负责接收任务,并执行任务,Nimbus只有1个,Supervisor有多个。上述上传的jar包会首先被上传到Nimbus,然后被分发到各个Supervisor节点,Supervisor真正执行任务。每个Supervisor相当于一台机器,对应多个worker(多个jvm),每个worker里有多个Executor(线程),每个线程只会运行一个task实例(spout或者bolt)。

 

Zookeeper、ZeroMQ、Netty:Nimbus和Supervisor之间的不会直接交互,而是通过Zookeeper来协调并维持心跳信息,Zookeeper是storm实现分布式的核心组成。在Storm 0.8之前数据传递是通过ZeroMQ实现,即上述流程中spout发射消息到bolt,bolt发射消息到下一个bolt,这些都是通过ZeroMQ。从 storm 0.9开始改用netty实现,(这也是我个人比较喜欢netty的原因)。

 

关于storm就总结这么多吧,更多信息可以浏览storm官网:http://storm.apache.org/ 在Documentation菜单下可以了解各个版本相关信息。下面进入正题,讲述文章开头记录日志的“三个阶段”核心代码实现。

 

1、storm从kafka消费日志消息

 

这个过程其实就是创建spout的过程,我们在提交Topology的main方法中实现,代码如下:

/**
 * Created by gantianxing on 2017/7/29.
 */
public class LogTopology {
    public static void main(String[] args) throws Exception {
 
        //step1 配置zk
        String zks = "localhost:2181";
        BrokerHosts brokerHosts = new ZkHosts(zks);
 
 
        //step2 配置kafka spout
        String topic = "self_log"; //指定kafka topic
        String zkRoot = "/storm/log"; // default zookeeper root configuration for storm
        String id = "LogTopology";
        SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());//指定消息格式为String
 
        //step3 创建Topology,绑定spout、bolt。
        int spoutNum = 5;//并行度 建议小于等于topic分区数
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), spoutNum);
        builder.setBolt("log-save", new LogMsgBolt(), 2).shuffleGrouping("kafka-reader");//shuffleGrouping随机发射
 
        //step4 提交Topology
        Config conf = new Config();
        String name = LogTopology.class.getSimpleName();
        config.setNumWorkers(10);//指定worker个数
        StormSubmitter.submitTopologyWithProgressBar(name, config, builder.createTopology());
    }
}
 

 

创建Topology大致分为4步:

Step1:配置zookeeper,文章前部分已经讲过storm的分布式是基于zookeeper实现。

Step2:配置kafka spout,通过new KafkaSpout(spoutConf), spoutNum)创建KafkaSpout实例。

Step3:创建Topology,主要是绑定spout,以及多个bolt,实现数据在spout、bolt之间传递。

Step4:提交Topology作业,可以指定一些运行参数,比如通过config.setNumWorkers(10);//指定需要多少个worker执行这个Topology作业(worker个数对应jvm个数)。

 

2、对日志消息按照系统标识进行过滤分组。

 

这步主要是通过Bolt实现,主要过程为:解析日志并把日志内容放入一个map中;再通过strom自带的定时器功能,每隔2分钟把map中的日志内容推送到hbase,并清空hbase。代码实现逻辑如下(删除部分公司业务代码):

 

public class LogMsgBolt extends BaseRichBolt{
 
    private Logger LOG = LoggerFactory.getLogger(LogMsgBolt.class);
    private OutputCollector collector;
    private Map<String,String> dataInfo = new HashMap<>();
 
    public void prepare(Map map, TopologyContext context, OutputCollector collector){
        this.collector = collector;
    }
 
    public void execute(Tuple input){
 
        //定时向hbase 批量写日志
        if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)) {
 
            //批量执行hbase put方法
            HBaseUtil.bacthPut(dataInfo);
            dataInfo.clear();//清空map
        }
        else //解析日志消息 放到map
        {
            String line = input.getString(0);
 
            //转化为json格式
            JSONObject json = JSONObject.toJson(line);
 
            //省略代码:数据格式检查、数据转化为三部分 key、type、loginfo三部分
            String key = "xxxx";//业务key
            String type = "xxxx";//系统id 每个系统对应不同的日志表
            String loginfo = "xxxxx";//日志内容
 
            //构造hbase rowkey;
            String logTime="xxxx";//日志时间
            String rowKey = key+logTime+ UUID.randomUUID();//hbase rowkey
 
            //把日志内容放到map
            dataInfo.put("xxx","xxx");
 
            // 确认:tuple成功处理
            collector.ack(input);
        }
    }
 
    /**
     * 局部定时任务
     * @return
     */
    @Override
    public Map<String, Object> getComponentConfiguration() {
        HashMap<String, Object> hashMap = new HashMap<String, Object>();
        hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);//每隔两分钟 写一次hbase
        return hashMap;
    }
 
}

 

3、把不同的分组的日志消息,批量写入hbase。

 

第三步比较简单,写一个批量put到hbase的方法即可:

public class HBaseUtil {

    private static final Logger logger = LoggerFactory.getLogger(HBaseUtil.class);

    private static Configuration conf;
    private static Connection conn;

    static {
      try {
          if (conf == null) {
              conf = HBaseConfiguration.create();
//                conf.set("hbase.zookeeper.property.clientPort", ConfigUtil.getInstance().getConfigVal("zkport", ConstantProperties.COMMON_PROP));
                conf.set("hbase.zookeeper.quorum", ConfigUtil.getInstance().getConfigVal("zkhost", ConstantProperties.COMMON_PROP));
                conf.set("zookeeper.znode.parent", "/hbase");
          }
      } catch (Exception e) {
          logger.error("HBase Configuration Initialization failure !");
          throw new RuntimeException(e) ;
      }
  }

    /**
     * 获得链接
     * @return
     */
    public static synchronized Connection getConnection() {
        try {
            if(conn == null || conn.isClosed()){
                conn = ConnectionFactory.createConnection(conf);
            }
//         System.out.println("---------- " + conn.hashCode());
        } catch (IOException e) {
            logger.error("HBase 建立链接失败 ", e);
        }
        return conn;

    }

     /**
     * 异步往指定表添加数据
     */
    public void long put(String tablename, Map logInfo) throws Exception {
 
        List<SocPut> puts = xxxx;//省略把map转换为一个List业务代码       
        Connection conn = getConnection();
        final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
            @Override
            public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
                for (int i = 0; i < e.getNumExceptions(); i++) {
                    System.out.println("Failed to sent put " + e.getRow(i) + ".");
                    logger.error("Failed to sent put " + e.getRow(i) + ".");
                }
            }
        };
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename))
                .listener(listener);
        params.writeBufferSize(5 * 1024 * 1024);

        final BufferedMutator mutator = conn.getBufferedMutator(params);
        try {
            mutator.mutate(puts);
            mutator.flush();
        } finally {
            mutator.close();
            closeConnect(conn);
        }
    }

}

最后,通过拼装rowkey到指定bhase日志表提取日志即可。

 

至此整个通过java+kafka+strom+hbase,上报流水日志(敏感日志)流程讲解完毕。

0
0
分享到:
评论

相关推荐

    lamp安装配置及flume+Kafka+Storm+HDFS实时系统搭分享

    Apache Kafka是一种高吞吐量的分布式发布订阅消息系统,设计目标是处理流式数据。它允许应用程序实时消费数据,同时提供强大的存储能力,保证数据的可靠性。Kafka与Flume结合,可以实现高效的数据流转和分发。 ...

    log4j+flume+kafka+storm

    在集成Log4j与Flume的过程中,Avro Source是一种常用的方式。Avro Source允许远程客户端通过网络接口发送数据到Flume Agent。在这个场景中,Flume版本为1.5.2。 **步骤一:配置Flume** 首先需要对Flume的配置文件`...

    kafka+storm+influxdb的程序demo

    **CoapStormServer** 这个压缩包内的文件可能是服务器端的实现,可能基于CoAP(Constrained Application Protocol)协议,这是一种针对资源受限设备的轻量级M2M通信协议,常用于物联网应用。CoapStormServer可能作为...

    kafka跟storm收集日志解决方案

    Apache Kafka是一种分布式的、基于发布/订阅模式的消息系统,它能够处理大量的实时数据流。Kafka因其高性能、高吞吐量以及低延迟等特点,在大数据领域有着广泛的应用。Kafka主要应用于构建实时数据管道和流式应用。 ...

    docker-compose部署zk+kafka+storm集群的实现

    总结,这个配置实现了 ZooKeeper、Kafka、Storm 和 InfluxDB 的集群部署,利用 Docker Compose 提供了一种简化部署和管理的方式。通过这种方式,可以快速地搭建和扩展大数据处理环境,同时保持各个组件之间的通信和...

    kafka、storm、flink、apex、spark五种流式大数据系统调研报告

    Kafka提供了三种不同的消息交付保证: - At-least-once:至少一次,可能会重复。 - At-most-once:最多一次,可能会丢失。 - Exactly-once:恰好一次,是最理想的,但实现起来复杂。 1.3 Kafka性能测试 通过Kafka...

    storm集成kafka插demo.zip

    【标题】"storm集成kafka插demo.zip"指的是一个演示如何将Apache Storm与Apache Kafka集成的实例项目。这个压缩包包含了一个示例,用于展示如何在Storm拓扑中消费和处理Kafka的消息。 【描述】"storm集成kafka插件...

    扩展logback将日志输出到Kafka实例源码

    总之,扩展Logback将日志输出到Kafka是一种常见的日志管理实践,它结合了Logback的灵活性和Kafka的高性能特性,为大数据环境下的日志处理提供了有力支持。通过理解和实现上述技术点,开发者可以构建出高效、可靠的...

    storm-kafka实时趋势分析

    总的来说,"storm-kafka实时趋势分析"是一种强大的实时数据处理解决方案,它结合了Storm的实时计算能力和Kafka的消息中间件特性,可以广泛应用于各种实时数据分析场景,如电商的实时销量分析、社交媒体的情绪分析等...

    分布式实时日志密度数据流聚类算法及其基于Storm的实现.pdf

    在本文中,作者提出了一种名为RL-DSCA(Real-time Log density algorithm)的算法,它结合了经典的数据流聚类框架Clustream和基于密度的聚类算法DBSCAN,实现了多粒度的日志数据流聚类。 Clustream算法由Aggarwal等...

    基于Storm技术的实时数据处理平台研究与实现.pdf

    在系统设计方面,本文提出了一种基于Storm的实时数据处理平台架构,该架构主要由分布式集群服务器、Web服务器、客户端三个部分组成。分布式集群服务器负责实时数据的采集和处理,Web服务器则负责与客户端通信,提供...

    kafka入门必备手册

    7. 提交日志:作为分布式系统的一种实现,Kafka用于记录分布式系统的变动数据。 **Kafka设计原理** Kafka基于ZooKeeper,一个用于维护配置信息、提供分布式协调服务的开源框架。Kafka集群通过ZooKeeper进行管理,...

    基于storm的实时推荐系统论文

    综上所述,本文提出的基于Storm的实时推荐系统,通过结合Kafka的流数据处理能力和Storm的实时计算能力,实现了对用户行为数据的高效处理和实时分析。此外,通过改进的协同过滤算法和矩阵分解技术,进一步提高了推荐...

    jstorm集成kafka插件demo

    Kafka,由LinkedIn开源,现在是Apache软件基金会的顶级项目,是一种分布式流处理平台。它作为消息中间件,提供了高吞吐量的消息生产和消费能力,同时支持数据持久化,使得消息能够被多次消费,非常适合大数据实时...

    Kafka 入门基础篇.pptx

    Kafka 是 LinkedIn 公司开发的一种分布式消息队列系统,支持离线和在线日志处理。它可以实时处理大量数据,满足各种需求场景,如基于 Hadoop 的批处理系统、低延迟的实时系统、Storm/Spark 流式处理引擎、Web/nginx ...

    基于Storm的实时报警服务的设计与实现.pdf

    在本文中,作者提出了一种基于Storm的实时报警服务设计方案。首先,Scribe会持续收集大量日志数据,然后将这些数据传递给Kafka进行临时存储。接着,Storm作为实时处理引擎,从Kafka中消费数据,进行分析过滤,匹配...

    大数据之Kafka

    它以一种高吞吐量、低延迟的方式处理数据,适用于离线和在线的消息消费场景。Kafka最初由LinkedIn公司开发,后来捐赠给了Apache基金会。 #### 二、Kafka的特点 1. **高吞吐量**:Kafka被设计为支持高吞吐量的数据...

    一、Kafka简介.docx

    【Kafka】是一种分布式发布-订阅消息系统,由Apache开发,设计目的是为了处理大规模的数据流。Kafka将消息持久化到磁盘,并在集群中进行复制,以确保高可用性和容错性。它与ZooKeeper协同工作,提供了一个可靠且高...

    storm-kafka-Log-Consumer:这是一个来自kafka集群的风暴日志数据处理项目,最终存储到HBase

    描述中提到,该项目是针对"Storm日志数据处理应用",这意味着它可能涉及对日志数据的清洗、解析、分析等操作。日志数据通常包含应用程序运行时的信息,如错误、警告、性能指标等,通过处理这些数据,可以获取关于...

Global site tag (gtag.js) - Google Analytics