Welcome to aparke’s blog
Mapreduce经典案例之GroupingComparator
题目描述
用户在某个位置从某个时刻开始停留了多长时间
现有如下数据文件需要处理
格式:input.csv
字段:用户ID,位置ID,开始时间,停留时长(分钟)
4行样例:
UserA,LocationA,2018-01-01 08:00:00,60
UserA,LocationA,2018-01-01 09:00:00,60
UserA,LocationB,2018-01-01 10:00:00,60
UserA,LocationA,2018-01-01 11:00:00,60
解读:
样例数据中的数据含义是:
用户UserA,在LocationA位置,从8点开始,停留了60分钟
用户UserA,在LocationA位置,从9点开始,停留了60分钟
用户UserA,在LocationB位置,从10点开始,停留了60分钟
用户UserA,在LocationA位置,从11点开始,停留了60分钟该样例期待输出:
UserA,LocationA,2018-01-01 08:00:00,120
UserA,LocationB,2018-01-01 10:00:00,60
UserA,LocationA,2018-01-01 11:00:00,60处理逻辑:
1 对同一个用户,在同一个位置,连续的多条记录进行合并
2 合并原则:开始时间取最早时间,停留时长加和
处理逻辑代码
Mapper类
package cn.aparke.mr; |
Reducer类
package cn.aparke.mr; |
Job提交类
package cn.aparke.mr; |
UserLocationBean对象类
package cn.aparke.mr; |
GroupingComparator类
描述
GroupingComparator是在reduce阶段分组来使用的,由于reduce阶段,如果key相同的一组,只取第一个key作为key,迭代所有的values。
如果reduce的key是自定义的bean,我们只需要bean里面的某个属性相同就认为这样的key是相同的,这是我们就需要之定义GroupCoparator来“欺骗”reduce了。 我们需要理清楚的还有map阶段你的几个自定义:
parttioner中的getPartition()这个是map阶段自定义分区, bean中定义CopmareTo()是在溢出和merge时用来来排序的。我们就需要使用GroupingComparatorClass来自定义分组方式。我们需要定义一个Comparator函数,令其继承WritableComparator,并重写compare方法。
在compare方法方法中,我们定义规约器的key分组方式。
参考文章:
https://www.cnblogs.com/jingpeng77/p/10098847.html
https://www.jianshu.com/p/b36d8c890e5c
package cn.aparke.mr; |