Esper之事件处理
此篇博文主要用于介绍Esper对事件的处理,即Esper的进程模型。
1.UpdateListener
UpdaterListener是Esper提供的一个接口,用于监听某个EPL在引擎中的运行情况,即事件进入并产生结果后会通知UpdateListener。接口如下
package com.espertech.esper.client; import com.espertech.esper.client.EventBean; public interface UpdateListener { public void update(EventBean[] newEvents, EventBean[] oldEvents); }
接口很简单,就一个update方法,其中包括两个EventBean数组,至于两个参数的含义稍后再说。EventBean中有一个最常用的get方法,是用来得到EPL中某个字段的值。例如:
EPL:select name from User //假设newEvents长度为一 newEvents[0].get("name")能得到进入的User事件的name属性值 EPL:select count(*) from User.win:time(5 sec) //假设newEvents长度为一 newEvents[0].get("count(*))能得到5秒内进入引擎的User事件数量有多少
get方法最常用,此外还有getUnderlying等方法,以后会专门写一篇介绍EventBean的。
2.Insert and Remove Stream
Insert表示进入引擎,Remove表示移出引擎,事件在Esper中会因为某类EPL才会经历这两种状态。对应于 UpdateListener接口就是newEvents和oldEvents,因为处于这两种状态的事件不一定只有一个,所以newEvents和 oldEvents就是数组形式。举个例子说明下:
EPL:select * from User
从此图可以看出,随着时间推移,每个进入到引擎的W事件都是newEvents,即Insert Stream。W后括号里的值为属性值,可忽略。
有人可能要问了,为什么这里oldeEvents什么都没有。那是因为EPL的关系。看下面的例子:
EPL:select * from User.win:length(5)
注:win:length(5)是个view,详细的后面会专门讲解,这里先暂时理解为Esper开放一个空间并最多可同时存放5个事件(此空间其实就是大小为5的数组)
由图可知,length window可存放w1,w2等事件,在w6事件进入之前,每个事件进入都属于newEvents。直到w6进入后,length window不能容纳w1~w6的事件,必须把w1事件移出,即w1为oldEvents。length window就像一个队列,每当事件进入队列时,就会触发updateListener并告知有新事件进入。当队列满了,再进入一个新事件时,Esper 会触发UpdateListener告知有新事件进入并且有旧事件移出,正如上图所示的w6和w1。
实际上这个EPL触发监听器都只能看到newEvents,看不到oldEvents。如果想看到oldEvents,EPL要改写一下:
EPL:select irstream * from User.win:length(5)
默认情况下,Esper认为你只想让newEvents触发监听器,即istream(insert stream)。如果想让oldEvents触发监听器,那么为rstream(remove stream)。如果两个都想,那么为irstream。当然这个默认情况是可以配置的,以后会说到这个问题。
不过对于rstream,在我看来他有个bug,因为在运行时我发现,oldEvents触发监听器时,理论上应该是oldEvents这个参数有值,并 且api里也明确说明了这一点(就算他没说明,按照常理推断也应该是oldEvents有值),但是实际上是newEvents有值,oldEvents 为null。虽然说数据没有错,但是这个似乎不合常理。
3.Filter and Where-Clause
EPL有两种过滤事件的方式,一种是过滤事件进入view(可以把view理解为一个窗口),即Filter。另一种是让事件都进入view,但不触发UpdateListener,即Where子句。关于这两种语法后面会详细讲解,这里就只是简单介绍。
Filter:
// Apple事件进入Esper,只有amount大于200的才能进入win:length,并且length长度为5 EPL:select * from Apple(amount>200).win:length(5)
// Apple事件进入Esper并进入win:length(5),但是只有amount大于200的才能触发UpdateListener EPL:select * from Apple.win:length(5) where amount>200
由图上可以看出,Apple事件先进入view,然后才被where子句过滤,以至于被过滤掉的事件不会作为newEvent触发UpdateListener
其实单看两个EPL,就能发现一个过滤是在进入view前,一个过滤是在view后,所以大家在应用的时候要注意。PS:在我写这段的时候才发现以前认为这两种是一样的效果是错误滴- -!
4.Aggregation and Grouping
之前说过EPL是类SQL语法,所以也会有聚合和分组的功能。语法和SQL基本一样,下面给大家展示一下:
// 统计进入的5个Apple事件,amount的总数是多少 select sum(amount) from Apple.win:length_batch(5) // 统计进入的5个Apple事件,amount的总数是多少,并按照price分组 select price, sum(amount) from Apple.win:length_batch(5) group by price // 统计进入的5个Apple事件,amount的总数和name,并按照price分组 select price, name, sum(amount) from Apple.win:length_batch(5) group by price
最后一个和前一个的区别在于name也在统计的范围内,所以当name和price都一样的两个事件进入Esper,会有两个一模一样的事件作为 newEvent触发UpdaterListener,即price,name,sum(amount)都一样。当然要是group by name, price的话,就只会有一个事件触发监听器了。
下一讲将继续介绍Esper的Context上下文。
相关推荐
复杂事件处理(Complex Event Processing,简称CEP)是一种用于实时分析和响应大量数据流中的模式的技术。Esper作为一款高性能的CEP引擎,在处理大量事件时表现出色。该章节首先介绍了CEP的基本概念以及它与传统事件...
### 复杂事件处理(Complex Event Processing, CEP)与 Esper 概览 #### 一、复杂事件处理(CEP) 复杂事件处理是一种用于实时跟踪和分析来自不同数据源的信息流的技术,它能够识别出有意义的事件(如机会或威胁)...
- **Esper**:Esper是一款专为事件流处理和复杂事件处理(Complex Event Processing, CEP)设计的强大引擎。它能够在事件流中检测到特定模式时触发预定义的动作。 - **应用场景**:Esper特别适用于需要处理大量数据...
### Esper官网文档中英文对照201-400页:事件流处理与复杂事件处理技术解析 #### 一、Esper概述 Esper是一款专为事件流处理和复杂事件处理(Complex Event Processing, CEP)设计的强大引擎。它能够在事件流中检测...
**Esper** 是一个事件流处理和复杂事件处理(Complex Event Processing,简称 CEP)的引擎。它提供了一个强大的实时事件驱动框架,能够监测并响应事件流中的特定事件。这使得Esper在诸如金融市场交易监控、网络安全...
Esper是一个流行的开源框架,它专门用于实现复杂的事件处理(Complex Event Processing,CEP)和事件流分析。Esper提供了一种高效的方式来识别、过滤和响应大量事件流中的复杂模式,适用于诸如事件监控、网络监控等...
- **事件流处理(ESP)**与**复杂事件处理(CEP)**:这类处理通常没有统一的标准,但市场上有一些成熟的产品和技术,如IBM、WebLogic以及Esper等。 #### 六、Esper介绍 **Esper**是一个基于JAVA的ESP/CEP容器,...
- **复杂事件处理引擎**(如 Esper、Drools):支持复杂的事件规则和模式匹配,适用于需要识别特定事件序列的场景。 ### 实时数据处理引擎的选择 选择合适的实时数据处理引擎需综合考虑以下因素: - **应用场景**...
本方案的数据流图表明,Mc 信令(实时)数据通过 Socket 消息适配模块接入至 Esper 计算引擎进行实时处理,向应用提供事件 API 服务,支撑实时营销应用。非实时数据通过非实时 ETL 方式装载到 Hadoop 的 HDFS 文件...
- **Esper集成**:Esper是一个流行的复杂事件处理引擎,可以用于实时数据分析。 - **事件处理**:定义事件处理规则。 ##### 5. OSGi控制台集成 - **OSGi框架**:Rifidi Edge Server基于OSGi框架构建。 - **控制台...