logo资料库

Flink1.9官方文档(中文版).pdf

第1页 / 共17页
第2页 / 共17页
第3页 / 共17页
第4页 / 共17页
第5页 / 共17页
第6页 / 共17页
第7页 / 共17页
第8页 / 共17页
资料共17页,剩余部分请下载后查看
基本API概念
DataSet和DataStream
Flink程序剖析
延迟计算
key的指定
为元组(Tuples)定义键
使用字段表达式定义键
使用键选择器函数定义键
转换功能的指定
实现一个接口
匿名类
Java 8 Lambdas表达式
Rich函数
支持的数据类型
Java元组和Scala Case类
POJOs
基本数据类型
常规类型
值类型
Hadoop Writables
特殊类型
类型擦除和类型推断
累加器和计数器
如何使用累加器
自定义累加器
Flink DataStream API编程指南
示例程序
数据源
DataStream数据转换
数据Sinks
迭代
执行参数
容错
控制延迟
调试
本地执行环境
集合数据源
迭代器数据sink
下一步如何做?
事件时间
事件时间/处理时间/接入时间
设定时间特性
事件时间和水印
关注公众号 登峰大数据 阅读完整版   基本API概念 Flink程序是在分布式集合上实现transformations的常规程序(例如,过滤、映射、更新状态、连接、分 组、定义窗口、聚合)。集合是从源(例如,从文件、kafka Topic或本地内存集合中读取数据)创建的。读 取的结果通过sinks返回,例如,sink可以将数据写入(分布式)文件,或者写入标准输出(例如,命令行终 端)。Flink程序在各种上下文中运行,standalone运行或嵌入到其他程序中。程序可以在本地JVM中执 行,也可以在许多机器的集群中进行。根据数据源的类型,即有界或无界数据源,可以编写一个批处理 程序或一个流式程序,其中DataSet API用于批处理程序,DataStream API用于流式处理程序。本指南 将介绍这两个API共有的基本概念,但请参阅我们的流处理指南和批处理指南,以获得关于使用每个API 编写程序的具体信息。  注意 在展示如何使用这些API的实际示例时,我们将使用StreamingExecutionEnvironment和DataStream API。在DataSet API中,概念是完全相同的,只是被ExecutionEnvironment和DataSet替换了。
DataSet和DataStream Flink程序剖析 延迟计算 key的指定 转换功能的指定 支持的数据类型 累加器和计数器 DataSet和DataStream Flink用特殊的类DataSet和DataStream来表示程序中的数据。可以将它们视为不可变的数据集合,其 中可能包含重复的数据。对于DataSet,数据是有界的,而对于DataStream,元素的数量是无界的。 这些集合在某些关键方面与常规Java集合不同。首先,它们是不可变的,这意味着一旦创建了它们,就 不能添加或删除元素。也不能简单地检查其中的元素。 初始集合是通过在Flink程序中添加一个source(源)来创建的,而新的集合则是通过使用map、filter等 API方法对它们进行转换而派生出来的。 Flink程序剖析 Flink程序看起来像转换数据集合的常规程序。每个程序都由相同的基本部分组成: 1. 获取一个执行环境, 2. 加载/创建初始数据, 3. 指定对该数据的转换, 4. 指定放置计算结果的位置, 5. 触发程序执行 现在,我们将对每个步骤进行概述,请参阅相应的部分以了解更多细节。注意,Java DataSet API的所 有核心类都可以在org.apache.flink.api.java包中找到,而Java DataStream API的类可以在 org.apache.flink.streaming.api包中找到。 StreamExecutionEnvironment是所有Flink程序的基础。你可以获得一个 StreamExecutionEnvironment,使用其中的静态方法: getExecutionEnvironment() createLocalEnvironment() createRemoteEnvironment(String host, int port, String... jarFiles) 通常,只需要使用getExecutionEnvironment(),因为这将根据上下文做正确的事:如果你执行程序在IDE 或普通Java程序将创建一个本地环境,将在本地机器上执行程序。如果将程序打包为一个JAR文件,并通 过命令行调用它,那么Flink集群管理器将执行您的main方法,而getExecutionEnvironment()将返回一 个用于在集群中执行程序的执行环境。 为了指定数据源,执行环境有几个方法,可以使用各种方式从文件中读取数据:可以逐行读取文件,如 CSV文件,或者使用完全定制的数据输入格式。要将文本文件作为行序列读取,可以使用以下命令: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream text = env.readTextFile("file:///path/to/file");
这将为您提供一个DataStream,您可以在其上应用转换来创建新的派生DataStreams。 通过使用转换函数调用DataStream上的方法来应用转换。例如,一个map转换看起来像这样: DataStream input = ...; DataStream parsed = input.map(new MapFunction() {    @Override    public Integer map(String value) {        return Integer.parseInt(value);   } }); 这将通过将原始集合中的每个String转换为Integer来创建一个新的DataStream。 一旦有了包含最终结果的DataStream,就可以通过创建sink将其写入外部系统。这些只是一些创建sink 的示例方法: writeAsText(String path) print() 一旦开发了完整的程序,就需要通过调用StreamExecutionEnvironment上的execute()来触发程序执 行。根据ExecutionEnvironment的类型,可以在本地机器上执行程序,或将程序提交到集群上执行。 execute()方法返回一个JobExecutionResult,它包含执行时间和累加器结果。 有关流数据源和接收器(sink)的信息,以及关于DataStream上支持的转换的更深入的信息,请参阅流处 理指南。 有关批处理数据源和接收器(sink)的信息,以及有关DataSet上支持的转换的更深入的信息,请参阅批处 理指南。 延迟计算 所有的Flink程序都是延迟执行的:当程序的main方法执行时,数据加载和转换不会立即执行。相反, 每个操作都被创建并添加到程序的计划中。当执行环境中的execute()调用显式地触发执行时,这些操作 才被实际执行。程序是在本地执行还是在集群中执行,取决于执行环境的类型。 延迟计算允许构建复杂的程序,Flink将其作为一个整体计划单元执行。 key的指定 一些转换(join、coGroup、keyBy、groupBy)要求在元素集合上定义一个键(key)。其他转换(Reduce、 GroupReduce、Aggregate、Windows)允许在应用数据之前,按key对数据进行分组。 DataSet被分组为 DataSet<...> input = // [...] DataSet<...> reduced = input .groupBy(/*define key here*/) .reduceGroup(/*do something*/); 而key也可以在DataStream上使用
DataStream<...> input = // [...] DataStream<...> windowed = input .keyBy(/*define key here*/) .window(/*window specification*/); Flink的数据模型不是基于键值对的。因此,您不需要硬性将数据集类型改为键和值类型。键是“虚拟 的”:它们被定义为指导分组操作符的实际数据上的函数。 注意:在接下来的讨论中,我们将使用DataStream API和keyBy。对于DataSet API,只需用DataSet和 groupBy替换即可。 为元组(Tuples)定义键 最简单的情况是对元组的一个或多个字段进行分组: DataStream> input = // [...] KeyedStream,Tuple> keyed = input.keyBy(0) 元组按第一个字段(Integer类型的字段)分组。 DataStream> input = // [...] KeyedStream,Tuple> keyed = input.keyBy(0,1) 在这里,我们在一个由第一个和第二个字段组成的组合键上对元组进行分组。 关于嵌套元组的说明:如果您有一个嵌套元组的DataStream,例如: DataStream,String,Long>> ds; 指定keyBy(0),Flink系统将使用完整的Tuple2作为键(整数和浮点数作为键)。如果想要“导航”到嵌套的 Tuple2,您必须使用下面演示的字段表达式键。 使用字段表达式定义键 可以使用基于String的字段表达式来说明嵌套字段,并为分组、排序、连接或联合分组定义键。 字段表达式使得在(嵌套的)复合类型(如Tuple和POJO类型)中选择字段变得非常容易。 在下面的示例中,我们有一个WC POJO,其中包含两个字段“word”和“count”。要按字段字分组,只需 将其名称传递给keyBy()函数。 // some ordinary POJO (Plain old Java Object) public class WC {  public String word;  public int count; } DataStream words = // [...] DataStream wordCounts = words.keyBy("word").window(/*window specification*/); 字段表达式语法: 选择POJO字段:根据字段名选择POJO字段。例如,字段名“user”指的是POJO类型的“user”字段。 选择Tuple字段:根据字段名或0-offset字段索引选择Tuple字段。例如,“f0”和“5”分别引用Java元 组类型的第一个和第六个字段。
选择嵌套字段:可以选择pojo和元组中的嵌套字段。例如"user.zip"是指存储在POJO类型 的“user”字段中的POJO的“zip”字段。支持pojo和元组的任意嵌套和混合,如"f1.user.zip" 或"user.f3.1.zip"。 可以使用“*”通配符表达式选择完整类型。这也适用于非Tuple或POJO类型的类型。 字段表达式示例: public static class WC {  public ComplexNestedClass complex; //nested POJO  private int count;  // getter / setter for private field (count)  public int getCount() {    return count; }  public void setCount(int c) {    this.count = c; } } public static class ComplexNestedClass {  public Integer someNumber;  public float someFloat;  public Tuple3 word;  public IntWritable hadoopCitizen; } 下面这些是以上示例代码的有效字段表达式: “count”:WC类中的count字段。 “complex”:递归选择POJO类型ComplexNestedClass的字段complex的所有字段。 “complex.word.f2”:选择嵌套Tuple3的最后一个字段。 “complex.hadoopCitizen":选择Hadoop IntWritable类型。 使用键选择器函数定义键 定义键的另一种方法是“键选择器”函数。键选择器函数接受单个元素作为输入并返回元素的键。key可以 是任何类型的,可以从确定性计算中得到。 下面的例子显示了键选择器函数(简单返回对象的字段): // some ordinary POJO public class WC {public String word; public int count;} DataStream words = // [...] KeyedStream keyed = words .keyBy(new KeySelector() {     public String getKey(WC wc) { return wc.word; }   }); 转换功能的指定 大多数转换需要用户定义的函数。本节列出了指定它们的不同方法 实现一个接口 最基本的方法是实现其中一个提供的接口:
class MyMapFunction implements MapFunction {  public Integer map(String value) { return Integer.parseInt(value); } }; data.map(new MyMapFunction()); 匿名类 你可以传递一个函数作为一个匿名类: data.map(new MapFunction () {  public Integer map(String value) { return Integer.parseInt(value); } }); Java 8 Lambdas表达式 Flink还在Java API中支持Java 8 Lambdas 表达式。 data.filter(s -> s.startsWith("http://")); data.reduce((i1,i2) -> i1 + i2); Rich函数 所有需要用户定义函数的转换,都可以将一个Rich函数作为参数。例如可以将下列代码 class MyMapFunction implements MapFunction {  public Integer map(String value) { return Integer.parseInt(value); } }; 替换为: class MyMapFunction extends RichMapFunction {  public Integer map(String value) { return Integer.parseInt(value); } }; 并像往常一样将函数传递给一个map转换: data.map(new MyMapFunction()); Rich函数也可以定义为一个匿名类: data.map (new RichMapFunction() {  public Integer map(String value) { return Integer.parseInt(value); } }); 除了用户定义的函数(map、reduce等)之外,Rich函数还提供四个方法:open、close、 getRuntimeContext和setRuntimeContext。它们对于参数化函数(请参阅将参数传递给函数)、创建和 初始化本地状态、访问广播变量(请参阅广播变量)以及访问诸如累加器和计数器之类的运行时信息(请参 阅累加器和计数器)和关于迭代的信息(请参阅迭代)非常有用。 支持的数据类型
Flink对DataSet或DataStream中的元素类型有一些限制。原因是系统会分析这些类型以确定有效的执 行策略。 数据类型有六种: Java元组和Scala Case类 Java pojo 基本数据类型 常规类 值类型 Hadoop Writables类型 特殊类型 Java元组和Scala Case类 元组是包含固定数量的具有各种类型的字段的复合类型。Java API提供了从Tuple1到Tuple25的类。元 组的每个字段可以是任意的Flink类型,包括元组,嵌套元组。元祖的字段可以通过字段名称直接访问, 如:tuple.f4。或者使用通用的getter方法:tuple.getField(int position)。下标从0开始。注意,这与 Scala元组形成了对比,但是它与Java的常规索引一致。 DataStream> wordCounts = env.fromElements(    new Tuple2("hello", 1),    new Tuple2("world", 2)); wordCounts.map(new MapFunction, Integer>() {    @Override    public Integer map(Tuple2 value) throws Exception {        return value.f1;   } }); wordCounts.keyBy(0); // also valid .keyBy("f0") POJOs Java和Scala类被Flink视为特殊的POJO数据类型,如果它们满足以下要求: 类必须是public的。 它必须有一个无参公共构造函数(默认构造函数)。 所有字段要么是public的,要么必须通过getter和setter函数进行访问。对于一个名为foo的字段, getter和setter方法必须分别命名为getFoo()和setFoo()。 已注册的序列化器必须支持字段的类型。 POJOs通常用PojoTypeInfo表示,并使用PojoSerializer进行序列化(使用Kryo作为可配置)。当POJOs实 际上是Avro类型(Avro特定的记录)或作为“Avro反射类型”生成时除外。在这种情况下,POJO由 AvroTypeInfo表示,并使用AvroSerializer进行序列化。如果需要,还可以注册自己的定制序列化器;有 关更多信息,请参见序列化。 Flink分析POJO类型的结构,即,它学习POJO的字段。因此,POJO类型比一般类型更容易使用。此 外,与一般类型相比,Flink可以更有效地处理POJO。 下面的示例显示了一个简单的POJO,它有两个public字段。 public class WordWithCount {    public String word;
   public int count;    public WordWithCount() {}    public WordWithCount(String word, int count) {        this.word = word;        this.count = count;   } } DataStream wordCounts = env.fromElements(    new WordWithCount("hello", 1),    new WordWithCount("world", 2)); wordCounts.keyBy("word"); // key by field expression "word" 基本数据类型 Flink支持所有Java和Scala基本数据类型,如Integer, String和Double。 常规类型 Flink支持大多数Java和Scala类(API和自定义)。限制适用于包含不能序列化的字段的类,如文件指针、 I/O流或其他native资源。通常,遵循Java bean约定的类工作得很好。 所有未标识为POJO类型的类(请参见上面的POJO需求)都由Flink作为常规类类型处理。Flink将这些数据 类型视为黑盒,因此无法访问它们的内容(例如,为了高效排序)。常规类型使用序列化框架Kryo序列化/ 反序列化。 值类型 值类型手动描述它们的序列化和反序列化。它们不是通过通用的序列化框架,而是通过实现 org.apache.flinktypes.Value接口(包含读写方法)为这些操作提供定制代码。当通用序列化效率非常低 时,使用值类型是合理的。例如,数据类型将元素的稀疏向量实现为数组。知道数组大部分是零,就可 以对非零元素使用特殊的编码,而通用序列化需编写所有的数组元素。 The org.apache.flinktypes.CopyableValue接口以类似的方式支持手动内部克隆逻辑。 Flink附带了与基本数据类型相对应的预定义值类型。(ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue)这些值类型充当基本数据类型的可 变变体:它们的值可以更改,允许程序员重用对象并减轻垃圾收集器的压力。 Hadoop Writables 您可以使用实现org.apache.hadoop.Writable接口的类型。在write()和readFields()方法中定义的序列 化逻辑将用于序列化。 特殊类型 可以使用特殊类型,包括Scala的Either、Option和Try。Java API有自己的Either自定义实现。与Scala 的Either类似,它表示两个可能类型的值,Left 或 Right。对于错误处理或需要输出两种不同类型记录 的操作符,Either非常有用。 类型擦除和类型推断 注意:本节只与Java相关。
分享到:
收藏