一.背景
这个和demo1类似,只是提供另外一种实现方式,类似kafka 利用streamTableSource 来做。
二.代码
@Data @ToString public class UserInfo implements Serializable { private Timestamp pTime; private String userId; private String itemId; public UserInfo() { } public UserInfo(String userId, String itemId) { this.userId = userId; this.itemId = itemId; this.pTime = new Timestamp(System.currentTimeMillis()); } }
public class UserTableSource implements StreamTableSource<UserInfo>, DefinedRowtimeAttributes { /** * 返回类型 * @return */ @Override public TypeInformation<UserInfo> getReturnType() { return TypeInformation.of(UserInfo.class); } @Override public TableSchema getTableSchema() { // 可以 这样定义 // TableSchema schema = new TableSchema( // new String[]{"pTime","userId","itemId"}, // new TypeInformation[]{Types.SQL_TIMESTAMP,Types.STRING,Types.STRING}); return TableSchema.fromTypeInfo(getReturnType()); } @Override public String explainSource() { return "userSource"; } @Override public DataStream<UserInfo> getDataStream(StreamExecutionEnvironment execEnv) { UserDataSource source = new UserDataSource(); DataStream<UserInfo> userInfoDataStream = execEnv.addSource(source); return userInfoDataStream; } @Override public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() { RowtimeAttributeDescriptor descriptor = new RowtimeAttributeDescriptor("pTime", new ExistingField("pTime"), new AscendingTimestamps()); return Collections.singletonList(descriptor); } }
public class UserStreamTableApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // create a TableEnvironment StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); tEnv.registerTableSource("test", new UserTableSource()); Table result = tEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM test" + " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId "); // deal with (Tuple2<Boolean, Row> value) -> out.collect(row) SingleOutputStreamOperator allClick = tEnv.toRetractStream(result, Row.class) .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> { out.collect(value.f1); }).returns(Row.class); // add sink or print allClick.print(); env.execute("test"); } }
相关推荐
总结来说,“flink-sql-demo-data-part2.tar.gz”压缩包中的数据是Flink SQL实战的重要素材,通过解析和操作这些数据,开发者可以熟悉Flink SQL的语法和功能,进而应用到实际的大数据处理任务中。无论是简单的数据...
Flink SQL支持标准的SQL语法,并且通过Table API提供了丰富的函数和操作符,包括聚合、窗口、连接等。 **RabbitMQ** 是基于AMQP(Advanced Message Queuing Protocol)协议的消息代理,能够可靠地路由和存储消息。...
通过Flink的Table & SQL API或DataStream API,我们可以将Doris作为数据源或数据接收器,实现数据的实时导入导出。 4. **Flink操作Doris的Java API** 在Java环境中,我们需要引入Flink的相应依赖,并使用`...
4. **Flink-day07**:可能包含Flink的高级功能,如事件时间处理、水印机制,或者Flink的SQL接口和Table API。 5. **Flink-day02**:通常会介绍基本的Flink概念,如DataStream API,以及基本的算子和连接操作。 6. **...
6. **Table & SQL API**:这是Flink用于统一处理流和批数据的API,它允许用户使用SQL查询数据,同时也支持Java和Scala的表API进行声明式编程。 7. **窗口操作**:Flink支持多种窗口操作,如滑动窗口、会话窗口、 ...
在"flink-sclas-demo-master"这个项目中,你可能会看到以下关键部分: 1. **数据源(Sources)**:Flink允许从多种数据源(如Kafka、RabbitMQ、文件等)获取数据流。在Scala代码中,会定义SourceFunction,用于定义...
.tableList("testdb.userdemo") // 指定监听的表 .deserializer(new JsonDebeziumDeserializationSchema()) // 用Json格式化输出 .startupOptions(StartupOptions.initial()) // 设置启动模式 .build()); // ...
它提供了TableAPI、Flink SQL以及DataStream API的实例,同时也涵盖了从socket、Kafka和MySQL数据源进行数据处理的场景。 【描述】"flinkDemo.zip"是一个压缩包,其中包含了实现上述功能的所有源代码。这个项目不仅...
sqlSubmit 基于flinkSQL提交程序现在只支持flink 1.12.0 了解有关Flink的更多信息 源自Jark的博客特征提交flink sql到集群例子SQL文件demo.sql像这样: -- sourceCREATE TABLE user_log ( user_id VARCHAR ,item_id ...
2. **创建 Iceberg 表**:在 Flink 中创建 Iceberg 表,可以使用 `TableEnvironment` 的 `create` 方法。例如: ```java TableEnvironment tableEnv = TableEnvironment.create(...); String createTableDdl = ...
2. 在flinksql中实现HTTP请求 3. 在flinksql中自定义函数调用过程 3.1. 编译打包代码,生成的jar复制到flink安装目录下的lib目录。 例如:/flink/lib 3.2. flinksql脚本注册自定义函数 CREATE FUNCTION ...
2. **有状态流处理**:Flink 支持处理过程中保持状态,这使得它能够进行复杂的计算,如窗口聚合、滑动窗口等。 3. **流批一体化**:Flink的流水线运行时系统能够统一处理批量和流式数据,避免了批处理和流处理之间...
8. **Flink Table & SQL API**:这是Flink的高级API,提供了一种统一的方式来处理批处理和流处理作业,简化了开发流程。 9. **生态集成**:Flink能够与Hadoop、Spark等其他大数据工具无缝集成,增强了整个大数据...