13.1、Flink Table API & SQL - Hive之hive概览
Apache Hive已经成为数据仓库生态系统的焦点。它不仅是一个用于大数据分析和ETL的SQL引擎,也是一个数据管理平台,在这里数据被发现、定义和发展。
Flink提供了与Hive的双重集成。第一种方法是利用Hive的Metastore作为一个持久目录,在会话之间存储Flink特定的元数据。第二个是提供Flink作为读取和写入Hive表的替代引擎。
hive目录被设计为与现有hive安装兼容的“开箱即用”。您不需要修改现有的Hive Metastore或更改表的数据位置或分区。
支持的Hive版本
Flink支持Hive 2.3.4和1.2.1,并依赖于Hive对其他低版本的兼容性保证。
如果您使用不同的Hive低版本,比如1.2.2或2.3.1,那么选择最近的版本1.2.1(适用于1.2.2)或2.3.4(适用于2.3.1)也可以。例如,您想使用Flink在sql client中集成2.3.1 hive版本,只需在YAML配置中将hive版本设置为2.3.4。类似地,在通过表API创建HiveCatalog实例时传递版本号。
欢迎用户尝试使用此解决方案的不同版本。由于只测试了2.3.4和1.2 .1,所以可能会出现一些意想不到的问题。我们将在未来的版本中测试和支持更多的版本。
Hive Maven依赖
要与Hive集成,用户需要在他们的项目中使用以下依赖项。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive__2.11</artifactId>
<version>1.10-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<!-- Hadoop Dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility-1.10-SNAPSHOT</artifactId>
<version>1.10-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<!-- Hive 2.3.4 is built with Hadoop 2.7.2. We pick 2.7.5 which flink-shaded-hadoop is pre-built with, but users can pick their own hadoop version, as long as it's compatible with Hadoop 2.7.2 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber-2.7.5-1.10-SNAPSHOT</artifactId>
<version>1.10-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<!-- Hive Metastore -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.3.4</version>
</dependency>
连接到Hive
通过表元数据或YAML配置文件使用Hive Catalog连接到现有的Hive集群。
String name = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "/opt/hive-conf";
String version = "2.3.4"; // or 1.2.1
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tableEnv.registerCatalog(hive);
catalogs:
- name: myhive
type: hive
property-version: 1
hive-conf-dir: /opt/hive-conf
hive-version: 2.3.4 # or 1.2.1
支持的数据类型
目前HiveCatalog通过以下映射支持大多数Flink数据类型:
Flink Data Type | Hive Data Type |
---|---|
CHAR(p) | CHAR(p) |
VARCHAR(p) | VARCHAR(p) |
STRING | STRING |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | LONG |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) | DECIMAL(p, s) |
DATE | DATE |
BYTES | BINARY |
ARRAY<T> | LIST<T> |
MAP<K, V> | MAP<K, V> |
ROW | STRUCT |
局限性
以下类型限制了Flink和Hive数据类型之间的映射:
- CHAR(p)最大长度为255
- VARCHAR(p)的最大长度为65535
- Hive的MAP的key只支持基本类型,而Flink的key可以是任意数据类型
- Hive的UNION类型不支持
- Flink的INTERVAL不支持Hive的INTERVAL的映射
- Hive不支持Flink的TIMESTAMP_WITH_TIME_ZONE和TIMESTAMP_WITH_LOCAL_TIME_ZONE
- 由于精度不同,Flink的TIMESTAMP_WITHOUT_TIME_ZONE类型不能映射到Hive的时间戳类型。
- Hive不支持Flink的MULTISET
还没有评论,来说两句吧...