基本语法
from stream_def [as name] [, stream_def [as name]] [,...] |
[where search_conditions] |
[group by grouping_expression_list] |
[having grouping_search_conditions] |
[output output_specification] |
[order by order_by_expression_list] |
选择事件流
1.进入与流出
select [istream | irstream | rstream] [distinct] * from userbuy.win:time( 5 sec)
|
我们划分了一个5秒的滑动时间窗口,并往里边写入数据
select后的4个选项影响事件流的类型,分别对应进入时间窗口、从窗口移出、二者包含、以及不重复的事件
分别编写istream和rstream的规则,在我们插入一批数据后,istream在5秒后输出进入窗口的数据,5秒过后,rstream会输出同样的内容
将规则改为
select [istream | rstream] count(*) from userbuy.win:time( 15 sec)
|
每隔一秒插入一条数据,插入3次,会得到如下的统计结果:
time
istream
rstream
18:33:35 |
1 |
0 |
18:33:38 |
2 |
1 |
18:33:43 |
3 |
2 |
18:33:50 |
2 |
3 |
18:33:53 |
1 |
2 |
18:33:58 |
0 |
1 |
在滑动时间窗口中,CEP会在每条事件进入和移出窗口时,执行EPL语句,比如istream分析窗口内的事件,当有事件移出时,也会对当前窗口内的事件进行统计
rstream分析移出窗口的事件,但当有事件进入时,也会统计当前移出窗口的事件的总数,当窗口中无数据时,看上去像是把一个空数据移出了
2.过滤
select * from userbuy(lottery='ssc').win:time(15 sec)
在事件进入窗口前先进行了过滤
views
即数据窗口,分别针对时间、数量等因素制定了各类窗口
win
win:length
win:length(10) 滑动窗口,存最近的5条数据
win:length_batch
win:length_batch(10) 间隔窗口,存满10条触发查询
win:time
win:time(5 sec) 滑动窗口,存最近5秒内的数据
win:ext_timed
win:ext_timed(timestamp,10 sec) 滑动窗口,存最近10秒内的数据,不再基于EPL引擎,而是系统时间戳
win:time_batch
win:time_batch(10 sec) 10秒间隔的窗口
win:time_batch(10 sec,"FORCE_UPDATE, START_EAGER") 加上这两个参数后,会在窗口初始化时就执行查询,并且在没有数据进入和移出时,强制查询出结果
win:time_length_batch
win:time_length_batch(10 sec,5),当时间或数量任意一个满足条件时,触发查询
win:time_accum
select rstream * from userbuy.win:time_accum(15 sec) 阻塞输出,直到15秒内没有新进入的数据时,才输出并触发查询. 这些数据视为已移出窗口
win:keepall
win:keepall() 无参数,记录所有进入的数据,除非使用delete操作,才能从窗口移出数据,详见name window
win:firstlength
win:firstlength(10) 保留一批数据的前10条,需配合delete操作
win:firsttime
win:firsttime(10 sec) 保留窗口初始化后10秒内的所有数据
std
std:unique
std:unique(id) 对不同的id保留其最近的一条事件
std:groupwin
std:groupwin(id).win:length(5) 一般与其它窗口组合使用,此例子中将进入的数据按id分组,相同id的数据条目数不大于5
select sum(amount),id from std:groupwin(id).win:length(5) group by id 统计每组id最近5次消费的总金额数
std:size
select size from userbuy.win:time(1 min).std:size() 统计一分钟内事件总数
std:firstunique
std:firstunique(id) 相同id的事件只取第一条
stat
进行一些统计数学统计,统计项固定
stat:uni, stat:linest, stat:correl,stat:weighted_avg
ext
ext:sort
对指定的字段进行排序,限定个数
select sum(amount) from userbuy.ext:sort(3, amount desc) 将3个最高金额求和
select * from userbuy.ext:sort(3,amount desc,id asc) 找出最高金额的3条事件,并按id排序
output 与 stream
与数据库关联查询
EPL可以与各种关系型及非关系型数据库通讯,进行关联查询,基本语法
select custId, cust_name from CustomerCallEvent, |
sql:MyCustomerDB [ ' select cust_name from Customer where cust_id = ${custId} ' ]
|
name window
在EPL中可以创建命名窗口,此窗口可供很多语句来查询,并且可以对此窗口进行insert update delete操作,一些不自动移出数据的view可以派上用场了,当然,也可以对常用的那些view使用
create
create window USERBUY.win:keepall() select id,amount,lottery from userbuy |
创建命名窗口,并声明包含的字段和类型
insert
insert into USERBUY select id,amount,lottery from userbuy |
update
update和delete属于触发式操作,当有事件进入时产生
on lotterychange.win:time( 5 sec)
|
update USERBUY set amount = 1
|
where USERBUY.id = lotterychange.id |
delete
on ticket.win:time( 10 sec)
|
where USERBUY.amount != 1
|
trigger
与标准sql不同,EPL基于数据流,操作都需要事件触发,比如动态窗口的每次查询都是在事件发生的时候。
一个适合的例子是子查询
select (select count(*) from OLDUSERBUY) as oldcount, |
(select sum(amount) from NEWUSERBUY) as newsum
|
from trigger.win:time( 5 sec)
|
此查询只会在trigger接收到事件时触发
分割与复制
将事件进行条件判断,把数据流划分到不同命名窗口
insert into OLDUSERBUY select id,amount,lottery where id > 100
|
insert into NEWUSERBUY select id,amount,lottery where amount > 1000
|
当事件进入时,以从上往下的速度去执行,成功则停止,只进入一个命名窗口,实现分割;加上output all后,逐一判断,复制到多个命名窗口
自定义变量
创建一个变量,此值可以用在各种语句中作为条件判断,也可以被触发修改
Pattern
Match Recognize
以正则表达式的形式匹配事件,做出复杂分析,语法如下:
[ partition by partition_expression [, partition_expression] [,...] ]
|
measures measure_expression as col_name [, measure_expression as col_name ] [,...]
|
[ after match skip (past last row | to next row | to current row) ]
|
pattern ( variable_regular_expr [, variable_regular_expr] [,...] )
|
define variable as variable_condition [, variable as variable_condition] [,...]
|
假定现在有人对单个url逐页进行抓取,我们可以从日志中取出IP、URL、页码,发给cep进行分析
select * from webaccess.win:time_batch( 60 sec)
|
measures A.url as a_url,count(B.page) as b_count,A.ip as a_ip
|
B as B.page - prev( 1 ,B.page) = 1 )
|
从webaccess数据流接收事件,并开始匹配,按相同的ip和url来分组,需要获取访问的url、ip及访问次数
接下来要确认什么样的事件符合我们的需求,它在抓取第一页后,抓取第二页时,页码数都要比之前大1位,之后一直是这类情况,于是统计此情况发生的次数,
如果此次数在1分钟的窗口内大于100次,便认为它是抓取
分享到:
相关推荐
Java 使用 Esper 实现事件分批处理是大数据实时分析领域中的一种常见技术,Esper 是一个强大的事件处理引擎,尤其适用于复杂事件处理(CEP,Complex Event Processing)。在 Java 应用程序中集成 Esper 可以帮助我们...
Esper的核心功能是通过其特有的EPL(Esper事件处理语言)来编写规则和表达式,这些规则能够对事件流进行分析和响应。 在Java应用中集成Esper,首先需要在项目中引入Esper的依赖库。Esper提供了一个丰富的API,其中...
Esper 的核心功能在于其事件处理语言(Event Processing Language, EPL),这是一种类似 SQL 的查询语言,允许用户定义事件模式并根据这些模式进行实时分析。以下是一些主要的知识点: 1. **EPL 语言**:EPL 提供了...
Esper 支持丰富的事件处理语言 (EPL),使得开发者能够快速构建高性能的应用程序。 - **Esper 的特点**: - **表达性强的事件处理语言**:支持连续查询、聚合、连接、因果关系及缺失事件处理、与历史数据的连接等。...
Esper是一种强大的事件处理引擎,常用于实时分析和复杂事件处理(CEP,Complex Event Processing)。在IT领域,Esper被广泛应用于金融交易、物联网、监控系统等需要实时数据流分析的场景。Esper的核心功能是能够从...
1. **esper.jar**:这是Esper的核心库,包含了事件处理器、表达式语言、API接口等核心功能。 2. **esper-javadoc.jar**:提供了Esper API的Java文档,方便开发者查阅和理解API的用法。 3. **esper-distribution....
在IT领域,Esper是一种实时事件处理引擎,常用于复杂事件处理(CEP)。Esper Engine 支持Esper Process Language(EPL),一种专为流数据处理设计的SQL-like查询语言。EPL Builder,正如其名,是一个专门用于构建和...
Esper是一款广泛使用的开源复杂事件处理(CEP)引擎,它允许开发者在实时数据流中检测模式、趋势和异常,从而实现对动态业务环境的快速响应。Esper源码的分析对于理解其内部工作原理、优化性能以及进行自定义扩展至...
Esper是一种事件处理语言和引擎,用于处理实时数据流中的复杂事件。本文将对Esper的主要接口进行深入解析,并对服务商接口(ServiceProvider Interface)的使用方式进行详解。 #### 二、API概览 Esper提供了以下...
Esper是一种专为复杂事件处理(CEP)和事件流分析设计的引擎,适用于需要实时或准实时响应的场景。CEP技术的核心在于处理大量快速流动的数据,对事件进行关联、过滤、窗口分析等复杂运算,以识别模式并作出即时反应...
- **Esper**:Esper是一款专为事件流处理和复杂事件处理(Complex Event Processing, CEP)设计的强大引擎。它能够在事件流中检测到特定模式时触发预定义的动作。 - **应用场景**:Esper特别适用于需要处理大量数据...
2. **Esper API**:Esper提供了丰富的API,包括EventPattern语言,用于编写事件处理逻辑,以及EPL(Esper Processing Language),一种SQL-like的语言,用于声明性地定义事件模式。 3. **事件模型**:在Esper中,...
Esper支持多种事件表示方式,并提供了一个强大的查询语言EPL(Event Processing Language),使得用户可以轻松地定义事件模式并从中提取有价值的信息。 ##### 1.2 需要的第三方资源 为了运行Esper,除了需要JDK之外...
Esper是一个强大的事件流处理和复杂事件处理引擎。它能实现实时事件驱动的框架,当检测到特定事件时触发预定义的操作。 - **核心概念**: - **事件**:在系统中发生的任何有意义的行为或状态变化。 - **事件流**...
EPL 是 Esper 语言,Esper 是一个流行的企业级复杂事件处理引擎,它允许开发者定义对实时数据流中的模式进行监听和响应。EPL 的语法类似于 SQL,但针对时间序列和事件数据进行了优化,可以用于实时监控、报警、交易...
Esper 是一个强大的事件处理引擎,常用于实时流数据分析和复杂事件处理(CEP,Complex Event Processing)。在Java开发环境中,Esper 提供了丰富的API和工具,使得开发者能够构建实时应用程序,对持续流入的数据流...
4. **SQL-like查询语言**:Esper提供了一种类似SQL的查询语言EPL(Esper Processing Language),使得开发者能够方便地定义和管理事件处理规则。 EsperDist的分布式特性主要包括: 1. **水平扩展**:通过分布式部署...
Esper 是一个实时事件处理引擎,它用于处理连续的数据流并执行复杂的事件相关分析。在 Java 开发中,Esper 通常被用于大数据环境下的实时分析和决策支持。`EsperQueryTest` 可能是一个项目或代码示例,旨在演示如何...
Esper 提供了一种声明式语言(EPL,Esper Programming Language),使得开发者可以定义规则和事件处理逻辑,从而在数据流中触发即时响应。 **虚拟数据窗口** 是 Esper 中的一个关键概念,它允许用户定义一个时间...
Esper的工作原理是接收连续的数据流,然后通过内置的SQL-like语言EPL(Esper Processing Language)对这些数据进行模式匹配和规则定义。这种实时分析能力使得Esper成为监控和预测业务关键指标的理想选择。 **与...