If I understand, the first step is to calculate a histogram:
[attr, value] => frequency
where frequency
is the number of times that value
ocurred in the attr
column.
The next step is to take the histogram table and the original data, for each line calculate the AVF, and sort them.
I'd do it in two passes: one map-reduce pass to calculate the histogram, a second m-r pass to find the AVF using the histogram. I'd also use a single constant hash guilt-free, as getting the histogram values and the cell values to the same locality will be a messy beast. (For example, have map1 emit [attr val id]
with [attr val]
as key; and have reduce1 accumulate all records for each key, count them, and emit [id attr val count]
. The second pass uses id
as key to reassemble and then average each row).
To calculate a histogram, it helps to think of the middle step as 'group' rather than 'sort'. Here's how: since the reduce input is sorted by key, have it accumulate all records for the given key, and as soon as it sees a different key, emit the count. Wukong, the ruby equivalent of dumbo, has an Accumulator
, and I assume dumbo does too. (See below for working code).
This leaves you with
attr1 val1a frequency
attr1 val1b frequency
attr2 val2a frequency
...
attrN attrNz frequency
For the next pass, I'd load that data into a hash table -- a simple Hash
(dictionary
) if it fits in memory, a fast key-value store if not -- and calculate each record's AVF just as you had it.
Here is working ruby code to calculate the avf; see http://github.com/mrflip/wukong/blob/master/examples/stats/avg_value_frequency.rb
First Pass
module AverageValueFrequency
# Names for each column's attribute, in order
ATTR_NAMES = %w[length width height]
class HistogramMapper < Wukong::Streamer::RecordStreamer
def process id, *values
ATTR_NAMES.zip(values).each{|attr, val| yield [attr, val] }
end
end
#
# For an accumulator, you define a key that is used to group records
#
# The Accumulator calls #start! on the first record for that group,
# then calls #accumulate on all records (including the first).
# Finally, it calls #finalize to emit a result for the group.
#
class HistogramReducer < Wukong::Streamer::AccumulatingReducer
attr_accessor :count
# use the attr and val as the key
def get_key attr, val, *_
[attr, val]
end
# start the sum with 0 for each key
def start! *_
self.count = 0
end
# ... and count the number of records for this key
def accumulate *_
self.count += 1
end
# emit [attr, val, count]
def finalize
yield [key, count].flatten
end
end
end
Wukong::Script.new(AverageValueFrequency::HistogramMapper, AverageValueFrequency::HistogramReducer).run
Second pass
module AverageValueFrequency
class AvfRecordMapper < Wukong::Streamer::RecordStreamer
# average the frequency of each value
def process id, *values
sum = 0.0
ATTR_NAMES.zip(values).each do |attr, val|
sum += histogram[ [attr, val] ].to_i
end
avf = sum / ATTR_NAMES.length.to_f
yield [id, avf, *values]
end
# Load the histogram from a tab-separated file with
# attr val freq
def histogram
return @histogram if @histogram
@histogram = { }
File.open(options[:histogram_file]).each do |line|
attr, val, freq = line.chomp.split("\t")
@histogram[ [attr, val] ] = freq
end
@histogram
end
end
end