`
qindongliang1922
  • 浏览: 2190577 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117712
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:126125
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:60061
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71441
社区版块
存档分类
最新评论

如何收集SparkSteaming运行日志实时进入kafka中

阅读更多




用过sparkstreaming的人都知道,当使用sparkstreaming on yarn模式的时候,如果我们想查看系统运行的log,是没法直接看的,就算能看也只是一部分。

这里的log分:

(1)spark本身运行的log

(2)代码里面业务产生的log


spark on yarn模式,如果你的hadoop集群有100台,那么意味着你的sparkstreaming的log有可能会随机分布在100台中,你想查看log必须登录上每台机器上,一个个查看,如果通过Hadoop的8088页面查看,你也得打开可能几十个页面才能看到所有的log,那么问题来了?

能不能将这个job运行所有的log统一收集到某一个目录里面呢? 如果收集到一起的话排查log就非常方便了。

答案是很遗憾,在sparkstreaming里面没法做到,因为sparkstreaming程序永远不停机,就算你开启hadoop的log聚合也没用,只有当sparkstreaming程序停掉,hadoop的log聚合才能把所有的log收集到一个目录里面,所以其他的非sparkstreaming程序,比如MR,Spark 运行完后,如果开启log聚合,hadoop会负责把运行在各个节点上的log给统一收集到HDFS上,这样的话我们查看log就非常方便了。


现在的问题是sparkstreaming不能停机,那么还能集中收集log到指定的地方吗?答案是可以的,我们使用log4j收集日志然后异步发送至kafka里面,最后再通过logstash收集kafka里面的日志进入es即可,这样一条龙服务打通之后,出现任何异常都可以非常快和方便的在es中排查问题,效率大大提升。至于使用logstash从kafka收集到es里面,不是本文的重点,有兴趣的参考散仙前面的文章:http://qindongliang.iteye.com/blog/2278642。



下面会介绍下如何使用:

streaming项目中的log4j使用的是apache log4j
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>



sparkstreaming项目可以单独提交某个job的log4j文件,这样就能定制每个job的log输出格式,如果提交的时候不提交log4j文件,那么默认用的是spark安装目录下面的log4j文件。
看下我们log4j文件的内容:
log4j.rootLogger=WARN,console,kafka

#log4j.logger.com.demo.kafka=DEBUG,kafka
# appender kafka
log4j.appender.kafka=kafka.producer.KafkaLog4jAppender
log4j.appender.kafka.topic=kp_diag_log
# multiple brokers are separated by comma ",".
log4j.appender.kafka.brokerList=192.168.201.6:9092,192.168.201.7:9092,192.168.201.8:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
#log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
log4j.appender.kafka.layout.ConversionPattern=[%d] [%p] [%t] %m%n

# appender console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d] [%p] [%t] %m%n
#log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n



最后看下提交脚本:


jars=`echo /home/spark/x_spark_job/streaming_lib/*jar | sed 's/ /,/g'`

echo $jars

#nohup /opt/bigdata/spark/bin/spark-submit  --class  com.bigdata.xuele.streaming.SparkStreamingKmd  --master yarn    --deploy-mode cluster --executor-cores 3  --driver-memory 4g   --executor-memory 4g  --num-executors 10  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=logback.xml"   --jars  $jars    kpdiag-stream-1.0.0-SNAPSHOT.jar  &> streaming.log  &


nohup /opt/bigdata/spark/bin/spark-submit    --class  com.bigdata.xuele.streaming.SparkStreamingKmd  --master yarn  --deploy-mode cluster \
 --files "/home/spark/x_spark_job/log4j.properties" \
 --executor-cores 3   --driver-memory 3g   --executor-memory 3g  --num-executors 12    --jars  $jars  \
 --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties"   \
 --driver-class-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \
 --driver-library-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar  \
 --conf spark.executor.extraClassPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar   \
 --conf spark.executor.extraLibraryPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar  \
 kpdiag-stream-1.0.0-SNAPSHOT.jar &> kp.log &




注意上面提交脚本中,/opt/bigdata/jars/spark/这个路径引用的jar包,必须在每台hadoop机器上都要存在,sparkstreaming运行过程中,会从本地加载jar包,此外log4j.properties文件以及参数里面--jars 后面的依赖jar 可以在提交机器上放一份即可,不需要每台机器上都存放。


提交任务后,在kafka的节点上执行消费者命令就能看到对应的log输出:
执行命令:
kafka-console-consumer --zookeeper 192.168.201.5:2181 --topic kp_diag_log


收集到的log内容如下:

[2017-01-21 16:37:03,154] [WARN] [Driver] Support for Java 7 is deprecated as of Spark 2.0.0

[2017-01-21 16:37:19,714] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0

[2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0

[2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0

[2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0

[2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0

[2017-01-21 16:37:19,740] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0

[2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0

[2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0

[2017-01-21 16:37:19,842] [WARN] [Executor task launch worker-0] 题目id:b07e88feff464659ab5a351bf1e68ee0在redis不存在




至此,我们的log就统一收集成功了,后续我们可以把log从kafka导入到es中,就可以任意分析和查询了。

这里需要注意一点,sparkstreaming运行时候,系统本身也有大量的log,如果把这个系统log也收集到kafka里面本身的量是非常大的,而且好多信息不重要,其实
我们只需要关注业务重点log即可,主要是WARN+ERROR级别的,调试的时候可以把info级别打开,代码里重点关注的log都放在warn级别,异常什么的放在ERROR即可
这样排查问题时候也容易而且了避免了大量log的产生从应用本身性能的影响。


有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。


0
0
分享到:
评论

相关推荐

    大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK......

    大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK...... 大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK...... 大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK.......

    本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统,大数据处理技术

    本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...

    企业大数据处理:Spark、Druid、Flume与Kafka应用实践(超清完整版).pdf

    例如,书中可能会介绍如何利用Spark进行大规模数据的并行处理,如何使用Druid实现实时数据查询和分析,以及如何借助Flume和Kafka搭建可靠高效的数据流管道等。通过这些实战案例的学习,读者不仅可以掌握核心技术,还...

    spark-token-provider-kafka-0-10_2.12-3.0.0.jar

    spark3.0.0版本对接kafka数据源需要的jar包,最新的版本导致maven的阿里云仓库不能直接下载下来,所以需要手动导入jar包进行操作,有需要的朋友可以免费下载

    使用Flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】

    flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】

    基于Spark的实时日志分析及异常检测系统 Flume + Kafka + Hbase + Spark-Streaming

    在本系统中,Flume 负责从各种数据源(如服务器、应用程序或网络设备)收集实时日志数据,将其汇聚到一个中心位置,确保数据传输的高可用性和可靠性。 2. **Apache Kafka**:Kafka 是一个高吞吐量的分布式发布订阅...

    扩展logback将日志输出到Kafka实例源码

    通过将日志输出到Kafka,可以方便地将这些日志与其他系统集成,如ELK(Elasticsearch、Logstash、Kibana)堆栈,实现日志的集中收集、分析和检索。 总之,扩展Logback将日志输出到Kafka是一种常见的日志管理实践,...

    kafka+spark streaming开发文档

    本文档提供了使用Kafka和Spark Streaming进行实时数据处理的详细开发指南,涵盖了Kafka集群搭建、Spark Streaming配置、Kafka和Spark Streaming的集成、主题创建和消息发送、查看主题状态等内容,旨在帮助开发者快速...

    kafka-sparkstreaming-cassandra, 用于 Kafka Spark流的Docker 容器.zip

    kafka-sparkstreaming-cassandra, 用于 Kafka Spark流的Docker 容器 用于 Kafka Spark流的Docker 容器这里Dockerfile为实验 Kafka 。Spark流( PySpark ) 和Cassandra设置了完整的流环境。 安装Kafka 0.10.2.1用于 ...

    使用kafka,spark,hbase开发日志分析系统

    * Spark:使用spark stream功能,实时分析消息系统中的数据,完成计算分析工作。 * Hbase:做为后端存储,存储spark计算结构,供其他系统进行调用 ## 环境部署 ### 软件版本 * hadoop 版本 : Hadoop相关...

    基于Flume+kafka+spark大型电商网站日志分析系统(离线+实时).zip

    在这个系统中,Spark 被用来对从 Kafka 中读取的日志数据进行离线和实时分析。对于离线分析,Spark SQL 或 DataFrames 可以用于结构化数据的处理,而 Spark Streaming 则用于处理实时数据流,它可以以微批处理的方式...

    spark-streaming-kafka-0-10_2.12-2.4.0.jar

    spakr streaming的kafka依赖

    kafka_Java_Log4j

    Kafka的特点包括高吞吐量、持久化存储、分区和复制,这使得它在日志收集、实时数据处理和流数据应用中非常受欢迎。 其次,我们来看`Log4j`。Log4j是Apache的一个开源项目,它为Java应用程序提供了一个强大的日志...

    spark与kafka集成

    Apache Spark与Apache Kafka的集成是大数据处理领域中的一个重要话题,特别是在实时流处理中。Kafka是一个高可用、高性能的消息中间件,它支持发布/订阅模式,可以作为数据管道,将数据从生产者传递到消费者。Spark...

    logback日志记录写入kafka

    本主题将详细介绍如何利用Logback和SLF4J来将日志记录到Kafka队列中,以及支持日志解析和过滤等扩展功能。 首先,我们需要理解SLF4J的工作原理。SLF4J提供了一组API,允许我们在应用程序中插入日志语句,而具体的...

    java基于spark streaming和kafka,hbase的日志统计分析系统.rar

    总结来说,这个"java基于spark streaming和kafka,hbase的日志统计分析系统"是一个集成的解决方案,通过Java编程,利用Kafka收集和分发实时日志,Spark Streaming进行实时处理,最后将结果存储在HBase中,实现了高效...

    kafka+flume 实时采集oracle数据到hive中.docx

    Flume是一个分布式、可靠、高吞吐量的日志收集系统,能够实时地从Kafka中提取数据,并将其写入到HDFS中。为了实现这一点,需要先安装Flume,版本号为flume-1.9.0-bin.tar.gz。然后,需要配置Flume的配置文件flume....

    扩展logback将日志输出到Kafka实例扩展源码

    至于压缩包中的"storm-chapter04"文件,由于信息不足,无法直接确定其与日志输出到Kafka的具体关系,但根据名称猜测可能与Apache Storm(一个分布式实时计算系统)的第四章内容有关,可能会涉及到实时日志处理的场景...

    Kafka模拟网页浏览实时统计

    传统的批处理方式无法满足这种实时性要求,而Kafka正好能够解决这个问题,它可以快速地收集、存储和处理大量数据流。 **Kafka API使用** 1. **生产者API**:在Java应用中,我们可以通过KafkaProducer类来发送消息。...

Global site tag (gtag.js) - Google Analytics