8. Hive-2

分区表

创建表的时候可以使用 PARTITIONED BY 来创建分区表。分区表实际上就是对应一个 HDFS 文件系统上的独立的文件夹,该文件夹下是该分区所有的数据文件。Hive 中的分区就是分目录,把一个大的数据集根据业务需要分割成小的数据集。在查询时通过 WHERE 子句中的表达式选择查询所需要的指定的分区,这样的查询效率会提高很多。

CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name 
[(col_name data_type [COMMENT col_comment], ...)]   //表中的字段信息
[COMMENT table_comment]  //表的注释

[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] 
[CLUSTERED BY (col_name, col_name, ...) 
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] 

[ROW FORMAT row_format]   // 表中数据每行的格式,定义数据字段的分隔符,集合元素的分隔符等
[STORED AS file_format]   //表中的数据要以哪种文件格式来存储,默认为TEXTFILE(文本文件)
[LOCATION hdfs_path]  //表在hdfs上的位置

分区表的创建方式

直接使用 create 创建分区表
create external table if not exists default.deptpart1(
deptno int,
dname string,
loc int
)
PARTITIONED BY(area string)
row format delimited fields terminated by '\t';

创建数据

partition_data1

1       测试    1
2       dev     2
3       data    3
4       test    4

partition_data2

5       测试    5
6       dev     6
7       data    7
8       test    8

加载数据到分区表中

load data local inpath '/home/rexyan/test_file/partition_data1' into table default.deptpart1 partition(area='shanghai');

load data local inpath '/home/rexyan/test_file/partition_data2' into table default.deptpart1 partition(area='beijing');

上面命令会将数据放在不同的分区中,partition_data1 的数据放在分区 area=’shanghai’ 中,partition_data2 的数据放在 area=’beijing’ 中。查询数据时会发现字段中默认增加了一列 area,area 的值就是分区的值。可以使用where 条件过滤该列的值。在 hdfs 中,会发现 deptpart1 目录下有 area=’shanghai’ 和 area=’beijing’ 两个分区目录。

hive (default)> select * from deptpart1;
OK
deptpart1.deptno        deptpart1.dname deptpart1.loc   deptpart1.area
5                           测试            5               beijing
6                           dev            6                 beijing
7                           data           7                 beijing
8                           test           8                 beijing
1                           测试             1               shanghai
2                           dev            2                 shanghai
3                           data           3                 shanghai
4                           test           4                 shanghai
Time taken: 0.201 seconds, Fetched: 8 row(s)
使用 alter 增加分区字段

创建普通表后或者在目前分区表的基础上使用 alter 增加分区字段

hive (default)> alter table deptpart1 add partition(area="shandong");
hive (default)> show partitions deptpart1;  # 查看添加的分区
OK
partition
area=beijing
area=shandong
area=shanghai
Time taken: 0.183 seconds, Fetched: 3 row(s)
使用 load 直接加载数据

使用 load 命令向分区加载数据,如果分区不存在,load 时自动帮我们生成分区

创建一张分区表

create external table if not exists default.deptpart2(
deptno int,
dname string,
loc int
)
PARTITIONED BY(area string)
row format delimited fields terminated by '\t';

使用 load 命令加载数据,将加载时指定分区,因为上面刚创建的表是没有分区 area=’jiangshu’ 和 area=’wuhan’ 的,使用下面方式导入后就能直接创建分区,并将数据导入。

load data local inpath '/home/rexyan/test_file/partition_data1' into table default.deptpart2 partition(area='jiangshu');

load data local inpath '/home/rexyan/test_file/partition_data2' into table default.deptpart2 partition(area='wuhan');
修复分区命令

创建一张分区表,直接将数据上传到 hdfs 目录下,这时使用 hive 是查询不了数的,需要使用命令修复分区自动生成分区的元数据。

创建分区表

create external table if not exists default.deptpart3(
deptno int,
dname string,
loc int
)
PARTITIONED BY(area string)
row format delimited fields terminated by '\t';

上传数据到 hdfs

[[email protected] ~]$ hadoop fs -mkdir -p /hive/deptpart3/area=guiyang
[[email protected] ~]$ hadoop fs -mkdir -p /hive/deptpart3/area=guangxi
[[email protected] ~]$ hadoop fs -put test_file/partition_data1 /hive/deptpart3/area=guiyang
[[email protected] ~]$ hadoop fs -put test_file/partition_data2 /hive/deptpart3/area=guangxi

修复分区

hive (default)> msck repair table deptpart3;  # 修复分区
OK
Partitions not in metastore:    deptpart3:area=guangxi  deptpart3:area=guiyang
Repair: Added partition to metastore deptpart3:area=guangxi
Repair: Added partition to metastore deptpart3:area=guiyang
Time taken: 0.395 seconds, Fetched: 3 row(s)

分桶表

建表时指定了 CLUSTERED BY,这个表就为分桶表,分桶本质上也是为了分散数据,在分桶后,可以结合 hive 提供的抽样查询,只查询指定桶的数据。在分桶时,也可以指定将每个桶的数据根据一定的规则来排序,如果需要排序,那么可以在 CLUSTERED BY 后根 SORTED BY

创建分桶表

create table stu_buck(id int, name string)
clustered by(id)  -- 根据 id 进行分桶
SORTED BY (id desc)  --根据 id 进行排序
into 4 buckets  -- 分 4 个桶
row format delimited fields terminated by '\t';  -- 指定数据之间分隔符为 '\t'

向分桶表导入数据时,必须运行 MR 程序,才能实现分桶操作,之前使用的 load 操作只是相当于执行的 put 操作,无法满足分桶表导入数据,必须使用 insert into (hive 会自动转换为 MR 程序运行)。

在导入数据前还需要打开强制分桶开关和强制排序开关

打开强制分桶开关: set hive.enforce.bucketing=true;
打开强制排序开关: set hive.enforce.sorting=true;

插入数据除了使用 insert into 一条条的插入外,还可以使用查询插入的方式,即查询一张表的结果,并将这正表的数据插入到分桶表中。这里使用查询插入的方式实现。

create table stu_buck_tmp(id int, name string)  -- 创建普通表 stu_buck_tmp
row format delimited fields terminated by '\t';

将数据加载到表 stu_buck_tmp 中

[[email protected] test_file]$ hadoop fs -put ~/test_file/stu_buck_tmp_data /hive/stu_buck_tmp

将数据从 stu_buck_tmp 中查出插入到 stu_buck 中

insert into table stu_buck select * from stu_buck_tmp;

因为 stu_buck 表分桶数量是 4, 所以会看到在插入数据的时候有 4 个 reduce task 执行,查看 hdfs 结构会看到在 stu_buck 下有四个数据文件,每个代表一个桶,每个里面存放一部分数据,且数据都是按照 hash 取模的方式分配的,且数据是倒序存在的。

抽样查询

抽样查询的表必须是分桶表,语法格式为 select * from 分桶表 tablesample(bucket x out of y on 分桶表分桶字段); 以上面创建的分桶表为例,查询示例如下

select * from stu_buck tablesample(bucket 1 out of 2 on id);  --代表从第1桶(0号桶)开始抽,每隔2桶抽一次,一共抽2桶(4/2,4为总桶数,2为每次抽取桶的间隔数)。即最后抽取的桶为 0号桶,1号桶

select * from stu_buck tablesample(bucket 1 out of 1 on id);  --代表从第1桶(0号桶)开始抽,每个1桶抽一次,一共抽4桶(4/1,4为总桶数,1为每次抽取桶的间隔数)。即最后抽取的桶为 0号桶,1号桶,2号桶,3号桶

select * from stu_buck tablesample(bucket 2 out of 4 on id);  --代表从第2桶(1号桶)开始抽,每个4桶抽一次,一共抽1桶(4/4,4为总桶数,4为每次抽取桶的间隔数)。即最后抽取的桶为 1号桶

select * from stu_buck tablesample(bucket 2 out of 8 on id);  --代表从第2桶(1号桶)开始抽,每个8桶抽一次,一共抽0.5桶(4/8,4为总桶数,8为每次抽取桶的间隔数)。即最后抽取的桶为 1号桶的一半

数据导入

load

将数据直接加载到表目录中,语法:load data [local] inpath 路径 into table 表名 partition(xx=xx),local 参数的作用是将本地文件系统的文件上传到 hdfs 中。不加 local 代表着,文件在 hdfs 上,并且将文件从 hdfs 上将源文件移动到目标目录。

insert

insert 的方式会运行 MR 程序,通过程序将数据输出到表目录。在某些场景, 必须使用 insert 的方式来导入数据:

  1. 向分桶表插入数据
  2. 如果指定表中的数据,不是以纯文本(TextFile)形式存储,需要使用 insert 方式导入

语法:insert into/overwrite table 表名 select xxx/values(),(),() 可以使用 insert into 或者 insert overwrite 两种方式来插入数据,insert into 代表向表中追加新的数据,insert overwrite 代表先清空表中所有的数据,再向表中添加新的数据。后面数据的来源方式可以自己写在 values 中,也可以使用 select 来将查询结果进行插入。

import

不仅可以导入数据还可以顺便导入元数据(表结构)。Import 只能导入 export 导出的内容。

语法格式为 import external table 表名 from HDFS路径 ,使用 import 导入要遵循这些约束,如果向一个新表中导入数据,hive 会根据要导入表的元数据自动创建表。如果向一个已经存在的表导入数据,在导入之前会先检查表的结构和属性是否一致,只有在表的结构和属性一致时,才会执行导入。不管表是否为空,要导入的分区必须是不存在的。

数据导出

insert

将一条 sql 运算的结果,插入到指定的路径,语法格式为 insert overwrite [local] directory 目录 row format 格式。这里 local 的含义和导入的时候一样,不加 local 代表着将文件导出到 hdfs 上,加上则是导出到文件系统中。

export

导出数据和元数据(表结构),export 会在 hdfs 的导出目录中,生成数据和元数据,并且导出的元数据是和 RDMS无关的。如果是分区表,可以选择将分区表的部分分区进行导出

语法格式为 export table 表名 [partiton(分区信息) ] to HDFS路径

export table deptpart1 to "/export";  -- 导出 deptpart1
import table deptpart1_import from "/export";  -- 导入 deptpart1_import

元数据信息还在

hive (default)> show partitions deptpart1_import;
OK
partition
area=beijing
area=shandong
area=shanghai
Time taken: 0.21 seconds, Fetched: 3 row(s)

排序

Hive 的本质是 MR,MR 中的排序有以下几种:

  1. 全排序: 结果只有一个(只有一个分区),所有的数据整体有序
  2. 部分排序: 结果有多个(有多个分区),每个分区内部有序
  3. 二次排序: 在排序时,比较的条件有多个

在 MR 中,排序在 reduce 之前就已经排好序了,排序是 shuffle 阶段的主要工作。分区是指使用 Partitioner 来进行分区,当 reduceTaskNum>1,使用用户自己定义的分区器,如果没有就使用 HashParitioner,HashParitioner 只根据 key 的 hashcode 来分区。

Hive 中可以使用以下排序:

  1. ORDER BY 列名: 全排序
  2. SORT BY 列名: 部分排序,如果希望自定定义使用哪个字段分区,需要使用 DISTRIBUTE BY
  3. DISTRIBUTE BY 列名: 指定按照哪个字段分区, 结合 sort by 使用
  4. CLUSTER BY 列名: 如果分区的字段和排序的字段一致,且是正序排序,那么可以用 CLUSTER BY,即DISTRIBUTE BY 列名 sort by 列名 asc 等价于 CLUSTER BY 列名 , CLUSTER BY 后不能写排序方式,只能使用默认的按照 asc 排序。

操作实例:

创建表 emp1,并插入数据,数据如下

create table default.emp1(
empno int,
ename string,
job string,
mgr int,
hiredate string, 
sal double, 
comm double,
deptno int)
row format delimited fields terminated by '\t';
7369    SMITH   CLERK   7902    1980-12-17      800.00
207499  ALLEN   SALESMAN        7698    1981-2-20       1600.00 300.00
307521  WARD    SALESMAN        7698    1981-2-22       1250.00 500.00
307566  JONES   MANAGER 7839    1981-4-2        2975.00
207654  MARTIN  SALESMAN        7698    1981-9-28       1250.00 1400.00
307698  BLAKE   MANAGER 7839    1981-5-1        2850.00
307782  CLARK   MANAGER 7839    1981-6-9        2450.00
107788  SCOTT   ANALYST 7566    1987-4-19       3000.00
207839  KING    PRESIDENT               1981-11-17      5000.00
107844  TURNER  SALESMAN        7698    1981-9-8        1500.00 0.00
307876  ADAMS   CLERK   7788    1987-5-23       1100.00
207900  JAMES   CLERK   7698    1981-12-3       950.00
307902  FORD    ANALYST 7566    1981-12-3       3000.00
207934  MILLER  CLERK   7782    1982-1-23       1300.00         10

ORDER BY 查询示例

select * from empno order by empno desc;

结果显示如下

emp1.empno      emp1.ename      emp1.job        emp1.mgr        emp1.hiredate   emp1.sal        emp1.comm       emp1.deptno
307902  FORD    ANALYST 7566    1981-12-3       3000.0  NULL    NULL
307876  ADAMS   CLERK   7788    1987-5-23       1100.0  NULL    NULL
307782  CLARK   MANAGER 7839    1981-6-9        2450.0  NULL    NULL
307698  BLAKE   MANAGER 7839    1981-5-1        2850.0  NULL    NULL
307566  JONES   MANAGER 7839    1981-4-2        2975.0  NULL    NULL
307521  WARD    SALESMAN        7698    1981-2-22       1250.0  500.0   NULL
207934  MILLER  CLERK   7782    1982-1-23       1300.0  NULL    10
207900  JAMES   CLERK   7698    1981-12-3       950.0   NULL    NULL
207839  KING    PRESIDENT       NULL    1981-11-17      5000.0  NULL    NULL
207654  MARTIN  SALESMAN        7698    1981-9-28       1250.0  1400.0  NULL
207499  ALLEN   SALESMAN        7698    1981-2-20       1600.0  300.0   NULL
107844  TURNER  SALESMAN        7698    1981-9-8        1500.0  0.0     NULL
107788  SCOTT   ANALYST 7566    1987-4-19       3000.0  NULL    NULL
7369    SMITH   CLERK   7902    1980-12-17      800.0   NULL    NULL
7369    SMITH   CLERK   7902    1980-12-17      800.0   NULL    207499
Time taken: 38.149 seconds, Fetched: 15 row(s)

SORT BY 和 DISTRIBUTE BY 查询示例

set mapreduce.job.reduces=3; -- 设置 reduce task 的数量为 3
insert overwrite local directory "/tmp/orderby" ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' select * from emp1 distribute by mgr sort by empno desc;

上面 HQL 的意思是对 empno 进行降序排序,并且按照 mgr 字段进行分区【如果不加 distribute by,那么就不知道会对哪个字段进行分区】,mgr 字段是 int 类型的,所以会将该列的值 %3 进行分区,所以结果文件为 3 个。因为 mgr 这一列的值 %3 都等于 0,所以只有第一个分区(分区0)有值,其他的两个分区都是空的。

CLUSTER BY 查询示例

set mapreduce.job.reduces=3; -- 设置 reduce task 的数量为 3
insert overwrite local directory "/tmp/orderby" ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' select * from emp1 cluster by empno;

cluster by empno 相当于 distribute by empno sort by empno desc

函数

函数有所属库的概念,系统提供的除外。系统提供的函数可以在任意库中使用。

查看当前库所有的函数:show functions;
查看函数的使用: desc function 函数名
查看函数的详细使用: desc function extended 函数名

函数的来源

  1. 系统函数,自带的,直接使用即可
  2. 用户自定义的函数

函数按照特征分类

  1. UDF: 用户定义的函数。 一进一出。 输入单个参数,返回单个结果
  2. UDTF: 用户定义的表生成函数。 一进多出。传入一个参数(集合类型),返回一个结果集
  3. UDAF: 用户定义的聚集函数。 多进一出。 传入一列多行的数据,返回一个结果(一列一行)

常用日期函数(hive 默认解析的日期格式必须是: 2019-11-24 08:09:10)

unix_timestamp:返回当前或指定时间的时间戳    
from_unixtime:将时间戳转为日期格式
current_date:当前日期
current_timestamp:当前的日期加时间
* to_date:抽取日期部分
year:获取年
month:获取月
day:获取日
hour:获取时
minute:获取分
second:获取秒
weekofyear:当前时间是一年中的第几周
dayofmonth:当前时间是一个月中的第几天
months_between: 两个日期间的月份,前-后
add_months:日期加减月
datediff:两个日期相差的天数,前-后
date_add:日期加天数
date_sub:日期减天数
last_day:日期的当月的最后一天
date_format格式化日期

常用取整函数

round: 四舍五入
ceil:  向上取整
floor: 向下取整

常用字符串操作函数

upper: 转大写
lower: 转小写
length: 长度
trim:  前后去空格
lpad: 向左补齐,到指定长度
rpad:  向右补齐,到指定长度
regexp_replace: 使用正则表达式匹配目标字符串,匹配成功后替换。SELECT regexp_replace('100-200', '(\d+)', 'num')='num-num

集合操作

size: 集合(map和list)中元素的个数
map_keys: 返回map中的key
map_values:返回map中的value
array_contains:判断array中是否包含某个元素
sort_array: 将array中的元素排序

查询语句和 MySQL 的不同点

A <=> B :  ①A,B都为null,返回true
                   ②A,B一方为null,返回null
                   ③A,B都不为null,等同于A=B

A Rlike B :  B是一个正则表达式,判断A是否负责B表达式的要求,返回true和false

在关联时,只支持等值连接

在管理时,支持满连接,使用full join