- 浏览: 2183839 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
用过sparkstreaming的人都知道,当使用sparkstreaming on yarn模式的时候,如果我们想查看系统运行的log,是没法直接看的,就算能看也只是一部分。
这里的log分:
(1)spark本身运行的log
(2)代码里面业务产生的log
spark on yarn模式,如果你的hadoop集群有100台,那么意味着你的sparkstreaming的log有可能会随机分布在100台中,你想查看log必须登录上每台机器上,一个个查看,如果通过Hadoop的8088页面查看,你也得打开可能几十个页面才能看到所有的log,那么问题来了?
能不能将这个job运行所有的log统一收集到某一个目录里面呢? 如果收集到一起的话排查log就非常方便了。
答案是很遗憾,在sparkstreaming里面没法做到,因为sparkstreaming程序永远不停机,就算你开启hadoop的log聚合也没用,只有当sparkstreaming程序停掉,hadoop的log聚合才能把所有的log收集到一个目录里面,所以其他的非sparkstreaming程序,比如MR,Spark 运行完后,如果开启log聚合,hadoop会负责把运行在各个节点上的log给统一收集到HDFS上,这样的话我们查看log就非常方便了。
现在的问题是sparkstreaming不能停机,那么还能集中收集log到指定的地方吗?答案是可以的,我们使用log4j收集日志然后异步发送至kafka里面,最后再通过logstash收集kafka里面的日志进入es即可,这样一条龙服务打通之后,出现任何异常都可以非常快和方便的在es中排查问题,效率大大提升。至于使用logstash从kafka收集到es里面,不是本文的重点,有兴趣的参考散仙前面的文章:http://qindongliang.iteye.com/blog/2278642。
下面会介绍下如何使用:
streaming项目中的log4j使用的是apache log4j
<dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
sparkstreaming项目可以单独提交某个job的log4j文件,这样就能定制每个job的log输出格式,如果提交的时候不提交log4j文件,那么默认用的是spark安装目录下面的log4j文件。
看下我们log4j文件的内容:
log4j.rootLogger=WARN,console,kafka #log4j.logger.com.demo.kafka=DEBUG,kafka # appender kafka log4j.appender.kafka=kafka.producer.KafkaLog4jAppender log4j.appender.kafka.topic=kp_diag_log # multiple brokers are separated by comma ",". log4j.appender.kafka.brokerList=192.168.201.6:9092,192.168.201.7:9092,192.168.201.8:9092 log4j.appender.kafka.compressionType=none log4j.appender.kafka.syncSend=false log4j.appender.kafka.layout=org.apache.log4j.PatternLayout #log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n log4j.appender.kafka.layout.ConversionPattern=[%d] [%p] [%t] %m%n # appender console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=[%d] [%p] [%t] %m%n #log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
最后看下提交脚本:
jars=`echo /home/spark/x_spark_job/streaming_lib/*jar | sed 's/ /,/g'` echo $jars #nohup /opt/bigdata/spark/bin/spark-submit --class com.bigdata.xuele.streaming.SparkStreamingKmd --master yarn --deploy-mode cluster --executor-cores 3 --driver-memory 4g --executor-memory 4g --num-executors 10 --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=logback.xml" --jars $jars kpdiag-stream-1.0.0-SNAPSHOT.jar &> streaming.log & nohup /opt/bigdata/spark/bin/spark-submit --class com.bigdata.xuele.streaming.SparkStreamingKmd --master yarn --deploy-mode cluster \ --files "/home/spark/x_spark_job/log4j.properties" \ --executor-cores 3 --driver-memory 3g --executor-memory 3g --num-executors 12 --jars $jars \ --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties" \ --driver-class-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ --driver-library-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ --conf spark.executor.extraClassPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ --conf spark.executor.extraLibraryPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ kpdiag-stream-1.0.0-SNAPSHOT.jar &> kp.log &
注意上面提交脚本中,/opt/bigdata/jars/spark/这个路径引用的jar包,必须在每台hadoop机器上都要存在,sparkstreaming运行过程中,会从本地加载jar包,此外log4j.properties文件以及参数里面--jars 后面的依赖jar 可以在提交机器上放一份即可,不需要每台机器上都存放。
提交任务后,在kafka的节点上执行消费者命令就能看到对应的log输出:
执行命令:
kafka-console-consumer --zookeeper 192.168.201.5:2181 --topic kp_diag_log
收集到的log内容如下:
[2017-01-21 16:37:03,154] [WARN] [Driver] Support for Java 7 is deprecated as of Spark 2.0.0 [2017-01-21 16:37:19,714] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,740] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,842] [WARN] [Executor task launch worker-0] 题目id:b07e88feff464659ab5a351bf1e68ee0在redis不存在
至此,我们的log就统一收集成功了,后续我们可以把log从kafka导入到es中,就可以任意分析和查询了。
这里需要注意一点,sparkstreaming运行时候,系统本身也有大量的log,如果把这个系统log也收集到kafka里面本身的量是非常大的,而且好多信息不重要,其实
我们只需要关注业务重点log即可,主要是WARN+ERROR级别的,调试的时候可以把info级别打开,代码里重点关注的log都放在warn级别,异常什么的放在ERROR即可
这样排查问题时候也容易而且了避免了大量log的产生从应用本身性能的影响。
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。
发表评论
-
Scala里面的排序函数的使用
2018-01-09 20:20 2645排序方法在实际的应用场景中非常常见,Scala里面有三种排序 ... -
在Scala里面如何使用元组
2018-01-08 22:05 916元组在Scala语言中是一 ... -
Spark如何读取一些大数据集到本地机器上
2018-01-04 21:07 1680最近在使用spark处理分 ... -
使用Spark SQL的临时表解决一个小问题
2017-12-28 18:27 2453最近在使用spark处理一个业务场景时,遇到一个小问题,我在 ... -
Spark任务两个小问题笔记
2017-12-26 19:52 1649今天在用spark处理数据 ... -
Spark中foreachPartition和mapPartitions的区别
2017-12-25 21:19 3324spark的运算操作有两种类型:分别是Transformat ... -
Spark Streaming优雅的关闭策略优化
2017-12-07 19:26 4149前面文章介绍了不少有关Spark Streaming的off ... -
kafka版本不一致导致的一个小问题(二)
2017-12-04 21:37 8565背景介绍: 我们公司的实时流项目现在用的spark stre ... -
谈谈如何优雅的关闭正在运行中的Spark Streaming的流程序
2017-11-30 19:20 2302前面的文章,已经简 ... -
如何管理Spark Streaming消费Kafka的偏移量(三)
2017-11-28 23:41 5212前面的文章已经介绍了在spark streaming集成kaf ... -
理解Spark的运行机制
2017-11-23 21:52 1246Spark生态系统目前已经非常成熟了,有很多类型的任务都可以使 ... -
如何管理Spark Streaming消费Kafka的偏移量(二)
2017-11-16 19:30 4733上篇文章,讨论了在spar ... -
如何管理Spark Streaming消费Kafka的偏移量(一)
2017-11-14 20:42 4079最近工作有点忙,所以更新文章频率低了点,在这里给大家说声抱 ... -
在scala中使用spark sql解决特定需求(2)
2017-07-21 16:00 2282接着上篇文章,本篇来 ... -
在scala中使用spark sql解决特定需求
2017-07-20 19:53 1030spark sql一个强大之处就 ... -
Spark如何在一个SparkContext中提交多个任务
2017-07-04 19:09 6756在使用spark处理数据的时候,大多数都是提交一个job执行, ... -
如何使用scala+spark读写hbase?
2017-06-12 19:48 3446最近工作有点忙,所以文章更新频率低了点,希望大家可以谅解,好 ... -
使用ES-Hadoop插件结合spark向es插入数据
2017-05-05 17:19 5059上篇文章简单介绍了E ... -
spark sql 快速体验调试
2017-04-13 16:40 1057spark sql提供了更快的查询性能,如何能够更快的体验,开 ... -
spark on yarn 如何集成elasticsearch
2017-04-11 16:16 1556随着spark越来越流行,我们的很多组件都有可能和spark集 ...
相关推荐
大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK...... 大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK...... 大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK.......
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
例如,书中可能会介绍如何利用Spark进行大规模数据的并行处理,如何使用Druid实现实时数据查询和分析,以及如何借助Flume和Kafka搭建可靠高效的数据流管道等。通过这些实战案例的学习,读者不仅可以掌握核心技术,还...
flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】
spark3.0.0版本对接kafka数据源需要的jar包,最新的版本导致maven的阿里云仓库不能直接下载下来,所以需要手动导入jar包进行操作,有需要的朋友可以免费下载
本文档提供了使用Kafka和Spark Streaming进行实时数据处理的详细开发指南,涵盖了Kafka集群搭建、Spark Streaming配置、Kafka和Spark Streaming的集成、主题创建和消息发送、查看主题状态等内容,旨在帮助开发者快速...
kafka-sparkstreaming-cassandra, 用于 Kafka Spark流的Docker 容器 用于 Kafka Spark流的Docker 容器这里Dockerfile为实验 Kafka 。Spark流( PySpark ) 和Cassandra设置了完整的流环境。 安装Kafka 0.10.2.1用于 ...
在本系统中,Flume 负责从各种数据源(如服务器、应用程序或网络设备)收集实时日志数据,将其汇聚到一个中心位置,确保数据传输的高可用性和可靠性。 2. **Apache Kafka**:Kafka 是一个高吞吐量的分布式发布订阅...
通过将日志输出到Kafka,可以方便地将这些日志与其他系统集成,如ELK(Elasticsearch、Logstash、Kibana)堆栈,实现日志的集中收集、分析和检索。 总之,扩展Logback将日志输出到Kafka是一种常见的日志管理实践,...
* Spark:使用spark stream功能,实时分析消息系统中的数据,完成计算分析工作。 * Hbase:做为后端存储,存储spark计算结构,供其他系统进行调用 ## 环境部署 ### 软件版本 * hadoop 版本 : Hadoop相关...
在这个系统中,Spark 被用来对从 Kafka 中读取的日志数据进行离线和实时分析。对于离线分析,Spark SQL 或 DataFrames 可以用于结构化数据的处理,而 Spark Streaming 则用于处理实时数据流,它可以以微批处理的方式...
Kafka的特点包括高吞吐量、持久化存储、分区和复制,这使得它在日志收集、实时数据处理和流数据应用中非常受欢迎。 其次,我们来看`Log4j`。Log4j是Apache的一个开源项目,它为Java应用程序提供了一个强大的日志...
Apache Spark与Apache Kafka的集成是大数据处理领域中的一个重要话题,特别是在实时流处理中。Kafka是一个高可用、高性能的消息中间件,它支持发布/订阅模式,可以作为数据管道,将数据从生产者传递到消费者。Spark...
本主题将详细介绍如何利用Logback和SLF4J来将日志记录到Kafka队列中,以及支持日志解析和过滤等扩展功能。 首先,我们需要理解SLF4J的工作原理。SLF4J提供了一组API,允许我们在应用程序中插入日志语句,而具体的...
总结来说,这个"java基于spark streaming和kafka,hbase的日志统计分析系统"是一个集成的解决方案,通过Java编程,利用Kafka收集和分发实时日志,Spark Streaming进行实时处理,最后将结果存储在HBase中,实现了高效...
Flume是一个分布式、可靠、高吞吐量的日志收集系统,能够实时地从Kafka中提取数据,并将其写入到HDFS中。为了实现这一点,需要先安装Flume,版本号为flume-1.9.0-bin.tar.gz。然后,需要配置Flume的配置文件flume....
spakr streaming的kafka依赖
至于压缩包中的"storm-chapter04"文件,由于信息不足,无法直接确定其与日志输出到Kafka的具体关系,但根据名称猜测可能与Apache Storm(一个分布式实时计算系统)的第四章内容有关,可能会涉及到实时日志处理的场景...
传统的批处理方式无法满足这种实时性要求,而Kafka正好能够解决这个问题,它可以快速地收集、存储和处理大量数据流。 **Kafka API使用** 1. **生产者API**:在Java应用中,我们可以通过KafkaProducer类来发送消息。...