版本:
kafka:2.11
spark:2.0.2
测试过程:
1、开发spark streaming程序,读取kafka队列数据,并进行处理;
2、启动spark、zookeeper及kafka;
3、启动log4j输出到kafka的程序,先用kafka receive console程序验证其正确性;
4、启动spark streaming程序,观察执行效果,启动命令如下:
spark-submit --class com.itown.bigdata.kafka.KafkaReader /usr/hadoop/jar/sparkApp-0.0.1-SNAPSHOT-jar-with-dependencies.jar
开发过程:
1、java类:
注意:
package com.itown.bigdata.kafka;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import com.google.common.collect.Lists;
public class KafkaReader {
static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) {
// 每个话题的分片数
int numThreads = 2;
SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount")
.setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
new Duration(10000));
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("test", "test2");
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(topics,
kafkaParams));
JavaDStream<String> words = stream
.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() {
public Iterator<String> call(
ConsumerRecord<String, String> t) throws Exception {
System.out.println(">>>" + t.value());
return Lists.newArrayList(SPACE.split(t.value()))
.iterator();
}
});
// 对其中的单词进行统计
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
// 打印结果
wordCounts.print();
try {
jssc.start();
jssc.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2、maven pom.xml
注意:
1)spark-streaming-kafka的引用部分
2)打包同时将依赖包也打了进来
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itown.bigdata</groupId>
<artifactId>sparkApp</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>sparkApp</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId> maven-assembly-plugin </artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>central</id>
<name>central</name>
<url>http://central.maven.org/maven2/</url>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
</project>
相关推荐
10. **测试与调优**:完成代码编写后,需要对系统进行测试,验证其功能正确性、性能效率以及稳定性。此外,根据实际运行情况,可能还需要对Spark和HBase的配置进行调优。 这个项目为学习和实践大数据实时处理提供了...
`spark-streaming-kafka-0-10`是Spark与Kafka的连接器,允许Spark直接读取Kafka主题的数据,进行实时分析。测试代码可能包含了如何配置Spark Streaming去读取Kafka数据,并执行计算的代码片段。 六、测试代码解析 ...
2. 编写SparkStreaming代码,定义输入源(如Kafka、Flume等)、数据处理逻辑以及输出结果。 3. 配置Spark运行环境,如设置Master URL、内存分配等。 4. 使用Maven命令打包应用,生成可执行的JAR文件。 5. 在Spark...
《基于Flume、Kafka、Spark Streaming和HBase的流式处理系统设计与实现》 在大数据处理领域,实时流处理已经成为不可或缺的一部分。本项目旨在构建一个高效、可靠的实时数据处理系统,通过集成Apache Flume、Kafka...
8. **实战经验**:项目提供了一个实际环境来测试和优化Spark Streaming与HDFS的集成,对于学习者来说,是提升实际操作能力的好机会。 以上就是基于给定标题和描述可以提炼出的主要知识点,每个点都值得深入学习和...
**Spark Streaming与Kafka的整合** 主要通过`KafkaDirectStream`或`KafkaReceiver`实现。Spark Streaming可以从Kafka的特定主题消费数据,使用AvroSchemaRegistry解析Avro消息,然后执行DStream操作。 在`kafka-...
Spark Streaming 是 Spark 用于实时数据处理的模块,它能从各种数据源(如 Kafka、Flume、Twitter 等)接收流数据,然后以微批次的形式处理这些数据。这种设计允许 Spark Streaming 拥有与批处理相似的 API,同时又...
5. 测试与部署模块:包括单元测试和集成测试,以及部署脚本。 总的来说,这个基于Spark的电影点评系统展示了如何利用大数据技术处理用户行为数据,实现高效的数据分析和个性化推荐。通过深入学习Spark的相关知识,...
Spark可以与Kafka集成,通过Spark Streaming模块从Kafka消费数据流进行实时处理。这种方式使得数据能够实时地从Kafka主题中读取,经过Spark的计算逻辑后,结果可以直接写回Kafka或其他存储系统。这种架构为实时大...
2. **Flume 与 Spark Streaming 集成**:在 DEA(可能是开发环境或服务器)上,通过 Maven 管理项目,将 `spark-streaming-flume-sink_2.11-2.4.2.jar` 添加到 Flume 安装目录的 `lib` 文件夹。由于文件权限问题,...
集成Hadoop 2.7意味着Spark能够与Hadoop生态系统中的其他组件(如HDFS、YARN)无缝协作,处理存储在Hadoop集群上的大量数据。 Spark的核心设计是基于内存计算,通过在内存中缓存数据,减少磁盘I/O,从而显著提高了...
在名为 `sparkstreaming-master` 的压缩包文件中,可能包含了 Spark Streaming 应用程序的源代码。通常,这样的项目会包含以下部分: 1. `pom.xml`:Maven 项目配置文件,定义了项目依赖,包括 Spark 和其他库。 2....
Kafka消费生产代码工程可能展示了如何使用Spark Streaming与Kafka集成,实现实时数据处理。 8. Flume:Apache Flume是一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的系统。在Spark集群中,Flume可以...
压缩包子文件"LogStreaming-master"可能包含了整个项目的所有源代码,包括配置文件、Spark作业、Flume agents、Kafka配置以及可能的测试脚本等。通过这个项目,你可以学习如何设置和集成这些组件,实现从数据收集到...
3. **Spark Streaming改进**:增加了对Kafka Direct Stream的支持,简化了与Kafka的集成。 4. **MLlib更新**:引入了更多机器学习模型和算法,如宽线性模型、深度学习框架DLlib等。 5. **安全性增强**:支持Kerberos...
- **实时分析**: 结合Spark Streaming或Flink等流处理框架,实时处理Kafka中的数据流,进行实时分析和决策。 - **微服务通信**: 在微服务架构中,Kafka可以作为一个消息中间件,允许服务之间异步通信,提高系统的可...
总结来说,这段代码展示了如何使用Apache Spark Streaming与Kafka集成,以及如何处理Kafka中的数据流,特别是考虑到偏移量的管理和手动控制消费起点。同时,也揭示了在没有历史偏移量的情况下,系统将如何处理数据流...
2. **Kafka集成**: Apache Kafka作为高吞吐量的分布式消息系统,在数据流处理中扮演着关键角色。它被用于捕获和暂存数据,确保数据的可靠传输。阿里云Spark Streaming服务与Kafka的结合,使得数据可以从Kafka主题...
至于大数据应用,Kafka作为消息系统,常与Hadoop、Spark等大数据框架集成,实现数据的实时处理和分析。例如,可以使用Kafka Connect将数据流式传输到HDFS或Elasticsearch,或者结合Spark Streaming实现实时流处理。 ...