如何用sum和max dateMap/减少?

v7pvogib  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(361)

我有一个文件,我需要Map/减少那里的输出需要一个日期的总和和最大值。我有总和部分的工作,但是,我不知道如何包括最大日期作为减少输出的一部分。
输入数据如下所示:

ID1,  ID2, date,                count
3000, 001, 2014-12-30 18:00:00, 2
3000, 001, 2015-01-01 10:00:00, 1
3000, 002, 2014-11-18 12:53:00, 5
3000, 002, 2014-12-20 20:14:00, 3

我的Map器将id1+id2连接起来,以便对它们进行分组。其输出如下所示:

key (ID1|ID2), value (count)
3000|001,      2
3000|001,      1
3000|002,      5
3000|002,      3

减速器输出如下所示:

key (ID1|ID2), value (sum)
3000|001,      3
3000|002,      8

我真正需要的是这样的输出:

key (ID1|ID2), value (sum), date (max)
3000|001,      3,           2015-01-01 10:00:00
3000|002,      8,           2014-12-20 20:14:00

mapper和reducer是用ruby编写的,但是,我将使用python编写的一个工作示例(我将把它翻译成ruby)。
以下是Map器代码:

require 'csv'

pattern = File.join(File.expand_path('data', File.dirname(__FILE__)), '*.txt')

Dir.glob(pattern).each do |file|
  CSV.foreach(file, {col_sep: "\t", headers: false}) do |row|
    puts [
           "#{row[6]}|#{row[3].rjust(8, '0')}", # key = ID1 | ID2
           row[7] # value = count
         ].join("\t")
    end
end

以及减速器:

prev_key  = nil
key_total = 0

ARGF.each do |line|
  line = line.chomp
  next unless line

  (key, value) = line.split("\t")

  # check for new key
  if prev_key && key != prev_key && key_total > 0

    # output total for previous key
    puts [prev_key, key_total].join("\t")

    # reset key total for new key
    prev_key  = key
    key_total = 0

  elsif !prev_key
    prev_key = key

  end

  # add to count for this current key
  key_total += value.to_i

end

# this is to catch the final counts after all records have been received

puts [prev_key, key_total].join("\t")

更新
以下是基于公认答案的建议的新Map器和还原器:
Map器:

require 'csv'

pattern = File.join(File.expand_path('data', File.dirname(__FILE__)), '*.txt')

Dir.glob(pattern).each do |file|
  CSV.foreach(file, {col_sep: "\t", headers: false}) do |row|
    date_time = "#{row[0]} #{row[1]}:00:00#{row[2]}" # %Y-%m-%d %H:%M:%S%z
    puts [
             "#{row[6]}|#{row[3].rjust(8, '0')}", # key = ID1 | ID2
             "#{row[7]}|#{date_time}", # value = count | date_time
         ].join("\t")
  end
end

减速器:

require 'date'

prev_key  = nil
key_total = 0
dates = []

ARGF.each do |line|
  line = line.chomp
  next unless line

  (key, values) = line.split("\t")
  (value, date_time) = values.split('|')

  # check for new key
  if prev_key && key != prev_key && key_total > 0

    # output total for previous key
    puts [prev_key.split('|'), key_total, dates.max].join("\t")

    # reset key total for new key
    prev_key  = key
    key_total = 0

    # reset dates array for new key
    dates.clear

  elsif !prev_key
    prev_key = key

  end

  # add date to array for this current key
  dates << DateTime.strptime(date_time, '%Y-%m-%d %H:%M:%S%z')

  # add to count for this current key
  key_total += value.to_i

end

# this is to catch the final counts after all records have been received

puts [prev_key.split('|'), key_total, dates.max].join("\t")
uqdfh47h

uqdfh47h1#

您只需将date和count放入一对<date,count>中,并将其作为一个值从Map器中发出。然后在reducer中从对值中提取日期和计数。sum的计数方式与之前相同,并跨输入值(每个键)跟踪最大日期。

相关问题