Welcome to aparke’s blog
Mapreduce经典案例之Partitioner
题目描述
统计不同归属地的用户上网流量
解读:
一组数据中是样例数据中属性分别是
时间戳 手机号 MAC地址 ip地址 访问网站域名 上行数据包 下行数据包 上行流量 下行流量 访问响应状态码处理逻辑:
1 先切分出需要的数据列第二列以及倒数第二三列。
2 通过Partitioner类,实现getPartition方法,在Mapper阶段,当Mapper处理好数据后,这些数匀的分据需要经过Partitioner进行分区,来选择不同的Reducer处理,从而将Mapper的输出结果按自定义分区在Reducer上面执行,然后设置Reduce个数job.setNumReduceTasks();将不同类型结果输出到不同文件中。
处理逻辑代码
Mapper类
package cn.aparke.mr.flow; |
Reducer类
package cn.aparke.mr.flow; |
Job提交类
package cn.aparke.mr.flow; |
FlowBean对象类
package cn.aparke.mr.flow; |
Partitioner类
描述
Partitioner决定了Map Task 输出的每条数据交给哪个Reduce Task 来处理。Partitioner 有两个功能:
1) 均衡负载。它尽量将工作均匀地分配给不同的 Reduce。
2) 效率。它的分配速度一定要非常快。Partitioner 的默认实现:hash(key) mod R,这里的R代表Reduce Task 的数目,意思就是对key进行hash处理然后取模。很多情况下,用户需要自定义 Partitioner,比如“hash(hostname(URL)) mod R”,它确保相同域名下的网页交给同一个 Reduce Task 来处理。 用户自定义Partitioner,需要继承Partitioner类,实现它提供的一个方法。
自定义partitioner
每一个Reduce的输出都是有序的,但是将所有Reduce的输出合并到一起却并非是全局有序的,如果要做到全局有序,我们该怎么做呢?最简单的方式,只设置一个Reduce task,但是这样完全发挥不出集群的优势,而且能应对的数据量也很受限。最佳的方式是自己定义一个Partitioner,用输入数据的最大值除以系统Reduce task数量的商作为分割边界,也就是说分割数据的边界为此商的1倍、2倍至numPartitions-1倍,这样就能保证执行partition后的数据是整体有序的。解决数据倾斜
另一种需要我们自己定义一个Partitioner的情况是各个Reduce task处理的键值对数量极不平衡。对于某些数据集,由于很多不同的key的hash值都一样,导致这些键值对都被分给同一个Reducer处理,而其他的Reducer处理的键值对很少,从而拖延整个任务的进度。当然,编写自己的Partitioner必须要保证具有相同key值的键值对分发到同一个Reducer。自定义的Key包含了好几个字段,比如自定义key是一个对象,包括type1,type2,type3,只需要根据type1去分发数据,其他字段用作二次排序。
package cn.aparke.mr.flow; |