我想实现一个mapreduce作业,该作业读取具有以下架构的Parquet文件:
{
optional int96 dropoff_datetime;
optional float dropoff_latitude;
optional float dropoff_longitude;
optional int32 dropoff_taxizone_id;
optional float ehail_fee;
optional float extra;
optional float fare_amount;
optional float improvement_surcharge;
optional float mta_tax;
optional int32 passenger_count;
optional binary payment_type (UTF8);
optional int96 pickup_datetime;
optional float pickup_latitude;
optional float pickup_longitude;
optional int32 pickup_taxizone_id;
optional int32 rate_code_id;
optional binary store_and_fwd_flag (UTF8);
optional float tip_amount;
optional float tolls_amount;
optional float total_amount;
optional float trip_distance;
optional binary trip_type (UTF8);
optional binary vendor_id (UTF8);
required int64 trip_id;
}
这项工作的主要目标是计算出租车每天每小时的平均速度(0->23)。
我的mapper类计算每一小时的速度,因此它提供以下几点(小时,速度)。
减速器类通常应该计算每小时的平均速度。
但是,我想知道是否可以使用组合器类来促进数据处理,因为我了解到组合器类只能用于交换和关联操作,而这不是平均值的情况,对吗?
任何帮助都将不胜感激。
谢谢:)
1条答案
按热度按时间b5buobof1#
合并器可以帮助计算平均值。你基本上想使用合并器来给你一个运行总数,你可以用它来计算平均值。
作为输入,组合器将得到
(hour, (speed, 1))
,并作为它应该产生的输出(hour, (sum_speed, num_records))
. 然后,减速机可以通过除以sum_speed
由num_records
.例如,如果组合器1作为输入接收:
然后它会输出:
如果组合器2作为输入接收:
然后它会输出:
然后,在除以之前,减速机将再次求和:
给你答案的形式
(hour, average_speed)
.