关注公众号 登峰大数据 阅读完整版
基本API概念
Flink程序是在分布式集合上实现transformations的常规程序(例如,过滤、映射、更新状态、连接、分
组、定义窗口、聚合)。集合是从源(例如,从文件、kafka Topic或本地内存集合中读取数据)创建的。读
取的结果通过sinks返回,例如,sink可以将数据写入(分布式)文件,或者写入标准输出(例如,命令行终
端)。Flink程序在各种上下文中运行,standalone运行或嵌入到其他程序中。程序可以在本地JVM中执
行,也可以在许多机器的集群中进行。根据数据源的类型,即有界或无界数据源,可以编写一个批处理
程序或一个流式程序,其中DataSet API用于批处理程序,DataStream API用于流式处理程序。本指南
将介绍这两个API共有的基本概念,但请参阅我们的流处理指南和批处理指南,以获得关于使用每个API
编写程序的具体信息。
注意
在展示如何使用这些API的实际示例时,我们将使用StreamingExecutionEnvironment和DataStream
API。在DataSet API中,概念是完全相同的,只是被ExecutionEnvironment和DataSet替换了。
DataSet和DataStream
Flink程序剖析
延迟计算
key的指定
转换功能的指定
支持的数据类型
累加器和计数器
DataSet和DataStream
Flink用特殊的类DataSet和DataStream来表示程序中的数据。可以将它们视为不可变的数据集合,其
中可能包含重复的数据。对于DataSet,数据是有界的,而对于DataStream,元素的数量是无界的。
这些集合在某些关键方面与常规Java集合不同。首先,它们是不可变的,这意味着一旦创建了它们,就
不能添加或删除元素。也不能简单地检查其中的元素。
初始集合是通过在Flink程序中添加一个source(源)来创建的,而新的集合则是通过使用map、filter等
API方法对它们进行转换而派生出来的。
Flink程序剖析
Flink程序看起来像转换数据集合的常规程序。每个程序都由相同的基本部分组成:
1. 获取一个执行环境,
2. 加载/创建初始数据,
3. 指定对该数据的转换,
4. 指定放置计算结果的位置,
5. 触发程序执行
现在,我们将对每个步骤进行概述,请参阅相应的部分以了解更多细节。注意,Java DataSet API的所
有核心类都可以在org.apache.flink.api.java包中找到,而Java DataStream API的类可以在
org.apache.flink.streaming.api包中找到。
StreamExecutionEnvironment是所有Flink程序的基础。你可以获得一个
StreamExecutionEnvironment,使用其中的静态方法:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
通常,只需要使用getExecutionEnvironment(),因为这将根据上下文做正确的事:如果你执行程序在IDE
或普通Java程序将创建一个本地环境,将在本地机器上执行程序。如果将程序打包为一个JAR文件,并通
过命令行调用它,那么Flink集群管理器将执行您的main方法,而getExecutionEnvironment()将返回一
个用于在集群中执行程序的执行环境。
为了指定数据源,执行环境有几个方法,可以使用各种方式从文件中读取数据:可以逐行读取文件,如
CSV文件,或者使用完全定制的数据输入格式。要将文本文件作为行序列读取,可以使用以下命令:
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream text = env.readTextFile("file:///path/to/file");
这将为您提供一个DataStream,您可以在其上应用转换来创建新的派生DataStreams。
通过使用转换函数调用DataStream上的方法来应用转换。例如,一个map转换看起来像这样:
DataStream input = ...;
DataStream parsed = input.map(new MapFunction() {
@Override
public Integer map(String value) {
return Integer.parseInt(value);
}
});
这将通过将原始集合中的每个String转换为Integer来创建一个新的DataStream。
一旦有了包含最终结果的DataStream,就可以通过创建sink将其写入外部系统。这些只是一些创建sink
的示例方法:
writeAsText(String path)
print()
一旦开发了完整的程序,就需要通过调用StreamExecutionEnvironment上的execute()来触发程序执
行。根据ExecutionEnvironment的类型,可以在本地机器上执行程序,或将程序提交到集群上执行。
execute()方法返回一个JobExecutionResult,它包含执行时间和累加器结果。
有关流数据源和接收器(sink)的信息,以及关于DataStream上支持的转换的更深入的信息,请参阅流处
理指南。
有关批处理数据源和接收器(sink)的信息,以及有关DataSet上支持的转换的更深入的信息,请参阅批处
理指南。
延迟计算
所有的Flink程序都是延迟执行的:当程序的main方法执行时,数据加载和转换不会立即执行。相反,
每个操作都被创建并添加到程序的计划中。当执行环境中的execute()调用显式地触发执行时,这些操作
才被实际执行。程序是在本地执行还是在集群中执行,取决于执行环境的类型。
延迟计算允许构建复杂的程序,Flink将其作为一个整体计划单元执行。
key的指定
一些转换(join、coGroup、keyBy、groupBy)要求在元素集合上定义一个键(key)。其他转换(Reduce、
GroupReduce、Aggregate、Windows)允许在应用数据之前,按key对数据进行分组。
DataSet被分组为
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
而key也可以在DataStream上使用
DataStream<...> input = // [...]
DataStream<...> windowed = input
.keyBy(/*define key here*/)
.window(/*window specification*/);
Flink的数据模型不是基于键值对的。因此,您不需要硬性将数据集类型改为键和值类型。键是“虚拟
的”:它们被定义为指导分组操作符的实际数据上的函数。
注意:在接下来的讨论中,我们将使用DataStream API和keyBy。对于DataSet API,只需用DataSet和
groupBy替换即可。
为元组(Tuples)定义键
最简单的情况是对元组的一个或多个字段进行分组:
DataStream> input = // [...]
KeyedStream,Tuple> keyed = input.keyBy(0)
元组按第一个字段(Integer类型的字段)分组。
DataStream> input = // [...]
KeyedStream,Tuple> keyed = input.keyBy(0,1)
在这里,我们在一个由第一个和第二个字段组成的组合键上对元组进行分组。
关于嵌套元组的说明:如果您有一个嵌套元组的DataStream,例如:
DataStream,String,Long>> ds;
指定keyBy(0),Flink系统将使用完整的Tuple2作为键(整数和浮点数作为键)。如果想要“导航”到嵌套的
Tuple2,您必须使用下面演示的字段表达式键。
使用字段表达式定义键
可以使用基于String的字段表达式来说明嵌套字段,并为分组、排序、连接或联合分组定义键。
字段表达式使得在(嵌套的)复合类型(如Tuple和POJO类型)中选择字段变得非常容易。
在下面的示例中,我们有一个WC POJO,其中包含两个字段“word”和“count”。要按字段字分组,只需
将其名称传递给keyBy()函数。
// some ordinary POJO (Plain old Java Object)
public class WC {
public String word;
public int count;
}
DataStream words = // [...]
DataStream wordCounts = words.keyBy("word").window(/*window
specification*/);
字段表达式语法:
选择POJO字段:根据字段名选择POJO字段。例如,字段名“user”指的是POJO类型的“user”字段。
选择Tuple字段:根据字段名或0-offset字段索引选择Tuple字段。例如,“f0”和“5”分别引用Java元
组类型的第一个和第六个字段。
选择嵌套字段:可以选择pojo和元组中的嵌套字段。例如"user.zip"是指存储在POJO类型
的“user”字段中的POJO的“zip”字段。支持pojo和元组的任意嵌套和混合,如"f1.user.zip"
或"user.f3.1.zip"。
可以使用“*”通配符表达式选择完整类型。这也适用于非Tuple或POJO类型的类型。
字段表达式示例:
public static class WC {
public ComplexNestedClass complex; //nested POJO
private int count;
// getter / setter for private field (count)
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3 word;
public IntWritable hadoopCitizen;
}
下面这些是以上示例代码的有效字段表达式:
“count”:WC类中的count字段。
“complex”:递归选择POJO类型ComplexNestedClass的字段complex的所有字段。
“complex.word.f2”:选择嵌套Tuple3的最后一个字段。
“complex.hadoopCitizen":选择Hadoop IntWritable类型。
使用键选择器函数定义键
定义键的另一种方法是“键选择器”函数。键选择器函数接受单个元素作为输入并返回元素的键。key可以
是任何类型的,可以从确定性计算中得到。
下面的例子显示了键选择器函数(简单返回对象的字段):
// some ordinary POJO
public class WC {public String word; public int count;}
DataStream words = // [...]
KeyedStream keyed = words
.keyBy(new KeySelector() {
public String getKey(WC wc) { return wc.word; }
});
转换功能的指定
大多数转换需要用户定义的函数。本节列出了指定它们的不同方法
实现一个接口
最基本的方法是实现其中一个提供的接口:
class MyMapFunction implements MapFunction {
public Integer map(String value) { return Integer.parseInt(value); }
};
data.map(new MyMapFunction());
匿名类
你可以传递一个函数作为一个匿名类:
data.map(new MapFunction () {
public Integer map(String value) { return Integer.parseInt(value); }
});
Java 8 Lambdas表达式
Flink还在Java API中支持Java 8 Lambdas 表达式。
data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
Rich函数
所有需要用户定义函数的转换,都可以将一个Rich函数作为参数。例如可以将下列代码
class MyMapFunction implements MapFunction {
public Integer map(String value) { return Integer.parseInt(value); }
};
替换为:
class MyMapFunction extends RichMapFunction {
public Integer map(String value) { return Integer.parseInt(value); }
};
并像往常一样将函数传递给一个map转换:
data.map(new MyMapFunction());
Rich函数也可以定义为一个匿名类:
data.map (new RichMapFunction() {
public Integer map(String value) { return Integer.parseInt(value); }
});
除了用户定义的函数(map、reduce等)之外,Rich函数还提供四个方法:open、close、
getRuntimeContext和setRuntimeContext。它们对于参数化函数(请参阅将参数传递给函数)、创建和
初始化本地状态、访问广播变量(请参阅广播变量)以及访问诸如累加器和计数器之类的运行时信息(请参
阅累加器和计数器)和关于迭代的信息(请参阅迭代)非常有用。
支持的数据类型
Flink对DataSet或DataStream中的元素类型有一些限制。原因是系统会分析这些类型以确定有效的执
行策略。
数据类型有六种:
Java元组和Scala Case类
Java pojo
基本数据类型
常规类
值类型
Hadoop Writables类型
特殊类型
Java元组和Scala Case类
元组是包含固定数量的具有各种类型的字段的复合类型。Java API提供了从Tuple1到Tuple25的类。元
组的每个字段可以是任意的Flink类型,包括元组,嵌套元组。元祖的字段可以通过字段名称直接访问,
如:tuple.f4。或者使用通用的getter方法:tuple.getField(int position)。下标从0开始。注意,这与
Scala元组形成了对比,但是它与Java的常规索引一致。
DataStream> wordCounts = env.fromElements(
new Tuple2("hello", 1),
new Tuple2("world", 2));
wordCounts.map(new MapFunction, Integer>() {
@Override
public Integer map(Tuple2 value) throws Exception {
return value.f1;
}
});
wordCounts.keyBy(0); // also valid .keyBy("f0")
POJOs
Java和Scala类被Flink视为特殊的POJO数据类型,如果它们满足以下要求:
类必须是public的。
它必须有一个无参公共构造函数(默认构造函数)。
所有字段要么是public的,要么必须通过getter和setter函数进行访问。对于一个名为foo的字段,
getter和setter方法必须分别命名为getFoo()和setFoo()。
已注册的序列化器必须支持字段的类型。
POJOs通常用PojoTypeInfo表示,并使用PojoSerializer进行序列化(使用Kryo作为可配置)。当POJOs实
际上是Avro类型(Avro特定的记录)或作为“Avro反射类型”生成时除外。在这种情况下,POJO由
AvroTypeInfo表示,并使用AvroSerializer进行序列化。如果需要,还可以注册自己的定制序列化器;有
关更多信息,请参见序列化。
Flink分析POJO类型的结构,即,它学习POJO的字段。因此,POJO类型比一般类型更容易使用。此
外,与一般类型相比,Flink可以更有效地处理POJO。
下面的示例显示了一个简单的POJO,它有两个public字段。
public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) {
this.word = word;
this.count = count;
}
}
DataStream wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy("word"); // key by field expression "word"
基本数据类型
Flink支持所有Java和Scala基本数据类型,如Integer, String和Double。
常规类型
Flink支持大多数Java和Scala类(API和自定义)。限制适用于包含不能序列化的字段的类,如文件指针、
I/O流或其他native资源。通常,遵循Java bean约定的类工作得很好。
所有未标识为POJO类型的类(请参见上面的POJO需求)都由Flink作为常规类类型处理。Flink将这些数据
类型视为黑盒,因此无法访问它们的内容(例如,为了高效排序)。常规类型使用序列化框架Kryo序列化/
反序列化。
值类型
值类型手动描述它们的序列化和反序列化。它们不是通过通用的序列化框架,而是通过实现
org.apache.flinktypes.Value接口(包含读写方法)为这些操作提供定制代码。当通用序列化效率非常低
时,使用值类型是合理的。例如,数据类型将元素的稀疏向量实现为数组。知道数组大部分是零,就可
以对非零元素使用特殊的编码,而通用序列化需编写所有的数组元素。
The org.apache.flinktypes.CopyableValue接口以类似的方式支持手动内部克隆逻辑。
Flink附带了与基本数据类型相对应的预定义值类型。(ByteValue, ShortValue, IntValue, LongValue,
FloatValue, DoubleValue, StringValue, CharValue, BooleanValue)这些值类型充当基本数据类型的可
变变体:它们的值可以更改,允许程序员重用对象并减轻垃圾收集器的压力。
Hadoop Writables
您可以使用实现org.apache.hadoop.Writable接口的类型。在write()和readFields()方法中定义的序列
化逻辑将用于序列化。
特殊类型
可以使用特殊类型,包括Scala的Either、Option和Try。Java API有自己的Either自定义实现。与Scala
的Either类似,它表示两个可能类型的值,Left 或 Right。对于错误处理或需要输出两种不同类型记录
的操作符,Either非常有用。
类型擦除和类型推断
注意:本节只与Java相关。