`
阅读更多

SparkStreaming与kafka整合小项目实践含所有代码带详细注释

 

总流程:自制日志生成器生成含数据日志,使用kafkaAppender直接发送到kafka,SparkStreaming从kafka消费日志,并流式处理将结果发送到kafka另一个topic,Java后台从kafka消费日志分析结果,实现秒级大数据实时分析展示。

 

版本

kafka_2.11-0.11.0.1

spark-2.1.1-bin-hadoop2.7

scala-2.11.11

Jdk-1.8

Spark使用Intelij Idea

其余使用eclipse

 

 

第一步

日志生成器输出日志到kafka

 

重点jar包:

kafka-log4j-appender-0.11.0.1.jar //日志使用

kafka_2.11-0.11.0.1.jar //如果报错就加上吧

kafka-clients-0.11.0.1.jar //如果报错就加上吧

slf4j-api-1.7.25.jar //日志框架也可以用其他的

slf4j-log4j12-1.7.25.jar

 

配置文件内容及注意事项

文件名:log4j.properties

文件内容:

 

log4j.rootLogger=DEBUG,stdout,KAFKA
//appender Console
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %5p %x-%t %l  (message:%m)%n
 
## appender KAFKA
log4j.appender.KAFKA=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.KAFKA.topic=log-topic
log4j.appender.KAFKA.brokerList=master:9090
log4j.appender.KAFKA.compressionType=none
log4j.appender.KAFKA.syncSend=true
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
log4j.appender.KAFKA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %5p %x-%t %l  (message:%m)

 

 

文件名:my.properties

 

#time interval of every times,unit is  ms,default 100ms
timeinterval=1000
#the count of log every times,default 1000
frequency=298
#runningtime unit is  ms,default 60000ms
runtime=6000000

 

 

代码解析:

LogWriterExcutor.java

 

import org.apache.log4j.Logger;
class LogWriterExcutor implements Runnable{
	
	Logger logger = Logger.getLogger(this.getClass().getName());
	private String []message;
	public LogWriterExcutor(String []message){
		this.message = message;
	}	
	@Override
	public void run() {
		// TODO Auto-generated method stub
		for(String e : message)
			logger.info(e);
	}
}

 

 

LogCreater.java

 

 

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.log4j.Logger;

class LogCreater extends Constant{
	
	Logger logger = Logger.getLogger(this.getClass().getName());
	
	ExecutorService executor = null;
	private int timeinterval = TIME_INTERVAL;		//间隔多久发送一批日志,单位毫秒
	private int frequency = FREQUENCY;				//每一批发送发送多少条数据,单位条
	private int sumOfChinese = SUM_CHINESE;			//自定义中文字集元素个数
	private int runtime = RUNTIME;					//程序运行总时间
	private long startTime = 0;
	private long endTime = 0;
	private long logCount = 0;						//日志已发条数
	private boolean stop = true;
	
	LogCreater(){
		init();
	}
	
	public void init(){
		Properties properties = new Properties();
		FileInputStream in;
		try {
			in = new FileInputStream("src\\source\\my.properties");
			properties.load(in);
			timeinterval = Integer.parseInt((String)properties.get("timeinterval"));
			frequency =Integer.parseInt((String)properties.get("frequency"));
			runtime =Integer.parseInt((String)properties.get("runtime"));
		} catch (IOException e) {
			logger.error("配置文件读取失败");
			e.printStackTrace();
		}
		executor = Executors.newCachedThreadPool();
		startTime = System.currentTimeMillis();
		printHint();
	}
	
	
	public void startCreate() {
		System.out.println("正在生成日志.....");
		
		if(executor == null){
			logger.error("线程池获取失败,日志生成器执行失败。执行结束");
			return;
		}
		while(stop){
			String []messages = getMessages(frequency);
			create(messages);
			try {
				Thread.sleep(timeinterval);
			} catch (InterruptedException e) {
				logger.error("线程睡眠执行出错");
				e.printStackTrace();
			}
			endTime = System.currentTimeMillis();
			if((endTime-startTime)>runtime)
				stop = false;
		}
		
		System.out.println("共生成 "+logCount+" 条日志。");
	}
	
	private void create(String []messages) {
		executor.execute(new Thread(new LogWriterExcutor(messages)));
		logCount += messages.length;
	}
	
	private String[] getMessages(Integer frequency) {
		Random rand = new Random();
		String []massages = new String[frequency];
		for(int i=0;i<frequency;i++){
			massages[i] = REGRET[rand.nextInt(sumOfChinese)];
		}
		return massages;
	}
	
	private void printHint(){
		System.out.println("每次时间间隔\t"+timeinterval+"ms");
		System.out.println("每次日志数量\t"+frequency+"条/次");
		System.out.println("预计运行时间\t"+runtime/1000+"s");
	}
}

 

 

Constant .java

 

public class Constant {

	/*
	 * 这个文件中存放的全部是常量
	 */
	
	/*
	 * 日志生成器隔多少时间写一批日志,默认值
	 */
	public static Integer TIME_INTERVAL = 100;
	
	/*
	 * 日志生成器每一批次生成多少条日志,默认值
	 */
	public static Integer FREQUENCY = 100;
	
	/*
	 * 运行时间,默认一分钟,默认值
	 */
	public static Integer RUNTIME = 60000;
	
	/*
	 * 298个中文字,来自楚辞《惜誓》
	 */
	public static String[]REGRET = {"一","言","老","调","清","者","舆","昆","合","渊","下","而","同","不","明","与",
			"昏","谏","小","騑","少","我","气","谔","世","或","尚","丝","鸟","逢","瀣","中","是","鸱","就","水","临","制",
			"举","砾","鸾","所","乃","鹄","久","居","陆","之","虎","乎","乐","虑","乔","虖","剖","遗","虚","聚","江","吸",
			"瑟","象","乡","衡","周","息","虯","衰","驰","山","驱","乱","干","年","并","恶","穷","偷","顺","登","白","幽",
			"驾","岁","蚁","节","梅","沆","皆","皇","骋","二","于","隐","源","麒","骖","骛","墟","功","麟","纡","纫","被",
			"身","犬","躯","悲","河","蚴","犹","人","难","裁","仁","狂","黄","集","哉","背","苍","从","风","仑","黑","盖",
			"高","飙","仙","四","盛","惜","飞","回","苟","因","以","拥","苦","独","竭","曲","直","相","建","固","国","攀",
			"异","儃","处","茅","月","夏","霑","休","众","北","圜","生","索","謣","圣","贤","伤","大","在","用","木","天",
			"眩","太","夫","伯","地","朱","失","贵","然","贼","放","愿","流","权","充","故","商","均","先","浊","子","何",
			"余","神","非","止","赤","此","来","车","革","兮","佯","数","女","杳","海","睹","蝼","彼","载","松","使","长",
			"极","羁","如","概","历","玉","涉","冉","枉","羊","王","後","厌","再","美","箕","得","龙","原","龟","审","醢",
			"群","冥","推","循","讬","枭","况","德","容","方","澹","离","去","旁","见","观","係","心","寄","又","反","重",
			"野","藏","量","发","翔","比","俗","志","诚","进","远","川","察","忠","无","濡","矣","凤","日","知","左","自",
			"矫","可","称","翱","深","已","右","至","石","念","时","迻","忽","寿","丹","根","为","尽",};
	
	/*
	 * 中文字个数,用作随机数范围使用
	 */
	public static Integer SUM_CHINESE = 100;
}

 

 

MyUtil.java

import java.util.Random;
public class MyUtil {
	public static int[] getRand(int n,int range){
		Random ran = new Random();
		int []arr = new int[n];
		while(n-->0){
			arr[n] = ran.nextInt(range);
		}
		return arr;
	}
}

 

Demo.java

/*
 * 日志生成器
 */

public class Demo{
	public static void main(String[] args){
		new LogCreater().startCreate();
		System.exit(0);
	}
}

目录结构:就普通java project,


 

第二步

创建kafka topic

安装跳过

配置%KAFKA_HOME%conf/server.properties:

网上教程很多,此处不再赘述

 

启动kafka

kafka-server-start.sh config/server.properties &

 

创建topic:

kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic log-topic

 

查看topic:

kafka-topics.sh --describe --zookeeper master:2181 --topic log-topic

 

创建控制台消费者:

kafka-console-consumer.sh --bootstrap-server master:9090 --from-beginning --topic log-topic

 

启动顺序

1.启动kafka Server,2.创建topic,3.查看创建的topic(可选),4.创建控制台消费者,5.启动日志生成器程序。

 

注意事项在启动控制台消费者的终端会将接收的日志打印出来,命令最后面加上 & 符号可将进程调至后台运行。关闭消费者使用Ctrl+c

 

 

第三步

spark消费kafka的日志

重点jar包:

kafka_2.11-0.11.0.1.jar

kafka-clients-0.11.0.1.jar

spark-streaming-kafka_2.11-1.6.3.jar

 

Spark所有自带jar包

Scala的SDK

 

报异常:

如果运行报java.lang.NoClassDefFoundError: org/apache/spark/Logging

这个Logging截止存在于spark-core_2.11-1.5.2中。

2.1.1版本saprk无此class文件,被org.apache.spark.internal.Logging取代。

解决办法

把1.5.2版本里面的这个class提出来单独用java -xvf  new_name.jar class_dir 打包成一个jar包,然后当做常规jar工具包使用

 

过程解析:

Spark创建Receiver从kafka消费日志数据。

 

代码解析:Kafka.scala 

import java.util.Properties
import java.util.logging.{Level, Logger}

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
//import com.trigl.spark.util.{DataUtil, LauncherMultipleTextOutputFormat}
import org.apache.spark.Logging
object Kafka extends Logging{

  private var producer: KafkaProducer[String, String] = _
  private var props : Properties = _

  def main(args: Array[String]) {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARNING)
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sparkConf = new SparkConf().setAppName("LauncherStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    /*
        provider的参数
     */
    val brokerAddress = "master:9090"
    val topic = "pro-topic"
    props = new Properties()
    props.put("bootstrap.servers", brokerAddress)
    props.put("value.serializer", classOf[StringSerializer].getName)
    // Key serializer is required.
    props.put("key.serializer", classOf[StringSerializer].getName)
    // wait for all in-sync replicas to ack sends
    props.put("acks", "all")

	//创建kafka生产者,后面可以直接使用它发送数据  
    producer = new KafkaProducer[String, String](props)
    if(producer == null) {
      println("producer为空")
      ssc.stop()
    }

    /*
    *消费者参数
     */
    val zkQuorum = "master:2181,slave1:2181,slave2:2181"
	//这个group本来是随意创建,但是不能与已存在的重复,否在接收不到数据。每次运行请务必修改,或者做成参数,这个问题我尚未解决,但不影响流程///测试
    val group = "log-group21"		
    val topicMap = Map[String, Int]("log-topic" -> 1)

	//创建kafka消费者,如果不使用窗口将每隔【StreamingContext第二个参数定义时间】创建一个rdd
    val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)

    kafkaStream.window(Seconds(12),Seconds(6)).foreachRDD((rdd: RDD[String], time: Time) => {
		//使用窗口每隔6秒钟处理一次前12秒区段的数据,此处6秒钟位置所在参数必须为StreamingContext(),第二个参数的倍数
		//这12秒时间区段的数据全在这一个rdd里面,直接迭代计算wordcount,将最终生成的数据发送到kafka另一个topic
      val re = rdd.flatMap(t => t.reverse.charAt(1).toString).map(m => (m,1L)).reduceByKey(_+_)
      val a = re.collect().toMap
      producer.send(new ProducerRecord[String, String](topic, a.mkString(",")))
    })

/*
    //这个可以用
    kafkaStream.foreachRDD((rdd: RDD[String], time: Time) => {

      //下面这个可以用,直接转发
      //rdd.collect().foreach(t => producer.send(new ProducerRecord[String, String](topic, t)))

      //下面这个可以用,微处理然后发送
      rdd.collect().foreach(t =>{
        println("正在发送: "+t)
        var s = t.reverse.charAt(1).toString		//提取前面夹杂在日志中的一个汉字
        producer.send(new ProducerRecord[String, String](topic, s))
      })

    })
*/
    ssc.start()
        // 等待实时流
    ssc.awaitTermination()
	
	//这条语句建议写上。
    producer.close()	
    println("它发生了")
  }

 

运行命令及注意事项

spark-submit  --master spark://master:7077 --class streaming.Kafka libra.jar

如果缺包可以用--jars或者其他参数加上

特别注意:

每次运行请修改scala消费者的group消费组名,否则会接收不到数据,这个问题我还没解决

 

第四步

spark生成处理结果发送给kafka

jar包:

与第三步一样

 

创建新的topic:

创建命令请看第二步,新的topic请配置到spark的Producer中

,创建控制台消费者

 

第五步

Java后台消费kafka日志

重点ar包:

kafka-clients-0.11.0.1.jar

kafka_2.11-0.11.0.1.jar

slf4j-api-1.7.25.jar

slf4j-log4j12-1.7.25.jar

log4j-1.2.17.jar

 

普通Java工程

代码解析:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;

public class Consumer{

	//0.11.0.0版本后使用KafkaConsumer,,版本0.11.0.0之前使用ConsumerConnector
    private final KafkaConsumer<Integer, String> consumer;
    private String topic;

    public Consumer(String topic) {
        Properties props = new Properties();
		//KafkaProperties是自定义接口文件,用于存放静态参数
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
		
		//这里消费组名貌似也有不能重复的嫌疑,每次运行建议修改一下
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "log-group101");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        consumer = new KafkaConsumer<>(props);
        this.topic = topic;
    }

    public void doWork() {
	
		//设置topic
        consumer.subscribe(Collections.singletonList(topic));
        ConsumerRecords<Integer, String> records = null;
		
		//循环消费数据,每次请求都会把还没消费过的数据全部请求回来
        while(true) {
			//这里7秒是每次请求数据的最大等待时间,因为前面spark设置的6秒处理一次,这里用6秒,kafka中转可能延迟
        	records = consumer.poll(7000);
        	System.out.println("===========================");
        	System.out.println("接收数据条数:"+records.count());
        	  for (ConsumerRecord<Integer, String> record : records) {
                  System.out.println(record.value()+"=="+ record.offset());
              }
        	  System.out.println("===========================");
        }
    }
}

 

 

  • 大小: 16 KB
分享到:
评论

相关推荐

    SparkStreaming和kafka的整合.pdf

    根据提供的文件信息,本文将详细解析“Spark Streaming与Kafka的整合”这一主题,并结合代码片段探讨其在实际场景中的应用。尽管标签中提到“数学建模”,但从标题和描述来看,这部分内容与数学建模无关,因此我们将...

    Spark-Streaming整合Kafka.md

    Spark_Streaming整合Kafka.md

    kafka+spark streaming开发文档

    kafka+Spark Streaming开发文档 本文档主要讲解了使用Kafka和Spark Streaming进行实时数据处理的开发文档,涵盖了Kafka集群的搭建、Spark Streaming的配置和开发等内容。 一、Kafka集群搭建 首先,需要安装Kafka...

    springBoot整合kafka和elasticSearch,实现批量拉取日志以及批量更新到es里

    本项目将详细讲解如何利用SpringBoot整合Kafka和Elasticsearch,实现日志的批量拉取和更新。 首先,我们需要在SpringBoot项目中引入相应的依赖。对于Kafka,我们需要添加`spring-kafka`依赖,它提供了与Kafka交互的...

    kafka kafka与sparkStreaming kafka与Scala

    **Kafka与Spark Streaming的整合** Kafka和Spark Streaming的结合是实时数据处理的常见架构。Spark Streaming可以从Kafka的Topic中直接读取数据,进行实时处理,然后将结果写回Kafka或者其他存储系统。这种组合提供...

    基于Spark+Kafka+Flume实现的电影推荐系统.zip

    总的来说,这个项目展示了如何利用大数据技术来构建一个实时的电影推荐系统,通过整合Spark、Kafka和Flume,实现了数据的高效采集、处理和推荐,对于理解和应用大数据技术,以及理解推荐系统的工作原理具有很高的...

    基于spark+kafka+zk整合的实时项目+文档说明

    2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合小白学习进阶,当然也可作为毕设项目、课程设计、作业、项目初期立项演示等。...

    Spark Streaming 流式处理整合Kafka.rar

    在示例代码中 `kafkaParams` 封装了 Kafka 消费者的属性,这些属性和 Spark Streaming 无关,是 Kafka 原生 API 中就有定义的。其中服务器地址、键序列化器和值序列化器是必选的,其他配置是可选的。 Spark ...

    毕业设计:基于Spark+Kafka+Hive的智能货运系统设计与实现.zip

    本毕业设计旨在构建一个利用大数据处理技术的智能货运系统,通过整合Apache Spark、Kafka和Hive等工具,实现数据的实时处理、流式传输以及存储分析,从而优化货运流程,提高运营效率。 Spark是Apache开源项目中的...

    新闻、健身实时数据 基于spark+kafka+flume+echarts可视化+hadoop

    该项目是关于实时数据处理和可视化的综合应用,利用了大数据技术栈中的多个组件,包括Spark、Kafka、Flume、Echarts以及Hadoop。以下是这些技术在该项目中的具体作用和相关知识点: 1. **Spark**: Apache Spark是一...

    sparkstreaming-kafka开发的pom文件

    &lt;scala.version&gt;2.10.5 &lt;spark.version&gt;1.6.2&lt;/spark.version&gt; &lt;jackson.version&gt;2.4.3 &lt;hbase.version&gt;1.2.0 的pom文件

    阿里云emr spark kafka redis MongoDB例子demo

    在这个"阿里云EMR spark kafka redis MongoDB例子demo"中,我们看到了如何整合这些技术,构建一个实时数据处理系统,从Kafka获取数据,利用Spark Streaming进行实时分析,然后将结果存储到Redis和MongoDB,实现了...

    flume+kafka+sparkstreaming

    通过flume监控文件,让kafka消费flume数据,再将sparkstreaming连接kafka作为消费者进行数据处理,文档整理实现

    利用JAVA语言基于Spark实现的电影推荐系统,整合Spring,Spark,Kafka,Flume,MySQL.zip

    这项任务通常要求学生运用所学专业知识,通过独立研究和创新,完成一个实际问题的解决方案或者开展一项有价值的项目。 首先,毕业设计的选择通常由学生根据个人兴趣、专业方向以及实际需求来确定。学生需要在导师的...

    基于spark streaming和kafka,hbase的日志统计分析系统.zip

    这个项目不仅适用于毕业设计,也是对大数据实时处理技术的实际应用训练,对于理解Spark Streaming、Kafka和HBase的工作原理,以及如何在实际场景中整合使用这些技术,具有很高的学习价值。通过该项目,开发者可以...

    spring-kafka 整合官方文档

    spring-kafka是Spring框架对Apache Kafka消息系统进行整合的一个项目,它简化了Kafka的使用,使其更容易与Spring应用程序集成。Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流应用程序。它具备...

Global site tag (gtag.js) - Google Analytics