- 浏览: 25409 次
- 性别:
- 来自: 深圳
文章分类
最新评论
一,flume配置
# Name the components on this agent
a1.sources = tailsource-1
a1.sinks = remotesink
a1.channels = memoryChnanel-1
# Describe/configure the source
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.command = tail -F /var/log/test/raw_data.txt
a1.sources.tailsource-1.channels = memoryChnanel-1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.memoryChnanel-1.type = memory
a1.channels.memoryChnanel-1.keep-alive = 10
a1.channels.memoryChnanel-1.capacity = 100000
a1.channels.memoryChnanel-1.transactionCapacity = 100000
# Bind the source and sink to the channel
a1.sinks.remotesink.type = avro
a1.sinks.remotesink.hostname = 172.18.203.137
a1.sinks.remotesink.port = 9999
a1.sinks.remotesink.channel = memoryChnanel-1
#agent section
producer.sources = s
producer.channels = c
producer.sinks = r
#source section
producer.sources.s.type = avro
producer.sources.s.bind = 172.18.203.137
producer.sources.s.port = 9999
producer.sources.s.channels = c
# Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
producer.sinks.r.topic = mytopic
producer.sinks.r.brokerList = master1:9092,master2:9092,slave2:9092
producer.sinks.r.requiredAcks = 1
producer.sinks.r.batchSize = 20
producer.sinks.r.channel = c1
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = org.apache.flume.channel.kafka.KafkaChannel
producer.channels.c.capacity = 10000
producer.channels.c.transactionCapacity = 1000
producer.channels.c.brokerList=master1:9092,master2:9092,slave2:9092
producer.channels.c.topic=channel1
producer.channels.c.zookeeperConnect=master2:2181,slave2:2181,slave4:2181
二, Spark代码
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author: david
* Date : 3/7/17
*/
object StreamingDataTest {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);
val conf = new SparkConf().setAppName("StreamingDataTest").setMaster("local[4]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
// Kafka的topic
val topics = Set("mytopic")
//kafka brokers列表
val brokers = "master1:9092,master2:9092,slave3:9092"
//kafka查询参数
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
//创建direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
//kafkaStream这个tuple的第二部分为接收kafka topic里的文本流
val rawDStream = kafkaStream.flatMap(_._2.split("\\s+")).map((_, 1))
val resDStream = rawDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => {
v1 + v2
},
Seconds(8),
Seconds(4));
resDStream.print();
ssc.start()
ssc.awaitTermination()
}
}
三,注意事项
查看/var/log/flume-ng下面的日志报错信息
avro端口号绑定大于公共端口1024
注意linux防火墙service iptables stop
注意运行scala依赖的scope为 provided编译可以,但本机运行找不到class
# Name the components on this agent
a1.sources = tailsource-1
a1.sinks = remotesink
a1.channels = memoryChnanel-1
# Describe/configure the source
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.command = tail -F /var/log/test/raw_data.txt
a1.sources.tailsource-1.channels = memoryChnanel-1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.memoryChnanel-1.type = memory
a1.channels.memoryChnanel-1.keep-alive = 10
a1.channels.memoryChnanel-1.capacity = 100000
a1.channels.memoryChnanel-1.transactionCapacity = 100000
# Bind the source and sink to the channel
a1.sinks.remotesink.type = avro
a1.sinks.remotesink.hostname = 172.18.203.137
a1.sinks.remotesink.port = 9999
a1.sinks.remotesink.channel = memoryChnanel-1
#agent section
producer.sources = s
producer.channels = c
producer.sinks = r
#source section
producer.sources.s.type = avro
producer.sources.s.bind = 172.18.203.137
producer.sources.s.port = 9999
producer.sources.s.channels = c
# Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
producer.sinks.r.topic = mytopic
producer.sinks.r.brokerList = master1:9092,master2:9092,slave2:9092
producer.sinks.r.requiredAcks = 1
producer.sinks.r.batchSize = 20
producer.sinks.r.channel = c1
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = org.apache.flume.channel.kafka.KafkaChannel
producer.channels.c.capacity = 10000
producer.channels.c.transactionCapacity = 1000
producer.channels.c.brokerList=master1:9092,master2:9092,slave2:9092
producer.channels.c.topic=channel1
producer.channels.c.zookeeperConnect=master2:2181,slave2:2181,slave4:2181
二, Spark代码
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author: david
* Date : 3/7/17
*/
object StreamingDataTest {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);
val conf = new SparkConf().setAppName("StreamingDataTest").setMaster("local[4]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
// Kafka的topic
val topics = Set("mytopic")
//kafka brokers列表
val brokers = "master1:9092,master2:9092,slave3:9092"
//kafka查询参数
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
//创建direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
//kafkaStream这个tuple的第二部分为接收kafka topic里的文本流
val rawDStream = kafkaStream.flatMap(_._2.split("\\s+")).map((_, 1))
val resDStream = rawDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => {
v1 + v2
},
Seconds(8),
Seconds(4));
resDStream.print();
ssc.start()
ssc.awaitTermination()
}
}
三,注意事项
查看/var/log/flume-ng下面的日志报错信息
avro端口号绑定大于公共端口1024
注意linux防火墙service iptables stop
注意运行scala依赖的scope为 provided编译可以,但本机运行找不到class
发表评论
-
Canal相关理解
2017-12-29 16:18 459转载:http://www.importnew.com/251 ... -
kettle部署
2017-12-26 16:04 7191.将jmbi sql先上生产环境, 参考附件jmbi.sql ... -
crontab定时运行MR不行,手动shell可以执行成功问题排查过程
2017-12-26 15:48 858设置了定时任务,但MR任务没有执行。 第一步:手动执行she ... -
Flume+Kafka+Spark Steaming demo
2017-11-21 15:21 441一.准备flume配置 a1.sources = r1 a1. ... -
HBase表导出成HDFS
2017-10-19 19:40 896导出步骤:在old cluster上/opt/cloudera ... -
zepplin实战
2017-10-13 16:10 360一句话介绍Zeppelin 以笔记(Note)的形式展示的数据 ... -
Azkaban安装
2017-10-10 18:32 905一.下载 https://github.com/azkaban ... -
KYKIN安装
2017-09-30 17:35 121. Kylin的一些概念 No. 关键字 解释 1 Kyl ... -
KYKIN安装
2017-09-30 17:40 3591. Kylin的一些概念 No. 关键字 解释 1 Kyl ... -
Logstash安装部署配置
2017-04-28 10:24 1021为了实现各业务平台日志信息采集到大数据平台hdf ... -
HBASE API
2017-04-18 11:01 471package org.jumore.test; impor ... -
Ambari卸载shell
2017-03-28 17:28 465#!/bin/bash # Program: # uni ... -
linux ssh 相互密码登录
2017-02-22 13:40 4131.修改集群各机器名称 vim /etc/sysconfig/ ... -
Kettle Linux 安装部署
2017-02-15 17:20 1352一.安装JDK环境:根据自己的linux系统选择相应的版本,比 ... -
hadoop环境搭建
2017-01-23 17:31 351192.168.23.231 server1 192.168. ... -
环境安装
2017-01-17 16:26 391物理机部署分配 3台物理机上部署 Zookeeper 3个,F ... -
Storm demo
2016-12-19 15:50 439public class SentenceSpout exte ... -
运行Hadoop jar 第三方jar包依赖
2016-08-22 13:47 1016将自己编写的MapReduce程序打包成jar后,在运行 ha ... -
windows10下运行MR错误
2016-07-05 13:45 1654当在windows下运行MR程序时,会报各种错误。现把这次碰到 ... -
HBase问题
2016-06-16 17:02 3051.java.net.UnknownHostException ...
相关推荐
flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】
这里提到的"基于Flume+kafka+spark大型电商网站日志分析系统(离线+实时)"就是一个这样的解决方案,结合了三个关键组件:Apache Flume、Apache Kafka和Apache Spark。下面将详细介绍这三个技术及其在系统中的作用。...
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目 项目名称:实时的用户轨迹查询项目 项目介绍: 利用企业建设的WIFI基站,实时采集用户的信息,可以基于这些信息做用户画像处理,网络安全...
### Flume+kafka+Storm整合知识点详解 #### 一、Flume、Kafka与Storm概述 在大数据领域,数据采集、传输与实时处理是至关重要的环节。本篇内容重点介绍了如何利用Flume、Kafka与Storm这三个开源工具实现一套完整的...
本文将深入探讨这些技术及其在"flume+kafka+flink+mysql数据统计"中的应用。 首先,Flume 是 Apache Hadoop 的一个子项目,主要用于收集、聚合和移动大量日志数据。在本场景中,Flume 可以从 Nginx 服务器收集 Web ...
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目源代码+文档说明。项目架构: 主要是基于Flume+Kafka+Sparkstreaming +HBase+ES来实现实时的用户信息存储轨迹查询任务。 含有代码注释,满分...
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
通过flume监控文件,让kafka消费flume数据,再将sparkstreaming连接kafka作为消费者进行数据处理,文档整理实现
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目 项目名称:实时的用户轨迹查询项目 项目介绍: 利用企业建设的WIFI基站,实时采集用户的信息,可以基于这些信息做用户画像处理,网络...
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目 项目名称:实时的用户轨迹查询项目 项目介绍: 利用企业建设的WIFI基站,实时采集用户的信息,可以基于这些信息做用户画像处理,网络安全...
在大数据领域,构建一个完整的生态系统是至关重要的,其中包括多个组件,如Hadoop、Spark、Hive、HBase、Oozie、Kafka、Flume、Flink、Elasticsearch和Redash。这些组件协同工作,提供了数据存储、处理、调度、流...
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例.txt基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例.txt基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例.txt基于Flume+Kafka+Hbase+Flink+FineBI的实时综合...
搭建Hadoop集群,并使用flume+kafka+storm+hbase实现日志抓取分析,使用一个主节点master、两个slave节点
《LNMP环境构建与Flume+Kafka+Storm+HDFS实时系统集成详解》 在当前的互联网时代,数据量的急剧增长使得大数据处理成为一项关键任务。本篇将深入探讨如何在Linux环境下搭建LNMP(Linux + Nginx + MySQL + PHP)...
这里提到的"人工智能-spark"项目,结合了Flume、Kafka、HBase以及Spark-Streaming,构建了一个高效且实时的数据处理管道。下面我们将详细探讨这些组件各自的作用及其相互间的协同工作原理。 1. **Apache Flume**:...
基于 Flume+ Kafka+ Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码。 基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: ...
基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip...