`

一种基于kafka+storm实现的日志记录方法(二)

阅读更多

引言

 

上一篇分享博文《一种基于kafka+storm实现的日志记录方法》,讲述了一种基于大数据实时运算实现的日志记录方式。在文中只是提出了一种技术实现思路,以及整体架构,并且在我所在的项目中已经进行了实践,感兴趣的朋友,可以进一步完善,比如添加权限等,实现一种新日志平台的搭建。

 

博文发布后,有网友留言希望公开部分源码。今天准备整理下我们已经实现的代码,去掉公司业务部分,做一个简单share,以回应网友要求。本文不再对整体实现流程进行讲解,感兴趣的朋友请直接前往上一遍博文。

 

代码实现主要分两部分:第一部分是java客户端往kafka写日志消息(生产者);第二部分是storm消费kafka日志消息,归类,批量写入hbase。从hbase查询日志部分比较简单,代码就不提供了。

 

由于这周末还要准备一个晋升答辩,本次分享只整理出来第一部分“java客户端往kafka写日志消息”。

 

Java写日志消息到kafka

 

我实现的第一版发送日志消息到kafka是复用的点击流日志上报流程,即用nginx+lua实现的http接口,往kafka写消息,当然也有采用nginx+go语言实现的。这种方式适用于做页面埋点,当用户浏览页面产生点击操作时调用该http接口,往kafka写日志,当时主要是想通过这种方式实现点击热力图、注意力热图等。这种方式实现的http接口性能相当优异,而且在支持高并发、高吞吐量方面表现优异,现在在各大电商网站广泛的运用,用户收集用户的行为数据,这些都是做大数据计算、分析、以及智能推荐的基础。

 

好吧不扯远了,后面有时间再分享下我们做大数据实时计算、以及智能推荐相关实现。既然这种nginx+lua+kafka的方式实现的http接口能支持每天海量的点击流日志上报,那它同样能满足服务器端的日志记录,而且这点日志量对于该http接口来说简直毫无压力。我的第一版实现很简单,直接在java服务端适用httpclient构造http请求,调用该http接口进行“服务器”端的日志上报。而且正如料想的一样,毫无压力。

 

这仅仅是我们的第一次尝试,但每次打印日志都需要调用一个http接口,我还是觉得很别扭,而且http接口还是有一定的网络开销。既然这种方式可行,那就可以放弃http接口,直接在java应用服务器端直连kafka发送日志消息,如果是http接口还有一点网络开销的话(10ms-50ms),这种方式对“应用服务器”来说毫无感知(1-2ms),这也是我想要的效果,毕竟只是打印一条日志。我把这个想法告诉我的同事丹哥”(外号甄子丹),最后把这部分代码实现做成一个jar包,在需要采用这种方式打印日志的系统引入这个jar包,再做一些配置即可。

 

核心代码讲解

 

下面我们来看下该jar包的核心代码LogCollectorClient类:

 

@Component
public class LogCollectorClient {
    private static final Log log = LogFactory.getLog(LogCollectorClient.class);
 
    //kafka生产者(京东对kafka做了一些简单封装,简称JDQ)
    private JDQProducerClient<String, byte[]> producer = null;
 
    private boolean HASAUTH = false;
 
    //每一批日志量,批量上报日志使用
    protected int OFFSET = 500;
 
    //spring 读取properties配置文件
    @Resource
    private Environment env;
 
    //初始化方法
    @PostConstruct
    private void init() {
        try {
 
            //step1:连接kafka权限验证,公司对kafka做的权限封装,可以根据自己公司kafka具体情况调整
            Authentication e = new Authentication(env.getProperty("kafka_key"), env.getProperty("test_token"));//开发、测试环境kafka
 
            //step2 设置kafka生成者相关配置属性
            Properties pros = new Properties();
            pros.setProperty("partitioner.class", env.getProperty("partitioner"));//指定分片策略
            pros.setProperty("producer.type", env.getProperty("producer.type"));
            pros.setProperty("compression.codec", env.getProperty("compression.codec"));
            pros.setProperty("request.required.acks", env.getProperty("request.required.acks"));
 
            //step3 初始化kafka生产者客户端
            this.producer = new JDQProducerClient(e, pros);
        } catch (Exception var4) {
            log.info("kafaka鉴权初始化失败!");
        }
 
    }
 
    /**
     * 上报一条单条日志
     * @param key
     * @param type
     * @param logMap
     * @throws JDQOverSpeedException
     * @throws JDQException
     */
    public void sendLogInfo(String key, String type, Map<String, String> logMap) throws JDQOverSpeedException, JDQException {
        if(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(type) && null != logMap && !logMap.isEmpty()) {
            this.producer.send(new JDQMessage(key + "_" + type, this.assembleJsonStr(key, type, logMap).getBytes()));
        }
 
    }
 
    /**
     * 转换成json格式上报
     * @param key
     * @param type
     * @param logMap
     * @return
     */
    private String assembleJsonStr(String key, String type, Map<String, String> logMap) {
        StringBuffer valueStr = new StringBuffer();
        Iterator logInfo = logMap.entrySet().iterator();
 
        while(logInfo.hasNext()) {
            Map.Entry entry = (Map.Entry)logInfo.next();
            valueStr.append((StringUtils.isNotBlank((String)entry.getKey())?((String)entry.getKey()).replaceAll("&", " "):"") + "=").append(StringUtils.isNotBlank((String)entry.getValue())?((String)entry.getValue()).replaceAll("&", " "):"").append("&");
        }
 
        LogAssembleInfo logInfo1 = new LogAssembleInfo("key=" + key + "&type=" + type + "&" + valueStr.toString(), DateUtil.getTime());
        return JsonUtil.write2JsonStr(logInfo1);
    }
 
    @PreDestroy
    private void destroy() {
        if(null != this.producer) {
            this.producer.close();
        }
    }
}

 

这个类其实很简单,说明如下:

 

1、采用@Component注解,说明只是一个简单的spring 单例 beanspring容器启动时注入到容器中。

 

2@PostConstruct 注解的init方法,bean初始化时,就会初始化一个kafka生产者对象,我们公司kafka团队对kafka做了简单的封装 JDQProducerClient本质上对应的是kafkakafka.javaapi.producer.Producer。如果你使用的原生kafka,生产者的初始化方法如下:

 

public static void main(String[] args) throws Exception {
 
         Properties prop = new Properties();
 
         prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");
 
         prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");
 
         prop.put("serializer.class", StringEncoder.class.getName());
 
         Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(prop));
 
         int i = 0;
 
         while(true){
 
                 producer.send(new KeyedMessage<String, String>("test111", "msg:"+i++));
 
                 Thread.sleep(1000);
 
             }
 
    }
 

3、关于kafka初始化的相关配置信息放到一个properties的配置文件中,通过springEnvironment环境上下文对象的getProperty()方法获取与连接kafka所以的配置。

 

4、采用@PreDestroy注解的destroy()方法,这里是应用服务器tomcat停止之前,优雅的自动关闭kafka连接。

 

5、最后来看下日志上报方法sendLogInfo(),在需要上报日志的类中注入LogCollectorClient对象即可,如下:

 

 
@Component
public class TestService {
 
    @Resource
    private LogCollectorClient log;
 
    public void publish(){
        //省略业务代码
 
        //开始上报日志,日志内容放到一个map里
        Map<String,String> param = new HashMap<String, String>();
        param.put("time", DateTimeUtils.getDateTime());
        param.put("logs", "xxx发布活动");
        log.sendLogInfo(pageId, SystemConstant.APP_ID, param);
    }
}
 

 

sendLogInfo日志上报方法需要三个参数:

 

第一个是查询key hbase日志表中rowkey构成部分。比如:这里的pageId,发布页面的id

 

第二个是系统id,用于区分hbase日志表,每个系统对应一个固定的常量。

 

第三个是需要答应的日志内容,考虑到打印的日志可能比较多,这里用一个map存放,也可以改为一个String

 

好了,关于javakafka上报日志的核心类LogCollectorClient讲解完毕。正如前面所说,把LogCollectorClient类打成一个jar包,在需要日志打印的应用系统里引入这个jar包,以及一个kafkaproperties配置文件即可。

 

当然,你可以把kafka需要的配置当做常量写死在jar包中的一个常量类中,这样应用系统只需要一个jar包即可。

 

优化

 

最后你还可以对上述LogCollectorClient类做一些优化:

1、比如加一个线程池,把上报日志改为异步上报,在线程中处理kafka·异常。这样即便kafka·出现问题时,也不会影响正常业务,唯一影响的就是日志会丢失。

 

2、另外如果你的日志量很大,你还可以采用kafka·的批量上报,当日志量达到一定条数后 才调用一次producersender方法。

 

3、也许你已经发现了这里上报的日志内容是json格式,为了更加高效你改为pb格式。

 

最后需要说明的是:理论上这种日志记录方式可以完全代替传统的日志打印到文件的方式,比如Log4j。但是没有必要,个人觉得一些无关紧要的调试日志还是使用Log4j,对于一些敏感日志或者重要的流水日志,采用这种方式。 Log4j打印日志更简单,基于kafka+storm的更加安全、永久存放、日志更集中(一张hbase表中),二者结合使用天衣无缝。

 

关于第一部分“java客户端往kafka写日志消息(生产者)就分享到这里,由于下周还有一个晋升答辩需要准备,预祝自己这次晋升能成功吧。第二部分“storm消费kafka日志消息放到hbase”只能缓几天,才能整理出来啦,忘谅解。

2
1
分享到:
评论
2 楼 moon_walker 2017-07-25  
QuarterLifeForJava 写道
您好,请教2个问题:
1、kafka的事物处理及异常,在java端该怎么获取及处理,能否再详解下
2、关于JSON的转换是否有一些开源封装好的来处理?


第一个问题,关于事务处理,其实我这里不需要事务处理,只是记录一条日志而已。但可以讲下,kafka的处理方式跟其他mq一样,只要保证最终一致性即可,有些mq还提供相关支持。但kafka需要借助表来实现最终一致性:
a、在生产端每次往mysql里插入一条记录,状态即为“新建”,再通过一个worker不停的扫描状态为“新建”的记录,向kafka发送消息,发送成功后,修改状态为“完成”。这样可以保证及时生产端挂掉,下次重启也可以发送消息。
b、在客户端,每次消费消息,先把消息写到表以后,再调用ack方法通知kafka处理已经完成。保证即使客户端挂掉,下次重启也能继续消费。
上述两种方式还可以加入一些监控,通过人工介入处理。

第二个问题,是的,我们使用的jackson,淘宝的fastjson,以及google的pb格式 也不错。
1 楼 QuarterLifeForJava 2017-07-24  
您好,请教2个问题:
1、kafka的事物处理及异常,在java端该怎么获取及处理,能否再详解下
2、关于JSON的转换是否有一些开源封装好的来处理?

相关推荐

    lamp安装配置及flume+Kafka+Storm+HDFS实时系统搭分享

    Apache Kafka是一种高吞吐量的分布式发布订阅消息系统,设计目标是处理流式数据。它允许应用程序实时消费数据,同时提供强大的存储能力,保证数据的可靠性。Kafka与Flume结合,可以实现高效的数据流转和分发。 ...

    log4j+flume+kafka+storm

    在集成Log4j与Flume的过程中,Avro Source是一种常用的方式。Avro Source允许远程客户端通过网络接口发送数据到Flume Agent。在这个场景中,Flume版本为1.5.2。 **步骤一:配置Flume** 首先需要对Flume的配置文件`...

    kafka+storm+influxdb的程序demo

    **CoapStormServer** 这个压缩包内的文件可能是服务器端的实现,可能基于CoAP(Constrained Application Protocol)协议,这是一种针对资源受限设备的轻量级M2M通信协议,常用于物联网应用。CoapStormServer可能作为...

    kafka跟storm收集日志解决方案

    Apache Kafka是一种分布式的、基于发布/订阅模式的消息系统,它能够处理大量的实时数据流。Kafka因其高性能、高吞吐量以及低延迟等特点,在大数据领域有着广泛的应用。Kafka主要应用于构建实时数据管道和流式应用。 ...

    docker-compose部署zk+kafka+storm集群的实现

    总结,这个配置实现了 ZooKeeper、Kafka、Storm 和 InfluxDB 的集群部署,利用 Docker Compose 提供了一种简化部署和管理的方式。通过这种方式,可以快速地搭建和扩展大数据处理环境,同时保持各个组件之间的通信和...

    kafka、storm、flink、apex、spark五种流式大数据系统调研报告

    Kafka基于发布/订阅模型,由生产者、消费者和 brokers 组成。生产者负责将消息发布到主题,brokers 存储这些消息,消费者从 brokers 订阅并消费消息。 1.2.2 消息分发模式 消息以主题(topic)的形式组织,每个主题...

    storm集成kafka插demo.zip

    【标题】"storm集成kafka插demo.zip"指的是一个演示如何将Apache Storm与Apache Kafka集成的实例项目。这个压缩包包含了一个示例,用于展示如何在Storm拓扑中消费和处理Kafka的消息。 【描述】"storm集成kafka插件...

    扩展logback将日志输出到Kafka实例源码

    总之,扩展Logback将日志输出到Kafka是一种常见的日志管理实践,它结合了Logback的灵活性和Kafka的高性能特性,为大数据环境下的日志处理提供了有力支持。通过理解和实现上述技术点,开发者可以构建出高效、可靠的...

    storm-kafka实时趋势分析

    总的来说,"storm-kafka实时趋势分析"是一种强大的实时数据处理解决方案,它结合了Storm的实时计算能力和Kafka的消息中间件特性,可以广泛应用于各种实时数据分析场景,如电商的实时销量分析、社交媒体的情绪分析等...

    分布式实时日志密度数据流聚类算法及其基于Storm的实现.pdf

    在本文中,作者提出了一种名为RL-DSCA(Real-time Log density algorithm)的算法,它结合了经典的数据流聚类框架Clustream和基于密度的聚类算法DBSCAN,实现了多粒度的日志数据流聚类。 Clustream算法由Aggarwal等...

    基于Storm技术的实时数据处理平台研究与实现.pdf

    在系统设计方面,本文提出了一种基于Storm的实时数据处理平台架构,该架构主要由分布式集群服务器、Web服务器、客户端三个部分组成。分布式集群服务器负责实时数据的采集和处理,Web服务器则负责与客户端通信,提供...

    kafka入门必备手册

    7. 提交日志:作为分布式系统的一种实现,Kafka用于记录分布式系统的变动数据。 **Kafka设计原理** Kafka基于ZooKeeper,一个用于维护配置信息、提供分布式协调服务的开源框架。Kafka集群通过ZooKeeper进行管理,...

    基于storm的实时推荐系统论文

    综上所述,本文提出的基于Storm的实时推荐系统,通过结合Kafka的流数据处理能力和Storm的实时计算能力,实现了对用户行为数据的高效处理和实时分析。此外,通过改进的协同过滤算法和矩阵分解技术,进一步提高了推荐...

    jstorm集成kafka插件demo

    Kafka,由LinkedIn开源,现在是Apache软件基金会的顶级项目,是一种分布式流处理平台。它作为消息中间件,提供了高吞吐量的消息生产和消费能力,同时支持数据持久化,使得消息能够被多次消费,非常适合大数据实时...

    Kafka 入门基础篇.pptx

    Kafka 是 LinkedIn 公司开发的一种分布式消息队列系统,支持离线和在线日志处理。它可以实时处理大量数据,满足各种需求场景,如基于 Hadoop 的批处理系统、低延迟的实时系统、Storm/Spark 流式处理引擎、Web/nginx ...

    基于Storm的实时报警服务的设计与实现.pdf

    在本文中,作者提出了一种基于Storm的实时报警服务设计方案。首先,Scribe会持续收集大量日志数据,然后将这些数据传递给Kafka进行临时存储。接着,Storm作为实时处理引擎,从Kafka中消费数据,进行分析过滤,匹配...

    大数据之Kafka

    它以一种高吞吐量、低延迟的方式处理数据,适用于离线和在线的消息消费场景。Kafka最初由LinkedIn公司开发,后来捐赠给了Apache基金会。 #### 二、Kafka的特点 1. **高吞吐量**:Kafka被设计为支持高吞吐量的数据...

    一、Kafka简介.docx

    【Kafka】是一种分布式发布-订阅消息系统,由Apache开发,设计目的是为了处理大规模的数据流。Kafka将消息持久化到磁盘,并在集群中进行复制,以确保高可用性和容错性。它与ZooKeeper协同工作,提供了一个可靠且高...

    storm-kafka-Log-Consumer:这是一个来自kafka集群的风暴日志数据处理项目,最终存储到HBase

    描述中提到,该项目是针对"Storm日志数据处理应用",这意味着它可能涉及对日志数据的清洗、解析、分析等操作。日志数据通常包含应用程序运行时的信息,如错误、警告、性能指标等,通过处理这些数据,可以获取关于...

Global site tag (gtag.js) - Google Analytics