`

一种基于kafka+storm实现的日志记录方法

阅读更多

背景:

 

在我们系统中,每逢大促经常会遇到活动页面莫名被篡改的情况,有些操作是人为故意修改,比如大促时把某些坑位的商品全部换为某个商家的商品,这里就存在恶意修改页面的问题。每次遇到这种问题,内控合规部 需要找我们提取操作日志,追溯所有篡改记录,查找元凶。

 

由于活动页面装修可以通过授权给多人装修,我们mysql表里的操作日志只有最后一次修改的状态记录,没有操作流水日志。无法从技术上排除每个被授权人的嫌疑。

 

当然你可以会觉得解决这个问题很简单,可以首先通过传统的打印日志的方法。但这个种方式有如下缺陷:

1、日志散落在各台应用服务器,要收集齐所有相关日志非常麻烦。

2、由于服务器硬盘资源有限,日志超过一定量或多少天就会被删除,如超过7天就会被删除。

3、我们公司不允许在日志中打印用户的账户等敏感信息。这点是最致命的。

 

也许你还会想到直接把日志写入的nosql数据库的方式,比如 Mongodb、ES、hbase等nosql数据库。但直接把日志写入到数据库的方式存在很大的性能问题,每打印一条日志都需要消耗几十到几百毫秒不等。严重影响正常业务的性能。

 

采用kafka+storm异步记录日志方式

 

借鉴点击流日志上报流程,只是上报客户端从用户的浏览器变成了我们的系统服务器,上报的内容从浏览数据变成了用户操作日志。使用到的技术:java+kafka+storm+hbase。具体流程如下:




1、前期准备工作:预先为每个需要接入该“日志上报工具”的业务系统分配一个系统id,如”sys_A”。在我们的实现中,有一个“业务接入”注册功能:

 

2、把用java语言实现的向kafka集群指定的“日志topic”发送消息的功能,封装成一个通用的jar包,在每个需要接入该“日志上报工具”的业务系统中引入该jar包。

 

3、在需要打印用户操作日志的地方,收集系统id、用户信息(账号,ip等)、查询key、用户操作、日志描述、操作时间等信息 组装成一个map。调用通用jar包中的日志打印方法SelfLog.log(map)。

 

4、在通用jar包SelfLog.log(Map map)方法中,会检查日志参数是否正确(比如,是否包含关键的 “系统id”和“查询key”,因为查询日志时需要根据这两个参数做定位)。检查通过后,把map转换成json字符串发送到指定kafka集群的指定topic中。

向kafka发送消息是个很快的过程,性能消耗几乎可以忽略不计,对正常业务几乎没有影响。同时利用kafka的高吞吐量,每天支持亿级的日志量没有问题。

 

5、通过storm实时消费kafka的指定“日志topic”,对日志进行解析,并根据“系统id”进行分组,每隔指定时间(比如1分钟)把收集到的日志信息,批量写入hbase的指定表中。这里每隔指定时间向hbase批量写入,主要是为了减少与hbase的交互次数,提升性能。这里指定的间隔时间,就是日志查询的延迟时间,可以做到近乎实时的日志查询。

日志被写入hbase日志表以后,就可以通过编写一个查询hbase的web页面进行日志查询。通过录入相关查询key,一次就可以提取出所有相关的日志信息:




最终效果

 

1、实现高效的流水日志记录(几乎没有性能消耗)。方便问题追溯。

2、可以打印敏感信息,这些信息最终是存在到hbase表,而不是写到日志文件。

 

3、相比传统的日志打印方式,日志存储周期更长,可以做到永久存在。日志不再散落在各个应用服务器,查询日志更方便直观。

 

为了提高日志的查询效率,我们把“查询key”做为hbase日志表rowkey的一部分,所有每次需要打印日志时,需要认真定义这个“查询key”,比如操作某个活动,可以把活动id作为“查询key”。

 

最后,通过创建更多的“topic”,在storm消费的地方再进行更细分处理,再加上一些权限验证,完全可以把这个工具平台化,提供一个类似“统一日志管理”的日志平台。

3
0
分享到:
评论
11 楼 moon_walker 2018-01-19  
果粒儿我是 写道
“切面方法里只能接收到方法的入参,也就是你这里的UserDTO,可以把入参的内容统一写入消息队列。但切面里取到的入参是Object类型,需要一次类型转换。” 如果只是 切面get入参的数据,可以不用放到消息队列or redis, 再定义一个注解修饰UserDTO你想要什么属性就好了。 然后切面get到这个属性就ok


没有明白你的这段的意思。感觉你就是对delete的场景不好处理,为了简单起见还是建议在方法名上做些规范。比如把你的delete(int userId)方法改为:
deleteUserById(int userId)

在切面方法里可以取到 方法名和入参,可以拼装你需要的日志 delete User表中id为xxx的记录。 在把当前用户信息放入ThreadLocal,在切面方法里取到当前用户,就可以实现:
who delete User表中id为xxx
不知道你明白我说的意思没,感觉可以解决你的问题。
10 楼 果粒儿我是 2018-01-19  
“切面方法里只能接收到方法的入参,也就是你这里的UserDTO,可以把入参的内容统一写入消息队列。但切面里取到的入参是Object类型,需要一次类型转换。” 如果只是 切面get入参的数据,可以不用放到消息队列or redis, 再定义一个注解修饰UserDTO你想要什么属性就好了。 然后切面get到这个属性就ok
9 楼 果粒儿我是 2018-01-19  
moon_walker 写道
创建基类也有侵入性,感觉可以把方法名规范化。比如selectXXX,delectXXX,在切面中可以获取到方法名是delete开头的,就可以认为是删除操作,新增和查询类似。 这也就是所谓的约定大于配置的编程方式吧。


创建基类的方式的确有侵入性,而且违背了aop初衷貌似..
我想要做的是为微服务多个服务提供一个包,专门写日志,使用者只需要在自己的应用里加入注解就够了。但是这种只适合“通用”日志,不适合我的场景

通用日志是,例如异常日志,错误日志等等。就是所有日志的格式固定。我的日志场景是想提取出来用户都操作了什么,然后展示给用户看。举例子来说就是删除。前端传入删除用户id = 5.
  @Auditable(actionType = ActionType.DELETE, subject = Subject.User)
    @RequestMapping("/delete")
    @ResponseBody
    public void
    delete(int userId){
       String userNameDeleted = userService.getUserById(userId);// 需要记录日志 “DELETE USER 张三“,
        userInfoService.delete(userId);
    }
需要记录日志 “DELETE USER 张三“,  但是这个张三切面是无法捕获到的,因为它不是注解标记的方法的入参或者出参,他更需要传递给切面。所以貌似切面无法满足这种动态一点的日志需求。
8 楼 moon_walker 2018-01-19  
创建基类也有侵入性,感觉可以把方法名规范化。比如selectXXX,delectXXX,在切面中可以获取到方法名是delete开头的,就可以认为是删除操作,新增和查询类似。 这也就是所谓的约定大于配置的编程方式吧。
7 楼 moon_walker 2018-01-19  
果粒儿我是 写道
最近也有遇到类似场景,这个场景实际上是一个关于如何写应用系统的审计日志的问题。使用MQ+存储工具貌似是目前发现比较通用的方法哈。

关于如何收集日志我有一点疑问想请教哈,我看文中是通过在应用业务程序中加入SelfLog.log(map),这种写法其实对业务代码侵入性很大,因为写业务代码的人员只需要关注写业务就好了。 我想到一个办法是使用切面+自定义注解实现这个写审计日志功能。例如当我在系统新增一个用户,我可以实现如下代码:

    @Auditable(actionType = ActionType.ADD, subject = Subject.User)
    @RequestMapping("/signup")
    @ResponseBody
    public void
    signUp(UserDTO userDTO){
        UserInfo userInfo = new UserInfo();
        userInfo.setName(userDTO.getName());
        userInfo.setPassword("11");
        userInfo.setUid(1);
        userInfo.setState(1);
        userInfoService.insert(userInfo);

    }
通过在切面中可以获取操作类型是新增,操作主体是用户,等等信息。但是有一个问题,如果我想删除用户,前端传递删除的用户id=5, 因为切面是一种统一操作,他是没法获取用户名称,因此也没法实现写日志“删除用户张三”。

请问如何能实现向切面传递动态信息呢?



你这个想法不错啊,我知道的切面方法里只能接收到方法的入参,也就是你这里的UserDTO,可以把入参的内容统一写入消息队列。但切面里取到的入参是Object类型,需要一次类型转换。如果入参的类型很多 这时切面就不通用了,最好在定义一个基类,让入参类都继承这个基类,按照这个方法理论上可行。我也想按照你的想法改进下,谢谢
6 楼 果粒儿我是 2018-01-18  
最近也有遇到类似场景,这个场景实际上是一个关于如何写应用系统的审计日志的问题。使用MQ+存储工具貌似是目前发现比较通用的方法哈。

关于如何收集日志我有一点疑问想请教哈,我看文中是通过在应用业务程序中加入SelfLog.log(map),这种写法其实对业务代码侵入性很大,因为写业务代码的人员只需要关注写业务就好了。 我想到一个办法是使用切面+自定义注解实现这个写审计日志功能。例如当我在系统新增一个用户,我可以实现如下代码:

    @Auditable(actionType = ActionType.ADD, subject = Subject.User)
    @RequestMapping("/signup")
    @ResponseBody
    public void
    signUp(UserDTO userDTO){
        UserInfo userInfo = new UserInfo();
        userInfo.setName(userDTO.getName());
        userInfo.setPassword("11");
        userInfo.setUid(1);
        userInfo.setState(1);
        userInfoService.insert(userInfo);

    }
通过在切面中可以获取操作类型是新增,操作主体是用户,等等信息。但是有一个问题,如果我想删除用户,前端传递删除的用户id=5, 因为切面是一种统一操作,他是没法获取用户名称,因此也没法实现写日志“删除用户张三”。

请问如何能实现向切面传递动态信息呢?
5 楼 moon_walker 2017-07-21  
QuarterLifeForJava 写道
可否讲解下Java如和生产消息到Kafka,两边的配置及核心代码,以前storm和kafa的配置和核心代码,还有HBase,如果能再将就下配置、编写代码中的坑那就更好了

没问题,周末先把java如何生产消息到kafka整理出来
4 楼 QuarterLifeForJava 2017-07-21  
可否讲解下Java如和生产消息到Kafka,两边的配置及核心代码,以前storm和kafa的配置和核心代码,还有HBase,如果能再将就下配置、编写代码中的坑那就更好了
3 楼 xierui 2017-07-21  
很不错,学习了
2 楼 moon_walker 2017-07-20  
sg6303 写道
有源码可以提供学习吗??

kafka和storm都是用公司的资源,代码含有公司业务,需要剥离才能公开。其实实现起来也不难,主要就是编写一个java往kafka写消息的生成者,再使用storm消费这些消息。storm的主要职责就是对消息归类,以及批量存储到hbase。
如果大家需求强烈,我整理下,可以公开部分代码。
1 楼 sg6303 2017-07-20  
有源码可以提供学习吗??

相关推荐

    lamp安装配置及flume+Kafka+Storm+HDFS实时系统搭分享

    Apache Kafka是一种高吞吐量的分布式发布订阅消息系统,设计目标是处理流式数据。它允许应用程序实时消费数据,同时提供强大的存储能力,保证数据的可靠性。Kafka与Flume结合,可以实现高效的数据流转和分发。 ...

    log4j+flume+kafka+storm

    在集成Log4j与Flume的过程中,Avro Source是一种常用的方式。Avro Source允许远程客户端通过网络接口发送数据到Flume Agent。在这个场景中,Flume版本为1.5.2。 **步骤一:配置Flume** 首先需要对Flume的配置文件`...

    kafka+storm+influxdb的程序demo

    **CoapStormServer** 这个压缩包内的文件可能是服务器端的实现,可能基于CoAP(Constrained Application Protocol)协议,这是一种针对资源受限设备的轻量级M2M通信协议,常用于物联网应用。CoapStormServer可能作为...

    kafka跟storm收集日志解决方案

    Apache Kafka是一种分布式的、基于发布/订阅模式的消息系统,它能够处理大量的实时数据流。Kafka因其高性能、高吞吐量以及低延迟等特点,在大数据领域有着广泛的应用。Kafka主要应用于构建实时数据管道和流式应用。 ...

    docker-compose部署zk+kafka+storm集群的实现

    总结,这个配置实现了 ZooKeeper、Kafka、Storm 和 InfluxDB 的集群部署,利用 Docker Compose 提供了一种简化部署和管理的方式。通过这种方式,可以快速地搭建和扩展大数据处理环境,同时保持各个组件之间的通信和...

    kafka、storm、flink、apex、spark五种流式大数据系统调研报告

    Kafka基于发布/订阅模型,由生产者、消费者和 brokers 组成。生产者负责将消息发布到主题,brokers 存储这些消息,消费者从 brokers 订阅并消费消息。 1.2.2 消息分发模式 消息以主题(topic)的形式组织,每个主题...

    storm集成kafka插demo.zip

    【压缩包子文件的文件名称列表】: "storm-kafka"这个文件名可能是指一个目录,包含了实现Storm和Kafka集成的Java代码、配置文件以及可能的依赖库。通常,这样的项目会有一个主类(如`StormKafkaDemo.java`),其中...

    扩展logback将日志输出到Kafka实例源码

    总之,扩展Logback将日志输出到Kafka是一种常见的日志管理实践,它结合了Logback的灵活性和Kafka的高性能特性,为大数据环境下的日志处理提供了有力支持。通过理解和实现上述技术点,开发者可以构建出高效、可靠的...

    storm-kafka实时趋势分析

    总的来说,"storm-kafka实时趋势分析"是一种强大的实时数据处理解决方案,它结合了Storm的实时计算能力和Kafka的消息中间件特性,可以广泛应用于各种实时数据分析场景,如电商的实时销量分析、社交媒体的情绪分析等...

    分布式实时日志密度数据流聚类算法及其基于Storm的实现.pdf

    在本文中,作者提出了一种名为RL-DSCA(Real-time Log density algorithm)的算法,它结合了经典的数据流聚类框架Clustream和基于密度的聚类算法DBSCAN,实现了多粒度的日志数据流聚类。 Clustream算法由Aggarwal等...

    基于Storm技术的实时数据处理平台研究与实现.pdf

    在系统设计方面,本文提出了一种基于Storm的实时数据处理平台架构,该架构主要由分布式集群服务器、Web服务器、客户端三个部分组成。分布式集群服务器负责实时数据的采集和处理,Web服务器则负责与客户端通信,提供...

    kafka入门必备手册

    7. 提交日志:作为分布式系统的一种实现,Kafka用于记录分布式系统的变动数据。 **Kafka设计原理** Kafka基于ZooKeeper,一个用于维护配置信息、提供分布式协调服务的开源框架。Kafka集群通过ZooKeeper进行管理,...

    基于storm的实时推荐系统论文

    综上所述,本文提出的基于Storm的实时推荐系统,通过结合Kafka的流数据处理能力和Storm的实时计算能力,实现了对用户行为数据的高效处理和实时分析。此外,通过改进的协同过滤算法和矩阵分解技术,进一步提高了推荐...

    jstorm集成kafka插件demo

    Kafka,由LinkedIn开源,现在是Apache软件基金会的顶级项目,是一种分布式流处理平台。它作为消息中间件,提供了高吞吐量的消息生产和消费能力,同时支持数据持久化,使得消息能够被多次消费,非常适合大数据实时...

    Kafka 入门基础篇.pptx

    Kafka 是 LinkedIn 公司开发的一种分布式消息队列系统,支持离线和在线日志处理。它可以实时处理大量数据,满足各种需求场景,如基于 Hadoop 的批处理系统、低延迟的实时系统、Storm/Spark 流式处理引擎、Web/nginx ...

    storm-kafka-Log-Consumer:这是一个来自kafka集群的风暴日志数据处理项目,最终存储到HBase

    描述中提到,该项目是针对"Storm日志数据处理应用",这意味着它可能涉及对日志数据的清洗、解析、分析等操作。日志数据通常包含应用程序运行时的信息,如错误、警告、性能指标等,通过处理这些数据,可以获取关于...

    基于Storm的实时报警服务的设计与实现.pdf

    在本文中,作者提出了一种基于Storm的实时报警服务设计方案。首先,Scribe会持续收集大量日志数据,然后将这些数据传递给Kafka进行临时存储。接着,Storm作为实时处理引擎,从Kafka中消费数据,进行分析过滤,匹配...

    一、Kafka简介.docx

    【Kafka】是一种分布式发布-订阅消息系统,由Apache开发,设计目的是为了处理大规模的数据流。Kafka将消息持久化到磁盘,并在集群中进行复制,以确保高可用性和容错性。它与ZooKeeper协同工作,提供了一个可靠且高...

    大数据之Kafka

    它以一种高吞吐量、低延迟的方式处理数据,适用于离线和在线的消息消费场景。Kafka最初由LinkedIn公司开发,后来捐赠给了Apache基金会。 #### 二、Kafka的特点 1. **高吞吐量**:Kafka被设计为支持高吞吐量的数据...

Global site tag (gtag.js) - Google Analytics