- 浏览: 25407 次
- 性别:
- 来自: 深圳
文章分类
最新评论
一.准备flume配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /var/log/test
a1.sources.r1.fileHeader = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = spark
a1.sinks.k1.brokerList = master1:9092,master2:9092,slave3:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
二,spark代码
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkStreamDemo {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName("spark_streaming")
conf.setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setCheckpointDir("D://checkpoints")
sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc, Seconds(5))
val topics = Map("spark" -> 2)
val lines = KafkaUtils.createStream(ssc, "master2:2181,slave2:2181,slave4:2181", "spark", topics).map(_._2)
val ds1 = lines.flatMap(_.split(" ")).map((_, 1))
val ds2 = ds1.updateStateByKey[Int]((x:Seq[Int], y:Option[Int]) => {
Some(x.sum + y.getOrElse(0))
})
ds2.print()
ssc.start()
ssc.awaitTermination()
}
}
三,注意的事项
1.kafka的topic是自动创建的,如果启动了配置没有的话,会建一个新的
2.记得flume读取文件夹是有权限的chown -R flume:flume /var/log/test
3.echo "my my last test test test" > logs5
4.sc.setCheckpointDir("D://checkpoints")这里的文件路径
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /var/log/test
a1.sources.r1.fileHeader = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = spark
a1.sinks.k1.brokerList = master1:9092,master2:9092,slave3:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
二,spark代码
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkStreamDemo {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName("spark_streaming")
conf.setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setCheckpointDir("D://checkpoints")
sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc, Seconds(5))
val topics = Map("spark" -> 2)
val lines = KafkaUtils.createStream(ssc, "master2:2181,slave2:2181,slave4:2181", "spark", topics).map(_._2)
val ds1 = lines.flatMap(_.split(" ")).map((_, 1))
val ds2 = ds1.updateStateByKey[Int]((x:Seq[Int], y:Option[Int]) => {
Some(x.sum + y.getOrElse(0))
})
ds2.print()
ssc.start()
ssc.awaitTermination()
}
}
三,注意的事项
1.kafka的topic是自动创建的,如果启动了配置没有的话,会建一个新的
2.记得flume读取文件夹是有权限的chown -R flume:flume /var/log/test
3.echo "my my last test test test" > logs5
4.sc.setCheckpointDir("D://checkpoints")这里的文件路径
发表评论
-
Canal相关理解
2017-12-29 16:18 459转载:http://www.importnew.com/251 ... -
kettle部署
2017-12-26 16:04 7191.将jmbi sql先上生产环境, 参考附件jmbi.sql ... -
crontab定时运行MR不行,手动shell可以执行成功问题排查过程
2017-12-26 15:48 858设置了定时任务,但MR任务没有执行。 第一步:手动执行she ... -
Flume+kafka+Spark Steaming demo2
2017-11-22 13:15 457一,flume配置 # Name the components ... -
HBase表导出成HDFS
2017-10-19 19:40 896导出步骤:在old cluster上/opt/cloudera ... -
zepplin实战
2017-10-13 16:10 359一句话介绍Zeppelin 以笔记(Note)的形式展示的数据 ... -
Azkaban安装
2017-10-10 18:32 905一.下载 https://github.com/azkaban ... -
KYKIN安装
2017-09-30 17:35 121. Kylin的一些概念 No. 关键字 解释 1 Kyl ... -
KYKIN安装
2017-09-30 17:40 3591. Kylin的一些概念 No. 关键字 解释 1 Kyl ... -
Logstash安装部署配置
2017-04-28 10:24 1021为了实现各业务平台日志信息采集到大数据平台hdf ... -
HBASE API
2017-04-18 11:01 471package org.jumore.test; impor ... -
Ambari卸载shell
2017-03-28 17:28 465#!/bin/bash # Program: # uni ... -
linux ssh 相互密码登录
2017-02-22 13:40 4131.修改集群各机器名称 vim /etc/sysconfig/ ... -
Kettle Linux 安装部署
2017-02-15 17:20 1352一.安装JDK环境:根据自己的linux系统选择相应的版本,比 ... -
hadoop环境搭建
2017-01-23 17:31 351192.168.23.231 server1 192.168. ... -
环境安装
2017-01-17 16:26 391物理机部署分配 3台物理机上部署 Zookeeper 3个,F ... -
Storm demo
2016-12-19 15:50 439public class SentenceSpout exte ... -
运行Hadoop jar 第三方jar包依赖
2016-08-22 13:47 1016将自己编写的MapReduce程序打包成jar后,在运行 ha ... -
windows10下运行MR错误
2016-07-05 13:45 1654当在windows下运行MR程序时,会报各种错误。现把这次碰到 ... -
HBase问题
2016-06-16 17:02 3051.java.net.UnknownHostException ...
相关推荐
flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目 项目名称:实时的用户轨迹查询项目 项目介绍: 利用企业建设的WIFI基站,实时采集用户的信息,可以基于这些信息做用户画像处理,网络安全...
这里提到的"基于Flume+kafka+spark大型电商网站日志分析系统(离线+实时)"就是一个这样的解决方案,结合了三个关键组件:Apache Flume、Apache Kafka和Apache Spark。下面将详细介绍这三个技术及其在系统中的作用。...
### Flume+kafka+Storm整合知识点详解 #### 一、Flume、Kafka与Storm概述 在大数据领域,数据采集、传输与实时处理是至关重要的环节。本篇内容重点介绍了如何利用Flume、Kafka与Storm这三个开源工具实现一套完整的...
本文将深入探讨这些技术及其在"flume+kafka+flink+mysql数据统计"中的应用。 首先,Flume 是 Apache Hadoop 的一个子项目,主要用于收集、聚合和移动大量日志数据。在本场景中,Flume 可以从 Nginx 服务器收集 Web ...
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目源代码+文档说明。项目架构: 主要是基于Flume+Kafka+Sparkstreaming +HBase+ES来实现实时的用户信息存储轨迹查询任务。 含有代码注释,满分...
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目 项目名称:实时的用户轨迹查询项目 项目介绍: 利用企业建设的WIFI基站,实时采集用户的信息,可以基于这些信息做用户画像处理,网络...
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目 项目名称:实时的用户轨迹查询项目 项目介绍: 利用企业建设的WIFI基站,实时采集用户的信息,可以基于这些信息做用户画像处理,网络安全...
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
通过flume监控文件,让kafka消费flume数据,再将sparkstreaming连接kafka作为消费者进行数据处理,文档整理实现
搭建Hadoop集群,并使用flume+kafka+storm+hbase实现日志抓取分析,使用一个主节点master、两个slave节点
《LNMP环境构建与Flume+Kafka+Storm+HDFS实时系统集成详解》 在当前的互联网时代,数据量的急剧增长使得大数据处理成为一项关键任务。本篇将深入探讨如何在Linux环境下搭建LNMP(Linux + Nginx + MySQL + PHP)...
这里提到的"人工智能-spark"项目,结合了Flume、Kafka、HBase以及Spark-Streaming,构建了一个高效且实时的数据处理管道。下面我们将详细探讨这些组件各自的作用及其相互间的协同工作原理。 1. **Apache Flume**:...
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例.txt基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例.txt基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例.txt基于Flume+Kafka+Hbase+Flink+FineBI的实时综合...
在大数据领域,构建一个完整的生态系统是至关重要的,其中包括多个组件,如Hadoop、Spark、Hive、HBase、Oozie、Kafka、Flume、Flink、Elasticsearch和Redash。这些组件协同工作,提供了数据存储、处理、调度、流...
基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip...
主要讲的是Flume+Kafka+Storm的环境整合,并且全部都是最新版本 、、、你也可以到博客地址http://blog.csdn.net/u012185296中去学习相关的云技术,Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ......