`
shangboz
  • 浏览: 14211 次
文章分类
社区版块
存档分类
最新评论

CEP(siddhi)复杂事件流程引擎

阅读更多

Siddhi是一个轻量级的,简单的开源的复杂事件流程引擎。它使用类SQL的语言描述事件流任务,可以很好的支撑开发一个可扩展的,可配置的流式任务执行引擎。传统设计之中,为了支持不同的告警规则类型,我们需要编写不同的业务逻辑代码,但是使用了Siddhi之后,我们只需要配置不同的流任务Siddhiql,即可以支持不同的告警业务。

 

Why use Siddhi:

Overview

 

 1.maven依赖

<dependency>
    <groupId>org.wso2.siddhi</groupId>
    <artifactId>siddhi-core</artifactId>
    <version>4.2.0</version>
</dependency>

<dependency>
    <groupId>org.wso2.siddhi</groupId>
    <artifactId>siddhi-query-api</artifactId>
    <version>4.2.0</version>
</dependency>

<dependency>
    <groupId>org.wso2.siddhi</groupId>
    <artifactId>siddhi-query-compiler</artifactId>
    <version>4.2.0</version>
</dependency>

 2、siddhi代码执行实例

// Creating Siddhi Manager
SiddhiManager siddhiManager = new SiddhiManager();

String siddhiApp = "define stream cseEventStream (symbol string, price float, volume long); " +
        "" +
        "@info(name = 'query1') " +
        "from cseEventStream#window.length(0) " +
        "select symbol, price, avg(price) as ap, sum(price) as sp, count(price) as cp " +
        "group by symbol " +
        "output first every 4000 milliseconds "+
        "insert into outputStream;";

// Generating runtime
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
// Starting event processing
siddhiAppRuntime.start();
// Adding callback to retrieve output events from query
siddhiAppRuntime.addCallback("query1", new QueryCallback()
{
    @Override
    public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents)
    {
        System.out.println("============query callback============");
        EventPrinter.print(timeStamp, inEvents, removeEvents);
        /*System.out.println(timeStamp);
        System.out.println(inEvents);*/
    }
});
siddhiAppRuntime.addCallback("cseEventStream", new StreamCallback() {
    @Override
    public void receive(Event[] events) {
        System.out.println("============input stream callback============");
        EventPrinter.print(events);
    }
});


// Retrieving InputHandler to push events into Siddhi
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");


int i = 1;
while (i <= 10) {
    float p = i*10;
    inputHandler.send(new Object[]{"WSO2", p, 100});
    System.out.println("\"WSO2\", " + p);
    inputHandler.send(new Object[] {"IBM", p, 100});
    System.out.println("\"IBM\", " + p);
    Thread.sleep(1000);
    i++;
}

// Shutting down the runtime
siddhiAppRuntime.shutdown();

// Shutting down Siddhi
siddhiManager.shutdown();

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics