一、前言
随着新兴的大数据处理框架不断推陈出新,storm已经逐渐淡出了人们的视野,storm的社区已经有近3年无人维护。storm已经被Flink替代,接下来,我们就以Flink框架作为大数据处理的核心框架进行研究,并把storm中的代码移植到Flink中。
先分析一下Flink的优势,了解一下选用Flink的优势和必要性。
具体的Flink优势和特点,可以从网上进行搜索去查看。具体内容,参考以下几个链接:
https://www.infoq.cn/article/fRt1RF1pxu_ZtmeObOoJ
本文主要以实战为主,先构建Flink的运行环境,通过实际运行代码来感受和分析Flink。
二、helloworld 初试
1.1 搭建运行环境
使用docker快速搭建和启动Flink环境。docker的具体使用,请网上自行检索。
docker-compose.yml
version: "2.1"
services:
jobmanager:
image: flink
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
1.2 编写 helloword
maven工程配置文件:pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tchen</groupId>
<artifactId>flink</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>flink</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.7.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
-->
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.dalong.app.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
java程序:helloworld.java
package com.tchen.flink.helloworld;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// 输入tcp流
final int port;
final String host;
port = 9008; // nc监听的tcp端口
host = "10.10.50.70"; // docker宿主机ip
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(host, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
// @Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
// @Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
三、本地调试
示例代码:FlinkLocal.java
package com.tchen.flink.local;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
public class FlinkLocal {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
DataSet<String> data = env.readTextFile("file:///d:/flink-data.txt");
data
.filter(new FilterFunction<String>() {
public boolean filter(String value) {
return value.startsWith("http://");
}
})
.writeAsText("file:///d:/flink-data-out.txt");
JobExecutionResult res = env.execute();
}
}
参考资料:
官方:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/tutorials/local_setup.html
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/pom.xml
https://flink-docs-cn.gitbook.io/project/05-ying-yong-kai-fa/batch-dataset-api/ben-di-zhi-hang
https://www.iamle.com/archives/2572.html
https://juejin.im/post/5c4f16dbe51d454f342fb7e7
https://www.infoq.cn/article/zbBAGroBgtytDiBs*Xq9
分享到:
相关推荐
Flink有一个非常重要的...课程内容包括了Flink安装部署,入门实战案例,Flink原理初探,流处理的教学,Flink高级API和Flink-Table-SQL-案例,Flink高级特性和新特性,Flink多语言开发,Flink监控与优化。 视频大小:4G
分享课程——《Flink SQL大数据项目实战》,2022新课,基于Flink1.14.3版本。提供视频配套的源码和文档下载! Flink SQL大数据项目实战课程以FlinkSQL流批一体技术为主线,全面讲解Flink Table编程、SQL编程、Time...
Apache Flink 进阶(一):Runtime 核心机制剖析 4 Apache Flink 进阶(二):时间属性深度解析 18 Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践 30 Apache Flink 进阶(四):Flink on Yarn/K8s 原理...
flink 1.6.0 源码 <!-- Dummy module to force execution of the Maven Shade plugin (see Shade plugin below) --> <module>tools/force-shading <module>flink-annotations <module>flink-shaded-...
标题中的"flink-1.13.6_cdc"指的是Apache Flink的1.13.6版本,专门用于Change Data Capture (CDC)的实现。Flink是流行的开源流处理框架,它允许实时处理数据流,而CDC则是一种技术,用于捕获数据库中的更改事件并将...
Flink 入门文档 Flink 是一个大数据处理框架,具有批流一体、高容错、高吞吐、低延迟、大规模计算、多平台部署等核心特点。本文档将对 Flink 的核心概念、架构、API 层次进行详细介绍。 1. 核心概念 Flink 是一个...
Apache Flink 是一个开源的流处理和批处理框架,它为实时数据处理提供了高效、可扩展和容错的解决方案。Flink 1.15.2 版本是该框架的一个稳定版本,针对前一版本进行了优化和修复,旨在提供更稳定的性能和更多的功能...
13-[掌握]-Flink原理初探-角色分工-执行流程-DataFlow 14-[掌握]-Flink原理初探-TaskSlot和TaskSlotSharing 15-[掌握]-Flink原理初探-执行流程图生成 Flink-day02 01-[理解]-流处理核心概念说明 02-[掌握]-Source-...
Flink 1.11 中文文档 Flink 是一个开源的分布式处理引擎,用于实时处理大规模数据流。Apache Flink 1.11 是 Flink 的一个版本,提供了许多新的功能和改进。下面是 Flink 1.11 中文文档的知识点总结: 1. Apache ...
在大数据处理领域,Apache Flink 是一个强大的流处理框架,它提供实时数据处理和批处理的能力,具有高吞吐量、低延迟以及状态管理等特性。"Flink示例源码-Flink入门"是一个旨在帮助初学者理解并掌握Flink核心概念和...
《Flink 1.13.1 在 CDH 6.3.2 上的部署详解》 Apache Flink 是一个开源的流处理框架,提供低延迟、高性能的实时数据处理能力。版本1.13.1是Flink的一个重要里程碑,它引入了多项优化和改进,增强了稳定性和用户体验...
Flink 和 Spark 比较 Flink 和 Spark 都是大数据处理领域中的重要框架,本文将对它们进行比较,分别介绍它们的核心实现、计算模型、硬件需求、数据源集成、性能对比等方面。 核心实现 Apache Spark 是基于 Scala ...
标题中的“flink-sql集成rabbitmq”指的是将Apache Flink的数据流处理能力与RabbitMQ消息队列系统相结合,实现数据的实时处理和传输。Flink是一个开源的流处理框架,提供低延迟、高吞吐量的数据处理,而RabbitMQ是一...
Apache Flink是一个流行的开源流处理和批处理框架,它提供了高度容错的数据处理能力,能够实时处理大规模数据流。Flink 1.16.2是该软件的一个稳定版本,针对Scala 2.12构建,这意味着它可以充分利用Scala语言的强大...
Flink 1.18.1 是一个高度可扩展的开源流处理框架,适用于实时数据流分析和批处理。在Linux环境下安装Flink 1.18.1,需要遵循以下步骤,确保系统具备必要的环境并正确配置。 首先,确保你的Linux系统上已经安装了...
在本教程中,我们将深入探讨"flink-java-本地例子"这一主题,它涉及Apache Flink的Java API以及如何在本地环境中运行Flink程序。Apache Flink是一个强大的开源流处理和批处理框架,广泛用于实时数据处理。我们将讨论...
Apache Flink 是一个开源的流处理和批处理框架,它在大数据领域扮演着重要的角色。Flink 1.14.4 是该框架的一个稳定版本,提供了诸多改进和新特性,旨在提高性能、稳定性和易用性。在这个版本中,我们可以深入研究其...
在构建实时数据仓库的过程中,Flink、FlinkCDC、FlinkSQL和Clickhouse这四个关键技术起着至关重要的作用。本文将深入探讨这些技术,并结合2022年新课中的内容,详细介绍如何利用它们构建高效的实时数据处理系统。 ...
在大数据处理领域,Apache Flink 是一款开源的流处理框架,以其高效实时处理能力和强大的状态管理而备受关注。本文将围绕“flink新版本bat启动文件.zip”这一主题,详细阐述Flink的启动过程,以及如何在新版本中解决...