`
飞鱼德蒙
  • 浏览: 13017 次
  • 性别: Icon_minigender_1
  • 来自: 广州
文章分类
社区版块
存档分类
最新评论

基于Hadoop生态SparkStreaming的大数据实时流处理平台的搭建

阅读更多

 

 随着公司业务发展,对大数据的获取和实时处理的要求就会越来越高,日志处理、用户行为分析、场景业务分析等等,传统的写日志方式根本满足不了业务的实时处理需求,所以本人准备开始着手改造原系统中的数据处理方式,重新搭建一个实时流处理平台,主要是基于hadoop生态,利用Kafka作为中转,SparkStreaming框架实时获取数据并清洗,将结果多维度的存储进HBase数据库。

整个平台大致的框架如下:

操作系统:Centos7

用到的框架:

1. Flume1.8.0

2. Hadoop2.9.0

3. kafka2.11-1.0.0

4.Spark2.2.1

5.HBase1.2.6

6. ZooKeeper3.4.11

7. maven3.5.2

整体的开发环境是基于JDK1.8以上以及Scala,所以得提前把java和Scala的环境给准备好,接下来就开始着手搭建基础平台:

一、配置开发环境

下载并解压JDK1.8,、下载并解压Scala,配置profile文件:

vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.8.0_144
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export SCALA_HOME=/usr/local/scala-2.11.12
export PATH=$PATH:$SCALA_HOME/bin
source /etc/profile

二、配置zookeeper、maven环境

下载并解压zookeeper以及maven并配置profile文件

wget http://mirrors.hust.edu.cn/apache/maven/maven-3/3.5.2/binaries/apache-maven-3.5.2-bin.tar.gz
tar -zxvf apache-maven-3.5.2-bin.tar.gz -C /usr/local
wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz
tar -zxvf zookeeper-3.4.11.tar.gz -C /usr/local
vim /etc/profile
export MAVEN_HOME=/usr/local/apache-maven-3.5.2
export PATH=$PATH:$MAVEN_HOME/bin
source /etc/profile

zookeeper的配置文件配置一下:

cp /usr/local/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.11/conf/zoo.cfg

然后配置一下zoo.cfg里面的相关配置,指定一下dataDir目录等等

启动zookeeper:

/usr/local/zookeeper-3.4.11/bin/zkServer.sh start

如果不报错,jps看一下是否启动成功

三、安装配置Hadoop

Hadoop的安装配置在之前文章中有说过(传送门),为了下面的步骤方便理解,这里只做一个单机版的简单配置说明:

下载hadoop解压并配置环境:

wget http://mirrors.hust.edu.cn/apache/hadoop/common/hadoop-2.9.0/hadoop-2.9.0.tar.gz
tar -zxvf hadoop-2.9.0.tar.gz -C /usr/local
vim /etc/profile
export HADOOP_HOME=/usr/local/hadoop-2.9.0
export PATH=$PATH:$HADOOP_HOME/bin
source /etc/profile

配置hadoop 进入/usr/local/hadoop-2.9.0/etc/hadoop目录

cd /usr/local/hadoop-2.9.0/etc/hadoop

首先配置hadoop-env.sh、yarn-env.sh,修改JAVA_HOME到指定的JDK安装目录/usr/local/java/jdk1.8.0_144

创建hadoop的工作目录

mkdir /opt/data/hadoop

编辑core-site.xml、hdfs-site.xml、yarn-site.xml等相关配置文件,具体配置不再阐述请看前面的文章,配置完成之后记得执行hadoop namenode -format,否则hdfs启动会报错,启动完成后不出问题浏览器访问50070端口会看到hadoop的页面。

 

想学习大数据或者想学习大数据的朋友,我整理了一套大数据的学习视频免费分享给大家,从入门到实战都有,大家可以加微信:Lxiao_28获取,还可以入微信群交流!(备注领取资料,真实有效)。

 

 

四、安装配置kafka

还是一样,先下载kafka,然后配置:

wget http://mirrors.hust.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -zxvf kafka_2.11-1.0.0.tgz -C /usr/local
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka_2.11-1.0.0
export PATH=$KAFKA_HOME/bin:$PATH
source /etc/profile

进入kafka的config目录,配置server.properties,指定log.dirs和zookeeper.connect参数;配置zookeeper.properties文件中zookeeper的dataDir,配置完成后启动kafka

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

可以用jps查看有没有kafka进程,然后测试一下kafka是否能够正常收发消息,开两个终端,一个用来做producer发消息一个用来做consumer收消息,首先,先创建一个topic

kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic testTopic
kafka-topics.sh --describe --zookeeper localhost:2181 --topic testTopic

如果不出一下会看到如下输出:

Topic:testTopic	PartitionCount:1	ReplicationFactor:1	Configs:
Topic: testTopic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

然后在第一个终端中输入命令:

kafka-console-producer.sh –broker-list localhost:9092 –topic testTopic

在第二个终端中输入命令:

kafka-console-consumer.sh –zookeeper 127.0.0.1:2181 –topic testTopic

如果启动都正常,那么这两个终端将进入阻塞监听状态,在第一个终端中输入任何消息第二个终端都将会接收到。

五、安装配置HBase

下载并解压HBase:

wget http://mirrors.hust.edu.cn/apache/hbase/1.2.6/hbase-1.2.6-bin.tar.gz
tar -zxvf hbase-1.2.6-bin.tar.gz -C /usr/local/
vim /etc/profile
export HBASE_HOME=/usr/local/hbase-1.2.6
export PATH=$PATH:$HBASE_HOME/bin
source /etc/profile

修改hbase下的配置文件,首先修改hbase-env.sh,主要修改JAVA_HOME以及相关参数,这里要说明一下HBASE_MANAGES_ZK这个参数,因为采用了自己的zookeeper,所以这里设置为false,否则hbase会自己启动一个zookeeper

cd /usr/local/hbase-1.2.6/conf
vim hbase-env.sh
export JAVA_HOME=/usr/local/java/jdk1.8.0_144/
HBASE_CLASSPATH=/usr/local/hbase-1.2.6/conf
export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -XX:PermSize=256m -XX:MaxPermSize=1024m"
export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -XX:PermSize=256m -XX:MaxPermSize=1024m"
export HBASE_PID_DIR=/opt/data/hbase
export HBASE_MANAGES_ZK=false

然后修改hbase-site.xml,我们设置hbase的文件放在hdfs中,所以要设置hdfs地址,其中tsk1是我安装hadoop的机器的hostname,hbase.zookeeper.quorum参数是安装zookeeper的地址,这里的各种地址最好用机器名

vim hbase-site.xml
<configuration>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://tsk1:9000/hbase</value>
    </property>
    <property>
        <name>hbase.master</name>
        <value>tsk1:60000</value>
    </property>
    <property>
        <name>hbase.master.port</name>
        <value>60000</value>
    </property>
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>192.168.70.135</value>
    </property>
    <property>
        <name>zookeeper.znode.parent</name>
        <value>/hbase</value>
    </property>
    <property>
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/opt/data/zookeeper</value>
    </property>
    <property>
		<name>hbase.master.info.bindAddress</name>
        <value>tsk1</value>
    </property>
</configuration>

配置完成后启动hbase,输入命令:

start-hbase.sh

完成后查看日志没有报错的话测试一下hbase,用hbase shell进行测试:

hbase shell
hbase(main):001:0>create 'myTestTable','info'
0 row(s) in 2.2460 seconds
=> Hbase::Table - myTestTable
hbase(main):003:0>list
TABLE                                                                                                                    
testTable                                                                                                                
1 row(s) in 0.1530 seconds

=> ["myTestTable"]

至此,hbase搭建成功,访问以下hadoop的页面,查看file system(菜单栏Utilities->Browse the file system),这时可以看见base的相关文件已经载hadoop的文件系统中。

六、安装spark

下载spark并解压

wget http://mirrors.hust.edu.cn/apache/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz
tar -zxvf spark-2.2.1-bin-hadoop2.7.tgz -C /usr/local
vim /etc/profile
export SPARK_HOME=/usr/local/spark-2.2.1-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
source /etc/profile

七、测试

至此,环境基本搭建完成,以上搭建的环境仅是服务器生产环境的一部分,涉及服务器信息、具体调优信息以及集群的搭建就不写在这里了,下面我们写一段代码整体测试一下从kafka生产消息到spark streaming接收到,然后处理消息并写入HBase。先写一个HBase的连接类HBaseHelper:

public class HBaseHelper {
    private static HBaseHelper ME;
    private static Configuration config;
    private static Connection conn;
    private static HBaseAdmin admin;

    public static HBaseHelper getInstances() {
        if (null == ME) {
            ME = new HBaseHelper();
            config = HBaseConfiguration.create();
            config.set("hbase.rootdir", "hdfs://tsk1:9000/hbase");
            config.set("hbase.zookeeper.quorum", "tsk1");
            config.set("hbase.zookeeper.property.clientPort", "2181");
            config.set("hbase.defaults.for.version.skip", "true");
        }
        if (null == conn) {
            try {
                conn = ConnectionFactory.createConnection(config);
                admin = new HBaseAdmin(config);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return ME;
    }

    public Table getTable(String tableName) {
        Table table = null;
        try {
            table = conn.getTable(TableName.valueOf(tableName));
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        return table;
    }

    public void putAdd(String tableName, String rowKey, String cf, String column, Long value) {
        Table table = this.getTable(tableName);
        try {
            table.incrementColumnValue(rowKey.getBytes(), cf.getBytes(), column.getBytes(), value);
            System.out.println("OK!");
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
 //......以下省略
}

再写一个测试类KafkaRecHbase用来做spark-submit提交

package com.test.spark.spark_test;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

public class KafkaRecHbase {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("kafkaRecHbase");
        sparkConf.setMaster("local[2]");
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
        int numThreads = Integer.parseInt(args[3]);
        Map<String, Integer> topicMap = new HashMap<>();
        String[] topics = args[2].split(",");
        for (String topic : topics) {
            topicMap.put(topic, numThreads);
        }
        JavaPairReceiverInputDStream<String, String> kafkaStream =
                KafkaUtils.createStream(ssc, args[0], args[1], topicMap);
        JavaDStream<String> lines = kafkaStream.map(Tuple2::_2);
        JavaDStream<String> lineStr = lines.map(line -> {
            if (null == line || line.equals("")) {
                return "";
            }
            String[] strs = SPACE.split(line);
            if (strs.length < 1) {
                return "";
            }
            try {
                for (String str : strs) {
                    HBaseHelper.getInstances().putAdd("myTestTable", str, "info", "wordCunts", 1l);
                }
                return "strs:" + line;
            } catch (Exception ex) {
                System.out.println(line);
                return "报错了:" + ex.getMessage();
            }
        });
        lineStr.print();
        ssc.start();
        System.out.println("spark 启动!!!");
        ssc.awaitTermination();
    }
}

编译提交到服务器,执行命令:

spark-submit --jars $(echo /usr/local/hbase-1.2.6/lib/*.jar | tr ' ' ',') --class com.test.spark.spark_test.KafkaRecHbase --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1 /opt/FileTemp/streaming/spark-test-0.1.1.jar tsk1:2181 test testTopic 1

没报错的话执行kafka的producer,输入几行数据在HBase内就能看到结果了!

八、装一个Flume实时采集Nginx日志写入Kafka

Flume是一个用来日志采集的框架,安装和配置都比较简单,可以支持多个数据源和输出,具体可以参考Flume的文档,写的比较全 传送门

下载Flume并配置环境

wget http://mirrors.hust.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /usr/local
vim /etc/profile
export FLUME_HOME=/usr/local/apache-flume-1.8.0-bin/
export PATH=$FLUME_HOME/bin:$PATH
source /etc/profile

写一个Flume的配置文件在flume的conf目录下:

vim nginxStreamingKafka.conf
agent1.sources=r1
agent1.channels=logger-channel
agent1.sinks=kafka-sink

agent1.sources.r1.type=exec
agent1.sources.r1.deserializer.outputCharset= UTF-8
agent1.sources.r1.command=tail -F /opt/data/nginxLog/nginxLog.log

agent1.channels.logger-channel.type=memory

agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = flumeKafka
agent1.sinks.kafka-sink.brokerList = tsk1:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20

agent1.sources.r1.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

kafka创建一个名为flumeKafka的topic用来接收,然后启动flume:

flume-ng agent --name agent1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/nginxStreamingKafka.conf -Dflume.root.logger=INFO,console

如果没有报错,Flume将开始采集opt/data/nginxLog/nginxLog.log中产生的日志并实时推送给kafka,再按照上面方法写一个spark streaming的处理类进行相应的处理就好。

分享到:
评论

相关推荐

    Hadoop+Spark生态系统操作与实战指南.epub

    第2部分(第8~11章)讲解Spark的原生态组件,包括SparkCore、SparkSQL、SparkStreaming、DataFrame,以及介绍Scala、SparkAPI、SparkSQL、SparkStreaming、DataFrame原理和CDH版本环境下实战操作,其中Flume和Kafka...

    基于Hadoop3.2搭建大数据平台

    搭建基于Hadoop3.2的大数据平台的过程通常包括以下步骤: 1. **环境准备**:首先,需要安装一个支持虚拟化的操作系统,例如Ubuntu 16.04,并配置好网络环境,包括主机名、静态IP地址、网关和DNS设置。 2. **JDK...

    Hadoop+Spark+Hive+HBase+Oozie+Kafka+Flume+Flink+ES+Redash等详细安装部署

    在大数据领域,构建一个完整的生态系统是至关重要的,其中包括多个组件,如Hadoop、Spark、Hive、HBase、Oozie、Kafka、Flume、Flink、Elasticsearch和Redash。这些组件协同工作,提供了数据存储、处理、调度、流...

    《大数据开发工程师系列:Hadoop & Spark大数据开发实战》1

    Spark Streaming则用于实时数据流处理,适合处理连续的数据流。 为了增强实践性,书中会包含大量的案例分析和实践项目,帮助读者将理论知识转化为实际操作技能。同时,配套的学习资源如视频教程、案例素材、学习...

    Hadoop Spark生态系统操作与实战指南

    《Hadoop Spark生态系统操作与实战指南》是一本深入解析大数据处理技术的专业书籍,主要围绕Hadoop和Spark两大核心组件展开,旨在帮助读者掌握在实际环境中运用这两个工具进行数据处理和分析的能力。本书不仅介绍了...

    Hadoop+Spark+Kafka+jar包

    通过这些jar包,开发者可以在一个已经搭建好的Hadoop集群上部署Spark应用,实现对MySQL数据库的数据读写,以及利用Spark Streaming从Kafka获取实时数据流进行处理。这在实时大数据分析、监控系统或者在线推荐系统等...

    大数据环境搭建,本项目为大数据基础镜像组件,其中包括Hadoop、Spark、Hive、Tez、Hue、Flink、.zip

    Spark不仅支持批处理,还支持交互式查询(Spark SQL)、实时流处理(Spark Streaming)和机器学习(MLlib),使其成为大数据处理的全能工具。 Hive是基于Hadoop的数据仓库工具,可以将结构化的数据文件映射为一张...

    本项目为大数据基础镜像组件,其中包括Hadoop、Spark、Hive、Tez、Hue、Flink、MySQL等

    Spark支持批处理、交互式查询(通过Spark SQL)、实时流处理(通过Spark Streaming)和机器学习(通过MLlib库)等多种数据处理模式。 3. **Hive**:Hive是基于Hadoop的数据仓库工具,可将结构化的数据文件映射为...

    spark core、spark sql以及spark streaming 的Scala、java项目混合框架搭建以及大数据案例

    3. **Spark Streaming**:Spark Streaming是Spark处理实时数据流的组件,它将数据流分解为小批次,然后使用Spark Core进行快速处理。项目中可能包含创建DStream(Discretized Stream),设置窗口操作,以及实现复杂...

    spark企业级大数据项目实战.docx

    此外,Spark还包含了丰富的组件,如Spark SQL用于结构化数据处理,Spark Streaming用于实时流处理,MLlib用于机器学习,GraphX用于图计算。这些组件使得Spark能广泛应用于数据分析、实时监控、推荐系统等领域。教程...

    基于Hadoop平台的搭建及应用研究.rar

    Hadoop生态中还包括许多其他组件,如Hive(用于数据仓库和SQL查询)、Pig(提供数据分析工具)、HBase(NoSQL数据库)、Spark(更快的计算框架)等,它们共同构建了一个完整的数据处理平台。 六、Hadoop应用实例 1....

    spark-2.3.0-bin-hadoop2.7版本.zip

    6. **Spark Streaming**:Spark Streaming提供了一个高层次的API,可以处理实时数据流。它将数据流处理分解为一系列微批处理,这使得Spark Streaming能够利用Spark的并行计算能力处理实时数据。 7. **MLlib**:...

    机器学习算法的实现,对Hadoop,Spark,Hive等的搭建及其使用.zip

    在现代大数据处理领域,Hadoop、Spark和Hive是三个至关重要的工具,它们共同构建了高效的数据处理生态系统。本文将深入探讨如何实现机器学习算法,并介绍如何在这些平台上进行搭建和应用。 首先,Hadoop是一个开源...

    基于Spark的分布式实时推荐系统.pdf

    Spark Streaming是Spark生态系统中的流处理模块,它允许用户对实时数据流进行处理。通过微批处理(micro-batching)的方式,Spark Streaming将实时数据流分割成一系列小批次,并对这些批次进行处理,从而实现了对...

    基于Spark框架的新闻网大数据实时分析可视化系统项目.zip

    实时数据处理:Kafka + Spark Streaming 数据应用层:MLlib 产生一个模型 als算法 数据展示和对接:Zeppelin 选用考量: HDFS不管是在存储的性能,稳定性 吞吐量 都是在主流文件系统中很占有优势的 如果感觉HDFS...

    基于Spark2.2的新闻网大数据实时分析系统设计与实现.zip

    实时数据处理:Kafka + Spark Streaming 数据应用层:MLlib 产生一个模型 als算法 数据展示和对接:Zeppelin 选用考量: HDFS不管是在存储的性能,稳定性 吞吐量 都是在主流文件系统中很占有优势的 如果感觉HDFS...

    大数据spark搭建,spark安装包

    - **Spark Streaming**:提供了一个高级抽象来处理实时数据流,基于微批处理的概念,可以处理来自多种源的流数据。 - **MLlib**:Spark的机器学习库,提供了各种算法和实用工具,包括分类、回归、聚类、协同过滤等。...

    基于Spark的工业大数据处理可视化平台应用研究.pdf

    该平台的建立涉及分析工业大数据与传统数据的差异、采用Spark MLlib机器学习算法进行数据处理、解决技术问题以及制定平台搭建的流程。 工业大数据的特点包括数据体量巨大、类型多样、价值密度低和处理速度快。研究...

    基于spark的外卖大数据平台分析系统.zip

    实时数据处理:Kafka + Spark Streaming 数据应用层:MLlib 产生一个模型 als算法 数据展示和对接:Zeppelin 选用考量: HDFS不管是在存储的性能,稳定性 吞吐量 都是在主流文件系统中很占有优势的 如果感觉HDFS...

Global site tag (gtag.js) - Google Analytics