1、Flink Table API和Flink SQL的全面解析1. 整体概述1.1 什么是 Table API 和 Flink SQLApache Flink是批流统一的处理框架,具有两个关系API-Table API和SQL-用于统一流和批处理的上层API。Table API是Java,Scala和Python的语言集成查询API,它允许以非常直观的方式组合来自关系运算符(例如选择,过滤和联接)的查询。Flink的SQL就是可以在代码中写sql,实现一些查询操作,支持基于实现SQL标准的Apache Calcite(Apache开源SQL解析工具)。无论输入是连续的(流式)还是有界的(批处理)
2、,在两个接口中指定的查询都具有相同的语义,并指定相同的结果。官网介绍:https:/ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/overview/1.2 为什么需要Table API & SQLl 声明式:属于设定式语言,用户只要表达清楚需求即可,不需要了解底层执行;l 高性能:可优化,内置多种查询优化器,这些查询优化器可为 SQL 翻译出最优执行计划;l 简单易学:易于理解,不同行业和领域的人都懂,学习成本较低;l 标准稳定:语义遵循SQL标准,非常稳定,在数据库 30 多年的历史中,SQL 本
3、身变化较少;l 流批统一:可以做到API层面上流与批的统一,相同的SQL逻辑,既可流模式运行,也可批模式运行,Flink底层Runtime本身就是一个流与批统一的引擎1.3 Table API& SQL发展历程l 架构升级自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。随着版本的不断更新,API 也出现了很多不兼容的地方。在 Flink 1.9 中,Tab
4、le 模块迎来了核心架构的升级,引入了阿里巴巴Blink团队贡献的诸多功能图示,在Flink 1.9 之前,Flink API 层 一直分为DataStream API 和 DataSet API,Table API & SQL 位于 DataStream API 和 DataSet API 之上。可以看出流处理和批处理有各自独立的api (流处理DataStream,批处理DataSet)。而且有不同的执行计划解析过程,codegen过程也完全不一样,完全没有流批一体的概念,面向用户不太友好。在Flink1.9之后新的架构中,有两个查询处理器:Flink Query Processor,也称
5、作Old Planner和Blink Query Processor,也称作Blink Planner。为了兼容老版本Table及SQL模块,插件化实现了Planner,Flink原有的Flink Planner不变,后期版本会被移除。新增加了Blink Planner,新的代码及特性会在Blink planner模块上实现。批或者流都是通过解析为Stream Transformation来实现的,不像Flink Planner,批是基于Dataset,流是基于DataStream。l 查询处理器的选择 查询处理器是 Planner 的具体实现,通过parser、optimizer、codeg
6、en(代码生成技术)等流程将 Table API & SQL作业转换成 Flink Runtime 可识别的 Transformation DAG,最终由 Flink Runtime 进行作业的调度和执行。Flink Query Processor查询处理器针对流计算和批处理作业有不同的分支处理,流计算作业底层的 API 是 DataStream API, 批处理作业底层的 API 是 DataSet APIBlink Query Processor查询处理器则实现流批作业接口的统一,底层的 API 都是Transformation,这就意味着我们和Dataset完全没有关系了Flink1.1
7、1之后Blink Query Processor查询处理器已经是默认的了1.4 两种planner(old & blink)的区别l Blink 将批处理作业视作流处理的一种特例。严格来说,Table和DataSet之间不支持相互转换,并且批处理作业也不会转换成DataSet程序而是转换成DataStream程序,流处理作业也一样。l Blink 计划器不支持BatchTableSource,而是使用有界的StreamTableSource来替代。l 旧计划器和 Blink 计划器中FilterableTableSource的实现是不兼容的。旧计划器会将PlannerExpression下推至
8、FilterableTableSource,而 Blink 计划器则是将Expression下推。l PlannerConfig在两种计划器中的实现(CalciteConfig)是不同的。l Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),TableEnvironment和StreamTableEnvironment都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。l 旧计划器目前不支持 catalog 统计数据,而 Blink 支持。https:/ci.apache.org/projects/flink/fl
9、ink-docs-release-1.13/zh/docs/dev/table/common/2. 概念和通用APITable API 和 SQL 集成在同一套 API 中。这套 API 的核心概念是Table,用作查询的输入和输出。本文介绍了 Table API 和 SQL 查询程序的通用结构、如何注册Table、如何查询Table以及如何输出Table。2.1 需要引入的依赖参考官网:https:/ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/overview/ org.apache.flink
10、 flink-table-api-scala-bridge_2.11 $flink.version org.apache.flink flink-table-api-java-bridge_2.11 $flink.version org.apache.flink flink-table-planner_2.11 $flink.version org.apache.flink flink-table-planner-blink_2.11 $flink.version org.apache.flink flink-table-common $flink.version org.apache.fli
11、nk flink-runtime-web_2.11 $flink.version l flink-table-common:这个包中主要是包含 Flink Planner 和 Blink Planner一些共用的代码。l flink-table-api-java:这部分是用户编程使用的 API,包含了大部分的 API。l flink-table-api-scala:这里只是非常薄的一层,仅和 Table API 的 Expression 和 DSL 相关。l 两个 Planner:flink-table-planner 和 flink-table-planner-blink。l 两个 Brid
12、ge:flink-table-api-scala-bridge 和 flink-table-api-java-bridge,Flink Planner 和 Blink Planner 都会依赖于具体的 JavaAPI,也会依赖于具体的 Bridge,通过 Bridge 可以将 API 操作相应的转化为Scala 的 DataStream、DataSet,或者转化为 JAVA 的 DataStream 或者Data Set2.2 基本程序结构所有用于批处理和流处理的 Table API 和 SQL 程序结构,与流式处理的程序结构类似;也可以近似地认为有这么几步:首先创建执行环境,然后定义sour
13、ce、transform和sink。具体操作流程如下:/ 创建表的执行环境TableEnvironment tableEnv = .;/ 创建一张表,用来读取数据tableEnv.executeSql(CREATE TEMPORARY TABLE table1 . WITH ( connector = . );/注册一张表,用来输出计算结果数据tableEnv.executeSql(CREATE TEMPORARY TABLE outputTable . WITH ( connector = . );/通过Table API查询算子,得到一张结果表Table table2 = tableEnv
14、.from(table1).select(.);/通过SQL查询语句,得到一张结果表Table table3 = tableEnv.sqlQuery(SELECT . FROM table1 . );/将结果表写入到输出表中TableResult tableResult = table2.executeInsert(outputTable);tableResult.2.3 创建 TableEnvironmentTableEnvironment是 Table API 和 SQL 的核心概念。它负责:l 注册catalogl 在内部 catalog 中注册表l 执行 SQL 查询l 注册用户自定义
15、函数l 将DataStream转换为表l 保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用在创建TableEnv的时候,可以多传入一个EnvironmentSettings或者TableConfig参数,可以用来配置TableEnvironment 的一些特性。比如:l 配置老版本的流式查询(Flink-Streaming-Query): / */ FLINK STREAMING QUERY/ *import org.apache.flink.streaming.api.environment.StreamExecutionEn
16、vironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner() / 使用老版本planner.inStreamingMode() / 流处理模式.build();StreamExecutionEnvironment fsEnv =
17、StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);/ or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);l 基于老版本的批处理环境(Flink-Batch-Query):/ */ FLINK BATCH QUERY/ *import org.apache.flink.api.java.
18、ExecutionEnvironment;import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);l 基于blink版本的流处理环境(Blink-Streaming-Query):/ */ BLINK STREAMING QU
19、ERY/ *import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();Enviro
20、nmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner()/ 使用新版本planner.inStreamingMode()/ 流处理模式.build();StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);/ or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);l 基于blink版本的批处理环境(
21、Blink-Batch-Query):/ */ BLINK BATCH QUERY/ *import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.TableEnvironment;EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner()/ 使用新版本planner.inBatchMode()/ 批处理模式.build();TableEnvironment bbTabl
22、eEnv = TableEnvironment.create(bbSettings);2.4 在Catalog中注册表2.4.1 表(Table)的概念TableEnvironment 维护着一个由标识符(identifier)创建的表 catalog 的映射。标识符由三个部分组成:Catalog名、数据库(database)名和对象名(表名)。如果 catalog 或者数据库没有指明,就会使用当前默认值。Table 可以是虚拟的(视图 VIEWS)也可以是常规的(表 TABLES)。视图 VIEWS可以从已经存在的Table中创建,一般是 Table API 或者 SQL 的查询结果。 表T
23、ABLES描述的是外部数据,例如文件、数据库表或者消息队列。2.4.2 临时表(Temporary Table)和永久表(Permanent Table)表可以是临时的,并与单个 Flink 会话(session)的生命周期相关表也可以是永久的,并且在多个 Flink 会话和群集(cluster)中可见永久表需要catalog(例如 Hive Metastore)以维护表的元数据。一旦永久表被创建,它将对任何连接到 catalog 的 Flink 会话可见且持续存在,直至被明确删除。另一方面,临时表通常保存于内存中并且仅在创建它们的 Flink 会话持续期间存在。这些表对于其它会话是不可见的。
24、它们不与任何 catalog 或者数据库绑定但可以在一个命名空间(namespace)中创建。即使它们对应的数据库被删除,临时表也不会被删除。2.4.3 创建表2.4.3.1 虚拟表在 SQL 的术语中,Table API 的对象对应于视图(虚拟表)。它封装了一个逻辑查询计划。它可以通过以下方法在 catalog 中创建:/ get a TableEnvironmentTableEnvironment tableEnv = .; / see Create a TableEnvironment section/ table is the result of a simple projection
25、 queryTable projTable = tableEnv.from(X).select(.);/ register the Table projTable as table projectedTabletableEnv.createTemporaryView(projectedTable, projTable);l 注意事项从传统数据库系统的角度来看,Table对象与VIEW视图非常像。也就是,定义了Table的查询是没有被优化的, 而且会被内嵌到另一个引用了这个注册了的Table的查询中。如果多个查询都引用了同一个注册了的Table,那么它会被内嵌每个查询中并被执行多次, 也就是说注
26、册了的Table的结果不会被共享(注:Blink 计划器的TableEnvironment会优化成只执行一次)2.4.3.2 Connector Tables另外一个方式去创建TABLE是通过connector声明。Connector 描述了存储表数据的外部系统。存储系统例如 Apache Kafka 或者常规的文件系统都可以通过这种方式来声明。tableEnvironment .connect(.) .withFormat(.) .withSchema(.) .inAppendMode() .createTemporaryTable(MyTable)2.4.3.3 扩展表标识符表总是通过三元
27、标识符注册,包括 catalog 名、数据库名和表名。用户可以指定一个 catalog 和数据库作为 “当前catalog” 和当前数据库。有了这些,那么刚刚提到的三元标识符的前两个部分就可以被省略了。如果前两部分的标识符没有指定, 那么会使用当前的 catalog 和当前数据库。用户也可以通过 Table API 或 SQL 切换当前的 catalog 和当前的数据库。标识符遵循 SQL 标准,因此使用时需要用反引号()进行转义。TableEnvironment tEnv = .;tEnv.useCatalog(custom_catalog);tEnv.useDatabase(custom_
28、database);Table table = .;/在custom_catalog”的目录中注册名称“exampleView”的视图/在“custom_database”的数据库中tableEnv.createTemporaryView(exampleView, table);/在“custom_catalog”的目录中注册名称“exampleView”的视图/在“other_database”的数据库中tableEnv.createTemporaryView(other_database.exampleView, table);/在“custom_catalog”的目录中注册名称“exam
29、ple.view”的视图/在“custom_database”的数据库中tableEnv.createTemporaryView(example.View, table);/在“other_catalog”的目录中注册名称“exampleView”的视图/在“other_database”的数据库中tableEnv.createTemporaryView(other_catalog.other_database.exampleView, table);2.5 查询表利用外部系统的连接器connector,我们可以读写数据,并在环境的Catalog中注册表。接下来就可以对表做查询转换了。Flin
30、k给我们提供了两种查询方式:Table API和 SQL。2.5.1 Table API的调用Table API基于代表一张“表”的Table类,并提供一整套操作处理的方法API。这些方法会返回一个新的Table对象,这个对象就表示对输入表应用转换操作的结果。有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构。例如table.select().filter(),其中select()表示选择表中指定的字段,filter()表示筛选条件。注意:通过导入可以启用Java Table APIorg.apache.flink.table.api.java.*。下面的示例演示如何构造Java T
31、able API程序以及如何将表达式指定为字符串。对于Expression DSL,还必须导入静态org.apache.flink.table.api.Expressions.*import static org.apache.flink.table.api.Expressions.*;代码中的实现如下:/ 3. 表的查询转换Table orderTable = tableEnv.from(inputTable);/ 3.1 简单查询转换Table resultTable = orderTable .select($(id),$(timestamp),$(category),$(areaNam
32、e),$(money) .filter($(areaName).isEqual(北京);也可以加上聚合操作,比如我们统计每个城市订单数据出现的个数,做个count统计:/ 3. 表的查询转换Table orderTable = tableEnv.from(inputTable);/ 3.2 聚合转换Table aggResultSqlTable = orderTable .groupBy($(areaName) .select($(areaName), $(id).count().as(cnt);2.5.2 SQL查询Flink的SQL集成,基于的是ApacheCalcite,它实现了SQL标
33、准。在Flink中,用常规字符串来定义SQL查询语句。SQL 查询的结果,是一个新的 Table。l 下面的示例演示了如何指定查询并将结果作为Table对象返回/ 3. 表的查询转换Table orderTable = tableEnv.from(inputTable);/ 3.1 简单查询转换Table resultTable2 = tableEnv.sqlQuery(select id,timestamp,category,areaName,money from inputTable where areaName=北京);也可以加上聚合操作,比如我们统计每个城市订单数据出现的个数,做个co
34、unt统计:/ 3. 表的查询转换Table orderTable = tableEnv.from(inputTable);/ 3.2 聚合转换Table aggResultSqlTable2 = tableEnv.sqlQuery(select areaName, count(1) as cnt from inputTable group by areaName);l 如下的示例展示了如何指定一个更新查询,将查询的结果插入到已注册的表中/ get a TableEnvironmentTableEnvironment tableEnv = .; / see Create a TableEnvi
35、ronment section/ register Orders table/ register RevenueFrance output table/ compute revenue for all customers from France and emit to RevenueFrancetableEnv.executeSql( INSERT INTO RevenueFrance + SELECT cID, cName, SUM(revenue) AS revSum + FROM Orders + WHERE cCountry = FRANCE + GROUP BY cID, cName
36、);2.6 将DataStream 转换成表Flink允许我们把Table和DataStream做转换:可以基于一个DataStream,先流式地读取数据源,然后map成样例类,再把它转成Table。Table的列字段(column fields),就是样例类里的字段,这样就不用再麻烦地定义schema了。2.6.1 代码表达代码中实现非常简单,直接用tableEnv.fromDataStream()就可以了。默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。这就允许我们更换字段的顺序、重命名,或者只选取某些字段出来,相当于做了一次ma
37、p操作(或者Table API的 select操作)。代码具体如下: / 从文件读取数据 String filePath = TableApiTest.class.getClassLoader().getResource(order.csv).getPath(); DataStreamSource inputStream = env.readTextFile(filePath); / map成样例类类型 SingleOutputStreamOperator dataStream = inputStream.map(new MapFunction() Override public OrderI
38、nfo map(String data) throws Exception String dataArray = data.split(,); return new OrderInfo(dataArray0, dataArray1, Double.parseDouble(dataArray2), dataArray3); ); / 2. 基于tableEnv,将流转换成表 Table dataTable = tableEnv.fromDataStream(dataStream);2.6.2 数据类型与Table schema的对应在上面的例子中,DataStream 中的数据类型,与表的 Sc
39、hema 之间的对应关系,是按照样例类中的字段名来对应的(name-based mapping),所以还可以用as做重命名。另外一种对应方式是,直接按照字段的位置来对应(position-based mapping),对应的过程中,就可以直接指定新的字段名了。l 基于名称的对应:Table dataTable = tableEnv.fromDataStream(dataStream,$(id).as(ts), $(timestamp).as(myId), $(money), $(category);l 基于位置的对应:Table dataTable3 = tableEnv.fromDataSt
40、ream(dataStream,$(id), $(timestamp), $(money), $(category);Flink的DataStream和 DataSet API支持多种类型。组合类型,比如元组(内置Scala和Java元组)、POJO、Scala case类和Flink的Row类型等,允许具有多个字段的嵌套数据结构,这些字段可以在Table的表达式中访问。其他类型,则被视为原子类型。元组类型和原子类型,一般用位置对应会好一些;如果非要用名称对应,也是可以的:元组类型,默认的名称是 “_1”, “_2”;而原子类型,默认名称是 ”f0”。2.6.3 使用案例/* * 将DataS
41、tream 转换成表 */public class DataStreamToTable public static void main(String args) throws Exception / 0. 创建流执行环境,读取数据并转换成样例类类型 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inS
42、treamingMode().build(); env.getConfig().setRestartStrategy(RestartStrategies.noRestart(); env.setParallelism(1); / 从文件读取数据 String filePath = TableApiTest.class.getClassLoader().getResource(order.csv).getPath(); DataStreamSource inputStream = env.readTextFile(filePath); / map成样例类类型 SingleOutputStreamOperator dataStream = inputStream.map(new MapFunction() Override public OrderInfo map(String data) throws Exception String dataArray = data.split(,);