SparkStreaming与kafka整合小项目实践含所有代码带详细注释
总流程:自制日志生成器生成含数据日志,使用kafkaAppender直接发送到kafka,SparkStreaming从kafka消费日志,并流式处理将结果发送到kafka另一个topic,Java后台从kafka消费日志分析结果,实现秒级大数据实时分析展示。
版本
kafka_2.11-0.11.0.1
spark-2.1.1-bin-hadoop2.7
scala-2.11.11
Jdk-1.8
Spark使用Intelij Idea
其余使用eclipse
第一步
日志生成器输出日志到kafka
重点jar包:
kafka-log4j-appender-0.11.0.1.jar //日志使用
kafka_2.11-0.11.0.1.jar //如果报错就加上吧
kafka-clients-0.11.0.1.jar //如果报错就加上吧
slf4j-api-1.7.25.jar //日志框架也可以用其他的
slf4j-log4j12-1.7.25.jar
配置文件内容及注意事项
文件名:log4j.properties
文件内容:
log4j.rootLogger=DEBUG,stdout,KAFKA //appender Console log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %5p %x-%t %l (message:%m)%n ## appender KAFKA log4j.appender.KAFKA=org.apache.kafka.log4jappender.KafkaLog4jAppender log4j.appender.KAFKA.topic=log-topic log4j.appender.KAFKA.brokerList=master:9090 log4j.appender.KAFKA.compressionType=none log4j.appender.KAFKA.syncSend=true log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout log4j.appender.KAFKA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %5p %x-%t %l (message:%m)
文件名:my.properties
#time interval of every times,unit is ms,default 100ms timeinterval=1000 #the count of log every times,default 1000 frequency=298 #runningtime unit is ms,default 60000ms runtime=6000000
代码解析:
LogWriterExcutor.java
import org.apache.log4j.Logger; class LogWriterExcutor implements Runnable{ Logger logger = Logger.getLogger(this.getClass().getName()); private String []message; public LogWriterExcutor(String []message){ this.message = message; } @Override public void run() { // TODO Auto-generated method stub for(String e : message) logger.info(e); } }
LogCreater.java
import java.io.FileInputStream; import java.io.IOException; import java.util.Properties; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.log4j.Logger; class LogCreater extends Constant{ Logger logger = Logger.getLogger(this.getClass().getName()); ExecutorService executor = null; private int timeinterval = TIME_INTERVAL; //间隔多久发送一批日志,单位毫秒 private int frequency = FREQUENCY; //每一批发送发送多少条数据,单位条 private int sumOfChinese = SUM_CHINESE; //自定义中文字集元素个数 private int runtime = RUNTIME; //程序运行总时间 private long startTime = 0; private long endTime = 0; private long logCount = 0; //日志已发条数 private boolean stop = true; LogCreater(){ init(); } public void init(){ Properties properties = new Properties(); FileInputStream in; try { in = new FileInputStream("src\\source\\my.properties"); properties.load(in); timeinterval = Integer.parseInt((String)properties.get("timeinterval")); frequency =Integer.parseInt((String)properties.get("frequency")); runtime =Integer.parseInt((String)properties.get("runtime")); } catch (IOException e) { logger.error("配置文件读取失败"); e.printStackTrace(); } executor = Executors.newCachedThreadPool(); startTime = System.currentTimeMillis(); printHint(); } public void startCreate() { System.out.println("正在生成日志....."); if(executor == null){ logger.error("线程池获取失败,日志生成器执行失败。执行结束"); return; } while(stop){ String []messages = getMessages(frequency); create(messages); try { Thread.sleep(timeinterval); } catch (InterruptedException e) { logger.error("线程睡眠执行出错"); e.printStackTrace(); } endTime = System.currentTimeMillis(); if((endTime-startTime)>runtime) stop = false; } System.out.println("共生成 "+logCount+" 条日志。"); } private void create(String []messages) { executor.execute(new Thread(new LogWriterExcutor(messages))); logCount += messages.length; } private String[] getMessages(Integer frequency) { Random rand = new Random(); String []massages = new String[frequency]; for(int i=0;i<frequency;i++){ massages[i] = REGRET[rand.nextInt(sumOfChinese)]; } return massages; } private void printHint(){ System.out.println("每次时间间隔\t"+timeinterval+"ms"); System.out.println("每次日志数量\t"+frequency+"条/次"); System.out.println("预计运行时间\t"+runtime/1000+"s"); } }
Constant .java
public class Constant { /* * 这个文件中存放的全部是常量 */ /* * 日志生成器隔多少时间写一批日志,默认值 */ public static Integer TIME_INTERVAL = 100; /* * 日志生成器每一批次生成多少条日志,默认值 */ public static Integer FREQUENCY = 100; /* * 运行时间,默认一分钟,默认值 */ public static Integer RUNTIME = 60000; /* * 298个中文字,来自楚辞《惜誓》 */ public static String[]REGRET = {"一","言","老","调","清","者","舆","昆","合","渊","下","而","同","不","明","与", "昏","谏","小","騑","少","我","气","谔","世","或","尚","丝","鸟","逢","瀣","中","是","鸱","就","水","临","制", "举","砾","鸾","所","乃","鹄","久","居","陆","之","虎","乎","乐","虑","乔","虖","剖","遗","虚","聚","江","吸", "瑟","象","乡","衡","周","息","虯","衰","驰","山","驱","乱","干","年","并","恶","穷","偷","顺","登","白","幽", "驾","岁","蚁","节","梅","沆","皆","皇","骋","二","于","隐","源","麒","骖","骛","墟","功","麟","纡","纫","被", "身","犬","躯","悲","河","蚴","犹","人","难","裁","仁","狂","黄","集","哉","背","苍","从","风","仑","黑","盖", "高","飙","仙","四","盛","惜","飞","回","苟","因","以","拥","苦","独","竭","曲","直","相","建","固","国","攀", "异","儃","处","茅","月","夏","霑","休","众","北","圜","生","索","謣","圣","贤","伤","大","在","用","木","天", "眩","太","夫","伯","地","朱","失","贵","然","贼","放","愿","流","权","充","故","商","均","先","浊","子","何", "余","神","非","止","赤","此","来","车","革","兮","佯","数","女","杳","海","睹","蝼","彼","载","松","使","长", "极","羁","如","概","历","玉","涉","冉","枉","羊","王","後","厌","再","美","箕","得","龙","原","龟","审","醢", "群","冥","推","循","讬","枭","况","德","容","方","澹","离","去","旁","见","观","係","心","寄","又","反","重", "野","藏","量","发","翔","比","俗","志","诚","进","远","川","察","忠","无","濡","矣","凤","日","知","左","自", "矫","可","称","翱","深","已","右","至","石","念","时","迻","忽","寿","丹","根","为","尽",}; /* * 中文字个数,用作随机数范围使用 */ public static Integer SUM_CHINESE = 100; }
MyUtil.java
import java.util.Random; public class MyUtil { public static int[] getRand(int n,int range){ Random ran = new Random(); int []arr = new int[n]; while(n-->0){ arr[n] = ran.nextInt(range); } return arr; } }
Demo.java
/* * 日志生成器 */ public class Demo{ public static void main(String[] args){ new LogCreater().startCreate(); System.exit(0); } }
目录结构:就普通java project,
第二步
创建kafka topic
安装跳过
配置%KAFKA_HOME%conf/server.properties:
网上教程很多,此处不再赘述
启动kafka
kafka-server-start.sh config/server.properties &
创建topic:
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic log-topic
查看topic:
kafka-topics.sh --describe --zookeeper master:2181 --topic log-topic
创建控制台消费者:
kafka-console-consumer.sh --bootstrap-server master:9090 --from-beginning --topic log-topic
启动顺序:
1.启动kafka Server,2.创建topic,3.查看创建的topic(可选),4.创建控制台消费者,5.启动日志生成器程序。
注意事项:在启动控制台消费者的终端会将接收的日志打印出来,命令最后面加上 & 符号可将进程调至后台运行。关闭消费者使用Ctrl+c
第三步
spark消费kafka的日志
重点jar包:
kafka_2.11-0.11.0.1.jar
kafka-clients-0.11.0.1.jar
spark-streaming-kafka_2.11-1.6.3.jar
Spark所有自带jar包
Scala的SDK
报异常:
如果运行报java.lang.NoClassDefFoundError: org/apache/spark/Logging
这个Logging截止存在于spark-core_2.11-1.5.2中。
2.1.1版本saprk无此class文件,被org.apache.spark.internal.Logging取代。
解决办法
把1.5.2版本里面的这个class提出来单独用java -xvf new_name.jar class_dir 打包成一个jar包,然后当做常规jar工具包使用
过程解析:
Spark创建Receiver从kafka消费日志数据。
代码解析:Kafka.scala
import java.util.Properties import java.util.logging.{Level, Logger} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext, Time} //import com.trigl.spark.util.{DataUtil, LauncherMultipleTextOutputFormat} import org.apache.spark.Logging object Kafka extends Logging{ private var producer: KafkaProducer[String, String] = _ private var props : Properties = _ def main(args: Array[String]) { Logger.getLogger("org.apache.spark").setLevel(Level.WARNING) System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sparkConf = new SparkConf().setAppName("LauncherStreaming") val ssc = new StreamingContext(sparkConf, Seconds(3)) /* provider的参数 */ val brokerAddress = "master:9090" val topic = "pro-topic" props = new Properties() props.put("bootstrap.servers", brokerAddress) props.put("value.serializer", classOf[StringSerializer].getName) // Key serializer is required. props.put("key.serializer", classOf[StringSerializer].getName) // wait for all in-sync replicas to ack sends props.put("acks", "all") //创建kafka生产者,后面可以直接使用它发送数据 producer = new KafkaProducer[String, String](props) if(producer == null) { println("producer为空") ssc.stop() } /* *消费者参数 */ val zkQuorum = "master:2181,slave1:2181,slave2:2181" //这个group本来是随意创建,但是不能与已存在的重复,否在接收不到数据。每次运行请务必修改,或者做成参数,这个问题我尚未解决,但不影响流程///测试 val group = "log-group21" val topicMap = Map[String, Int]("log-topic" -> 1) //创建kafka消费者,如果不使用窗口将每隔【StreamingContext第二个参数定义时间】创建一个rdd val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2) kafkaStream.window(Seconds(12),Seconds(6)).foreachRDD((rdd: RDD[String], time: Time) => { //使用窗口每隔6秒钟处理一次前12秒区段的数据,此处6秒钟位置所在参数必须为StreamingContext(),第二个参数的倍数 //这12秒时间区段的数据全在这一个rdd里面,直接迭代计算wordcount,将最终生成的数据发送到kafka另一个topic val re = rdd.flatMap(t => t.reverse.charAt(1).toString).map(m => (m,1L)).reduceByKey(_+_) val a = re.collect().toMap producer.send(new ProducerRecord[String, String](topic, a.mkString(","))) }) /* //这个可以用 kafkaStream.foreachRDD((rdd: RDD[String], time: Time) => { //下面这个可以用,直接转发 //rdd.collect().foreach(t => producer.send(new ProducerRecord[String, String](topic, t))) //下面这个可以用,微处理然后发送 rdd.collect().foreach(t =>{ println("正在发送: "+t) var s = t.reverse.charAt(1).toString //提取前面夹杂在日志中的一个汉字 producer.send(new ProducerRecord[String, String](topic, s)) }) }) */ ssc.start() // 等待实时流 ssc.awaitTermination() //这条语句建议写上。 producer.close() println("它发生了") }
运行命令及注意事项
spark-submit --master spark://master:7077 --class streaming.Kafka libra.jar
如果缺包可以用--jars或者其他参数加上
特别注意:
每次运行请修改scala消费者的group消费组名,否则会接收不到数据,这个问题我还没解决
第四步
spark生成处理结果发送给kafka
jar包:
与第三步一样
创建新的topic:
创建命令请看第二步,新的topic请配置到spark的Producer中
,创建控制台消费者
第五步
Java后台消费kafka日志
重点ar包:
kafka-clients-0.11.0.1.jar
kafka_2.11-0.11.0.1.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.25.jar
log4j-1.2.17.jar
普通Java工程
代码解析:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; public class Consumer{ //0.11.0.0版本后使用KafkaConsumer,,版本0.11.0.0之前使用ConsumerConnector private final KafkaConsumer<Integer, String> consumer; private String topic; public Consumer(String topic) { Properties props = new Properties(); //KafkaProperties是自定义接口文件,用于存放静态参数 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); //这里消费组名貌似也有不能重复的嫌疑,每次运行建议修改一下 props.put(ConsumerConfig.GROUP_ID_CONFIG, "log-group101"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); this.topic = topic; } public void doWork() { //设置topic consumer.subscribe(Collections.singletonList(topic)); ConsumerRecords<Integer, String> records = null; //循环消费数据,每次请求都会把还没消费过的数据全部请求回来 while(true) { //这里7秒是每次请求数据的最大等待时间,因为前面spark设置的6秒处理一次,这里用6秒,kafka中转可能延迟 records = consumer.poll(7000); System.out.println("==========================="); System.out.println("接收数据条数:"+records.count()); for (ConsumerRecord<Integer, String> record : records) { System.out.println(record.value()+"=="+ record.offset()); } System.out.println("==========================="); } } }
相关推荐
1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。
MMC整流器技术解析:基于Matlab的双闭环控制策略与环流抑制性能研究,Matlab下的MMC整流器技术文档:18个子模块,双闭环控制稳定直流电压,环流抑制与最近电平逼近调制,优化桥臂电流波形,高效并网运行。,MMC整流器(Matlab),技术文档 1.MMC工作在整流侧,子模块个数N=18,直流侧电压Udc=25.2kV,交流侧电压6.6kV 2.控制器采用双闭环控制,外环控制直流电压,采用PI调节器,电流内环采用PI+前馈解耦; 3.环流抑制采用PI控制,能够抑制环流二倍频分量; 4.采用最近电平逼近调制(NLM), 5.均压排序:电容电压排序采用冒泡排序,判断桥臂电流方向确定投入切除; 结果: 1.输出的直流电压能够稳定在25.2kV; 2.有功功率,无功功率稳态时波形稳定,有功功率为3.2MW,无功稳定在0Var; 3.网侧电压电流波形均为对称的三相电压和三相电流波形,网侧电流THD=1.47%<2%,符合并网要求; 4.环流抑制后桥臂电流的波形得到改善,桥臂电流THD由9.57%降至1.93%,环流波形也可以看到得到抑制; 5.电容电压能够稳定变化 ,工作点关键词:MMC
Boost二级升压光伏并网结构的Simulink建模与MPPT最大功率点追踪:基于功率反馈的扰动观察法调整电压方向研究,Boost二级升压光伏并网结构的Simulink建模与MPPT最大功率点追踪:基于功率反馈的扰动观察法调整电压方向研究,Boost二级升压光伏并网结构,Simulink建模,MPPT最大功率点追踪,扰动观察法采用功率反馈方式,若ΔP>0,说明电压调整的方向正确,可以继续按原方向进行“干扰”;若ΔP<0,说明电压调整的方向错误,需要对“干扰”的方向进行改变。 ,Boost升压;光伏并网结构;Simulink建模;MPPT最大功率点追踪;扰动观察法;功率反馈;电压调整方向。,光伏并网结构中Boost升压MPPT控制策略的Simulink建模与功率反馈扰动观察法
STM32F103C8T6 USB寄存器开发详解(12)-键盘设备
科技活动人员数专指直接从事科技活动以及专门从事科技活动管理和为科技活动提供直接服务的人员数量
Matlab Simulink仿真探究Flyback反激式开关电源性能表现与优化策略,Matlab Simulink仿真探究Flyback反激式开关电源的工作机制,Matlab Simulimk仿真,Flyback反激式开关电源仿真 ,Matlab; Simulink仿真; Flyback反激式; 开关电源仿真,Matlab Simulink在Flyback反激式开关电源仿真中的应用
基于Comsol的埋地电缆电磁加热计算模型:深度解析温度场与电磁场分布学习资料与服务,COMSOL埋地电缆电磁加热计算模型:温度场与电磁场分布的解析与学习资源,comsol 埋地电缆电磁加热计算模型,可以得到埋地电缆温度场及电磁场分布,提供学习资料和服务, ,comsol;埋地电缆电磁加热计算模型;温度场分布;电磁场分布;学习资料;服务,Comsol埋地电缆电磁加热模型:温度场与电磁场分布学习资料及服务
1、文件内容:ibus-table-chinese-yong-1.4.6-3.el7.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/ibus-table-chinese-yong-1.4.6-3.el7.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、更多资源/技术支持:公众号禅静编程坊
基于51单片机protues仿真的汽车智能灯光控制系统设计(仿真图、源代码) 一、设计项目 根据本次设计的要求,设计出一款基于51单片机的自动切换远近光灯的设计。 技术条件与说明: 1. 设计硬件部分,中央处理器采用了STC89C51RC单片机; 2. 使用两个灯珠代表远近光灯,感光部分采用了光敏电阻,因为光敏电阻输出的是电压模拟信号,单片机不能直接处理模拟信号,所以经过ADC0832进行转化成数字信号; 3. 显示部分采用了LCD1602液晶,还增加按键部分电路,可以选择手自动切换远近光灯; 4. 用超声模块进行检测距离;
altermanager的企业微信告警服务
MyAgent测试版本在线下载
Comsol技术:可调BIC应用的二氧化钒VO2材料探索,Comsol模拟二氧化钒VO2的可调BIC特性研究,Comsol二氧化钒VO2可调BIC。 ,Comsol; 二氧化钒VO2; 可调BIC,Comsol二氧化钒VO2材料:可调BIC技术的关键应用
C++学生成绩管理系统源码
基于Matlab与Cplex的激励型需求响应模式:负荷转移与电价响应的差异化目标函数解析,基于Matlab与CPLEX的激励型需求响应负荷转移策略探索,激励型需求响应 matlab +cplex 激励型需求响应采用激励型需求响应方式对负荷进行转移,和电价响应模式不同,具体的目标函数如下 ,激励型需求响应; matlab + cplex; 负荷转移; 目标函数。,Matlab与Cplex结合的激励型需求响应模型及其负荷转移策略
scratch介绍(scratch说明).zip
内容概要:本文全面介绍了深度学习模型的概念、工作机制和发展历程,详细探讨了神经网络的构建和训练过程,包括反向传播算法和梯度下降方法。文中还列举了深度学习在图像识别、自然语言处理、医疗和金融等多个领域的应用实例,并讨论了当前面临的挑战,如数据依赖、计算资源需求、可解释性和对抗攻击等问题。最后,文章展望了未来的发展趋势,如与量子计算和区块链的融合,以及在更多领域的应用前景。 适合人群:对该领域有兴趣的技术人员、研究人员和学者,尤其适合那些希望深入了解深度学习原理和技术细节的读者。 使用场景及目标:①理解深度学习模型的基本原理和结构;②了解深度学习模型的具体应用案例;③掌握应对当前技术挑战的方向。 阅读建议:文章内容详尽丰富,读者应在阅读过程中注意理解各个关键技术的概念和原理,尤其是神经网络的构成及训练过程。同时也建议对比不同模型的特点及其在具体应用中的表现。
该文档提供了一个关于供应链管理系统开发的详细指南,重点介绍了项目安排、技术实现和框架搭建的相关内容。 文档分为以下几个关键部分: 项目安排:主要步骤包括搭建框架(1天),基础数据模块和权限管理(4天),以及应收应付和销售管理(5天)。 供应链概念:供应链系统的核心流程是通过采购商品放入仓库,并在销售时从仓库提取商品,涉及三个主要订单:采购订单、销售订单和调拨订单。 大数据的应用:介绍了数据挖掘、ETL(数据抽取)和BI(商业智能)在供应链管理中的应用。 技术实现:讲述了DAO(数据访问对象)的重用、服务层的重用、以及前端JS的继承机制、jQuery插件开发等技术细节。 系统框架搭建:包括Maven环境的配置、Web工程的创建、持久化类和映射文件的编写,以及Spring配置文件的实现。 DAO的需求和功能:供应链管理系统的各个模块都涉及分页查询、条件查询、删除、增加、修改操作等需求。 泛型的应用:通过示例说明了在Java语言中如何使用泛型来实现模块化和可扩展性。 文档非常技术导向,适合开发人员参考,用于构建供应链管理系统的架构和功能模块。
这份长达104页的手册由清华大学新闻与传播学院新媒体研究中心元宇宙文化实验室的余梦珑博士后及其团队精心编撰,内容详尽,覆盖了从基础概念、技术原理到实战案例的全方位指导。它不仅适合初学者快速了解DeepSeek的基本操作,也为有经验的用户提供了高级技巧和优化策略。
主题说明: 1、将mxtheme目录放置根目录 | 将mxpro目录放置template文件夹中 2、苹果cms后台-系统-网站参数配置-网站模板-选择mxpro 模板目录填写html 3、网站模板选择好之后一定要先访问前台,然后再进入后台设置 4、主题后台地址: MXTU MAX图图主题,/admin.php/admin/mxpro/mxproset admin.php改成你登录后台的xxx.php 5、首页幻灯片设置视频推荐9,自行后台设置 6、追剧周表在视频数据中,节目周期添加周一至周日自行添加,格式:一,二,三,四,五,六,日
运行GUI版本,可二开