星尘

任汝千圣现,我有天真佛

  • 首页
  • 归档
  • 分类
  • 标签
  • 瞎扯
  • 关于
  • 搜索

10. Hive-4

发表于 2020-09-18 | 分类于 Java 大数据进阶 , Hive

存储和压缩

存储

hive 支持 TEXTFILE ,SequenceFile, ORC, Parquet
行式存储:TEXTFILE, SequenceFile
列式存储:ORC, Parquet 都是行列结合存储,将一部分行(stripe或行组)按照列式存储。行式和列式存储的区别

TextFile:默认格式,数据不做压缩,磁盘开销大,数据解析开销大。可结合Gzip、Bzip2使用,但使用Gzip这种方式,hive不会对数据进行切分,从而无法对数据进行并行操作。

ORC:hive独有。性能略好!

Parquet:希望整个hadoop生态圈都可以支持Parquet格式!实用性更广!

常见使用组合:使用ORC+snappy组合,使用Parquet+LZO组合!

压缩

hive基于hadoop,hive支持的压缩格式也是hadoop支持的压缩格式

自定义函数

  1. 自定义类,继承UDF
  2. 提供多个evaluate(),此方法不能返回void,但是可以返回null值
  3. 打包,上传到HIVE_HOME/auxlib下
  4. 在hive中使用 create [temporary] function 函数名 as 全类名
  5. 注意函数在哪个库创建,只能在哪个库下使用,如果要跨库使用,需要加上库名作为前缀!

严格模式

Hive 提供了一个严格模式,可以防止用户执行那些可能意想不到的不好的影响的查询。通过设置属性hive.mapred.mode 值为默认是非严格模式 nonstrict 。开启严格模式需要修改 hive.mapred.mode 值为 strict,开启严格模式可以禁止3种类型的查询。

  1. 对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行。
  2. 对于使用了order by语句的查询,要求必须使用limit语句。
  3. 限制笛卡尔积的查询。对关系型数据库非常了解的用户可能期望在执行 JOIN 查询的时候不使用 ON 语句而是使用 where 语句,这样关系数据库的执行优化器就可以高效地将 WHERE 语句转化成那个 ON 语句。不幸的是,Hive 并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。

性能调优

Fetch抓取

Fetch 抓取是指,Hive 中对某些情况的查询可以不必使用 MapReduce 计算。例如:SELECT * FROM employees;在这种情况下,Hive 可以简单地读取 employee 对应的存储目录下的文件,然后输出查询结果到控制台。
把 hive.fetch.task.conversion 设置成 none,然后执行查询语句,都会执行 mapreduce 程序。
把 hive.fetch.task.conversion 设置成 more,然后执行查询语句,如下查询方式都不会执行 mapreduce 程序。

hive (default)> set hive.fetch.task.conversion=more;
hive (default)> select * from emp;
hive (default)> select ename from emp;
hive (default)> select ename from emp limit 3;

本地模式

大多数的 Hadoop Job 是需要 Hadoop 提供的完整的可扩展性来处理大数据集的。不过,有时 Hive 的输入数据量是非常小的。在这种情况下,为查询触发执行任务消耗的时间可能会比实际 job 的执行时间要多的多。对于大多数这种情况,Hive 可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。

用户可以通过设置 hive.exec.mode.local.auto 的值为 true,来让 Hive 在适当的时候自动启动这个优化。

//开启本地mr
set hive.exec.mode.local.auto=true;  
//设置local mr的最大输入数据量,当输入数据量小于这个值时采用local mr的方式,默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
//设置local mr的最大输入文件个数,当输入文件个数小于这个值时采用local mr的方式,默认为4
set hive.exec.mode.local.auto.input.files.max=10;

关联查询空KEY过滤

有时 join 超时是因为某些 key 对应的数据太多,而相同 key 对应的数据都会发送到相同的 reducer 上,从而导致内存不够。此时我们应该仔细分析这些异常的 key,很多情况下,这些 key 对应的数据是异常数据,我们需要在SQL 语句中进行过滤。例如存在很多 key 对应的字段为空的数据就会出现上述现象。解决方法是在查询的时候进行过滤

insert overwrite table jointable2 select n.* from (select * from nullidtable where id is not null ) n left join ori o on n.id = o.id;

关联查询空KEY转换

有时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在 join 的结果中,此时我们可以将表中 key 为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的 reducer 上

insert overwrite table jointable2
select n.* from nullidtable n full join ori o on 
case when n.id is null then concat('hive', rand()) else n.id end = o.id;

Group By

默认情况下,Map 阶段同一Key数据分发给一个 reduce,当一个key数据过大时就倾斜了。并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进行部分聚合,最后在 Reduce 端得出最终结果。

# 是否在Map端进行聚合,默认为True
hive.map.aggr = true
# 在Map端进行聚合操作的条目数目
hive.groupby.mapaggr.checkinterval = 100000
# 有数据倾斜的时候进行负载均衡(默认是false)
hive.groupby.skewindata = true

当选项设定为 true,生成的查询计划会有两个MR Job。第一个 MR Job中,Map 的输出结果会随机分布到 Reduce中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

去重统计

数据量大的情况下,由于 COUNT DISTINCT 操作需要用一个 Reduce Task 来完成,这一个 Reduce 需要处理的数据量太大,就会导致整个 Job 很难完成,一般 COUNT DISTINCT 使用先 GROUP BY 再 COUNT 的方式替换

select count(distinct id) from bigtable;
# 使用下面方式替换上面的方式
select count(id) from (select id from bigtable group by id) a;

先过滤再关联

select o.id from bigtable b join ori o on o.id = b.id where o.id <= 10;
# 使用下面方式替换上面的方式
select b.id from bigtable b join (select id from ori where id <= 10 ) o on b.id = o.id;

动态分区

关系型数据库中,对分区表Insert数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,Hive中也提供了类似的机制,即动态分区(Dynamic Partition),只不过,使用Hive的动态分区,需要进行相应的配置

# 开启动态分区功能(默认true,开启)
hive.exec.dynamic.partition=true

# 设置为非严格模式(动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区)
hive.exec.dynamic.partition.mode=nonstrict

# 在所有执行MR的节点上,最大一共可以创建多少个动态分区。
hive.exec.max.dynamic.partitions=1000

# 在每个执行MR的节点上,最大可以创建多少个动态分区。
hive.exec.max.dynamic.partitions.pernode=100

# 整个MR Job中,最大可以创建多少个HDFS文件。
hive.exec.max.created.files=100000

# 当有空分区生成时,是否抛出异常。一般不需要设置。
hive.error.on.empty.partition=false

创建分区表

create table ori_partitioned_target(
    id bigint, 
    time bigint, 
    uid string,
    keyword string, 
    url_rank int, 
    click_num int, 
    click_url string) 
    PARTITIONED BY (p_time STRING) row format delimited fields terminated by '\t';

使用 insert 导入数据的时候,就会自动的根据 p_time 的值来进行分区了。

合理设置Map数

是不是map数越多越好?

答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map 任务来完成,而一个 map 任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的 map 数是受限的。这种情况下就应该减少 map 数

是不是保证每个map处理接近128m的文件块,就高枕无忧了?

答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。这种情况下就应该增加 map 数。

Python + Flask + Consul

发表于 2020-09-16 | 分类于 Python

安装

三台机器

192.168.16.161
192.168.16.69
192.168.16.47

161

docker run --net=host --name consul -v /app/consul/data:/consul/data -v /app/consul/conf:/consul/config -d consul consul agent -server -bind=192.168.16.161 -client 0.0.0.0 -ui -bootstrap-expect=3 -data-dir /consul/data -config-dir /consul/config

69

docker run --net=host --name consul -v /app/consul/data:/consul/data -v /app/consul/conf:/consul/config -d consul consul agent -server -bind=192.168.16.69 -client 0.0.0.0 -ui -bootstrap-expect=3 -data-dir /consul/data -config-dir /consul/config -join 192.168.16.161 

47

docker run --net=host --name consul -v /app/consul/data:/consul/data -v /app/consul/conf:/consul/config -d consul consul agent -server -bind=192.168.16.47 -client 0.0.0.0 -ui -bootstrap-expect=3 -data-dir /consul/data -config-dir /consul/config -join 192.168.16.161 

参数说明

--net=host:采用主机网络配置,若采用默认的bridge模式,则会存在容器跨主机间通信失败的问题
-v /data/consul_data/data:/consul/data:主机的数据目录挂载到容器的/consul/data下,因为该容器默认的数据写入位置即是/consul/data
-v /data/consul_data/conf:/consul/config:主机的配置目录挂载到容器的/consul/conf下,因为该容器默认的数据写入位置即是/consul/conf
consul agent -server:consul 的 server 启动模式
consul agent -bind=192.168.16.161:consul 绑定到主机的ip上
consul agent -bootstrap-expect=3:server 要想启动,需要至少3个server
consul agent -data-dir /consul/data:consul 的数据目录
consul agent -config-dir /consul/config:consul 的配置目录
consul agent -join 192.168.16.161:对于主机 69、47 来说,需要加入到这个集群里
-client 0.0.0.0 -ui : 启动ui界面

进入容器,查看集群信息

docker exec -it 46267ed9474c /bin/sh

/ # consul members
Node            Address              Status  Type    Build  Protocol  DC   Segment
192-168-16-161  192.168.16.161:8301  alive   server  1.8.4  2         dc1  
192-168-16-47   192.168.16.47:8301   alive   server  1.8.4  2         dc1  
192-168-16-69   192.168.16.69:8301   alive   server  1.8.4  2         dc1  

访问 web 界面 http://192.168.16.161:8500/ui/

Python 使用

参考 github 上的项目进行了简单的修改,修改后地址为 新地址,使用方式:

  1. 启动 consul
  2. 将 consul 地址配置到代码中
  3. 启动项目(注意修改启动 IP 为本机 IP,因为 consul 会对项目做健康检查,检查不通过无法调用服务)
  4. 查看 consul web 页面,查看服务是否正常
  5. 执行 aaaa.py 脚本,调用 consul 上已经注册的服务

参考资料:

https://www.jianshu.com/p/565a1f24d730

https://www.cnblogs.com/lfzm/p/10633595.html

9. Hive-3

发表于 2020-08-29 | 分类于 Java 大数据进阶 , Hive

使用 beeline 和 hiveserver2 连接

在一个窗口中使用 hiveserver2,让窗口一直不关闭,在另一个窗口进入 beeline 后使用 JDBC 进行连接,使用 beeline 的好处是查询结果格式以更方便的形式的显示

常用函数

nvl

NVL( string1, replace_with): 判断 string1 是否为null,如果为null,使用 replace_with 替换null,否则就返回 string1

select avg(nvl(comm, 0)) from emp1;

例如上述例子中,对 comm 列计算平均值,如果 comm 为 null,则使用 0 代替。如果这里不使用 nvl 函数的话,那么计算平均值是不算上 null 的个数的,数据如下所示,不使用 nvl ,平均值为 (300+500+1400+0)/ 4 = 550。使用 nvl 平均值就是 146.66666666666666

concat

字符串拼接。 可以在参数中传入多个 string 类型的字符串,但是一旦有一个参数为 null,则返回 null

select concat("a","v"); -- av
select concat("a","v",null); -- NULL

concat_ws

使用指定的分隔符完成字符串拼接,格式为 concat_ws(分隔符,[string | array<string>]+),第一个参数为分隔符,后面为要拼接的字符串或者数组。

select concat_ws(".","zhansan",array("lisi","wangwu"));  -- zhansan.lisi.wangwu

collect_set

collect_set(列名) 作用是将此列的多行记录合并为一个 set 集合,去重

collect_list

collect_list(列名) 作用是将此列的多行记录合并为一个 set 集合,不去重

explode

explode(列名) 参数只能是 array 或 map, 将 array 类型参数转为1列N行, 将 map 类型参数转为2列N行

练习

练习一

有以下数据,求出不同部门男女各多少人

期望查询结果如下

A     2       1
B     1       2

方法1,按照部门分组,单独查询出男女的个数,然后在将结果进行汇总。下面方式 HQL 性能不高,有两个字查询,一个汇总,所以需要三个 job 才能运行完成。

 select a.dept_id, male_count, female_count from
 (select dept_id, count(1) male_count from emp_sex where sex="男" group by dept_id) a
 join
 (select dept_id, count(1) female_count from emp_sex where sex="女" group by dept_id) b
 on a.dept_id = b.dept_id;

方法2,利用 sum 函数,在求男性总人数时,如果当前人的性别为男,记1,否则记 0,sum 该字段就得到了男性的值。在求女性总人数时,如果当前人的性别为女,记1,否则记0,sum 该字段就得到了男性的值。

case when 的语法如下,这一列中出现的 值1 会被替换为 值2 , 值3 会被替换为 值4, 其他的替换为值5

case  列名 
    when  值1  then  值2
    when  值3  then  值4
    ...
    else 值5
end
select dept_id,
sum(case sex when "男" then 1 else 0 end) male_count,
sum(case sex when "女" then 1 else 0 end) female_count
from emp_sex group by dept_id;

使用 case when 的优化 HQL 如上,只需要一个 job 就能完成。

HQL 中除了 case when 能做判断外,if 也能做判断,语法为 if(判断表达式,值1,值2),值1当表达式为 true 时获取,值2当表达式为 false 时获取。实现 HQL 如下:

select dept_id,
sum(if(sex == "男", 1, 0)) male_count,
sum(if(sex == "女", 1, 0)) female_count
from emp_sex group by dept_id;

这个 HQL 也是只需要一个 job 就能完成。

练习二

把星座和血型一样的人归类到一起。结果如下:

射手座,A            大海|凤姐
白羊座,A            孙悟空|猪八戒
白羊座,B            宋宋

这里需要使用函数 concat_ws 进行字符串的拼接,还需要使用 collect_list 将某列的多行记录合并为一个 list 集合,不去重。

select  concat(constellation,',',blood_type),concat_ws('|',collect_list(name))
from person_info
group by constellation,blood_type

练习三

将电影分类中的数组数据展开。结果如下:

《疑犯追踪》      悬疑
《疑犯追踪》      动作
《疑犯追踪》      科幻
《疑犯追踪》      剧情
《Lie to me》   悬疑
《Lie to me》   警匪
《Lie to me》   动作
《Lie to me》   心理
《Lie to me》   剧情
《战狼2》        战争
《战狼2》        动作
《战狼2》        灾难

将 category 列进行 explode,即转换为 1列N行

0: jdbc:hive2://hadoop10:10000> select explode(category) from movie_info;
+------+--+
| col  |
+------+--+
| 悬疑   |
| 动作   |
| 科幻   |
| 剧情   |
| 悬疑   |
| 警匪   |
| 动作   |
| 心理   |
| 剧情   |
| 战争   |
| 动作   |
| 灾难   |
+------+--+
select movie,col1
from movie_info lateral view explode(category) tmp1 as col1

lateral view 的作用是将炸裂后的1列N行,在逻辑上依然视作1列1行,实际是1列N行,然后查询时和movie进行笛卡尔集这个操作在hive中称为侧写(lateral view)

练习四

有以下数据

person_info2.names      person_info2.tags       person_info2.hobbys
["jack","tom","jerry"]  ["阳光男孩","肌肉男孩","直男"]  ["晒太阳","健身","说多喝热水"]
["marry","nancy"]       ["阳光女孩","肌肉女孩","腐女"]  ["晒太阳","健身","看有内涵的段子"]

希望得到以下查询结果

期望结果:
jack    阳光男孩    晒太阳
jack    阳光男孩    健身
jack    阳光男孩    说多喝热水
jack    肌肉男孩    晒太阳
jack    肌肉男孩    健身
jack    肌肉男孩    说多喝热水
.....

查询 SQL

select name,tag,hobby
from person_info2
lateral view explode(names) tmp1 as name
lateral view explode(tags) tmp1 as tag
lateral view explode(hobbys) tmp1 as hobby;

窗口函数

即可以在函数在运行时通过改变窗口的大小,来控制计算的数据集的范围。

文档地址 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics

常用函数

窗口函数有以下三类

第一类

LEAD: LEAD (scalar_expression [,offset] [,default]): 返回当前行以下N行的指定列的列值,如果找不到,就采用默认值

LAG: LAG (scalar_expression [,offset] [,default]): 返回当前行以上N行的指定列的列值,如果找不到,就采用默认值

FIRST_VALUE: FIRST_VALUE(列名,[false(默认)]):  返回当前窗口指定列的第一个值,第二个参数如果为true,代表加入第一个值为null,跳过空值,继续寻找!

LAST_VALUE: LAST_VALUE(列名,[false(默认)]):  返回当前窗口指定列的最后一个值,第二个参数如果为true,代表加入第一个值为null,跳过空值,继续寻找!

第二类

统计类的函数(一般都需要结合over使用): min,max,avg,sum,count

第三类

排名分析函数:RANK,ROW_NUMBER,DENSE_RANK,CUME_DIST,PERCENT_RANK,NTILE

语法

相关语法解释

OVER():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化
CURRENT ROW:当前行
n PRECEDING:往前n行数据
n FOLLOWING:往后n行数据
UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING表示到后面的终点
LAG(col,n):往前第n行数据
LEAD(col,n):往后第n行数据
NTILE(n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从1开始,对于每一行,NTILE返回此行所属的组的编号。注意:n必须为int类型。

窗口函数的使用格式为 函数 over( partition by 字段 ,order by 字段 window_clause ) window_clause 为窗口的大小,窗口大小常用格式如下

(rows | range) between (unbounded | [num]) preceding and ([num] preceding | current row | (unbounded | [num]) following)

(rows | range) between current row and (current row | (unbounded | [num]) following)

(rows | range) between [num] following and (unbounded | [num]) following

特殊情况:

  1. 在 over() 中既没有出现 windows_clause,也没有出现 order by,窗口默认为 rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING
  2. 在over()中没有出现 windows_clause,但指定了 order by,窗口默认为 rows between UNBOUNDED PRECEDING and CURRENT ROW

窗口函数和分组有什么区别:

  1. 如果是分组操作,select 后只能写分组后的字段
  2. 如果是窗口函数,窗口函数是在指定的窗口内,对每条记录都执行一次函数
  3. 如果是分组操作,有去重效果,而窗口函数中的 partition不去重

注意⚠️:不是所有的函数在运行都是可以通过改变窗口的大小,来控制计算的数据集的范围,所有的排名函数和LAG,LEAD,支持使用 over(),但是在 over() 中不能定义 window_clause。

查询示例

例如有以下数据

0: jdbc:hive2://hadoop10:10000> select * from business;
+----------------+---------------------+----------------+--+
| business.name  | business.orderdate  | business.cost  |
+----------------+---------------------+----------------+--+
| jack           | 2017-01-01          | 10             |
| tony           | 2017-01-02          | 15             |
| jack           | 2017-02-03          | 23             |
| tony           | 2017-01-04          | 29             |
| jack           | 2017-01-05          | 46             |
| jack           | 2017-04-06          | 42             |
| tony           | 2017-01-07          | 50             |
| jack           | 2017-01-08          | 55             |
| mart           | 2017-04-08          | 62             |
| mart           | 2017-04-09          | 68             |
| neil           | 2017-05-10          | 12             |
| mart           | 2017-04-11          | 75             |
| neil           | 2017-06-12          | 80             |
| mart           | 2017-04-13          | 94             |
+----------------+---------------------+----------------+--+

需要进行以下查询

(1)查询在2017年4月份购买过的顾客及总人数

select name,count(*) over(rows between UNBOUNDED  PRECEDING and UNBOUNDED  FOLLOWING)
from business
where substring(orderdate,1,7)='2017-04'
group by name

上面的写法等价于

select name,count(*) over()
from business
where substring(orderdate,1,7)='2017-04'
group by name

查询结果为

+-------+-----------------+--+
| name  | count_window_0  |
+-------+-----------------+--+
| mart  | 2               |
| jack  | 2               |
+-------+-----------------+--+

(2)查询顾客的购买明细及月购买总额

 select name,orderdate,cost,sum(cost) over(partition by name,substring(orderdate,1,7) ) 
 from business

查询 name, orderdate, cost,并对 cost 进行按月份的累加求和, 结果如下

+-------+-------------+-------+---------------+--+
| name  |  orderdate  | cost  | sum_window_0  |
+-------+-------------+-------+---------------+--+
| jack  | 2017-01-05  | 46    | 111           |
| jack  | 2017-01-08  | 55    | 111           |
| jack  | 2017-01-01  | 10    | 111           |
| jack  | 2017-02-03  | 23    | 23            |
| jack  | 2017-04-06  | 42    | 42            |
| mart  | 2017-04-13  | 94    | 299           |
| mart  | 2017-04-11  | 75    | 299           |
| mart  | 2017-04-09  | 68    | 299           |
| mart  | 2017-04-08  | 62    | 299           |
| neil  | 2017-05-10  | 12    | 12            |
| neil  | 2017-06-12  | 80    | 80            |
| tony  | 2017-01-04  | 29    | 94            |
| tony  | 2017-01-02  | 15    | 94            |
| tony  | 2017-01-07  | 50    | 94            |
+-------+-------------+-------+---------------+--+

(3)查询顾客的购买明细, 并将cost按照日期进行累加

 select name,orderdate,cost,sum(cost) over(partition by name order by orderdate ) 
 from business

查询 name, orderdate, cost,并对 cost 进行按名称的累加,结果如下

+-------+-------------+-------+---------------+--+
| name  |  orderdate  | cost  | sum_window_0  |
+-------+-------------+-------+---------------+--+
| jack  | 2017-01-01  | 10    | 10            |
| jack  | 2017-01-05  | 46    | 56            |
| jack  | 2017-01-08  | 55    | 111           |
| jack  | 2017-02-03  | 23    | 134           |
| jack  | 2017-04-06  | 42    | 176           |
| mart  | 2017-04-08  | 62    | 62            |
| mart  | 2017-04-09  | 68    | 130           |
| mart  | 2017-04-11  | 75    | 205           |
| mart  | 2017-04-13  | 94    | 299           |
| neil  | 2017-05-10  | 12    | 12            |
| neil  | 2017-06-12  | 80    | 92            |
| tony  | 2017-01-02  | 15    | 15            |
| tony  | 2017-01-04  | 29    | 44            |
| tony  | 2017-01-07  | 50    | 94            |
+-------+-------------+-------+---------------+--+

(4)查询顾客的购买明细及顾客上次的购买时间

select name,orderdate,cost,lag(orderdate,1,'无数据') over(partition by name order by orderdate) from business

LAG(col,n) 表示取之前的第n行的 col 列的数据

上面查询代表查询 name,orderdate,cost 信息,并且取 orderdate 的上一条数据,如果没数据就显示 “无数据”,并按 name 分组,orderdate 排序

+-------+-------------+-------+---------------+--+
| name  |  orderdate  | cost  | lag_window_0  |
+-------+-------------+-------+---------------+--+
| jack  | 2017-01-01  | 10    | 无数据           |
| jack  | 2017-01-05  | 46    | 2017-01-01    |
| jack  | 2017-01-08  | 55    | 2017-01-05    |
| jack  | 2017-02-03  | 23    | 2017-01-08    |
| jack  | 2017-04-06  | 42    | 2017-02-03    |
| mart  | 2017-04-08  | 62    | 无数据           |
| mart  | 2017-04-09  | 68    | 2017-04-08    |
| mart  | 2017-04-11  | 75    | 2017-04-09    |
| mart  | 2017-04-13  | 94    | 2017-04-11    |
| neil  | 2017-05-10  | 12    | 无数据           |
| neil  | 2017-06-12  | 80    | 2017-05-10    |
| tony  | 2017-01-02  | 15    | 无数据           |
| tony  | 2017-01-04  | 29    | 2017-01-02    |
| tony  | 2017-01-07  | 50    | 2017-01-04    |
+-------+-------------+-------+---------------+--+

(5)查询顾客的购买明细及顾客下次的购买时间

和上面差不多,只不过需要将函数 lag 换为 lead

select name,orderdate,cost,lead(orderdate,1,'无数据') over(partition by name order by orderdate ) from business

结果如下

+-------+-------------+-------+----------------+--+
| name  |  orderdate  | cost  | lead_window_0  |
+-------+-------------+-------+----------------+--+
| jack  | 2017-01-01  | 10    | 2017-01-05     |
| jack  | 2017-01-05  | 46    | 2017-01-08     |
| jack  | 2017-01-08  | 55    | 2017-02-03     |
| jack  | 2017-02-03  | 23    | 2017-04-06     |
| jack  | 2017-04-06  | 42    | 无数据            |
| mart  | 2017-04-08  | 62    | 2017-04-09     |
| mart  | 2017-04-09  | 68    | 2017-04-11     |
| mart  | 2017-04-11  | 75    | 2017-04-13     |
| mart  | 2017-04-13  | 94    | 无数据            |
| neil  | 2017-05-10  | 12    | 2017-06-12     |
| neil  | 2017-06-12  | 80    | 无数据            |
| tony  | 2017-01-02  | 15    | 2017-01-04     |
| tony  | 2017-01-04  | 29    | 2017-01-07     |
| tony  | 2017-01-07  | 50    | 无数据            |
+-------+-------------+-------+----------------+--+

(6)查询顾客的购买明细及顾客本月第一次购买的时间

第一次购买的时间只需要按照购买时间拍正序,然后取第一条记录即可,这里使用函数 FIRST_VALUE 取第一条记录。

select name,orderdate,cost,FIRST_VALUE(orderdate,true) over(partition by name,substring(orderdate,1,7) order by orderdate ) from business

查询结果如下

+-------+-------------+-------+-----------------------+--+
| name  |  orderdate  | cost  | first_value_window_0  |
+-------+-------------+-------+-----------------------+--+
| jack  | 2017-01-01  | 10    | 2017-01-01            |
| jack  | 2017-01-05  | 46    | 2017-01-01            |
| jack  | 2017-01-08  | 55    | 2017-01-01            |
| jack  | 2017-02-03  | 23    | 2017-02-03            |
| jack  | 2017-04-06  | 42    | 2017-04-06            |
| mart  | 2017-04-08  | 62    | 2017-04-08            |
| mart  | 2017-04-09  | 68    | 2017-04-08            |
| mart  | 2017-04-11  | 75    | 2017-04-08            |
| mart  | 2017-04-13  | 94    | 2017-04-08            |
| neil  | 2017-05-10  | 12    | 2017-05-10            |
| neil  | 2017-06-12  | 80    | 2017-06-12            |
| tony  | 2017-01-02  | 15    | 2017-01-02            |
| tony  | 2017-01-04  | 29    | 2017-01-02            |
| tony  | 2017-01-07  | 50    | 2017-01-02            |
+-------+-------------+-------+-----------------------+--+

(7)查询顾客的购买明细及顾客本月最后一次购买的时间

和上面类似,只是使用函数不同, 因为是取本月最后一次购买的时间,所以这里要限制窗口的位置,窗口的位置就是当前行到本次分组的最后一行。

select name,orderdate,cost,LAST_VALUE(orderdate,true) over(partition by name,substring(orderdate,1,7) order by orderdate rows between CURRENT row and UNBOUNDED  FOLLOWING) from business 

查询结果如下

+-------+-------------+-------+----------------------+--+
| name  |  orderdate  | cost  | last_value_window_0  |
+-------+-------------+-------+----------------------+--+
| jack  | 2017-01-01  | 10    | 2017-01-08           |
| jack  | 2017-01-05  | 46    | 2017-01-08           |
| jack  | 2017-01-08  | 55    | 2017-01-08           |
| jack  | 2017-02-03  | 23    | 2017-02-03           |
| jack  | 2017-04-06  | 42    | 2017-04-06           |
| mart  | 2017-04-08  | 62    | 2017-04-13           |
| mart  | 2017-04-09  | 68    | 2017-04-13           |
| mart  | 2017-04-11  | 75    | 2017-04-13           |
| mart  | 2017-04-13  | 94    | 2017-04-13           |
| neil  | 2017-05-10  | 12    | 2017-05-10           |
| neil  | 2017-06-12  | 80    | 2017-06-12           |
| tony  | 2017-01-02  | 15    | 2017-01-07           |
| tony  | 2017-01-04  | 29    | 2017-01-07           |
| tony  | 2017-01-07  | 50    | 2017-01-07           |
+-------+-------------+-------+----------------------+--+

(8)查询顾客的购买明细及顾客最近三次cost花费

最近三次可以是 当前和之前两次 或 当前+前一次+后一次

当前和之前两次:

select name,orderdate,cost,sum(cost) over(partition by name order by orderdate rows between 2 PRECEDING and CURRENT row) from business

当前+前一次+后一次:

select name,orderdate,cost,sum(cost) over(partition by name order by orderdate rows between 1 PRECEDING and 1 FOLLOWING) from business

查询结果如下

+-------+-------------+-------+---------------+--+
| name  |  orderdate  | cost  | sum_window_0  |
+-------+-------------+-------+---------------+--+
| jack  | 2017-01-01  | 10    | 56            |
| jack  | 2017-01-05  | 46    | 111           |
| jack  | 2017-01-08  | 55    | 124           |
| jack  | 2017-02-03  | 23    | 120           |
| jack  | 2017-04-06  | 42    | 65            |
| mart  | 2017-04-08  | 62    | 130           |
| mart  | 2017-04-09  | 68    | 205           |
| mart  | 2017-04-11  | 75    | 237           |
| mart  | 2017-04-13  | 94    | 169           |
| neil  | 2017-05-10  | 12    | 92            |
| neil  | 2017-06-12  | 80    | 92            |
| tony  | 2017-01-02  | 15    | 44            |
| tony  | 2017-01-04  | 29    | 94            |
| tony  | 2017-01-07  | 50    | 79            |
+-------+-------------+-------+---------------+--+

(9)查询前20%时间的订单信息

排名函数

RANK: 允许并列,一旦有并列跳号! 
ROW_NUMBER: 行号! 连续的,每个号之间差1!
DENSE_RANK: 允许并列,一旦有并列不跳号!
CUME_DIST:  从排序后的第一行到当前值之间数据 占整个数据集的百分比!
PERCENT_RANK:  rank-1/ 总数据量-1   
NTILE(x):  将数据集均分到X个组中,返回每条记录所在的组号

示例数据如下

+-------------+----------------+--------------+--+
| score.name  | score.subject  | score.score  |
+-------------+----------------+--------------+--+
| 孙悟空         | 语文             | 87           |
| 孙悟空         | 数学             | 95           |
| 孙悟空         | 英语             | 68           |
| 大海          | 语文             | 94           |
| 大海          | 数学             | 56           |
| 大海          | 英语             | 84           |
| 宋宋          | 语文             | 64           |
| 宋宋          | 数学             | 86           |
| 宋宋          | 英语             | 84           |
| 婷婷          | 语文             | 65           |
| 婷婷          | 数学             | 85           |
| 婷婷          | 英语             | 78           |
+-------------+----------------+--------------+--+

查询示例如下

select  *,rank() over(order by score) ranknum,
ROW_NUMBER() over(order by score) rnnum,
DENSE_RANK() over(order by score) drnum,
CUME_DIST() over(order by score) cdnum,
PERCENT_RANK() over(order by score) prnum
from score;

查询结果

score.name      score.subject   score.score     ranknum rnnum   drnum   cdnum   prnum
大海    数学    56      1       1       1       0.08333333333333333     0.0
宋宋    语文    64      2       2       2       0.16666666666666666     0.09090909090909091
婷婷    语文    65      3       3       3       0.25    0.18181818181818182
孙悟空  英语    68      4       4       4       0.3333333333333333      0.2727272727272727
婷婷    英语    78      5       5       5       0.4166666666666667      0.36363636363636365
宋宋    英语    84      6       6       6       0.5833333333333334      0.45454545454545453
大海    英语    84      6       7       6       0.5833333333333334      0.45454545454545453
婷婷    数学    85      8       8       7       0.6666666666666666      0.6363636363636364
宋宋    数学    86      9       9       8       0.75    0.7272727272727273
孙悟空  语文    87      10      10      9       0.8333333333333334      0.8181818181818182
大海    语文    94      11      11      10      0.9166666666666666      0.9090909090909091
孙悟空  数学    95      12      12      11      1.0     1.0
Time taken: 38.666 seconds, Fetched: 12 row(s)

查询示例

(1) 按照科目进行排名

select *,rank() over(partition by subject order by score desc) from score

查询结果如下

+-------------+----------------+--------------+----------------+--+
| score.name  | score.subject  | score.score  | rank_window_0  |
+-------------+----------------+--------------+----------------+--+
| 孙悟空         | 数学             | 95           | 1              |
| 宋宋          | 数学             | 86           | 2              |
| 婷婷          | 数学             | 85           | 3              |
| 大海          | 数学             | 56           | 4              |
| 宋宋          | 英语             | 84           | 1              |
| 大海          | 英语             | 84           | 1              |
| 婷婷          | 英语             | 78           | 3              |
| 孙悟空         | 英语             | 68           | 4              |
| 大海          | 语文             | 94           | 1              |
| 孙悟空         | 语文             | 87           | 2              |
| 婷婷          | 语文             | 65           | 3              |
| 宋宋          | 语文             | 64           | 4              |
+-------------+----------------+--------------+----------------+--+

(2)给每个学生的总分进行排名

select name,sumscore,rank()  over( order by sumscore desc)
from
(select name,sum(score) sumscore
from  score
group by  name) tmp
+-------+-----------+----------------+--+
| name  | sumscore  | rank_window_0  |
+-------+-----------+----------------+--+
| 孙悟空   | 250       | 1              |
| 宋宋    | 234       | 2              |
| 大海    | 234       | 2              |
| 婷婷    | 228       | 4              |
+-------+-----------+----------------+--+

(3)求每个学生的成绩明细及给每个学生的总分和总分排名

select *,DENSE_RANK() over(order by tmp.sumscore desc)
from
(select *,sum(score) over(partition by name)  sumscore
from score) tmp
+-----------+--------------+------------+---------------+----------------------+--+
| tmp.name  | tmp.subject  | tmp.score  | tmp.sumscore  | dense_rank_window_0  |
+-----------+--------------+------------+---------------+----------------------+--+
| 孙悟空       | 语文           | 87         | 250           | 1                    |
| 孙悟空       | 数学           | 95         | 250           | 1                    |
| 孙悟空       | 英语           | 68         | 250           | 1                    |
| 宋宋        | 语文           | 64         | 234           | 2                    |
| 宋宋        | 数学           | 86         | 234           | 2                    |
| 宋宋        | 英语           | 84         | 234           | 2                    |
| 大海        | 语文           | 94         | 234           | 2                    |
| 大海        | 数学           | 56         | 234           | 2                    |
| 大海        | 英语           | 84         | 234           | 2                    |
| 婷婷        | 语文           | 65         | 228           | 3                    |
| 婷婷        | 数学           | 85         | 228           | 3                    |
| 婷婷        | 英语           | 78         | 228           | 3                    |
+-----------+--------------+------------+---------------+----------------------+--+

(4)只查询每个科目的成绩的前2名

select *
from
(select *,rank() over(partition by subject order by score desc) rn
from score) tmp
where rn<=2
+-----------+--------------+------------+---------+--+
| tmp.name  | tmp.subject  | tmp.score  | tmp.rn  |
+-----------+--------------+------------+---------+--+
| 孙悟空       | 数学           | 95         | 1       |
| 宋宋        | 数学           | 86         | 2       |
| 宋宋        | 英语           | 84         | 1       |
| 大海        | 英语           | 84         | 1       |
| 大海        | 语文           | 94         | 1       |
| 孙悟空       | 语文           | 87         | 2       |
+-----------+--------------+------------+---------+--+

(5)查询学生成绩明细,并显示当前科目最高分

select *,max(score) over(partition by subject)
from score

或者

select *,FIRST_VALUE(score) over(partition by subject order by score desc)
from score
+-------------+----------------+--------------+---------------+--+
| score.name  | score.subject  | score.score  | max_window_0  |
+-------------+----------------+--------------+---------------+--+
| 婷婷          | 数学             | 85           | 95            |
| 宋宋          | 数学             | 86           | 95            |
| 大海          | 数学             | 56           | 95            |
| 孙悟空         | 数学             | 95           | 95            |
| 婷婷          | 英语             | 78           | 84            |
| 宋宋          | 英语             | 84           | 84            |
| 大海          | 英语             | 84           | 84            |
| 孙悟空         | 英语             | 68           | 84            |
| 婷婷          | 语文             | 65           | 94            |
| 宋宋          | 语文             | 64           | 94            |
| 大海          | 语文             | 94           | 94            |
| 孙悟空         | 语文             | 87           | 94            |
+-------------+----------------+--------------+---------------+--+

8. Hive-2

发表于 2020-08-27 | 分类于 Java 大数据进阶 , Hive

分区表

创建表的时候可以使用 PARTITIONED BY 来创建分区表。分区表实际上就是对应一个 HDFS 文件系统上的独立的文件夹,该文件夹下是该分区所有的数据文件。Hive 中的分区就是分目录,把一个大的数据集根据业务需要分割成小的数据集。在查询时通过 WHERE 子句中的表达式选择查询所需要的指定的分区,这样的查询效率会提高很多。

CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name 
[(col_name data_type [COMMENT col_comment], ...)]   //表中的字段信息
[COMMENT table_comment]  //表的注释

[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] 
[CLUSTERED BY (col_name, col_name, ...) 
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] 

[ROW FORMAT row_format]   // 表中数据每行的格式,定义数据字段的分隔符,集合元素的分隔符等
[STORED AS file_format]   //表中的数据要以哪种文件格式来存储,默认为TEXTFILE(文本文件)
[LOCATION hdfs_path]  //表在hdfs上的位置

分区表的创建方式

直接使用 create 创建分区表
create external table if not exists default.deptpart1(
deptno int,
dname string,
loc int
)
PARTITIONED BY(area string)
row format delimited fields terminated by '\t';

创建数据

partition_data1

1       测试    1
2       dev     2
3       data    3
4       test    4

partition_data2

5       测试    5
6       dev     6
7       data    7
8       test    8

加载数据到分区表中

load data local inpath '/home/rexyan/test_file/partition_data1' into table default.deptpart1 partition(area='shanghai');

load data local inpath '/home/rexyan/test_file/partition_data2' into table default.deptpart1 partition(area='beijing');

上面命令会将数据放在不同的分区中,partition_data1 的数据放在分区 area=’shanghai’ 中,partition_data2 的数据放在 area=’beijing’ 中。查询数据时会发现字段中默认增加了一列 area,area 的值就是分区的值。可以使用where 条件过滤该列的值。在 hdfs 中,会发现 deptpart1 目录下有 area=’shanghai’ 和 area=’beijing’ 两个分区目录。

hive (default)> select * from deptpart1;
OK
deptpart1.deptno        deptpart1.dname deptpart1.loc   deptpart1.area
5                           测试            5               beijing
6                           dev            6                 beijing
7                           data           7                 beijing
8                           test           8                 beijing
1                           测试             1               shanghai
2                           dev            2                 shanghai
3                           data           3                 shanghai
4                           test           4                 shanghai
Time taken: 0.201 seconds, Fetched: 8 row(s)
使用 alter 增加分区字段

创建普通表后或者在目前分区表的基础上使用 alter 增加分区字段

hive (default)> alter table deptpart1 add partition(area="shandong");
hive (default)> show partitions deptpart1;  # 查看添加的分区
OK
partition
area=beijing
area=shandong
area=shanghai
Time taken: 0.183 seconds, Fetched: 3 row(s)
使用 load 直接加载数据

使用 load 命令向分区加载数据,如果分区不存在,load 时自动帮我们生成分区

创建一张分区表

create external table if not exists default.deptpart2(
deptno int,
dname string,
loc int
)
PARTITIONED BY(area string)
row format delimited fields terminated by '\t';

使用 load 命令加载数据,将加载时指定分区,因为上面刚创建的表是没有分区 area=’jiangshu’ 和 area=’wuhan’ 的,使用下面方式导入后就能直接创建分区,并将数据导入。

load data local inpath '/home/rexyan/test_file/partition_data1' into table default.deptpart2 partition(area='jiangshu');

load data local inpath '/home/rexyan/test_file/partition_data2' into table default.deptpart2 partition(area='wuhan');
修复分区命令

创建一张分区表,直接将数据上传到 hdfs 目录下,这时使用 hive 是查询不了数的,需要使用命令修复分区自动生成分区的元数据。

创建分区表

create external table if not exists default.deptpart3(
deptno int,
dname string,
loc int
)
PARTITIONED BY(area string)
row format delimited fields terminated by '\t';

上传数据到 hdfs

[[email protected] ~]$ hadoop fs -mkdir -p /hive/deptpart3/area=guiyang
[[email protected] ~]$ hadoop fs -mkdir -p /hive/deptpart3/area=guangxi
[[email protected] ~]$ hadoop fs -put test_file/partition_data1 /hive/deptpart3/area=guiyang
[[email protected] ~]$ hadoop fs -put test_file/partition_data2 /hive/deptpart3/area=guangxi

修复分区

hive (default)> msck repair table deptpart3;  # 修复分区
OK
Partitions not in metastore:    deptpart3:area=guangxi  deptpart3:area=guiyang
Repair: Added partition to metastore deptpart3:area=guangxi
Repair: Added partition to metastore deptpart3:area=guiyang
Time taken: 0.395 seconds, Fetched: 3 row(s)

分桶表

建表时指定了 CLUSTERED BY,这个表就为分桶表,分桶本质上也是为了分散数据,在分桶后,可以结合 hive 提供的抽样查询,只查询指定桶的数据。在分桶时,也可以指定将每个桶的数据根据一定的规则来排序,如果需要排序,那么可以在 CLUSTERED BY 后根 SORTED BY

创建分桶表

create table stu_buck(id int, name string)
clustered by(id)  -- 根据 id 进行分桶
SORTED BY (id desc)  --根据 id 进行排序
into 4 buckets  -- 分 4 个桶
row format delimited fields terminated by '\t';  -- 指定数据之间分隔符为 '\t'

向分桶表导入数据时,必须运行 MR 程序,才能实现分桶操作,之前使用的 load 操作只是相当于执行的 put 操作,无法满足分桶表导入数据,必须使用 insert into (hive 会自动转换为 MR 程序运行)。

在导入数据前还需要打开强制分桶开关和强制排序开关

打开强制分桶开关: set hive.enforce.bucketing=true;
打开强制排序开关: set hive.enforce.sorting=true;

插入数据除了使用 insert into 一条条的插入外,还可以使用查询插入的方式,即查询一张表的结果,并将这正表的数据插入到分桶表中。这里使用查询插入的方式实现。

create table stu_buck_tmp(id int, name string)  -- 创建普通表 stu_buck_tmp
row format delimited fields terminated by '\t';

将数据加载到表 stu_buck_tmp 中

[[email protected] test_file]$ hadoop fs -put ~/test_file/stu_buck_tmp_data /hive/stu_buck_tmp

将数据从 stu_buck_tmp 中查出插入到 stu_buck 中

insert into table stu_buck select * from stu_buck_tmp;

因为 stu_buck 表分桶数量是 4, 所以会看到在插入数据的时候有 4 个 reduce task 执行,查看 hdfs 结构会看到在 stu_buck 下有四个数据文件,每个代表一个桶,每个里面存放一部分数据,且数据都是按照 hash 取模的方式分配的,且数据是倒序存在的。

抽样查询

抽样查询的表必须是分桶表,语法格式为 select * from 分桶表 tablesample(bucket x out of y on 分桶表分桶字段); 以上面创建的分桶表为例,查询示例如下

select * from stu_buck tablesample(bucket 1 out of 2 on id);  --代表从第1桶(0号桶)开始抽,每隔2桶抽一次,一共抽2桶(4/2,4为总桶数,2为每次抽取桶的间隔数)。即最后抽取的桶为 0号桶,1号桶

select * from stu_buck tablesample(bucket 1 out of 1 on id);  --代表从第1桶(0号桶)开始抽,每个1桶抽一次,一共抽4桶(4/1,4为总桶数,1为每次抽取桶的间隔数)。即最后抽取的桶为 0号桶,1号桶,2号桶,3号桶

select * from stu_buck tablesample(bucket 2 out of 4 on id);  --代表从第2桶(1号桶)开始抽,每个4桶抽一次,一共抽1桶(4/4,4为总桶数,4为每次抽取桶的间隔数)。即最后抽取的桶为 1号桶

select * from stu_buck tablesample(bucket 2 out of 8 on id);  --代表从第2桶(1号桶)开始抽,每个8桶抽一次,一共抽0.5桶(4/8,4为总桶数,8为每次抽取桶的间隔数)。即最后抽取的桶为 1号桶的一半

数据导入

load

将数据直接加载到表目录中,语法:load data [local] inpath 路径 into table 表名 partition(xx=xx),local 参数的作用是将本地文件系统的文件上传到 hdfs 中。不加 local 代表着,文件在 hdfs 上,并且将文件从 hdfs 上将源文件移动到目标目录。

insert

insert 的方式会运行 MR 程序,通过程序将数据输出到表目录。在某些场景, 必须使用 insert 的方式来导入数据:

  1. 向分桶表插入数据
  2. 如果指定表中的数据,不是以纯文本(TextFile)形式存储,需要使用 insert 方式导入

语法:insert into/overwrite table 表名 select xxx/values(),(),() 可以使用 insert into 或者 insert overwrite 两种方式来插入数据,insert into 代表向表中追加新的数据,insert overwrite 代表先清空表中所有的数据,再向表中添加新的数据。后面数据的来源方式可以自己写在 values 中,也可以使用 select 来将查询结果进行插入。

import

不仅可以导入数据还可以顺便导入元数据(表结构)。Import 只能导入 export 导出的内容。

语法格式为 import external table 表名 from HDFS路径 ,使用 import 导入要遵循这些约束,如果向一个新表中导入数据,hive 会根据要导入表的元数据自动创建表。如果向一个已经存在的表导入数据,在导入之前会先检查表的结构和属性是否一致,只有在表的结构和属性一致时,才会执行导入。不管表是否为空,要导入的分区必须是不存在的。

数据导出

insert

将一条 sql 运算的结果,插入到指定的路径,语法格式为 insert overwrite [local] directory 目录 row format 格式。这里 local 的含义和导入的时候一样,不加 local 代表着将文件导出到 hdfs 上,加上则是导出到文件系统中。

export

导出数据和元数据(表结构),export 会在 hdfs 的导出目录中,生成数据和元数据,并且导出的元数据是和 RDMS无关的。如果是分区表,可以选择将分区表的部分分区进行导出

语法格式为 export table 表名 [partiton(分区信息) ] to HDFS路径

export table deptpart1 to "/export";  -- 导出 deptpart1
import table deptpart1_import from "/export";  -- 导入 deptpart1_import

元数据信息还在

hive (default)> show partitions deptpart1_import;
OK
partition
area=beijing
area=shandong
area=shanghai
Time taken: 0.21 seconds, Fetched: 3 row(s)

排序

Hive 的本质是 MR,MR 中的排序有以下几种:

  1. 全排序: 结果只有一个(只有一个分区),所有的数据整体有序
  2. 部分排序: 结果有多个(有多个分区),每个分区内部有序
  3. 二次排序: 在排序时,比较的条件有多个

在 MR 中,排序在 reduce 之前就已经排好序了,排序是 shuffle 阶段的主要工作。分区是指使用 Partitioner 来进行分区,当 reduceTaskNum>1,使用用户自己定义的分区器,如果没有就使用 HashParitioner,HashParitioner 只根据 key 的 hashcode 来分区。

Hive 中可以使用以下排序:

  1. ORDER BY 列名: 全排序
  2. SORT BY 列名: 部分排序,如果希望自定定义使用哪个字段分区,需要使用 DISTRIBUTE BY
  3. DISTRIBUTE BY 列名: 指定按照哪个字段分区, 结合 sort by 使用
  4. CLUSTER BY 列名: 如果分区的字段和排序的字段一致,且是正序排序,那么可以用 CLUSTER BY,即DISTRIBUTE BY 列名 sort by 列名 asc 等价于 CLUSTER BY 列名 , CLUSTER BY 后不能写排序方式,只能使用默认的按照 asc 排序。

操作实例:

创建表 emp1,并插入数据,数据如下

create table default.emp1(
empno int,
ename string,
job string,
mgr int,
hiredate string, 
sal double, 
comm double,
deptno int)
row format delimited fields terminated by '\t';
7369    SMITH   CLERK   7902    1980-12-17      800.00
207499  ALLEN   SALESMAN        7698    1981-2-20       1600.00 300.00
307521  WARD    SALESMAN        7698    1981-2-22       1250.00 500.00
307566  JONES   MANAGER 7839    1981-4-2        2975.00
207654  MARTIN  SALESMAN        7698    1981-9-28       1250.00 1400.00
307698  BLAKE   MANAGER 7839    1981-5-1        2850.00
307782  CLARK   MANAGER 7839    1981-6-9        2450.00
107788  SCOTT   ANALYST 7566    1987-4-19       3000.00
207839  KING    PRESIDENT               1981-11-17      5000.00
107844  TURNER  SALESMAN        7698    1981-9-8        1500.00 0.00
307876  ADAMS   CLERK   7788    1987-5-23       1100.00
207900  JAMES   CLERK   7698    1981-12-3       950.00
307902  FORD    ANALYST 7566    1981-12-3       3000.00
207934  MILLER  CLERK   7782    1982-1-23       1300.00         10

ORDER BY 查询示例

select * from empno order by empno desc;

结果显示如下

emp1.empno      emp1.ename      emp1.job        emp1.mgr        emp1.hiredate   emp1.sal        emp1.comm       emp1.deptno
307902  FORD    ANALYST 7566    1981-12-3       3000.0  NULL    NULL
307876  ADAMS   CLERK   7788    1987-5-23       1100.0  NULL    NULL
307782  CLARK   MANAGER 7839    1981-6-9        2450.0  NULL    NULL
307698  BLAKE   MANAGER 7839    1981-5-1        2850.0  NULL    NULL
307566  JONES   MANAGER 7839    1981-4-2        2975.0  NULL    NULL
307521  WARD    SALESMAN        7698    1981-2-22       1250.0  500.0   NULL
207934  MILLER  CLERK   7782    1982-1-23       1300.0  NULL    10
207900  JAMES   CLERK   7698    1981-12-3       950.0   NULL    NULL
207839  KING    PRESIDENT       NULL    1981-11-17      5000.0  NULL    NULL
207654  MARTIN  SALESMAN        7698    1981-9-28       1250.0  1400.0  NULL
207499  ALLEN   SALESMAN        7698    1981-2-20       1600.0  300.0   NULL
107844  TURNER  SALESMAN        7698    1981-9-8        1500.0  0.0     NULL
107788  SCOTT   ANALYST 7566    1987-4-19       3000.0  NULL    NULL
7369    SMITH   CLERK   7902    1980-12-17      800.0   NULL    NULL
7369    SMITH   CLERK   7902    1980-12-17      800.0   NULL    207499
Time taken: 38.149 seconds, Fetched: 15 row(s)

SORT BY 和 DISTRIBUTE BY 查询示例

set mapreduce.job.reduces=3; -- 设置 reduce task 的数量为 3
insert overwrite local directory "/tmp/orderby" ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' select * from emp1 distribute by mgr sort by empno desc;

上面 HQL 的意思是对 empno 进行降序排序,并且按照 mgr 字段进行分区【如果不加 distribute by,那么就不知道会对哪个字段进行分区】,mgr 字段是 int 类型的,所以会将该列的值 %3 进行分区,所以结果文件为 3 个。因为 mgr 这一列的值 %3 都等于 0,所以只有第一个分区(分区0)有值,其他的两个分区都是空的。

CLUSTER BY 查询示例

set mapreduce.job.reduces=3; -- 设置 reduce task 的数量为 3
insert overwrite local directory "/tmp/orderby" ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' select * from emp1 cluster by empno;

cluster by empno 相当于 distribute by empno sort by empno desc

函数

函数有所属库的概念,系统提供的除外。系统提供的函数可以在任意库中使用。

查看当前库所有的函数:show functions;
查看函数的使用: desc function 函数名
查看函数的详细使用: desc function extended 函数名

函数的来源

  1. 系统函数,自带的,直接使用即可
  2. 用户自定义的函数

函数按照特征分类

  1. UDF: 用户定义的函数。 一进一出。 输入单个参数,返回单个结果
  2. UDTF: 用户定义的表生成函数。 一进多出。传入一个参数(集合类型),返回一个结果集
  3. UDAF: 用户定义的聚集函数。 多进一出。 传入一列多行的数据,返回一个结果(一列一行)

常用日期函数(hive 默认解析的日期格式必须是: 2019-11-24 08:09:10)

unix_timestamp:返回当前或指定时间的时间戳    
from_unixtime:将时间戳转为日期格式
current_date:当前日期
current_timestamp:当前的日期加时间
* to_date:抽取日期部分
year:获取年
month:获取月
day:获取日
hour:获取时
minute:获取分
second:获取秒
weekofyear:当前时间是一年中的第几周
dayofmonth:当前时间是一个月中的第几天
months_between: 两个日期间的月份,前-后
add_months:日期加减月
datediff:两个日期相差的天数,前-后
date_add:日期加天数
date_sub:日期减天数
last_day:日期的当月的最后一天
date_format格式化日期

常用取整函数

round: 四舍五入
ceil:  向上取整
floor: 向下取整

常用字符串操作函数

upper: 转大写
lower: 转小写
length: 长度
trim:  前后去空格
lpad: 向左补齐,到指定长度
rpad:  向右补齐,到指定长度
regexp_replace: 使用正则表达式匹配目标字符串,匹配成功后替换。SELECT regexp_replace('100-200', '(\d+)', 'num')='num-num

集合操作

size: 集合(map和list)中元素的个数
map_keys: 返回map中的key
map_values:返回map中的value
array_contains:判断array中是否包含某个元素
sort_array: 将array中的元素排序

查询语句和 MySQL 的不同点

A <=> B :  ①A,B都为null,返回true
                   ②A,B一方为null,返回null
                   ③A,B都不为null,等同于A=B

A Rlike B :  B是一个正则表达式,判断A是否负责B表达式的要求,返回true和false

在关联时,只支持等值连接

在管理时,支持满连接,使用full join

7. Hive-1

发表于 2020-08-25 | 分类于 Java 大数据进阶 , Hive

介绍

描述

Hive 是一个数据仓库软件,可以使用 SQL 来促进对已经存在在分布式设备中的数据进行读,写和管理等操作。Hive 在使用时,需要对已经存储的数据进行结构的投影(映射),Hive 提供了一个命令行和 JDBC 的方式,让用户可以连接到 Hive 。注意:Hive 只能分析结构化的数据,Hive 在 Hadoop 之上,使用hive的前提是先要安装 Hadoop

特点

  1. Hive 并不是一个关系型数据库

  2. 不是基于OLTP(在线事务处理。OLTP 设计的软件,侧重点在事务的处理,和在线访问。一般 RDMS 都是基于OLTP设计)设计的

  3. Hive无法做到实时查询,不支持行级别更新(update,delete)
  4. Hive 要分析的数据存储在 HDFS 中,Hive 为数据创建的表结构(schema),存储在 RDMS 中
  5. Hive 基于OLAP(在线分析处理。OLAP 设计的软件,侧重点在数据的分析上,不追求分析的效率)设计的
  6. Hive 使用类 SQL,称为 HQL 对数据进行分析
  7. Hive 容易使用,可扩展,有弹性

安装

保证有 JAVA_HOME, HADOOP_HOME

将 bin 配置到 PATH 中,在环境变量中提供 HIVE_HOME

JAVA_HOME=/opt/module/jdk1.8.0_121
HADOOP_HOME=/opt/module/hadoop-2.7.2
HIVE_HOME=/opt/module/apache-hive-1.2.1-bin

PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin

export JAVA_HOME PATH HADOOP_HOME HIVE_HOME

简单操作

创建表和插入数据

创建一张表

create table person(name varchar(20), age int);

向表中插入数据。hive 执行 HQL 其实是将 HQL 翻译为 MR 程序在 Hadoop 上运行。

insert into person values("张三", 20);
insert into person values("李四", 20);
insert into person values("王五", 20);

查询插入的数据信息

select * from person;

在hdfs 的 /user/hive/warehouse/person 下,有很多文件,文件中存储的就是插入的数据信息,路径中的 person 代表表名。路径下的文件为存储的数据文件

自定义数据文件

创建表的时候如果没有指定数据的分割符,那么结构化的数据分割符默认是 ^ 。还可以在创建表的时候直接指定数据的分隔符,这里使用 \t

create table student1(id int, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

创建数据文件 test_file

1       zhangsan
2       lisi
3       wangwu
4       liqiang
5       zhaoliu

将数据文件上传到 hdfs 中 student 目录下

hadoop fs -put test_file /user/hive/warehouse/student

执行查询就能看到查询结果

hive> select * from student;
OK
1       zhangsan
2       lisi
3       wangwu
4       liqiang
5       zhaoliu
Time taken: 1.33 seconds, Fetched: 5 row(s)

元数据管理

hive 默认的元数据管理是使用 derby,在 hive 中使用 derby 管理元数据的特点是只能允许一个 hive 的连接存在。在输入 hive 命令的目录下会生成一个 metastore_db 文件夹,该文件夹中存储的就是元数据。当切换目录执行 hive 命令时,又会在新目录下生成一个新的 metastore_db 文件夹,查看之前创建的表信息时会发现不存在(因为是新的 metastore_db 文件夹)

当已经有一个 hive 连接存在时,再次创建一个新的连接出现的错误信息

Caused by: ERROR XSDB6: Another instance of Derby may have already booted the database /opt/module/apache-hive-1.2.1-bin/metastore_db.

安装 MySQL

查看是否安装了 mysql,如果安装了需要将其卸载

[[email protected] apache-hive-1.2.1-bin]$ rpm -qa | grep mysql
mysql-libs-5.1.73-7.el6.x86_64
[[email protected] apache-hive-1.2.1-bin]$ sudo rpm -e --nodeps mysql-libs-5.1.73-7.el6.x86_64

上传解压得到 mysql 安装包

[[email protected] soft]$ cd mysql-libs
[[email protected] mysql-libs]$ ls -la
总用量 76060
drwxrwxr-x. 3 rexyan rexyan     4096 8月  24 23:17 .
drwxr-xr-x. 3 rexyan rexyan     4096 8月  24 23:13 ..
-rw-r--r--. 1 rexyan rexyan 18509960 8月  24 23:14 MySQL-client-5.6.24-1.el6.x86_64.rpm
drwxrwxr-x. 4 rexyan rexyan     4096 8月  24 23:14 mysql-connector-java-5.1.27
-rw-r--r--. 1 rexyan rexyan  3575135 8月  24 23:15 mysql-connector-java-5.1.27.tar.gz
-rw-r--r--. 1 rexyan rexyan 55782196 8月  24 23:18 MySQL-server-5.6.24-1.el6.x86_64.rpm

安装 mysql 服务器端

[[email protected] mysql-libs]$ sudo rpm -ivh MySQL-server-5.6.24-1.el6.x86_64.rpm

查看默认密码

[[email protected] mysql-libs]$ sudo cat /root/.mysql_secret
# The random password set for the root user at Tue Aug 25 14:05:01 2020 (local time): eHKx4R0nvli58uB3

启动 mysql 服务端程序

[[email protected] mysql-libs]$ sudo service mysql start 
Starting MySQL.....[确定]
[[email protected] mysql-libs]$ sudo service mysql status 
MySQL running (24737)[确定]
[[email protected] mysql-libs]$

安装 mysql 客户端程序

[[email protected] mysql-libs]$ sudo rpm -ivh MySQL-client-5.6.24-1.el6.x86_64.rpm

连接登录修改密码

[[email protected] mysql-libs]$ mysql -uroot -peHKx4R0nvli58uB3
mysql> SET PASSWORD=PASSWORD('123456');

允许远程登录

管理元数据

拷贝 mysql 驱动 jar 包到 hive lib 目录下

[[email protected] mysql-connector-java-5.1.27]$ cp mysql-connector-java-5.1.27-bin.jar /opt/module/apache-hive-1.2.1-bin/lib/

在 hive conf 目录下创建 hive-site.xml 文件,内容如下

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:mysql://hadoop10:3306/metastore?createDatabaseIfNotExist=true</value>
      <description>JDBC connect string for a JDBC metastore</description>
    </property>

    <property>
      <name>javax.jdo.option.ConnectionDriverName</name>
      <value>com.mysql.jdbc.Driver</value>
      <description>Driver class name for a JDBC metastore</description>
    </property>

    <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>root</value>
      <description>username to use against metastore database</description>
    </property>

    <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>123456</value>
      <description>password to use against metastore database</description>
    </property>
</configuration>

手动创建库 metastore,注意编码要使用 latin1。再次进入 hive 交互终端,就可以看到在 mysql 的 metastore 库中创建了很多表,hive 中创建的表的信息都存储在 tbls 表中,通过 db_id 和 dbs 表中的库进行外键约束,hive 中库的信息都存储在 dbs 表中,hive 中的字段信息存在 column_v2 表中,通过 CD_ID 和表的主键进行外键约束。

常见属性配置

Default 数据仓库的最原始位置是在hdfs上的 /user/hive/warehouse 路径下,如果某张表属于 default 数据库,直接在数据仓库目录下创建一个文件夹。修改 default 数据仓库原始位置可以在 hive-site.xml 文件中添加如下信息。其中 value 就是存储数据的地方,该路径为 hdfs 的路径。

<property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/hive</value>
    <description>location of default database for the warehouse</description>
</property>

如果想在 hive 交互式命令行显示当前数据库,以及查询表的头信息,那么可以在 hive-site.xml 中添加以下配置

<property>
    <name>hive.cli.print.header</name>
    <value>true</value>
</property>

<property>
    <name>hive.cli.print.current.db</name>
    <value>true</value>
</property>

hive 的日志默认存储在 /tmp/用户名/hive.log 中,如果要修改此目录,那么应该复制一份 hive-exec-log4j.properties.template 文件,将新文件名称修改为 hive-log4j.properties。并将其中的 hive.log.dir 值修改为存储日志的地址,重新进入交互式命令行即可。

hive.log.dir=/opt/module/apache-hive-1.2.1-bin/logs

交互式命令参数

在使用 hive 进入交互式操作的时候,可以在 hive 后面跟上一些参数

-d   定义一个变量,在hive启动后,可以使用${变量名}引用变量
--database   指定使用哪个库
-e   后跟一条引号引起来的sql,执行完返回结果后退出cli
-f   执行一个文件中的sql语句,执行完返回结果后退出cli
--hiveconf   在cli运行之前,定义一对属性,hive在运行时,先读取 hadoop的全部8个配置文件,读取之后,再读取  hive-default.xml再读取hive-site.xml, 如果使用--hiveconf,可以定义一组属性,这个属性会覆盖之前读到的参数的值
--hivevar   作用和-d是一致的,定义一个变量
-i   和 -e 类似,区别在于执行完返回结果后不退出 cli
-S   不打印和结果无关的信息

hive 中执行 hdfs 和 linux 命令

!ls /tmp;  # 执行 linux 命令
dfs -ls /user;   # 执行 hdfs 命令

数据类型

基本数据类型

Hive数据类型 Java数据类型 长度
TINYINT byte 1byte有符号整数
SMALINT short 2byte有符号整数
INT int 4byte有符号整数
BIGINT long 8byte有符号整数
BOOLEAN boolean 布尔类型,true或者false
FLOAT float 单精度浮点数
DOUBLE double 双精度浮点数
STRING string 字符系列。可以指定字符集。可以使用单引号或者双引号。
TIMESTAMP 时间类型
BINARY 字节数组

集合数据类型

数据类型 描述
struct 和c语言中的struct类似,都可以通过“点”符号访问元素内容。例如,如果某个列的数据类型是STRUCT{first STRING, last STRING},那么第1个元素可以通过字段.first来引用。
map MAP是一组键-值对元组集合,使用数组表示法可以访问数据。例如,如果某个列的数据类型是MAP,其中键->值对是’first’->’John’和’last’->’Doe’,那么可以通过字段名[‘last’]获取最后一个元素
array 数组是一组具有相同类型和名称的变量的集合。这些变量称为数组的元素,每个数组元素都有一个编号,编号从零开始。例如,数组值为[‘John’, ‘Doe’],那么第2个元素可以通过数组名[1]进行引用。

创表示例

例如有如下两条数据

songsong,bingbing_lili,xiao song:18_xiaoxiao song:19,hui long guan_beijing
yangyang,caicai_susu,xiao yang:18_xiaoxiao yang:19,chao yang_beijing

数据期待的 json 格式如下 ( Map和 Struct 的区别: Struct 中属性名是不变的, Map 中 key 可以变化的)

{
    "name": "songsong",
    "friends": ["bingbing" , "lili"] ,       //列表Array, 
    "children": {                      //键值Map,
        "xiao song": 18 ,
        "xiaoxiao song": 19
    },
    "address": {                      //结构Struct,
        "street": "hui long guan" ,
        "city": "beijing" 
    }
}

根据上面需求,建表语句如下

create table people(
name string,   -- name 字段,类型为 string
friends array<string>,   -- friends 字段,类型为 array,array 中存储 string 类型数据
children map<string,int>,  -- children 字段,类型为 map,key 为 string,value 为 int 类型数据
address struct<street:string,city:string>)  -- address 字段,类型为 struct。有两个属性 street city
row format delimited fields terminated by ','  -- 指定数据字段之间使用 “,” 进行分割
collection items terminated by '_'  -- 字段中得数据使用 '_' 进行分割
map keys terminated by ':'  -- map 中的 key 和 value 使用 ':' 进行分割
lines terminated by '\n';  -- 每行数据使用换行进行分割

将数据上传至 hdfs 指定目录下

[[email protected] test_file]$ hadoop fs -put create_table_test_data_file /hive/people

在 hive 中查询,得到如下结果

hive (default)> select * from people;
OK
people.name     people.friends  people.children people.address
songsong        ["bingbing","lili"]     {"xiao song":18,"xiaoxiao song":19}     {"street":"hui long guan","city":"beijing"}
yangyang        ["caicai","susu"]       {"xiao yang":18,"xiaoxiao yang":19}     {"street":"chao yang","city":"beijing"}
Time taken: 0.749 seconds, Fetched: 2 row(s)

可以看到 friends 列的值是 array 类型,children 是以 map 类型,address 是 struct 类型。

获取 friends 中的第一个值

hive (default)> select friends[0] from people;
OK
_c0
bingbing
caicai
Time taken: 0.166 seconds, Fetched: 2 row(s)

获取 children 中指定 key 的值

hive (default)> select children["xiao song"] from people;
OK
_c0
18
NULL
Time taken: 0.186 seconds, Fetched: 2 row(s)

获取 address 中某个属性的值

hive (default)> select address.city from people;
OK
city
beijing
beijing
Time taken: 0.163 seconds, Fetched: 2 row(s)

库操作

增

CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name  // 库名
[COMMENT database_comment]  // 库的注释说明
[LOCATION hdfs_path]        // 库在hdfs上的路径
[WITH DBPROPERTIES (property_name=property_value, ...)]; // 库的属性

例如创建一个库 mydb2,指定 hdfs 路径位置,并且设置库的属性值

create database  if not exists mydb2 comment 'this is my db' location 'hdfs://hadoop101:9000/mydb2' with dbproperties('ownner'='jack','tel'='12345','department'='IT');

删

drop database 库名 // 只能删除空库
drop database 库名 cascade // 删除非空库

改

use 库名  // 切换使用的库        
dbproperties  // 修改库的属性值
例如:alter database mydb2 set dbproperties('ownner'='tom','empid'='10001'); 同名的属性值会覆盖,之前没有的属性会新增

查

show databases: 查看当前所有的库
show tables in database: 查看库中所有的表
desc database 库名: 查看库的描述信息
desc database extended 库名: 查看库的详细描述信息

表操作

增

CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name 
[(col_name data_type [COMMENT col_comment], ...)]   //表中的字段信息
[COMMENT table_comment]  //表的注释

[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] 
[CLUSTERED BY (col_name, col_name, ...) 
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] 

[ROW FORMAT row_format]   // 表中数据每行的格式,定义数据字段的分隔符,集合元素的分隔符等
[STORED AS file_format]   //表中的数据要以哪种文件格式来存储,默认为TEXTFILE(文本文件)
[LOCATION hdfs_path]  //表在hdfs上的位置

建表时,如果不带 EXTERNAL ,那么创建的表是一个内部表或者叫做管理表。如果带有 EXTERNAL,那么创建的表是一个外部表。外部表和内部表的区别是内部表(管理表)在执行删除操作时,会将表的元数据(schema, 存在 MySQL 中的)和表位置的数据一起删除,外部表在执行删除表操作时,只删除表的元数据(schema, 存在 MySQL 中的)。在企业中,创建的都是外部表,因为在hive中表是廉价的,数据是珍贵的。

内部表和外部表的相互转换:
将表改为外部表 alter table p1 set tblproperties('EXTERNAL'='TRUE');
将表改为管理表 alter table p1 set tblproperties('EXTERNAL'='FALSE');

删

drop table 表名  // 删除表
truncate table 表名 // 清空表数据,只能情况管理表

改

alter table p1 set tblproperties(属性名=属性值);  // 修改表的属性
alter table change 旧列名 新列名 新列类型;  // 修改列的名称和类型

查

desc  表名  // 查看表的描述
desc formatted 表名  // 查看表的详细描述

6. Hadoop HA

发表于 2020-08-25 | 分类于 Java 大数据进阶 , Hadoop

Zookeeper 集群搭建

ZK 的写流程,客户端可以连接任意的 zkserver 实例,向 server 发送写请求命令,如果当前连接的 server 不是Leader,server 会将写命令发送给 Leader,Leader 将写操作命令广播到集群的其他节点,所有节点都执行写操作命令,一旦集群中半数以上的节点写数据成功,Leader 会响应当前 Server,让当前 Server 响应客户端,写操作完成。

集群搭建流程可参考之前 文章), 启动后效果如下:

Hadoop HA

之前搭建 hadoop 集群的时候,NN 和 RM 都只有一个节点,那么实现 hadoop 的 HA,必须保证在 NN 和 RM 故障时,采取容错机制,可以让集群继续使用。

HDFS HA

元数据同步过程

NN 的高可用中元数据的同步过程为,在 active【使用 active 状态来标记主节点,使用 standby 状态标记备用节点】的 NN 格式化后,将空白的 fsimage 文件拷贝到所有的 NN 的机器上,active 的 NN 在启动后,将 edits 文件中的内容发送给 Journalnode 进程,standby 状态的 NN 主动从 Journalnode 进程拷贝数据,保证元数据的同步。Journalnode 在设计时,采用 paxos 协议, Journalnode 适合在奇数台机器上启动,在 hadoop 中,要求至少需要3个 Journalnode 进程,如果开启了 hdfs 的 ha, 就不能再启动 2NN。在同一时刻,最多只能有一个 NN 作为主节点,对外提供服务,其余的 NN,都作为备用节点,不对外提供服务。

搭建过程

1. 修改 core-site.xml 中的 fs.defaultFS 地址

<property>
   <name>fs.defaultFS</name>
   <value>hdfs://mycluster</value>
</property>

mycluster 是自定义的集群名称

2. 修改 hdfs-site.xml 文件,配置 N 个 NN 运行的主机和端口。配置 JournalNode

<!-- 完全分布式集群名称 -->
    <property>
        <name>dfs.nameservices</name>
        <value>mycluster</value>
    </property>

    <!-- 集群中NameNode节点都有哪些 -->
    <property>
        <name>dfs.ha.namenodes.mycluster</name>
        <value>nn1,nn2</value>
    </property>

    <!-- nn1的RPC通信地址 -->
    <property>
        <name>dfs.namenode.rpc-address.mycluster.nn1</name>
        <value>hadoop10:9000</value>
    </property>

    <!-- nn2的RPC通信地址 -->
    <property>
        <name>dfs.namenode.rpc-address.mycluster.nn2</name>
        <value>hadoop11:9000</value>
    </property>

    <!-- nn1的http通信地址 -->
    <property>
        <name>dfs.namenode.http-address.mycluster.nn1</name>
        <value>hadoop10:50070</value>
    </property>

    <!-- nn2的http通信地址 -->
    <property>
        <name>dfs.namenode.http-address.mycluster.nn2</name>
        <value>hadoop11:50070</value>
    </property>

    <!-- 指定NameNode元数据在JournalNode上的存放位置(JournalNode 至少三个) -->
    <property>
        <name>dfs.namenode.shared.edits.dir</name>
    <value>qjournal://hadoop10:8485;hadoop11:8485;hadoop12:8485/mycluster</value>
    </property>

    <!-- 配置隔离机制,即同一时刻只能有一台服务器对外响应。当发生自动故障转移的时候,使用 ssh 发送命令的方式来杀死之前的服务,防止脑裂的情况 -->
    <property>
        <name>dfs.ha.fencing.methods</name>
        <value>sshfence</value>
    </property>

    <!-- 使用隔离机制时需要ssh无秘钥登录 -->
    <property>
        <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>/home/rexyan/.ssh/id_rsa</value>
    </property>

    <!-- 声明journalnode服务器存储目录-->
    <property>
        <name>dfs.journalnode.edits.dir</name>
        <value>/opt/module/hadoop-2.7.2/data/jn</value>
    </property>

    <!-- 关闭权限检查-->
    <property>
        <name>dfs.permissions.enable</name>
        <value>false</value>
    </property>

    <!-- 访问代理类:client,mycluster,active配置失败自动切换实现方式-->
    <property>
          <name>dfs.client.failover.proxy.provider.mycluster</name>
    <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>

上面配置中配置了两个NN 节点,分别在 hadoop10 和 hadoop11 机器上,分别配置了两个节点的 RPC 地址和 HTTP 地址,配置了 Journal 服务所在的位置等。

启动过程

1. 在所有机器上启动 JournalNode

sh xcall hadoop-daemons.sh start journalnode

查看状态

sh xcall jps

2. 格式化 NN,将格式化后的 fsimage 文件同步到其他 NN 节点,启动所有 NN,将其中一个 NN 的状态转换为 active 状态

上面配置了两个 NN,格式化 hadoop10 上的 NN 并格式化

hadoop namenode -format  # 格式化 hadoop 10 上的 NN
hadoop-daemon.sh start namenode   # 启动 hadoop 10 上的 NN

在 hadoop11 上同步 hadoop 10 上的数据,包括 fsimage 文件等

hdfs namenode -bootstrapStandby   # 在 hadoop11 上同步 hadoop 10 上的数据
hadoop-daemon.sh start namenode   # 启动 hadoop11 上的 namenode

启动 nn1 和 nn2 的 datanode

sh xcall hadoop-daemons.sh start datanode

访问 http://hadoop11:50070/ 和 http://hadoop10:50070/ 两个 nn 的 namenode web 地址,两者都显示为 standby

手动将某个 nn 节点修改为 active 状态

hdfs haadmin -transitionToActive nn1  # 手动将 nn1 节点状态改为 active

再次访问 http://hadoop10:50070/ 就能看到状态从 standby 变成了 active。

3. 文件上传测试

上传一个文件到 hdfs 中,因为只有 nn1 是 active 的,所以只有 nn1 提供服务。在 web 页面中只有 nn1 可以看到上传的文件信息。如果手动将 nn1 状态修改为 standby,将 nn2 状态修改为 active,那么就只有 nn2 可以看到上传的文件信息而 nn1 则不可以。

hdfs haadmin -transitionToStandby nn1  # 将 nn1 切换为 Standby
hdfs haadmin -transitionToActive nn2   # 将 nn2 切换为 Active
手动故障转移

在上面的过程中 nn2 已经成为了 active 的状态,现在手动杀死 nn2 的 namenode 进程。因为 nn2 是 actice,且已经被杀死了,所以现在是无法正常提供服务的。

jps  # 先获取 namenode 进程号
kill -9 3596   # 结束 namenode 进程

强制将 nn1 的状态修改为 Active

hdfs haadmin -transitionToActive --forceactive nn1  # 强制将 nn1 修改为 Active 

修改成功后 hdfs 就能正常的提供服务了

自动故障转移

自动故障转移为 HDFS 部署增加了两个新组件:ZooKeeper 和 ZKFailoverController(ZKFC)进程。ZKFC 使用一个健康检查命令定期地 ping 与之在相同主机的 NameNode,只要该 NameNode 及时地回复健康状态,ZKFC认为该节点是健康的。如果该节点崩溃,冻结或进入不健康状态,健康监测器标识该节点为非健康的。并且其他 NameNode 的 ZKFC 进程在 ZooKeeper 抢夺分布式锁,抢到的则成为 Active 状态。

为了防止脑裂情况的发生,hadoop HDFS 提供了两种解决方法,一种是配置 ssh 发送 kill 命令,即其他机器会通过 ssh 的方式来给你发送 kill 命令,防止你是假死。另外一种是自己配置一个脚本,当自己的 ZKFC 进程检测到自己处于不健康的状态时,那么就调用最的脚本将自己杀死。

自动故障转移配置

在 hdfs-site.xml 中配置自动故障转移

<property>
    <name>dfs.ha.automatic-failover.enabled</name>
    <value>true</value>
</property>

在 core-site.xml 中配置 zk 的集群地址信息

<property>
    <name>ha.zookeeper.quorum</name>
    <value>hadoop10:2181,hadoop11:2181,hadoop12:2181</value>
</property>

分发修改的两个文件,然后启动 zk 服务

sh xcall /opt/module/apache-zookeeper-3.6.1-bin/bin/zkServer.sh start   # 启动
sh xcall /opt/module/apache-zookeeper-3.6.1-bin/bin/zkServer.sh status  # 查看状态

初始化 HA 在 Zookeeper 中状态(其实就是在 zk 中新增一个 znode 信息,下面存放 hadoop ha 的信息)

hdfs zkfc -formatZK

启动 hdfs 服务

start-dfs.sh

查看 http://hadoop11:50070/ 和 http://hadoop10:50070/ 发现 hadoop11 成为了active

模拟故障,将 hadoop11 namenode 进程杀死,然后发现 hadoop10 自动成为了 active 状态。

YARN HA

在 yarn-site.xml 中增加下面配置

    <!--启用resourcemanager ha-->
    <property>
        <name>yarn.resourcemanager.ha.enabled</name>
        <value>true</value>
    </property>

    <!--声明两台resourcemanager的地址-->
    <property>
        <name>yarn.resourcemanager.cluster-id</name>
        <value>cluster-yarn1</value>
    </property>

    <property>
        <name>yarn.resourcemanager.ha.rm-ids</name>
        <value>rm1,rm2</value>
    </property>

    <property>
        <name>yarn.resourcemanager.hostname.rm1</name>
        <value>hadoop10</value>
    </property>

    <property>
        <name>yarn.resourcemanager.hostname.rm2</name>
        <value>hadoop11</value>
    </property>

    <!--指定zookeeper集群的地址--> 
    <property>
        <name>yarn.resourcemanager.zk-address</name>
        <value>hadoop10:2181,hadoop11:2181,hadoop12:2181</value>
    </property>

    <!--启用自动恢复--> 
    <property>
        <name>yarn.resourcemanager.recovery.enabled</name>
        <value>true</value>
    </property>

    <!--指定resourcemanager的状态信息存储在zookeeper集群--> 
    <property>
        <name>yarn.resourcemanager.store.class</name>     <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>

上面配置中声明了两台 resourcemanager 的地址分别是 hadoop10 和 hadoop11。还配置了 zookeeper 集群的地址,配置了自动恢复等。

在 hadoop10 和 hadoop11 上启动 rm

yarn-daemon.sh start resourcemanager  # hadoop10 上执行
yarn-daemon.sh start resourcemanager  # hadoop11 上执行

访问 http://hadoop10:8088 和 http://hadoop11:8088 会发现 http://hadoop11:8088 会跳转到 http://hadoop10:8088。所以 hadoop10 上的 Yarn 就变成了 active 对外服务。

将 hadoop10 上的 Yarn 进程杀死会发现只有访问 http://hadoop11:8088 才能成功,因为 http://hadoop11:8088 成为了 active。若再次将 hadoop10 上的 yarn 重新启动后,访问 hadoop10 会跳转到 hadoop11.

压缩

目的和原则

压缩的目的:压缩的目的是在 MR 运行期间,提高 MR 运行的效率,压缩可以减少 MR 运行期间的磁盘IO 和网络IO。

压缩的原则:IO 密集型,多用压缩。计算密集型,CPU 负载过重,少用压缩。

Hadoop 默认支持的压缩格式有 deflate, bzip2, gzip。需要额外安装的有 lzo, snappy。特点是 bzip2 压缩比最高,压缩速度最慢。snappy 压缩速度最快,压缩比凑合。deflate,gzip 折中。

常用配置

压缩常用配置项如下:

io.compression.codecs: 代表整个Job运行期间,可以使用哪些压缩格式,配置这个参数后,配置的压缩格式会被自动初始化,默认值 deflate,gzip,bzip2
mapreduce.map.output.compress: map阶段输出的key-value是否采用压缩,默认值 false
mapreduce.map.output.compress.codec: map阶段输出的key-value采用何种压缩,默认值 deflate
mapreduce.output.fileoutputformat.compress: job在reduce阶段最终的输出是否采用压缩, 默认值 false
mapreduce.output.fileoutputformat.compress.codec: job在reduce阶段最终的输出采用何种压缩,默认值deflate
mapreduce.output.fileoutputformat.compress.type: 如果Job输出的文件以SequenceFile格式,SequenceFile 中的数据,要以何种形式进行压缩。NONE:是否压缩及如何压缩取决于操作系统,RECORD(默认):每个key-value对作为一个单位,压缩一次。BLOCK:SequenceFile中的block,SequenceFile中的block默认为64K,每个block压缩一次!

压缩场景

什么时候需要考虑压缩:

  1. Mapper 的输入: 主要考虑每个文件的大小,如果文件过大,需要使用可以切片的压缩格式。
  2. Reducer 的输出: reducer 的输出主要考虑,输出之后,是否需要下一个 Job 继续处理,如果需要被下个 Job 继续处理,且单个文件过大,也要使用可以切片的压缩格式。
  3. shuffle阶段:能加速即可

调度器

FIFO调度器

FIFO 调度器的特点就是单队列,所有的 Job 按照客户端提交的先后顺序,先到先服务。弊端是如果当前队列中有一个大的 Job,非常消耗资源,那么这个 Job 之后的其他 Job 都需要付额外的等待时间。造成集群的资源利用率不足。

容量调度器

容量调度器的本质是多个 FIFO 的队列组成,Hadoop 默认使用就是容量调度器。

特点是每个队列可以配置一定的容量,空闲的资源可以匀给其他队列临时使用。可以配置每个job使用的容量的限制,防止一个大的 job 独占所有资源。可以配置每个用户可以使用的容量限制,防止当个用户占用所有资源。

公平调度器

公平调度器的设置和容量调度器大致相同,也是多条队列,每天队列都可以设置一定的容量,每个 Job,用户可以设置容量。区别在于公平调度器在调度策略上,采用最大最小公平算法,来调度 Job,这个算法会保证同一个队列中,所有已经提交,未运行结束的 Job,获取到队列中的资源是平等的。

Hadoop的优化

小文件的优化

源头上处理,在上传到集群之前,提前处理小文件
小文件已经在 HDFS 存在,可以使用 hadoop archieve 进行归档
在运行 MR 时,可以使用 CombineTextInputFormat 将多个小文件规划到一个切片中
小文件过多,可以开启 JVM 重用

MR 的优化

合理设置 MapTask 和 ReduceTask 的数量

避免数据倾斜。如果 Map 端的数据发生倾斜,那么在切片时,注意每片数据尽量均匀,防止有些不可切片的数据。Reduce 端的数据倾斜,提前对数据进行抽样调查,统计出大致的分布范围,根据分布范围,合理编写Partitioner,让每个分区的数据尽量均衡。

优化磁盘 IO 和网络 IO。可以启用 combiner。启动压缩。调大 MapTask 缓冲区的大小,减少溢写次数。调大MapTask 中 merge 阶段一次合并的片段数,减少合并花费的时间。调大 reduceTask 中 shuffle 线程可以使用的内存,减少溢写次数。调大 reduceTask 中,input.buffer 的大小,提前缓存部分数据到 buffer 中。

5. MapReduce-2

发表于 2020-08-20 | 分类于 Java 大数据进阶 , Hadoop

MapTask 工作机制

MR 核心阶段:map,sort,copy,sort,reduce

MapTask 分为两个阶段,分别是 map 和 sort。在执行 context.write(keyout-valueout) 之前都属于 map 阶段,然后如果该 job 有 ReduceTask,那么在进行 sort 排序。

在执行 context.write() 并不是直接将 key-value 写出,而是先攒到一个缓存区 MapOutPutBuffer 中。每个记录在进入缓冲区时,先调用 Partitioner(分区器)为记录计算一个区号。例如以单词统计为例,数据进入缓冲区后格式可能如下,index 为索引,partition为分区号,keystart,valuestart 分别表示 key 的起始位置和 value 的起始位置,key 为统计的单词,value 单词出现的个数。

index    partition  keystart  valuestart   key      value
  0          1          0        6        hadoop     1
  1          1          7        11       hive       1
  2          0              xx       xx       spark      1

缓存区有两个线程,一个为收集线程,收集线程负责将 Mapper 写出的 key-value 收集到缓冲区。第二个为溢写线程,溢写线程会在缓冲区已经收集了 80% 空间的数据时【缓冲区大小默认为 100M,80% 即 80M】,被唤醒,唤醒后负责将缓冲区收集的数据溢写到磁盘。一旦缓冲区满足溢写条件,先对缓冲区的所有数据,进行一次排序,利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号 Partition 进行排序,然后按照 key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照 key 有序。排序时,只排索引(记录有序的索引的顺序),不移动数据。达到溢写条件后,按照分区,进行溢写,每次溢写生成一个临时文件 spillx.out【x为分区号】。溢写多次,生成多个临时文件。当所有的数据全部被溢写结束,最后一批数据不满足溢写条件会执行一次 flush。

写结束后,会对所有的溢写片段执行一次 merge (将多个临时文件合并成一个最终结果)操作。合并时,将所有临时文件同一个分区的数据进行汇总,汇总后再排序【归并排序】,最后合并为一个文件,这个文件每个分区中的 key-value 都是有序的。

ReduceTask 工作机制

  1. Copy 阶段:ReduceTask 从各个MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
  2. Merge 阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
  3. Sort 阶段:按照 MapReduce 语义,用户编写 reduce() 函数输入数据是按 key 进行聚集的一组数据。为了将key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
  4. Reduce阶段:reduce() 函数将计算结果写到HDFS上。

Partition分区

默认的分区策略是根据 key 的 hashCode 对 ResultTasks 个数取模得到的。可以从配置中 mapreduce.job.partitioner.class 参数来设置分区器,如果没有设置,就使用 HashPartitioner 作为分区器。分区的数量和 ReduceTask 的数量一致,一个 ReduceTask 对应着一个分区,所以如果想设置多个分区,那么就需要设置 ReduceTask 的数量。

分区案例

将统计结果按照手机归属地不同省份输出到不同文件中(分区)。期望输出数据,手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。

PartitionerFlowBeanDriver 完整代码如下,需要在里面设置 ReduceTask 的数量和自定义使用的分区器。

package com.yanrs.mr.Partitioner;

import com.yanrs.mr.flowbean.FlowBean;
import com.yanrs.mr.flowbean.FlowBeanMapper;
import com.yanrs.mr.flowbean.FlowBeanReducer;
import com.yanrs.mr.wordcount.WCMapper;
import com.yanrs.mr.wordcount.WCReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
 * 启动这个进程,那么就会运行该 job
 */
public class PartitionerFlowBeanDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 获取文件系统
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:9000");

        FileSystem fileSystem = FileSystem.get(conf);

        // 设置输入目录和输出目录
        Path inputPath = new Path("/mrinput/flowbean");
        Path outPath = new Path("/mroutput/partition");
        // 输出目录存在就删除
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }

        // 创建 Job
        Job job = Job.getInstance(conf);

        // 设置 ReduceTask 的数量为  5
        job.setNumReduceTasks(5);
        // 设置使用自定义分区器
        job.setPartitionerClass(MyPatitioner.class);

        // 设置 job 名称
        job.setJobName("PartitionerFlowBean");

        // 设置job运行的 Mapper,Reducer
        job.setMapperClass(PartitionerFlowBeanMapper.class);
        job.setReducerClass(PartitionerFlowBeanReducer.class);

        // 设置 Mapper,Reducer 的输出 key 和 value 类型。
        // job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
        // 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(PartitionerFlowBean.class);

        // 设置输入输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // 运行 Job 并打印日志信息
        job.waitForCompletion(true);
    }
}

MyPatitioner 完整代码如下,接收到每个 key 之后,根据需求将不同的 key 划分到不同的 Partitioner 中。

package com.yanrs.mr.Partitioner;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * KEY , VALUE 是 Mapper 输出的 key,value 的类型
 */
public class MyPatitioner extends Partitioner<Text, PartitionerFlowBean> {

    // getPartition 方法就是计算分区,numPartitions 为总的分区数,也就是 ReduceTask 的数量
    @Override
    public int getPartition(Text text, PartitionerFlowBean partitionerFlowBean, int numPartitions) {
        int partitionNum = 0;
        // 获取手机号的前三位
        String suffix = text.toString().substring(0, 3);
        switch(suffix){
            case "136":
                partitionNum = 1;  // 手机号如果是 136 开头,那么就分到 1 分区
                break;
            case "137":
                partitionNum = 2;  // 手机号如果是 137 开头,那么就分到 2 分区
                break;
            case "138":
                partitionNum = 3;  // 手机号如果是 138 开头,那么就分到 3 分区
                break;
            case "139":
                partitionNum = 4;  // 手机号如果是 139 开头,那么就分到 4 分区
                break;
        }
        return partitionNum;
    }
}

运行之后可以看到结果文件为 5 份,136 开头的号码在 part-r-00001 中,137 开头的号码在 part-r-00002 中,138 开头的号码在 part-r-00003 中,139 开头的号码在 part-r-00004 中,其余的在 part-r-00000 中。

完整代码

排序

自定义比较器的两种方法

1. 定义 Mappper 输出的key,让 key 实现 WritableComparable, 实现 CompareTo()

2. 自定义类时,继承 WriableComparator 或实现 RawCompartor,使用时设置 mapreduce.job.output.key.comparator.class=自定义的类

排序案例

Mapper 输出 key 为内置类型

对消耗的总流量进行升序排序。在之前 flowbean 案例的结果的基础上,对用户手机号所消耗的总流量升序排序。之前 flowbean 结果如下。

13470253144    FlowBean{upFlow=180, downFlow=180, sumFlow=360}
13509468723    FlowBean{upFlow=7335, downFlow=110349, sumFlow=117684}
13560439638    FlowBean{upFlow=918, downFlow=4938, sumFlow=5856}
13568436656    FlowBean{upFlow=3597, downFlow=25635, sumFlow=29232}
13590439668    FlowBean{upFlow=1116, downFlow=954, sumFlow=2070}
...

因为之前案例手机号是 key ,所以输出结果默认是按照手机号 key 进行排序的。即需要注意的一点是,排序只针对 key 进行 什么是 key,那么就对这个字段进行排序。

现在的需求的是根据总流量排序,所以要将总流量做为 key 。上述结果文件总流量可以通过 = 拆分,然后去除 } 获取到。Sort1Mapper 代码如下,这里需要注意的是 mapper 的输出,因为排序只针对 mapper 输出的 key 排序,所以这里 key 是总流量的大小,即类型为 LongWritable【内置类型】,value 为手机号。还需要注意如何拆分字符串能获取到总流量和手机号。

package com.yanrs.mr.sort1;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Sort1Mapper extends Mapper<LongWritable, Text, LongWritable, Text>{
    private LongWritable outKey = new LongWritable();
    private Text outValue = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] words = value.toString().split("=");
        // 封装总流量为 key
        outKey.set(Long.parseLong(words[3].replace("}", "")));
        // 封装手机号为 value
        outValue.set(words[0].split("\t")[0]);
        context.write(outKey, outValue);
    }
}

Sort1Reducer 中需要注意的是,将 mapper 的输出结果进行顺序对换,即 reducer 的输出 key 为手机号,输出 value 为排序好的总流量。

package com.yanrs.mr.sort1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Sort1Reducer extends Reducer <LongWritable, Text, Text, LongWritable>{
    @Override
    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value: values) {
            // 这里写出数据的时候,还是将手机号放在前面,排序好的总流量放在后面
            context.write(value, key);
        }
    }
}

Sort1Driver 中需要注意设置 mapper 的输出类型和 reducer 的输出类型,因为现在两者不一致了,所以需要单独设置

package com.yanrs.mr.sort1;

import com.yanrs.mr.flowbean.FlowBean;
import com.yanrs.mr.flowbean.FlowBeanMapper;
import com.yanrs.mr.flowbean.FlowBeanReducer;
import com.yanrs.mr.wordcount.WCMapper;
import com.yanrs.mr.wordcount.WCReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
 * 启动这个进程,那么就会运行该 job
 */
public class Sort1Driver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 获取文件系统
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:9000");

        FileSystem fileSystem = FileSystem.get(conf);

        // 设置输入目录和输出目录
        Path inputPath = new Path("/mroutput/flowbean/part-r-00000");
        Path outPath = new Path("/mroutput/flowbean/sort1");
        // 输出目录存在就删除
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }

        // 创建 Job
        Job job = Job.getInstance(conf);

        // 设置 job 名称
        job.setJobName("sort1");

        // 设置job运行的 Mapper,Reducer
        job.setMapperClass(Sort1Mapper.class);
        job.setReducerClass(Sort1Reducer.class);

        // 设置 Mapper,Reducer 的输出 key 和 value 类型。
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 设置输入输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // 运行 Job 并打印日志信息
        job.waitForCompletion(true);
    }
}

运行输出结果

13966251146    240
13729199489    240
13768778790    240
13846544121    264
13470253144    360
13956435636    1644
13590439668    2070
15959002129    2118

示例代码

继承 WriableComparator

对消耗的总流量进行降序排序。上面例子 对消耗的总流量进行升序排序 的例子中,因为 mapper 的输出是总流量,类型为 LongWritable,而且 LongWritable 实现了 WritableComparable 接口,并且有 CompareTo 方法,而且 CompareTo 方法是按照升序排序的,所以我在上述例子中使用的就是 LongWritable 实现的比较器,得到的是升序排序的结果。

在对消耗的总流量进行降序排序的例子中,我们需要自己实现一个比较器,实现比较器有两种方式,这里采用自定义类继承 WriableComparator,使用时在 Driver 中设置自定义的比较器即可。

Sort2Mapper

package com.yanrs.mr.sort2;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Sort2Mapper extends Mapper<LongWritable, Text, LongWritable, Text>{
    private LongWritable outKey = new LongWritable();
    private Text outValue = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] words = value.toString().split("=");
        // 封装总流量为 key
        outKey.set(Long.parseLong(words[3].replace("}", "")));
        // 封装手机号为 value
        outValue.set(words[0].split("\t")[0]);
        context.write(outKey, outValue);
    }
}

Sort2Mapper

package com.yanrs.mr.sort2;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Sort2Mapper extends Mapper<LongWritable, Text, LongWritable, Text>{
    private LongWritable outKey = new LongWritable();
    private Text outValue = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] words = value.toString().split("=");
        // 封装总流量为 key
        outKey.set(Long.parseLong(words[3].replace("}", "")));
        // 封装手机号为 value
        outValue.set(words[0].split("\t")[0]);
        context.write(outKey, outValue);
    }
}

Sort2Driver

package com.yanrs.mr.sort2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
 * 启动这个进程,那么就会运行该 job
 */
public class Sort2Driver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 获取文件系统
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:9000");

        FileSystem fileSystem = FileSystem.get(conf);

        // 设置输入目录和输出目录
        Path inputPath = new Path("/mroutput/flowbean/part-r-00000");
        Path outPath = new Path("/mroutput/flowbean/sort2");
        // 输出目录存在就删除
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }

        // 创建 Job
        Job job = Job.getInstance(conf);

        // 设置 job 名称
        job.setJobName("sort1");

        // 设置使用自定义的比较器
        job.setSortComparatorClass(MyDescComparator.class);

        // 设置job运行的 Mapper,Reducer
        job.setMapperClass(Sort2Mapper.class);
        job.setReducerClass(Sort2Reducer.class);

        // 设置 Mapper,Reducer 的输出 key 和 value 类型。
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 设置输入输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // 运行 Job 并打印日志信息
        job.waitForCompletion(true);
    }
}

MyDescComparator

package com.yanrs.mr.sort2;


import org.apache.hadoop.io.WritableComparator;

public class MyDescComparator extends WritableComparator {
    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        long thisValue = readLong(b1, s1);
        long thatValue = readLong(b2, s2);
        return thisValue < thatValue ? 1 : (thisValue == thatValue ? 0 : -1);
    }
}

运行输出结果

13509468723    117684
13975057813    59301
13568436656    29232
13736230513    27162
15043685818    7197
.....

示例代码

Mapper 输出 key 为自定义类型

对消耗的总流量进行降序排序。自定义 key 的时候需要实现 WritableComparable 接口,而不是以前的 Writable 接口。实现 WritableComparable 后重写 compareTo 方法,在里面实现要比较的逻辑即可。

FlowBeanSort3Mapper

package com.yanrs.mr.sort3;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class FlowBeanSort3Mapper extends Mapper<LongWritable, Text, FlowBeanSort3, Text>{
    private Text outValue = new Text();
    private FlowBeanSort3 outKey = new FlowBeanSort3();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // key 为序列号,value 为每行的内容
        String[] words = value.toString().split("\t");
        // 上行流量
        outKey.setUpFlow(Long.parseLong(words[words.length - 3]));
        // 下行流量
        outKey.setDownFlow(Long.parseLong(words[words.length - 2]));
        outKey.setSumFlow(Long.parseLong(words[words.length - 2]) + Long.parseLong(words[words.length - 3]));
        // 封装手机号
        outValue.set(words[1]);
        context.write(outKey, outValue);
    }
}

FlowBeanSort3Reducer

package com.yanrs.mr.sort3;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowBeanSort3Reducer extends Reducer <FlowBeanSort3, Text, Text, FlowBeanSort3>{
    @Override
    protected void reduce(FlowBeanSort3 key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value:values) {
            context.write(value, key);
        }
    }
}

FlowBeanSort3Driver

package com.yanrs.mr.sort3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
/**
 * 启动这个进程,那么就会运行该 job
 */
public class FlowBeanSort3Driver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 获取文件系统
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:9000");

        FileSystem fileSystem = FileSystem.get(conf);

        // 设置输入目录和输出目录
        Path inputPath = new Path("/mrinput/flowbean");
        Path outPath = new Path("/mroutput/flowbean/sort3");
        // 输出目录存在就删除
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }

        // 创建 Job
        Job job = Job.getInstance(conf);

        // 设置 job 名称
        job.setJobName("sort3");

        // 设置job运行的 Mapper,Reducer
        job.setMapperClass(FlowBeanSort3Mapper.class);
        job.setReducerClass(FlowBeanSort3Reducer.class);

        // 设置 Mapper,Reducer 的输出 key 和 value 类型。
        job.setMapOutputKeyClass(FlowBeanSort3.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBeanSort3.class);

        // 设置输入输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // 运行 Job 并打印日志信息
        job.waitForCompletion(true);
    }
}

FlowBeanSort3

package com.yanrs.mr.sort3;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class FlowBeanSort3 implements WritableComparable<FlowBeanSort3> {
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    /**
     * 序列化, 在写出属性时,如果属性为引用数据类型,那么属性不能为 null
     * @param dataOutput
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    /**
     * 反序列化,反序列化和序列化的顺序要一致
     * @param dataInput
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        upFlow = dataInput.readLong();
        downFlow = dataInput.readLong();
        sumFlow = dataInput.readLong();
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public FlowBeanSort3() {
    }

    @Override
    public String toString() {
        return "PartitionerFlowBean{" +
                "upFlow=" + upFlow +
                ", downFlow=" + downFlow +
                ", sumFlow=" + sumFlow +
                '}';
        }

    @Override
    public int compareTo(FlowBeanSort3 flowBeanSort3) {
        return this.sumFlow -flowBeanSort3.getSumFlow() > 0?-1:1;
    }
}

示例代码

实现 RawCompartor 接口

对消耗的总流量进行降序排序。实现 RawCompartor 接口后会重写两个 compare 方法,在一个方法中获取比较的对象,另一个方法中进行比较。

MyDescRawCompartor

package com.yanrs.mr.sort4;

import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;

import java.io.IOException;

public class MyDescRawCompartor implements RawComparator<FlowBeanSort4> {
    private  FlowBeanSort4 key1 = new FlowBeanSort4();
    private  FlowBeanSort4 key2 = new FlowBeanSort4();
    private  DataInputBuffer buffer = new DataInputBuffer();
    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        try {
            this.buffer.reset(b1, s1, l1);
            this.key1.readFields(this.buffer);
            this.buffer.reset(b2, s2, l2);
            this.key2.readFields(this.buffer);
            this.buffer.reset((byte[])null, 0, 0);
        } catch (IOException var8) {
            throw new RuntimeException(var8);
        }

        return this.compare(this.key1, this.key2);
    }

    @Override
    public int compare(FlowBeanSort4 o1, FlowBeanSort4 o2) {
        return o1.getSumFlow() - o2.getSumFlow() > 0?-1:1;
    }
}

示例代码

Combiner

Combiner 实际上本质是一个 Reducer 类,Conbiner 只有在设置了之后,才会运行。combiner 的作用是在shuffle 阶段对相同 key 的 key-value 进行提前合并,以便在传输中可以减少磁盘 IO 和网络 IO。

Combiner 和 Reducer 的区别

Reducer 是在 reduce 阶段调用,Combiner 是在 shuffle 阶段调用【既有可能在 MapTask 端,也可能在ReduceTask 端】。但本质都是 Reducer 类,作用都是对有相同 key 的 key-value 进行合并。

Combiner 使用条件

Combiner 用在+,- 操作的场景,不能用在 *,/ 操作的场景。使用 Combiner 必须保证不能影响处理逻辑和结果。

使用时在 Driver 中设置Combiner 类即可 job.setCombinerClass(Reducer 类.class);

调用时机

MapTask 端调用:

  1. 每次溢写前会调用 Combiner 对溢写的数据进行局部合并。
  2. 在merge时如果溢写的分区数 >=3,如果设置了 Combiner,Combiner 会再次对数据进行 Combine。

ReduceTask 端调用:

  1. shuffle 线程拷贝多个 MapTask 同一分区的数据,拷贝后执行 merge 和 sort, 如果数据量过大,需要将部分数据先合并排序后,溢写到磁盘。如果设置了Combiner,Combiner 会再次运行。

案例

在之前的 flowbean 案例上,只需要在 Driver 中添加配置 job.setCombinerClass(FlowBeanReducer.class); 即可。

未添加 combine 之前结果如下:

添加 combine 之后结果如下

分组

分组通过分组比较器,对进入reduce的key进行对比,key相同的分为一组,一次性进入Reducer,被调用reduce方法。

自定义分组比较器

用户可以自定义 key 的分组比较器,自定义的比较器必须是一个 RawComparator类型的类然后实现compareTo()方法。如果没有设置 key 的分组比较器,默认采取在 Map 阶段排序时,key 的比较器。

分组案例

样例数据如上所示,现在需要求出每一个订单中最贵的商品。思路:将订单数据分装为 bean 对象,然后将 bean 做为 mapper 的输出 key,并让 bean 实现 WritableComparable 接口,重写 compareTo 方法,compareTo 方法中先对 订单 id 进行排序,若订单 id 相同再对成交金额进行排序。这样数据就是按照订单中金额有序排序的了。

总结

分区

总的分区数取决于reduceTask的数量,一个Job要启动几个reduceTask,取决于期望产生几个分区,每个分区最后都会生成一个结果文件。

当 reduceTask>1,尝试获取用户设置的 Partionner,如果没有设置使用内置的 HashPartitoner。如果reduceTask<=1, 系统默认提供一个 Partionner,它会将所有记录都分到0号区。

排序

每次溢写前,使用快速排序最后merge时,使用归并排序

比较器

如果用户自定义了比较器,MR 就使用用户自定义的比较器(RawComparator 类型),如果用户没有自定义,那么Mapper 输出的 Key 需要实现 WriableComparable 接口系统会自动提供比较器。不管是自己提供比较器还是实现WriableComparable 接口,最后在比较时,都是在调用自己实现的CompareTo 方法。

执行流程

  1. Partitioner计算分区
  2. 满足溢写条件,对所有数据进行排序,排序时用比较器对比 key,每次溢写前的排序,默认使用的快排。如果设置了 Combiner,在溢写前,排好序的结果会先被 Combiner 进行 combine 再溢写。
  3. 2过程会发生 N 次
  4. 所有的溢写片段需要 merge 为一个总的文件,合并时,使用归并排序,对 key 进行排序。如果溢写片段数量超过 3,在溢写成一个最终的文件时,Combiner 再次调用,执行Combine,combine 后再溢写。

4. MapReduce-1

发表于 2020-07-28 | 分类于 Java 大数据进阶 , Hadoop

MR 相关概念

  1. Job(作业) : 一个MR程序称为一个Job
  2. MRAppMaster(MR任务的主节点): 一个Job在运行时,会先启动一个进程,这个进程为 MRAppMaster。负责 Job 中执行状态的监控,容错,和 RM 申请资源,提交 Task 等。
  3. Task(任务): Task是一个进程,负责某项计算。
  4. Map(Map阶段): Map 是 MapReduce 程序运行的第一个阶段。Map阶段的目的是将输入的数据,进行切分。将一个大数据,切分为若干小部分。切分后,每个部分称为1片(split),每片数据会交给一个Task(进程)进行计算,负责 Map 阶段的 Task 称为 MapTask。在一个 MR 程序的 Map 阶段,会启动N(取决于切片数,多少个切片就会启动多少个 MapTask)个 MapTask。每个 MapTask 是并行运行。
  5. Reduce(Reduce阶段): Reduce 是MapReduce 程序运行的第二个阶段(最后一个阶段),Reduce 阶段的目的是将 Map 阶段,每个 MapTask 计算后的结果进行合并汇总,得到最终结果。Reduce阶段是可选的,不一定有。负责 Reduce 阶段的 Task 称为ReduceTask。一个Job可以通过设置,启动N个ReduceTask,这些ReduceTask也是并行运行,每个ReduceTask最终都会产生一个结果。

MR 相关组件

  1. Mapper: map 阶段核心的处理逻辑
  2. Reducer: reduce 阶段核心的处理逻辑
  3. InputFormat: 输入格式。MR 程序必须指定一个输入目录,一个输出目录,InputFormat 代表输入目录中文件的格式。如果是普通文件,可以使用FileInputFormat。如果是SequeceFile(hadoop提供的一种文件格式),可以使用 SequnceFileInputFormat,如果处理的数据在数据库中,需要使用 DBInputFormat。
  4. RecordReader: 记录读取器。RecordReader 负责从输入格式中,读取数据,读取后封装为一组记录(k-v)。
  5. OutPutFormat: 输出格式。OutPutFormat 代表 MR 处理后的结果,要以什么样的文件格式写出。将结果写出到一个普通文件中,可以使用 FileOutputFormat,将结果写出到数据库中,可以使用 DBOutPutFormat,将结果写出到 SequeceFil e中,可以使用 SequnceFileOutputFormat。
  6. RecordWriter: 记录写出器。将处理的结果以什么样的格式写出到输出文件中。
  7. Partitioner: 分区器。负责在 Mapper 将数据写出时,为每组 keyout-valueout 打上标记,进行分区。一个ReduceTask只会处理一个分区的数据。

MR 流程

  1. InputFormat 调用 RecordReader,从输入目录的文件中,读取一组数据,封装为 keyin-valuein 对象
  2. 将封装好的 key-value,交给 Mapper.map() ——>将处理的结果写出 keyout-valueout
  3. ReduceTask 启动 Reducer,使用 Reducer.reduce() 处理 Mapper写出的 keyout-valueout,
  4. OutPutFormat 调用 RecordWriter,将 Reducer 处理后的 keyout-valueout 写出到文件

Map阶段(MapTask): 切片(Split) —– 读取数据(Read) —– 交给Mapper处理(Map) —– 分区和排序(sort)
Reduce阶段(ReduceTask): 拷贝数据(copy) —– 排序(sort) —– 合并(reduce) —– 写出(write)

MR 编程

MR的编程只需要将自定义的组件和系统默认组件进行组合,组合之后运行即可,步骤:

  1. Map 阶段的核心处理逻辑需要编写在 Mapper 中
  2. Reduce 阶段的核心处理逻辑需要编写在 Reducer 中
  3. 将编写的 Mapper 和 Reducer 进行组合,组合成一个 Job
  4. 对 Job 进行设置,设置后运行

wordcount

InputFormat 的实现类很多

InputFormat 的作用:

  1. 验证输入目录中的文件格式,是否符合当前 Job 的要求
  2. 生成切片,每个切片都会交给一个 MapTask 处理
  3. 提供 RecordReader,由 RecordReader 从切片中读取记录,交给 Mapper 处理

InputFormat 中的 List<InputSplit> getSplits 方法的功能就是切片。ecordReader<K,V> createRecordReader 的功能是创建 RecordReader。默认 Hadoop 使用的是 TextInputFormat,而 TextInputFormat 创建的 RecordReader 是 LineRecordReader。所以 Hadoop 默认的 InputFormat 使用 TextInputFormat,默认是 Reader 使用 LineRecordReader。

本地模式

WCMapper 完整代码

package com.yanrs.mr.wordcount;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * KEYIN, VALUEIN: mapper 输入的 key-value 类型,由当前 JOb 的 InputFormat的 RecordReader 决定
 * KEYOUT, VALUEOUT:mapper 输出的 key-value 类型
 */
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text outKey = new Text();
    private IntWritable outValue = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // key 是行号,value 是一行的文本内容
        System.out.println("keyin: " + key + " valuein: " + value);
        // 将文本内容进行拆分,得到一个个单词组成的数组
        String[] words = value.toString().split("\t");
        // 遍历数组,并输出,输出格式为(单词,1)
        for (String word:words) {
            outKey.set(word);
            context.write(outKey, outValue);
        }
    }
}

WCReducer 完整代码

package com.yanrs.mr.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * KEYIN,VALUEIN: Mapper 的输出做为这里的输入
 * KEYOUT,VALUEOUT: 自定义,因为这个 MR 程序是统计单词出现的频率,所以这里类型为 Text, IntWritable
 */
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable outValue = new IntWritable();

    //reduce 方法一次处理一组数据,key(单词) 相同的数据是一组
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        // 遍历每个 key(单词) ,让相同的 key(单词) 的值进行累加
        for (IntWritable value:values) {
            sum+=value.get();
        }

        outValue.set(sum);
        // 将结果写出,key 是单词,outValue 是累加的次数
        context.write(key, outValue);
    }
}

WCDriver 完整代码

package com.yanrs.mr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
 * 启动这个进程,那么就会运行该 job
 */
public class WCDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 获取文件系统
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
        FileSystem fileSystem = FileSystem.get(conf);

        // 设置输入目录和输出目录
        Path inputPath = new Path("/wcinput");
        Path outPath = new Path("/mroutput");
        // 输出目录存在就删除
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }

        // 创建 Job
        Job job = Job.getInstance(conf);

        // 设置 job 名称
        job.setJobName("wordcount");

        // 设置job运行的 Mapper,Reducer
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);

        // 设置 Mapper,Reducer 的输出 key 和 value 类型。
        // job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
        // 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // 运行 Job 并打印日志信息
        job.waitForCompletion(true);
    }
}

直接在 idea 中运行 WCDriver 的 main 方法即可。上面设置连接的是 Hadoop10 的文件系统,但是是在本地运行的。

代码地址

yarn 上运行

WCMapper 完整代码,同上

WCReducer 完整代码,同上

在 yarn 上运行,需要指定运行方式为 yarn,且指定 resourcemanager 的地址

// 设置在 yarn 上运行
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "hadoop11");

还需要设置 job 所在的 jar 包

// yarn 运行时候还需要设置 job 所在的 jar 包
job.setJarByClass(WCDriver.class);
// 或者使用

将代码打包,上传到 hadoop 上,使用 hadoop jar 命令运行

hadoop jar mapreduce-test-1.0-SNAPSHOT.jar com.yanrs.mr.wordcount.WCDriver

WCDriver 完整代码

package com.yanrs.mr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
 * 启动这个进程,那么就会运行该 job
 */
public class WCDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 获取文件系统
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
        // 设置在 yarn 上运行
        conf.set("mapreduce.framework.name", "yarn");
        conf.set("yarn.resourcemanager.hostname", "hadoop11");

        FileSystem fileSystem = FileSystem.get(conf);

        // 设置输入目录和输出目录
        Path inputPath = new Path("/wcinput");
        Path outPath = new Path("/mroutput");
        // 输出目录存在就删除
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }

        // 创建 Job
        Job job = Job.getInstance(conf);

        // yarn 运行时候还需要设置 job 所在的 jar 包
        job.setJarByClass(WCDriver.class);
        // 或者使用
        // job.setJar("mapreduce-test-1.0-SNAPSHOT.jar");

        // 设置 job 名称
        job.setJobName("wordcount");

        // 设置job运行的 Mapper,Reducer
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);

        // 设置 Mapper,Reducer 的输出 key 和 value 类型。
        // job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
        // 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // 运行 Job 并打印日志信息
        job.waitForCompletion(true);
    }
}

代码地址

自定义 Bean

数据格式如上所示,需要统计每个手机消耗的上行,下行,总流量信息

FlowBeanMapper 代码如下,mapper 输入参数 key 为行号,value 为一行的文本。mapper 输出参数 key 手机号,value 为 bean 对象(对象中分别有上行,下行,总流量三个属性)

package com.yanrs.mr.flowbean;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * mapper 输入参数 key 为行号,value 为一行的文本
 * mapper 输出参数 key 手机号,value bean 对象(对象中分别有上行,下行,总流量三个属性)
 */
public class FlowBeanMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
    private Text outKey = new Text();
    private FlowBean flowBean = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // key 为序列号,value 为每行的内容
        String[] words = value.toString().split("\t");

        // 封装手机号
        outKey.set(words[1]);
        // 上行流量
        flowBean.setUpFlow(Long.parseLong(words[words.length - 3]));
        // 下行流量
        flowBean.setDownFlow(Long.parseLong(words[words.length - 2]));
        context.write(outKey, flowBean);
    }
}

FlowBean 为实体类,有三个属性,需要实现 hadoop 的序列化方法。需要重写 write(称为序列化) 和 readFields(称为反序列化) 方法。并且反序列化和序列化的顺序要一致,并且提供属性的 get,set 方法,空参构造,toString 方法。

package com.yanrs.mr.flowbean;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class FlowBean implements Writable {
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    /**
     * 序列化, 在写出属性时,如果属性为引用数据类型,那么属性不能为 null
     * @param dataOutput
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    /**
     * 反序列化,反序列化和序列化的顺序要一致
     * @param dataInput
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        upFlow = dataInput.readLong();
        downFlow = dataInput.readLong();
        sumFlow = dataInput.readLong();
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public FlowBean() {
    }

    @Override
    public String toString() {
        return "FlowBean{" +
                "upFlow=" + upFlow +
                ", downFlow=" + downFlow +
                ", sumFlow=" + sumFlow +
                '}';
        }
}

FlowBeanReducer 处理 FlowBeanMapper 输出的数据,所以输入 key 和 value 的类型分别为 Text 和 FlowBean。输出也为 Text, FlowBean

package com.yanrs.mr.flowbean;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * 输入 key 和 value 的类型分别为 Text 和 FlowBean
 *
 */
public class FlowBeanReducer extends Reducer <Text, FlowBean, Text, FlowBean>{

    private FlowBean outValue = new FlowBean();

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        // 累加每个手机号的上行流量和下行流量,并计算总流量
        long sumUpFlow = 0;
        long sumDownFlow = 0;

        for (FlowBean flowBean: values) {
            sumUpFlow += flowBean.getUpFlow();
            sumDownFlow += flowBean.getDownFlow();
        }

        // 将值封装进入 FlowBean 中
        outValue.setDownFlow(sumDownFlow);
        outValue.setUpFlow(sumUpFlow);
        outValue.setSumFlow(sumDownFlow + sumUpFlow);

        context.write(key, outValue);
    }
}

FlowBeanDriver 中设置输入和输出目录,设置 MapperClass 和 ReducerClass。设置 Mapper,Reducer 的输出 key 和 value 类型。

package com.yanrs.mr.flowbean;

import com.yanrs.mr.wordcount.WCMapper;
import com.yanrs.mr.wordcount.WCReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
 * 启动这个进程,那么就会运行该 job
 */
public class FlowBeanDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 获取文件系统
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:9000");

        FileSystem fileSystem = FileSystem.get(conf);

        // 设置输入目录和输出目录
        Path inputPath = new Path("/mrinput/flowbean");
        Path outPath = new Path("/mroutput/flowbean");
        // 输出目录存在就删除
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }

        // 创建 Job
        Job job = Job.getInstance(conf);

        // 设置 job 名称
        job.setJobName("FlowBean");

        // 设置job运行的 Mapper,Reducer
        job.setMapperClass(FlowBeanMapper.class);
        job.setReducerClass(FlowBeanReducer.class);

        // 设置 Mapper,Reducer 的输出 key 和 value 类型。
        // job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
        // 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 设置输入输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // 运行 Job 并打印日志信息
        job.waitForCompletion(true);
    }
}

因为没有配置在 yarn 上运行,所以直接 idea 运行即可。结果如下

13470253144    FlowBean{upFlow=180, downFlow=180, sumFlow=360}
13509468723    FlowBean{upFlow=7335, downFlow=110349, sumFlow=117684}
13560439638    FlowBean{upFlow=918, downFlow=4938, sumFlow=5856}
13568436656    FlowBean{upFlow=3597, downFlow=25635, sumFlow=29232}
13590439668    FlowBean{upFlow=1116, downFlow=954, sumFlow=2070}
13630577991    FlowBean{upFlow=6960, downFlow=690, sumFlow=7650}
13682846555    FlowBean{upFlow=1938, downFlow=2910, sumFlow=4848}
......

代码地址

默认的切片流程

片和块的关系

片:在计算MR程序时,才会切片。在运行程序时,临时将文件从逻辑上划分为若干部分(所以只是逻辑上的切片,并不是真正的切分),使用的输入格式不同(不同的 InputFormat),切片的方式不同,切片的数量也不同。每片的数据最终也是以块的形式存储在 HDFS。

块: 在向HDFS写文件时,文件中的内容以块为单位存储,块是实际的物理存在。

建议: 片大小最好等于块大小,将片大小设置和块大小一致,可以最大限度减少因为切片带来的磁盘IO和网络IO,MR计算框架速度慢的原因在于在执行MR时,会发生频繁的磁盘IO和网络IO。理论上来说:如果文件的数据量是一定的话,片越大,切片数量少,启动的 MapTask 少,Map 阶段运算慢,片越小,切片数量多,启动的MapTask多,Map阶段运算快。默认情况下片大小就是块大小,即文件的块大小默认为 128M,默认每片就是128M。MapTask的数量只取决于切片数,有多少切片就有多少个 MapTask

如果需要调节片大小 > 块大小:那么需要配置 mapreduce.input.fileinputformat.split.minsize > 128M

如果需要调节片大小 < 块大小:那么需要配置 mapreduce.input.fileinputformat.split.maxsize < 128M

FileInputFormat的切片策略(默认)
  1. 获取当前输入目录中所有的文件
  2. 以文件为单位切片,如果文件为空文件,默认创建一个空的切片
  3. 如果文件不为空,尝试判断文件是否可切(不是压缩文件,都可切)
  4. 如果文件不可切,整个文件作为1片
  5. 如果文件可切,先获取片大小(默认等于块大小),循环判断 待切部分/ 片大小 > 1.1倍,如果大于先切去一片,再判断…
  6. 剩余部分整个作为1片

常见的输入格式

FileInputFormat 中有六个子类,下面总结一下常见的四个子类的切片策略和 RecordReader

TextInputFormat

TextInputFormat 常用于输入目录中全部是文本文件

切片策略: 默认的切片策略

RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为key,一行内容作为value,即 key 的类型为 LongWritable,value 的类型为 Text

上面的 wordcount 例子就是使用的默认的 TextInputFormat

NlineInputFormat

切片策略: 以文件为单位,读取配置中 mapreduce.input.lineinputformat.linespermap 参数(默认为1),每次这么多行切为一片。

RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为key,一行内容作为value,即 key 的类型为 LongWritable,value 的类型为 Text

NLMapper 完整代码

package com.yanrs.mr.nline;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * KEYIN, VALUEIN: mapper 输入的 key-value 类型,由当前 JOb 的 InputFormat的 RecordReader 决定
 * KEYOUT, VALUEOUT:mapper 输出的 key-value 类型
 */
public class NLMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text outKey = new Text();
    private IntWritable outValue = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // key 是行号,value 是一行的文本内容
        System.out.println("keyin: " + key + " valuein: " + value);
        // 将文本内容进行拆分,得到一个个单词组成的数组
        String[] words = value.toString().split("\t");
        // 遍历数组,并输出,输出格式为(单词,1)
        for (String word:words) {
            outKey.set(word);
            context.write(outKey, outValue);
        }
    }
}

NLReducer 完整代码

package com.yanrs.mr.nline;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * KEYIN,VALUEIN: Mapper 的输出做为这里的输入
 * KEYOUT,VALUEOUT: 自定义,因为这个 MR 程序是统计单词出现的频率,所以这里类型为 Text, IntWritable
 */
public class NLReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable outValue = new IntWritable();

    //reduce 方法一次处理一组数据,key(单词) 相同的数据是一组
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        // 遍历每个 key(单词) ,让相同的 key(单词) 的值进行累加
        for (IntWritable value:values) {
            sum+=value.get();
        }

        outValue.set(sum);
        // 将结果写出,key 是单词,outValue 是累加的次数
        context.write(key, outValue);
    }
}

NLDriver 完整代码。在 Driver 中新增设置使用 NLineInputFormat。默认是一行切分为一片,如果需要设置可以在 conf 中设置 mapreduce.input.lineinputformat.linespermap 值即可。

package com.yanrs.mr.nline;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
 * 启动这个进程,那么就会运行该 job
 */
public class NLDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 获取文件系统
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
        // 设置几行为一片,默认一行一片
        // conf.set("mapreduce.input.lineinputformat.linespermap", "2");

        FileSystem fileSystem = FileSystem.get(conf);

        // 设置输入目录和输出目录
        Path inputPath = new Path("/mrinput/nline");
        Path outPath = new Path("/mroutput/nline");
        // 输出目录存在就删除
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }

        // 创建 Job
        Job job = Job.getInstance(conf);

        // 设置使用 NLineInputFormat
        job.setInputFormatClass(NLineInputFormat.class);

        // 设置 job 名称
        job.setJobName("nline");

        // 设置job运行的 Mapper,Reducer
        job.setMapperClass(NLMapper.class);
        job.setReducerClass(NLReducer.class);

        // 设置 Mapper,Reducer 的输出 key 和 value 类型。
        // job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
        // 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // 运行 Job 并打印日志信息
        job.waitForCompletion(true);
    }
}

代码地址

KeyValueTextInputFormat

针对文本文件,使用分割字符,将每一行分割为 key 和 value,如果没有找到分隔符,当前行的内容作为 key,value 为空串。默认分隔符为 \t,可以通过参数 mapreduce.input.keyvaluelinerecordreader.key.value.separator 指定。

切片策略:默认的切片策略

RecordReader : key 和 value 的类型都是 Text

KVMapper 完整代码

package com.yanrs.mr.keyvalue;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * KEYIN, VALUEIN: mapper 输入的 key-value 类型,由当前 JOb 的 InputFormat的 RecordReader 决定
 * KEYOUT, VALUEOUT:mapper 输出的 key-value 类型
 */
public class KVMapper extends Mapper<Text, Text, Text, IntWritable> {

    private IntWritable outValue = new IntWritable(1);

    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        // key 是 * 之前的姓名,value 是计数1
        context.write(key, outValue);
    }
}

KVReducer 完整代码

package com.yanrs.mr.keyvalue;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * KEYIN,VALUEIN: Mapper 的输出做为这里的输入
 * KEYOUT,VALUEOUT: 自定义,因为这个 MR 程序是统计单词出现的频率,所以这里类型为 Text, IntWritable
 */
public class KVReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable outValue = new IntWritable();

    //reduce 方法一次处理一组数据,key(单词) 相同的数据是一组
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        // 遍历每个 key(单词) ,让相同的 key(单词) 的值进行累加
        for (IntWritable value:values) {
            sum+=value.get();
        }

        outValue.set(sum);
        // 将结果写出,key 是单词,outValue 是累加的次数
        context.write(key, outValue);
    }
}

KVDriver 完整代码如下,需要设置使用 KeyValueTextInputFormat,并且需要设置分隔符,需要注意的是分隔符只是一个 byte 类型的数据,即便传入的是一个字符串,也只会读取第一个字符。

package com.yanrs.mr.keyvalue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
 * 启动这个进程,那么就会运行该 job
 */
public class KVDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 获取文件系统
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
        // 设置分隔符(需要注意的是分隔符只是一个 byte 类型的数据,即便传入的是一个字符串,也只会读取第一个字符)
        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "*");

        FileSystem fileSystem = FileSystem.get(conf);

        // 设置输入目录和输出目录
        Path inputPath = new Path("/mrinput/keyvalue");
        Path outPath = new Path("/mroutput/keyvalue");
        // 输出目录存在就删除
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }

        // 创建 Job
        Job job = Job.getInstance(conf);

        // 设置使用 KeyValueTextInputFormat
        job.setInputFormatClass(KeyValueTextInputFormat.class);

        // 设置 job 名称
        job.setJobName("keyvalue");

        // 设置job运行的 Mapper,Reducer
        job.setMapperClass(KVMapper.class);
        job.setReducerClass(KVReducer.class);

        // 设置 Mapper,Reducer 的输出 key 和 value 类型。
        // job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
        // 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // 运行 Job 并打印日志信息
        job.waitForCompletion(true);
    }
}

代码地址

CombineTextInputFormat

改变了传统的切片方式。将多个小文件,划分到一个切片中,适合小文件过多的场景。

切片策略: 先确定片的最大值 maxSize,maxSize 通过参数 mapreduce.input.fileinputformat.split.maxsize 设置。流程是以文件为单位,将每个文件划分为若干 part,如果文件的待切部分的大小小于等于 maxSize, 则整个待切部分作为1个 part,如果文件的待切部分的大小大于 maxsize 但是小于等于 2 maxSize, 那么将整个待切部分均匀的切分为2个 part。如果文件的待切部分的大小大于 2 maxSize, 那么先切去 maxSize 大小,得到 1个 part,剩余待切部分继续判断

RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为 key,一行内容作为 value,即 key 的类型为 LongWritable,value 的类型为 Text

CMMapper 完整代码

package com.yanrs.mr.combine;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * KEYIN, VALUEIN: mapper 输入的 key-value 类型,由当前 JOb 的 InputFormat的 RecordReader 决定
 * KEYOUT, VALUEOUT:mapper 输出的 key-value 类型
 */
public class CMMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text outKey = new Text();
    private IntWritable outValue = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // key 是行号,value 是一行的文本内容
        System.out.println("keyin: " + key + " valuein: " + value);
        // 将文本内容进行拆分,得到一个个单词组成的数组
        String[] words = value.toString().split("\t");
        // 遍历数组,并输出,输出格式为(单词,1)
        for (String word:words) {
            outKey.set(word);
            context.write(outKey, outValue);
        }
    }
}

CMReducer 完整代码

package com.yanrs.mr.combine;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * KEYIN,VALUEIN: Mapper 的输出做为这里的输入
 * KEYOUT,VALUEOUT: 自定义,因为这个 MR 程序是统计单词出现的频率,所以这里类型为 Text, IntWritable
 */
public class CMReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable outValue = new IntWritable();

    //reduce 方法一次处理一组数据,key(单词) 相同的数据是一组
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        // 遍历每个 key(单词) ,让相同的 key(单词) 的值进行累加
        for (IntWritable value:values) {
            sum+=value.get();
        }

        outValue.set(sum);
        // 将结果写出,key 是单词,outValue 是累加的次数
        context.write(key, outValue);
    }
}

CMDriver 完整代码。需要设置多大文件切为一片,设置使用 CombineTextInputFormat

package com.yanrs.mr.combine;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
 * 启动这个进程,那么就会运行该 job
 */
public class CMDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 获取文件系统
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
        // 设置多大文件切为一片
        conf.set("mapreduce.input.fileinputformat.split.maxsize", "2048");

        FileSystem fileSystem = FileSystem.get(conf);

        // 设置输入目录和输出目录
        Path inputPath = new Path("/mrinput/combine");
        Path outPath = new Path("/mroutput/combine");
        // 输出目录存在就删除
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }

        // 创建 Job
        Job job = Job.getInstance(conf);

        // 设置使用 CombineTextInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);

        // 设置 job 名称
        job.setJobName("combine");

        // 设置job运行的 Mapper,Reducer
        job.setMapperClass(CMMapper.class);
        job.setReducerClass(CMReducer.class);

        // 设置 Mapper,Reducer 的输出 key 和 value 类型。
        // job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
        // 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // 运行 Job 并打印日志信息
        job.waitForCompletion(true);
    }
}

代码地址

MR 核心阶段划分

MapTask 阶段

  1. map
  2. sort

RedcueTask 阶段

  1. copy
  2. sort
  3. reduce

shuffle 阶段

上面的 2-4 又称为 shuffle 阶段。Shuffle 阶段横跨 MapTask 和 RedcueTask,在MapTask端也有 Shuffle,在RedcueTask 也有 Shuffle。具体 Shuffle 阶段指 MapTask 的 map 方法运行之后到 RedcuceTask 的 reduce 方法运行之前。

总结

mapper 的输出,为 reducer 的输入,mapper 的输出由不同的 InputFormat 的 RecordReader 决定。

不同的 InputFormat 有着不同的切片策略,默认如果不设置,那么使用的是 TextInputFormat。

reduce 方法一次处理一组数据,key 相同的数据为一组。

mapper 和 reducer 的输出数据格式由自己根据需求来设置,可以是 hadoop 内置的类型,也可以自定义 bean。

如果要将编写好的程序在 yarn 上运行,那么需要配置 yarn 的地址,设置 job 所在的 jar 包,将程序打包为 jar 之后运行。

123…41

Rex

322 日志
35 分类
162 标签
RSS
Links
  • CSDN博客
  • rexyan.github.io
© 2017 — 2021 Rex
由 Hexo 强力驱动
|
主题 — NexT.Gemini v5.1.4
本站访客数 人次 本站总访问量 次