`
greemranqq
  • 浏览: 976948 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

flink-table-sql-demo1

阅读更多

一.背景

     flink 这个东西,后面会尝试走纯SQL 统计路线,这个阿里和华为都搞了一套,这里就简单记录下测试效果。

用SQL统计用户点击数,每隔5秒统计一次。暂时去掉了复杂逻辑。

 

二.直接看代码

 

// lombok 插件,这里主要写一个简单的数据产生的对象
// 表是时间,用户,以及商品3个字段
@Data
@ToString
public class UserInfo implements Serializable {
    private Timestamp pTime;
    private String userId;
    private String itemId;

    public UserInfo() {
    }

    public UserInfo(String userId, String itemId) {
        this.userId = userId;
        this.itemId = itemId;
        this.pTime = new Timestamp(System.currentTimeMillis());
    }
}

 

/**
 * 模拟数据产生,没隔1秒发送一个数据
 * @author <a href="mailto:huoguo@2dfire.com">火锅</a>
 * @time 2019/2/22
 */
public class UserDataSource implements SourceFunction<UserInfo> {
    static String[] items = {"i-1", "i-2", "i-3"};
    static String[] users = {"a", "b", "c"};
    @Override
    public void run(SourceContext sc) throws Exception {
        while (true) {
            TimeUnit.SECONDS.sleep(1);
            int m = (int) (System.currentTimeMillis() % 3);
            sc.collect(new UserInfo(users[m], items[m]));
        }
    }
    @Override
    public void cancel() {
        System.out.println("cancel to do ...");
    }
}

 

 

/**
 * 这就主函数,负责统计,引用是java的,别引错了
 */
public class UserApp {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<UserInfo> userInfoDataStream = env.addSource(new UserDataSource());

        DataStream<UserInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserInfo>() {
            @Override
            public long extractAscendingTimestamp(UserInfo element) {
                return element.getPTime().getTime();
            }
        });
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        // pTime.rowtime  = pTime as rowTime(proctime)
        tableEnv.registerDataStream("test", timedData, "userId,itemId,pTime.rowtime");
        Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM  test" +
                " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId ");
        // deal with (Tuple2<Boolean, Row> value) -> out.collect(row)
        SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class)
                .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> {
                    out.collect(value.f1);
                }).returns(Row.class);
        // add sink or print
        allClick.print();
        env.execute("test");
    }

}

 

结果:

c,2019-03-06 13:55:20.0,4

 

a,2019-03-06 13:55:20.0,1

--- 手动分开好看

c,2019-03-06 13:55:25.0,2

b,2019-03-06 13:55:25.0,2

a,2019-03-06 13:55:25.0,1

---

a,2019-03-06 13:55:30.0,2

c,2019-03-06 13:55:30.0,2

b,2019-03-06 13:55:30.0,1

 

小结:

        1.demo很简单,仅为了测试使用

        2.具体的原理,和很多东西 有时间再写吧

 

0
0
分享到:
评论

相关推荐

    flink-sql-demo-data-part2.tar.gz

    总结来说,“flink-sql-demo-data-part2.tar.gz”压缩包中的数据是Flink SQL实战的重要素材,通过解析和操作这些数据,开发者可以熟悉Flink SQL的语法和功能,进而应用到实际的大数据处理任务中。无论是简单的数据...

    flink-sql集成rabbitmq

    Flink SQL支持标准的SQL语法,并且通过Table API提供了丰富的函数和操作符,包括聚合、窗口、连接等。 **RabbitMQ** 是基于AMQP(Advanced Message Queuing Protocol)协议的消息代理,能够可靠地路由和存储消息。...

    flink-doris-demo.rar

    通过Flink的Table & SQL API或DataStream API,我们可以将Doris作为数据源或数据接收器,实现数据的实时导入导出。 4. **Flink操作Doris的Java API** 在Java环境中,我们需要引入Flink的相应依赖,并使用`...

    flink学习demo代码库

    4. **Flink-day07**:可能包含Flink的高级功能,如事件时间处理、水印机制,或者Flink的SQL接口和Table API。 5. **Flink-day02**:通常会介绍基本的Flink概念,如DataStream API,以及基本的算子和连接操作。 6. **...

    基于flink1.12,使用java,flink sql的demo,包含Mylsql.zip

    6. **Table & SQL API**:这是Flink用于统一处理流和批数据的API,它允许用户使用SQL查询数据,同时也支持Java和Scala的表API进行声明式编程。 7. **窗口操作**:Flink支持多种窗口操作,如滑动窗口、会话窗口、 ...

    flink-sclas-demo:flink学习scala版

    在"flink-sclas-demo-master"这个项目中,你可能会看到以下关键部分: 1. **数据源(Sources)**:Flink允许从多种数据源(如Kafka、RabbitMQ、文件等)获取数据流。在Scala代码中,会定义SourceFunction,用于定义...

    Flink实战之 MySQL CDC.pdf

    .tableList("testdb.userdemo") // 指定监听的表 .deserializer(new JsonDebeziumDeserializationSchema()) // 用Json格式化输出 .startupOptions(StartupOptions.initial()) // 设置启动模式 .build()); // ...

    flinkDemo完整代码scala版 - flinkDemo.zip

    它提供了TableAPI、Flink SQL以及DataStream API的实例,同时也涵盖了从socket、Kafka和MySQL数据源进行数据处理的场景。 【描述】"flinkDemo.zip"是一个压缩包,其中包含了实现上述功能的所有源代码。这个项目不仅...

    sqlSubmit:基于 Flink 的 sqlSubmit 程序

    sqlSubmit 基于flinkSQL提交程序现在只支持flink 1.12.0 了解有关Flink的更多信息 源自Jark的博客特征提交flink sql到集群例子SQL文件demo.sql像这样: -- sourceCREATE TABLE user_log ( user_id VARCHAR ,item_id ...

    flink 操作iceberg 的示例代码

    Flink 提供了对 Iceberg 的原生支持,允许用户将 Flink SQL 或 DataStream API 直接应用于 Iceberg 表。这提供了高度的灵活性和性能,使得 Flink 可以方便地进行数据的读取、写入和更新操作。 1. **安装依赖**:在...

    实现一个简单的HTTP请求的Flink UDF函数

    1. flink自定义函数 2. 在flinksql中实现HTTP请求 3. 在flinksql中自定义函数调用过程 3.1. 编译打包代码,生成的jar复制到flink安装目录下的lib目录。 例如:/flink/lib 3.2. flinksql脚本注册自定义函数 CREATE ...

    Flink技术参考手册.docx

    1. **事件驱动(Event-driven)**:Flink以事件为基本单位进行处理,能够实时响应事件的发生,适合实时数据处理场景。 2. **有状态流处理**:Flink 支持处理过程中保持状态,这使得它能够进行复杂的计算,如窗口聚合...

    demodemodemo.zip

    8. **Flink Table & SQL API**:这是Flink的高级API,提供了一种统一的方式来处理批处理和流处理作业,简化了开发流程。 9. **生态集成**:Flink能够与Hadoop、Spark等其他大数据工具无缝集成,增强了整个大数据...

Global site tag (gtag.js) - Google Analytics