views:

171

answers:

1

Consider the following log file format:

id        v1        v2        v3
1         15        30        25
2         10        10        20
3         50        30        30

We are to calculate the average value frequency (AVF) for each data row on a Hadoop cluster using dumbo. AVF for a data point with m attributes is defined as:

avf = (1/m)* sum (frequencies of attributes 1..m)

so for the first row, avf = (1/3)*(1+2+1) ~= 1.33. An outlier is identified by a low AVF.

Programming problem

We have the following pseudo/python code:

H = {}  # stores attribute frequencies

map1(_, datapoint): # 
  for attr in datapoint.attrs:
    yield (attr, 1)

reduce1(attr, values):
  H[attr] = sum(values)

map2(_, datapoint):
  sum = 0
  m = len(datapoint.attrs)
  for attr in datapoint.attrs:
    sum += H[attr]        

  yield (1/m)*sum, datapoint

reduce2(avf, datapoints): # identity reducer, only sorts datapoints on avf
  yield avf, datapoints

Problem is, how do we plug our set of data points into both map1 and map2, as well as use the intermediary hash H in map2. Having H globally defined as above seems like going against the MapReduce concept.

A: 

If I understand, the first step is to calculate a histogram:

[attr, value] => frequency

where frequency is the number of times that value ocurred in the attr column.

The next step is to take the histogram table and the original data, for each line calculate the AVF, and sort them.

I'd do it in two passes: one map-reduce pass to calculate the histogram, a second m-r pass to find the AVF using the histogram. I'd also use a single constant hash guilt-free, as getting the histogram values and the cell values to the same locality will be a messy beast. (For example, have map1 emit [attr val id] with [attr val] as key; and have reduce1 accumulate all records for each key, count them, and emit [id attr val count]. The second pass uses id as key to reassemble and then average each row).


To calculate a histogram, it helps to think of the middle step as 'group' rather than 'sort'. Here's how: since the reduce input is sorted by key, have it accumulate all records for the given key, and as soon as it sees a different key, emit the count. Wukong, the ruby equivalent of dumbo, has an Accumulator, and I assume dumbo does too. (See below for working code).

This leaves you with

attr1    val1a      frequency
attr1    val1b      frequency
attr2    val2a      frequency
...
attrN    attrNz     frequency

For the next pass, I'd load that data into a hash table -- a simple Hash (dictionary) if it fits in memory, a fast key-value store if not -- and calculate each record's AVF just as you had it.


Here is working ruby code to calculate the avf; see http://github.com/mrflip/wukong/blob/master/examples/stats/avg_value_frequency.rb

First Pass

module AverageValueFrequency
  # Names for each column's attribute, in order
  ATTR_NAMES = %w[length width height]

  class HistogramMapper < Wukong::Streamer::RecordStreamer
    def process id, *values
      ATTR_NAMES.zip(values).each{|attr, val| yield [attr, val] }
    end
  end

  #
  # For an accumulator, you define a key that is used to group records
  #
  # The Accumulator calls #start! on the first record for that group,
  # then calls #accumulate on all records (including the first).
  # Finally, it calls #finalize to emit a result for the group.
  #
  class HistogramReducer < Wukong::Streamer::AccumulatingReducer
    attr_accessor :count

    # use the attr and val as the key
    def get_key attr, val, *_
      [attr, val]
    end

    # start the sum with 0 for each key
    def start! *_
      self.count = 0
    end
    # ... and count the number of records for this key
    def accumulate *_
      self.count += 1
    end
    # emit [attr, val, count]
    def finalize
      yield [key, count].flatten
    end
  end
end

Wukong::Script.new(AverageValueFrequency::HistogramMapper, AverageValueFrequency::HistogramReducer).run

Second pass

module AverageValueFrequency
  class AvfRecordMapper < Wukong::Streamer::RecordStreamer
    # average the frequency of each value
    def process id, *values
      sum = 0.0
      ATTR_NAMES.zip(values).each do |attr, val|
        sum += histogram[ [attr, val] ].to_i
      end
      avf = sum / ATTR_NAMES.length.to_f
      yield [id, avf, *values]
    end

    # Load the histogram from a tab-separated file with
    #   attr    val   freq
    def histogram
      return @histogram if @histogram
      @histogram = { }
      File.open(options[:histogram_file]).each do |line|
        attr, val, freq = line.chomp.split("\t")
        @histogram[ [attr, val] ] = freq
      end
      @histogram
    end
  end
end
mrflip