一、前言
随着新兴的大数据处理框架不断推陈出新,storm已经逐渐淡出了人们的视野,storm的社区已经有近3年无人维护。storm已经被Flink替代,接下来,我们就以Flink框架作为大数据处理的核心框架进行研究,并把storm中的代码移植到Flink中。
先分析一下Flink的优势,了解一下选用Flink的优势和必要性。
具体的Flink优势和特点,可以从网上进行搜索去查看。具体内容,参考以下几个链接:
https://www.infoq.cn/article/fRt1RF1pxu_ZtmeObOoJ
本文主要以实战为主,先构建Flink的运行环境,通过实际运行代码来感受和分析Flink。
二、helloworld 初试
1.1 搭建运行环境
使用docker快速搭建和启动Flink环境。docker的具体使用,请网上自行检索。
docker-compose.yml
version: "2.1"
services:
jobmanager:
image: flink
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
1.2 编写 helloword
maven工程配置文件:pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tchen</groupId>
<artifactId>flink</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>flink</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.7.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
-->
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.dalong.app.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
java程序:helloworld.java
package com.tchen.flink.helloworld;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// 输入tcp流
final int port;
final String host;
port = 9008; // nc监听的tcp端口
host = "10.10.50.70"; // docker宿主机ip
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(host, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
// @Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
// @Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
三、本地调试
示例代码:FlinkLocal.java
package com.tchen.flink.local;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
public class FlinkLocal {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
DataSet<String> data = env.readTextFile("file:///d:/flink-data.txt");
data
.filter(new FilterFunction<String>() {
public boolean filter(String value) {
return value.startsWith("http://");
}
})
.writeAsText("file:///d:/flink-data-out.txt");
JobExecutionResult res = env.execute();
}
}
参考资料:
官方:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/tutorials/local_setup.html
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/pom.xml
https://flink-docs-cn.gitbook.io/project/05-ying-yong-kai-fa/batch-dataset-api/ben-di-zhi-hang
https://www.iamle.com/archives/2572.html
https://juejin.im/post/5c4f16dbe51d454f342fb7e7
https://www.infoq.cn/article/zbBAGroBgtytDiBs*Xq9
分享到:
相关推荐
Flink有一个非常重要的...课程内容包括了Flink安装部署,入门实战案例,Flink原理初探,流处理的教学,Flink高级API和Flink-Table-SQL-案例,Flink高级特性和新特性,Flink多语言开发,Flink监控与优化。 视频大小:4G
13-[掌握]-Flink原理初探-角色分工-执行流程-DataFlow 14-[掌握]-Flink原理初探-TaskSlot和TaskSlotSharing 15-[掌握]-Flink原理初探-执行流程图生成 Flink-day02 01-[理解]-流处理核心概念说明 02-[掌握]-Source-...
借鉴电商的流处理技术(如Kafka、Flink),可以实现实时数据流分析,为调度决策提供即时支持。 4. 用户画像构建:电商通过用户购物行为构建用户画像,为个性化推荐服务。公交大数据平台同样可以构建乘客画像,根据...
智能 Web 研发初探 一站式业务稳定性保障平台的 AIOps 实践 信息流广告的排序算法演进 小游戏质量保证测试实践之路 物流仓储数据分发平台架构实践及挑战 万亿规模下高吞吐低时延查询系统架构设计 数字化转型提升企业...
项目资源包含:可运行源码+数据集+文档 python + numpy, pandas, matplotlib, pyecharts, wordcloud 适用人群:学习不同技术领域的小白或进阶学习者;可作为课程设计、大作业、工程实训或初期项目立项。 数据来源:数据集taxis.csv从网络下载 数据清洗:异常值与缺失值的处理:有一些数据distance(乘车距离)为零而且上下车地点为空,还有些一些数据的payment(支付方式)为空。 数据预处理:将列名更改成中文 标准化与归一化: 数据分析: 数据可视化:
TypeScript 入门教程
人脸识别项目实战
本资源汇总了 历届全国电子设计竞赛(电赛)真题+模拟题,涵盖 电路设计、嵌入式系统、信号处理、自动控制等核心考点,并提供详细解析及综合测评,帮助参赛者高效备赛、查漏补缺、提升实战能力。 适用人群: 适合 准备参加电子设计竞赛的大学生、电赛爱好者、电子信息类相关专业的学生,以及希望提高电子设计和电路分析能力的工程师。 能学到什么: 电赛考察重点:熟悉往届竞赛的命题方向及考核重点。 电路设计与仿真:提升模拟电路、数字电路、单片机等核心技能。 问题分析与解决能力:通过综合测评找到薄弱点并针对性提升。 实战经验:掌握竞赛策略,提高应试效率和设计能力。 阅读建议: 建议先 通读真题,了解题型与解题思路,然后 结合模拟题实战演练,查找不足并通过测评强化练习,逐步提升竞赛能力。
2024人工智能如何塑造未来产业:AI对各行业组织带来的的变革研究研究报告.pdf
人脸识别项目源码实战
给大家分享一套课程——Vulkan原理与实战课程
c语言学习
海豚鲸鱼数据集 5435张图 正确识别率可达92.6% 可识别:海豚 虎鲸 蜥蜴 海豹 鲨鱼 龟 支持yolov8格式标注
答谢中书书教学设计.docx
人脸识别项目源码实战
c语言学习
人脸识别项目源码实战
人脸识别项目实战
本美发门店管理系统有管理员和用户两个角色。用户功能有项目预定管理,产品购买管理,会员充值管理,余额查询管理。管理员功能有个人中心,用户管理,美容项目管理,项目类型管理,项目预定管理,产品库存管理,产品购买管理,产品入库管理,会员卡管理,会员充值管理,余额查询管理,产品类型管理,系统管理等。因而具有一定的实用性。 本站是一个B/S模式系统,采用SSM框架,MYSQL数据库设计开发,充分保证系统的稳定性。系统具有界面清晰、操作简单,功能齐全的特点,使得美发门店管理系统管理工作系统化、规范化。本系统的使用使管理人员从繁重的工作中解脱出来,实现无纸化办公,能够有效的提高美发门店管理系统管理效率。 关键词:美发门店管理系统;SSM框架;MYSQL数据库;Spring Boot 1系统概述 1 1.1 研究背景 1 1.2研究目的 1 1.3系统设计思想 1 2相关技术 2 2.1 MYSQL数据库 2 2.2 B/S结构 3 2.3 Spring Boot框架简介 4 3系统分析 4 3.1可行性分析 4 3.1.1技术可行性 4 3.1.2经济可行性 5 3.1.3操作可行性 5 3.2系
内容概要:本文档介绍了基于SSA-CNN-GRU麻雀算法优化卷积门控循环单元数据分类预测的详细项目实例,重点讲述了该项目的背景、目标、挑战与解决方案、技术特点、应用领域等方面的内容。文档详细记录了从项目启动、数据预处理、算法设计(SSA优化CNN-GRU模型)、构建与评估模型到实现美观的GUI界面整个过程,并讨论了防止过拟合的技术如正则化、早停和超参数优化。另外还涵盖了项目扩展的可能性、部署和应用策略、需要注意的地方以及未来改进的方向。全文强调了模型的泛化能力和计算效率,展示了该混合算法模型在实际应用中的优越性能。 适合人群:具备一定的Python编程经验及机器学习基础知识的研究人员和技术人员;对深度学习、智能优化算法及实际应用感兴趣的学者和从业者;寻求提升数据分析和预测准确性的金融分析师、数据科学家等相关专业人士。 使用场景及目标:本文档非常适合用作学习和参考资料,以掌握如何将SSA、CNN与GRU三种先进技术结合起来进行复杂的分类和预测问题求解。具体应用场景包括但不限于以下几个方面:金融领域——股票价格预测;医疗保健领域——辅助诊断;工业制造——预防性维护;智能家居——个性化服务;以及其他涉及到时序数据分析和多模态数据处理的场合。文档既包含了理论知识又提供了完整的源代码示例,可以帮助读者理解算法原理并通过实践中加深对其的认识。 其他说明:该项目不仅仅是关于算法的设计实现,更是有关于系统的整体架构规划以及工程上的考量,比如环境准备(确保环境洁净、必要包的安装等)、数据准备、GPU配置支持等等。同时文中给出了详细的代码片段,方便开发者理解和复现实验成果。值得注意的是,虽然文中提供了一套通用解决方案,但在真实场景下还需要针对性的调整参数或修改网络结构来达到最好的性能效果。此外,对于追求更高的预测精度或解决更大规模的问题,作者建议进一步探索深度强化学习等高级技术和多任务学习策略,并且考虑使用增量学习让模型能够适应新数据而不必重新训练整个模型。最后提到安全性和隐私保护也是项目实施过程中的重要因素,要妥善保管用户的敏感信息并且做到合法合规地收集和使用数据。