views:

126

answers:

2

Is the following architecture possible in Hadoop MapReduce?

A distributed key-value store is used (HBase). So along with values, there would be a timestamp associated with the values. Map & Reduce tasks are executed iteratively. Map, in each iteration should take in values which were added in the previous iteration to the store (perhaps the ones with latest timestamp?). Reduce should take in Map's output as well as the pairs from the store whose key(s) match the key(s) that reduce has to process in the current iteration. The output of reduce goes to the store.

If this is possible, which classes (eg: InputFormat, run() of Reduce) should be extended so that instead of the regular operation the above operation takes place. If this is not possible, are there any alternatives to achieve the same?

A: 

So your "store" in iteration n-1 could be this:

key (timestamp | value)

a 1|x, b 2|x, c 3|x, d 4|x

In iteration n these pairs where added: ... b 5|x, d 6|x

The mapper would find these 2 records, because timestamp > 4 and put it into the intermediate results

The reducer now would find, that for these two records, there are another two matching records in the n-1 store: b 2|x, d 4|x

So the ouput of the reduce phase would be (regardless of the order): b 5|x, d 6|x, b 2|x, d 4|x

Is that what you want?

Peter Wippermann
Pentius, Thank you for the reply. In iteration n, the pairs that you mentioned would be coming from Map. Now the reduce has to pick values with key b,d(from store) since those 2 are the keys that Reduce is processing. So Reduce would have 2 sets of inputs - one from Map (which is normal) and one from the store.Another point is, when Reduce fetches the keys from the store, it should fetch only the latest value based on the timestamp.From what you said, I think 2 points differ(or I didn't understand what you said properly). In iteration n, you didn't say how b,d values come.(next comment)
Raghava
From you example, taking the same n-1 iteration values - in iteration n the values given by Map would be something like b y, d y. Reduce would have n-1 iteration's b, d key values with it. So it would have 4 inputs(b,d from n-1 and n iterations). (b, <y,x>), (d, <y,x>). Using this Reduce does some processing and writes the output back to the store for b,d keys with a new timestamp.I hope this is making sense.
Raghava
I'm sorry, but I didn't had an eye on this thread. I'm afraid the topic isn't hot anymore!?
Peter Wippermann
A: 

So if I really understood you right, I would design it like the following: You would use an IdentityMapper, no special logic needed here.

I would bring the key and the timestamp together as the key for the pair. Leaving your value as the value of the pair:

  • HadoopKey = {key|timestamp}
  • HadoopValue = {value)

You now have to implement your own comparator, so that pairs with the same original key, but a different timestamp will be recognized as having the same key and therefore go together. (GroupingComparator)

And it is important, that pairs for the reducer are ordered by timestamp, descending. (KeyComparator)

Have a look at the

  • RawComparator class,
  • Jobconf's setOutputValueGroupingComparator() &
  • setOutputKeyComparatorClass() method
  • and "Hadoop - The definitive guide", Chapter 4, page 100
  • or just ask, if you need assistance ;-)

The Reducer will receive all pairs with the same key - oops, a spoiler..., these should have been sorted by timestamp now. If the first - and youngest - timestamp is eligible for this iteration, then all key-value-pairs for this reducer-call are emitted. If the timestamp disqualifies, then no pairs with this key are emitted.

I think this should do it.

Peter Wippermann