下载完flume后,就可以在 https://flume.apache.org/FlumeUserGuide.html 中根据教程来启动agent console
启动完成后,在console中打印出现下面的日志信息:
2016-06-21 13:00:06,890 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.16.79.12:44444]
可以通过telnet 172.16.79.12 44444 的方式来发送数据,发送完成后就可以在启动的agent中查看到该日志输出,至此一个简单的agent示例就演示完成。
2016-06-21 13:00:28,905 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 61 62 63 64 65 0D abcde. }
规划配置flume用于日志收集
经过规划,我们使用flume用来收集日志的场景图如下所示,每台web服务器均配置一个agent用来传输日志,并上传至统一的agent4中,

对于每台web server上的agent,我们采用Exec Sources类型的source来配置简单的tail -f 来实现对日志进行处理,并打印到日志控制台中,配置方法如下,其中type需要声明为exec,需要指定执行的命令(tail -F,根据需要还可以以管道的方式加入grep等命令):
zhenmq-agent.sources = zhenmq-source zhenmq-agent.sinks = zhenmq-sink zhenmq-agent.channels = zhenmq-channel # Describe/configure the source zhenmq-agent.sources.zhenmq-source.type = exec zhenmq-agent.sources.zhenmq-source.command = tail -F /usr/local/tomcat/tomcat-zhenmq/logs/apilog/common-all.log # Describe the sink zhenmq-agent.sinks.zhenmq-sink.type = logger # Use a channel which buffers events in memory zhenmq-agent.channels.zhenmq-channel.type = memory zhenmq-agent.channels.zhenmq-channel.capacity = 1000 zhenmq-agent.channels.zhenmq-channel.transactionCapacity = 100 # Bind the source and sink to the channel zhenmq-agent.sources.zhenmq-source.channels = zhenmq-channel zhenmq-agent.sinks.zhenmq-sink.channel = zhenmq-channel
日志流经过channel(可以根据条件选择memory还是file)后,需要输出到统一的collector,这时候就需要指定使用flume中内置的序列化方式,这里我们使用比较通用的Avro Source/Sink,source用来接收其他服务端发送的日志流,sink用于将日志数据输出。
如果希望将flume进行分层设计,可以使用中间序列化方式将收集到的日志传输到不同的服务器中,此时可以使用flume中自带的avro source和sink组件,需要指定type为avro,以及hostname和port(端口号)。
# Describe the sink zhenmq-agent.sinks.zhenmq-sink.type = avro zhenmq-agent.sinks.zhenmq-sink.hostname = 192.168.1.12 zhenmq-agent.sinks.zhenmq-sink.port = 23004 collector-agent.sources.collector-source.type = avro collector-agent.sources.collector-source.bind= 192.168.1.13 collector-agent.sources.collector-source.port = 23004
注意,首先要在163服务器上启动flume服务,在先启动collector-source的情况下会报出拒绝连接的错误:
org.apache.flume.EventDeliveryException: Failed to send events at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:392) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flume.FlumeException: NettyAvroRpcClient { host: 192.168.1.163, port: 23004 }: RPC connection error at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:182) at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:121) at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:638) at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:89) at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127) at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:211) at org.apache.flume.sink.AbstractRpcSink.verifyConnection(AbstractRpcSink.java:272) at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:349) ... 3 more Caused by: java.io.IOException: Error connecting to /192.168.1.163:23004 at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261) at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203) at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152) at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:168) ... 10 more Caused by: java.net.ConnectException: 拒绝连接 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.connect(NioClientSocketPipelineSink.java:496) at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.processSelectedKeys(NioClientSocketPipelineSink.java:452) at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.run(NioClientSocketPipelineSink.java:365) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ... 1 more
启动完成后,会在163 collector服务中看到如下的日志,说明已经启动成功。
2016-06-22 18:48:30,179 (New I/O server boss #1 ([id: 0xb85f59b4, /192.168.1.163:23004])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xf57de901, /192.168.1.162:52778 => /192.168.1.163:23004] OPEN 2016-06-22 18:48:30,181 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xf57de901, /192.168.1.162:52778 => /192.168.1.163:23004] BOUND: /192.168.1.163:23004 2016-06-22 18:48:30,181 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xf57de901, /192.168.1.162:52778 => /192.168.1.163:23004] CONNECTED: /192.168.1.162:52778
Flume的负载均衡与故障转移
由于在图中agent4为单点,加入agent4挂掉的话会导致日志无法正常输出,故采用flume的负载均衡/故障转移模式来避免这一单点生效。即每次按照一定的算法选择sink输出到指定地方,如果在文件输出量很大的情况下,负载均衡还是很有必要的,通过多个通道输出缓解输出压力。
flume内置的负载均衡的算法默认是round robin,轮询算法,按序选择。
source里的event流经channel,进入sink组,在sink组内部根据负载算法(round_robin、random)选择sink,后续可以选择不同机器上的agent实现负载均衡。

如果是采用故障转移,这组sinke将会组成一个failover sink processor,此时如果有一个sink处理失败,flume会将这个sink放到一个地方等待冷却时间,等到正常处理event的时候再拿回来。event通过通过一个channel流向一个sink组,在sink组内部根据优先级选择具体的sink,一个失败后再转向另一个sink,流程图如下:

鉴于我们当前的日志规模不算太大,先采用故障转移的方式来进行,后续如果处理不过来可以采用负载均衡。
配置故障转移
首先需要定义sinkgroups,定义group的处理类型,以及每个sink的优先级,此时先会往优先级较高的服务端发送日志,如果该服务不可用,则放到冷却池中,使用优先级较低的sink来处理。
注意启动顺序,一定是被依赖的flume先启动。
zhenmq-agent.sources = zhenmq-source zhenmq-agent.sinks = collector-sink1 collector-sink2 zhenmq-agent.channels = zhenmq-channel # Describe/configure the source zhenmq-agent.sources.zhenmq-source.type = exec zhenmq-agent.sources.zhenmq-source.command = tail -F /usr/local/tomcat/tomcat-zhenmq/logs/apilog/common-all.log # Describe the sink zhenmq-agent.sinks.collector-sink1.type = avro zhenmq-agent.sinks.collector-sink1.channel= zhenmq-channel zhenmq-agent.sinks.collector-sink1.hostname = 192.168.1.163 zhenmq-agent.sinks.collector-sink1.port = 23004 zhenmq-agent.sinks.collector-sink2.type = avro zhenmq-agent.sinks.collector-sink2.channel= zhenmq-channel zhenmq-agent.sinks.collector-sink2.hostname = 192.168.1.165 zhenmq-agent.sinks.collector-sink2.port = 23004 # Use a channel which buffers events in memory zhenmq-agent.channels.zhenmq-channel.type = memory zhenmq-agent.channels.zhenmq-channel.capacity = 1000 zhenmq-agent.channels.zhenmq-channel.transactionCapacity = 100 zhenmq-agent.sinkgroups = g1 zhenmq-agent.sinkgroups.g1.sinks = collector-sink1 collector-sink2 zhenmq-agent.sinkgroups.g1.processor.type = failover zhenmq-agent.sinkgroups.g1.processor.priority.collector-sink1 = 10 zhenmq-agent.sinkgroups.g1.processor.priority.collector-sink2 = 11
Flume连接到Storm
一般情况下,flume的数据需要经过一轮转换至kafka中,然后storm读取kafka中的消息,来达到实时分析的目的。但我们可以暂时跳过kafka,直接将flume的输出结果输出到strom中。
参考开源实现:https://github.com/rvisweswara/flume-storm-connector,但通过分析其源码可以看出,其内部通过启动一个flume agent组件(SourceRunner,Channel,SinkCounter)来通过avro协议接收flume传输出来的流来完成此目的,FlumeSpout类型的整体类型图如下:

由于原来的实例是三年前写的,jar包比较老,可能无法启动,可以clone下面的链接本地启动(master分支):https://github.com/clamaa/flume-storm-connector
测试用例的启动入口类型为:FlumeConnectorTopology,其main方法中首先需要配置一个topology.properties文件,用来指定在FlumeSpout启动的Agent source类型和端口(一般情况下的type为avro,只需要指定对应的bind和port即可)。
flume-agent.source.type=avro flume-agent.channel.type=memory flume-agent.source.bind=127.0.0.1 flume-agent.source.port=10101
根据MaterializedConfigurationProvider以及相关配置,生成启动agent对应的MaterializedConfiguration(flume相关),在FlumeSpout.open的方法中,MaterializedConfiguration可以生成 sourceRunner(avro类型), channel(内存级别的,可以从中直接获取数据)。
构造flume agent的过程,由于不需要sink,也不需要添加SinkRunner,只加入SinkCounter用于输出计数使用(MXBean类型,可以通过JMX Console监听其关键输出指标)。
flumeAgentProps = StormEmbeddedAgentConfiguration.configure( FLUME_AGENT_NAME, flumeAgentProps); MaterializedConfiguration conf = configurationProvider.get( getFlumePropertyPrefix(), flumeAgentProps); Map<String, Channel> channels = conf.getChannels(); if (channels.size() != 1) { throw new FlumeException("Expected one channel and got " + channels.size()); } Map<String, SourceRunner> sources = conf.getSourceRunners(); if (sources.size() != 1) { throw new FlumeException("Expected one source and got " + sources.size()); } this.sourceRunner = sources.values().iterator().next(); this.channel = channels.values().iterator().next(); if (sinkCounter == null) { sinkCounter = new SinkCounter(FlumeSpout.class.getName()); }
nextTurple方法中,定时对内部启动的Flume Channel进行take操作,获取最新event,
for (int i = 0; i < this.batchSize; i++) { Event event = channel.take(); if (event == null) { break; } batch.add(event); }
并将这些event包装成Values,由Collector进行emit(发射)操作,这里由于日志的格式可能会有多种类型,FlumeSpout可以设置TurpleProducer,根据对应的event自定义消息类型,以及声明的字段名称。
for (Event event : batch) { Values vals = this.getTupleProducer().toTuple(event); this.collector.emit(vals); this.pendingMessages.put( event.getHeaders().get(Constants.MESSAGE_ID), event); LOG.debug("NextTuple:" + event.getHeaders().get(Constants.MESSAGE_ID)); }
消息在发送之前会暂时存在FlumeSpout.pendingMessages中(ConcurrentHashMap),以支持消息确认,在确认完成后,会将其删除;如果确认失败,会根据消息id进行重发。
/* * When a message is succeeded remove from the pending list * * @see backtype.storm.spout.ISpout#ack(java.lang.Object) */ public void ack(Object msgId) { this.pendingMessages.remove(msgId.toString()); } /* * When a message fails, retry the message by pushing the event back to channel. * Note: Please test this situation... * * @see backtype.storm.spout.ISpout#fail(java.lang.Object) */ public void fail(Object msgId) { //on a failure, push the message from pending to flume channel; Event ev = this.pendingMessages.get(msgId.toString()); if(null != ev){ this.channel.put(ev); } }
同时,该connector中也提供AvroSinkBolt,用于将storm生成的消息通过avro的方式再传回至flume中,其基本原理就是维持一个与flume的avro agent的连接RpcClient,并可以自定义flume事件生成器,将storm产生的Turple转换成storm对应的Event,这里就不再详细说明。
private RpcClient rpcClient; private FlumeEventProducer flumeEventProducer;
Flume收集日志的agent进程仍然可能出现另一种情况,就是挂掉,此时日志中出现错误:
<!--?xml version="1.0" encoding="UTF-8" standalone="no"?-->
2016-07-06 11:14:19,951 (pool-5-thread-1) [INFO - org.apache.flume.source.ExecSource$ExecRunnable.run(ExecSource.java:376)] Command [tail -F /usr/local/tomcat/tomcat-shopapi/logs/apilog/common-warn.log] exited with 137
<!--?xml version="1.0" encoding="UTF-8" standalone="no"?-->
exec source中有两个属性,用于处理当进程异常退出时尝试重启操作。
restartThrottle | 10000 | Amount of time (in millis) to wait before attempting a restart |
restart | false | Whether the executed cmd should be restarted if it dies |
相关推荐
- 收集来自各个系统的原始数据,并进行初步清洗和整理。 - 常用的技术栈包括Hadoop HDFS、Kafka等。 #### 2. 计算汇总层(DW) - 对原始数据进行深度加工和聚合,形成可供分析的数据模型。 - 使用的技术包括...
1. 数据采集传输:使用Flume、Kafka、Sqoop、Logstash或DataX等工具将数据从源系统抽取到目标存储。 2. 数据存储:数据可存储在MySql、HDFS、HBase、Redis或MongoDB等不同数据库中,根据数据类型和需求选择合适的...
Matlab领域上传的视频是由对应的完整代码运行得来的,完整代码皆可运行,亲测可用,适合小白; 1、从视频里可见完整代码的内容 主函数:main.m; 调用函数:其他m文件;无需运行 运行结果效果图; 2、代码运行版本 Matlab 2019b;若运行有误,根据提示修改;若不会,私信博主; 3、运行操作步骤 步骤一:将所有文件放到Matlab的当前文件夹中; 步骤二:双击打开main.m文件; 步骤三:点击运行,等程序运行完得到结果; 4、仿真咨询 如需其他服务,可私信博主; 4.1 博客或资源的完整代码提供 4.2 期刊或参考文献复现 4.3 Matlab程序定制 4.4 科研合作
《网页制作基础教程(Dreamweaver-CS3)》第11章-嵌入表单元素.ppt
V1_3_example.ipynb
《计算机应用基础项目教程》项目四-电子表格软件Excel2010的使用.pptx
《计算机系统结构》第4章-指令级并行.ppt
内容概要:本文详细介绍了基于西门子S7-1200 PLC和WinCC RT Pro的三部十层电梯联控系统的设计与实现。主要内容涵盖硬件配置、核心算法如电梯间协同算法、方向判断函数、状态机设计、呼叫调度算法以及WinCC画面设计中的动画效果和平滑移动实现方法。文中还讨论了常见的调试问题及其解决方案,如方向锁死、编码器干扰等。此外,强调了状态机在电梯控制中的重要性,并提供了具体的代码示例来解释各个功能模块的工作原理。 适合人群:自动化工程师、PLC程序员、HMI开发者、工业控制系统设计师。 使用场景及目标:适用于希望深入了解电梯控制系统设计原理和技术实现的专业人士。目标是帮助读者掌握电梯联控系统的编程技巧,提高对工业控制项目的理解和应用能力。 其他说明:文章不仅提供详细的代码片段,还分享了许多实践经验,有助于读者更好地理解和应对实际工程项目中的挑战。
飞猫智联u20一键打开adb并安装
DeepSeek:智能时代的全面到来和人机协作的新常态.pdf
内容概要:本文深入探讨了永磁同步电机(PMSM)无传感器控制技术中的高频谐波注入(HFI)方案及其滑模观测器仿真模型。主要介绍了HFI的工作原理,即通过向电机定子绕组注入高频信号并检测其响应来估算转子位置和速度。文中提供了详细的代码实现,包括高频信号生成、电流检测与处理、滑模观测器核心算法等。此外,还分享了实际工程项目中的调试经验和常见问题解决方案,如参数选择、硬件配置、滤波处理等。 适合人群:从事电机控制系统开发的技术人员,尤其是对PMSM无传感器控制感兴趣的工程师。 使用场景及目标:适用于需要提高PMSM电机控制性能的应用场合,如工业自动化设备、伺服系统等。目标是在低速条件下实现精确的转子位置和速度估算,从而提升系统的整体性能。 其他说明:文章不仅提供了理论和技术细节,还结合了大量实践经验,帮助读者更好地理解和应用HFI技术。同时强调了实际工程中需要注意的各种细节,如参数整定、硬件配置、滤波处理等,确保方案的可靠性和稳定性。
5G 6G NR-NTN A Hardware perspective && OAI-10th-Anniversary-Workshop-Qualcomm-2.pdf An SMEs Journey Towards 5G6G NTN Disruptive Technologies Lasting Software--OAI-10th-Anniversary-Workshop-Lasting-Software.pdf Deploying Private 5G with the Open Air Interface- OAI-10th-Anniversary-Workshop-Firecell-.pdf End-to-End Open-Source 5G and O-RAN Prototyping in ARA for Low-Latency Agriculture Applications OAI-10th-Anniversary-Workshop-Iowa-State-University.pdf OAI 5G NR NTN – PHY and MAC Layer Contributions 20240913_Fraunhofer_IIS_OAI_NTN_reduced.pdf Open Source in Telecoms Driving 5G innovatio--OAI-10th-Anniversary-Workshop-Canonical.pdf OpenAirInterface Recent Developments & Roadmap OAI-10th-Anniversary-Workshop-Florian-Kaltenberger.pdf The road of artificial intelligence towards the 6G
内容概要:本文详细介绍了如何使用三菱FX2N系列PLC通过编码器脉冲数计算输出距离的方法。首先,文章阐述了核心思路,即通过采集编码器产生的脉冲数,结合预先设定的脉冲数和输出长度参数,经过一系列浮点数运算得出输出距离。文中展示了详细的代码实现步骤,包括初始化部分、脉冲采集部分、浮点数运算部分和结果处理部分。此外,还讨论了一些常见的调试技巧和注意事项,如数据溢出处理、双编码器校验、方向信号处理等。 适合人群:从事自动化控制领域的工程师和技术人员,尤其是对PLC编程有一定基础的人群。 使用场景及目标:适用于需要精确测量距离的工业控制系统,如输送带系统、机械臂定位等。主要目标是帮助工程师掌握如何利用PLC和编码器实现高精度的距离测量。 其他说明:文章提供了丰富的代码示例和调试建议,有助于读者在实际项目中灵活运用相关技术和解决常见问题。同时强调了浮点运算在提高测量精度方面的优势及其潜在挑战。
内容概要:本文详细介绍了如何使用Matlab Simulink搭建无刷直流电机(BLDC)的转速电流双闭环调速系统。首先阐述了系统的基本架构,即外层转速环和内层电流环的作用及其相互关系。接着逐步讲解了各个模块的具体实现方法,包括电机模型参数设置、PI控制器参数配置、PWM信号生成、坐标变换等关键技术。通过仿真结果分析,展示了转速和电流响应曲线,探讨了参数调整对系统性能的影响。最终实现了对电机转速的精确控制,并提供了优化建议。 适合人群:从事电机控制领域的工程师和技术人员,尤其是有一定Matlab/Simulink基础的研究人员。 使用场景及目标:适用于希望深入了解BLDC电机控制原理及仿真的技术人员。目标是在理论基础上通过仿真工具掌握双闭环调速系统的构建与优化方法。 其他说明:文中不仅提供了详细的建模步骤,还分享了许多实用的经验技巧,如PI参数选择、PWM生成方式的选择、死区时间设置等,有助于提高实际应用中的系统性能。
多接口小猿题库等综合网课搜题微信小程序源码带流量主,网课搜题小程序, 可以开通流量主赚钱 搭建教程 1, 微信公众平台注册自己的小程序 2, 下载微信开发者工具和小程序的源码 3, 上传代码到自己的小程序
weixin287火锅店点餐系统的设计与实现+ssm(文档+源码)_kaic
《DotCom Secrets》是拉斯洛·布劳恩所著的一本关于如何使用在线营销策略来增长公司业务的书籍。本书揭示了在线营销巫师兄弟会不愿公开的秘密,强调了直接响应营销的重要性,并提供了构建和优化营销漏斗的系统方法。书中不仅介绍了如何通过各种在线渠道获取流量、提高转化率和销售,还提供了如何发现目标客户、构建价值阶梯以及如何利用“肥皂剧序列”等策略。拉斯洛·布劳恩通过本书,旨在帮助企业家在充满变数的互联网世界中找到坚实的营销基础。
包括:源程序工程文件、Proteus仿真工程文件、电路原理图文件、配套技术手册、论文资料等 1、采用51/52单片机(通用)作为主控芯片; 2、采用压力传感器+HX711模块检测压力; 3、可通过按键设置物品单价; 4、采用LCD1602对重量/单价/总价进行显示; 5、当物品超出传感器量程时,蜂鸣器进行过载报警。
First Word Fall Through FIFO特点是输出端口数据保持有效,读使能有效时即有数据输出。Standard FIFO在读使能有效时,数据一般延时1个时钟周期输出。通过仿真来查看这两种FIFO的特点。
内容概要:本文深入探讨了二极管箝位型三电平逆变器(NPC)的关键技术和挑战,主要包括三电平空间矢量调制(SVPWM)和中点电位平衡调制。SVPWM通过合成参考电压矢量使输出电压接近正弦波,而中点电位平衡则解决因直流侧电容充放电不平衡导致的问题。文中提供了详细的MATLAB代码示例,展示了如何实现这两种调制方法,并介绍了在MATLAB/Simulink中构建仿真模型的具体步骤。此外,文章还讨论了一些实用技巧,如使用坐标变换简化扇区判断、引入滞回比较稳定矢量位置、以及通过预测电流法改善中点电位平衡的动态响应。 适合人群:从事电力电子领域的研究人员和技术人员,尤其是对三电平逆变器有兴趣的工程师。 使用场景及目标:适用于需要深入了解NPC三电平逆变器工作原理的研究人员,帮助他们掌握SVPWM调制和中点电位平衡的技术细节,从而优化逆变器的设计和性能。 其他说明:文章不仅提供了理论分析,还包括大量实际工程中的经验和技巧,有助于读者更好地理解和应用相关技术。