views:

45

answers:

2

I'm trying to understand the data pipelines talk presented at google i/o: http://www.youtube.com/watch?v=zSDC_TU7rtc

I don't see why fan-in work indexes are necessary if i'm just going to batch through input-sequence markers.

Can't the optimistically-enqueued task grab all unapplied markers, churn through as many of them as possible (repeatedly fetching a batch of say 10, then transactionally update the materialized view entity), and re-enqueue itself if the task times out before working through all markers?

Does the work indexes have something to do with the efficiency querying for all unapplied markers? i.e., it's better to query for "markers with work_index = " than for "markers with applied = False"? If so, why is that?

For reference, the question+answer which led me to the data pipelines talk is here: http://stackoverflow.com/questions/3092660/app-engine-datastore-model-for-progressively-updated-terrain-height-map

+2  A: 

One disadvantage of this approach is that it requires an extra write: you have to flip the flag on the original marker entity when you've completed the update, which is something Brett's original approach doesn't require.

You still need some sort of work index, too, or you encounter the race conditions Brett talked about, where the task that should apply an update runs before the update transaction has committed. In your system, the update would still get applied - but it could be an arbitrary amount of time before the next update runs and applies it.

Still, I'm not the expert on this (yet ;). I've forwarded your question to Brett, and I'll let you know what he says - I'm curious as to his answer, too!

Nick Johnson
+2  A: 

A few things:

  • My approach assumes multiple workers (see ShardedForkJoinQueue here: http://code.google.com/p/pubsubhubbub/source/browse/trunk/hub/fork_join_queue.py), where the inbound rate of tasks exceeds the amount of work a single thread can do. With that in mind, how would you use a simple "applied = False" to split work across N threads? Probably assign another field on your model to a worker's shard_number at random; then your query would be on "shard_number=N AND applied=False" (requiring another composite index). Okay that should work.

  • But then how do you know how many worker shards/threads you need? With the approach above you need to statically configure them so your shard_number parameter is between 1 and N. You can only have one thread querying for each shard_number at a time or else you have contention. I want the system to figure out the shard/thread count at runtime. My approach batches work together into reasonably sized chunks (like the 10 items) and then enqueues a continuation task to take care of the rest. Using query cursors I know that each continuation will not overlap the last thread's, so there's no contention. This gives me a dynamic number of threads working in parallel on the same shard's work items.

  • Now say your queue backs up. How do you ensure the oldest work items are processed first? Put another way: How do you prevent starvation? You could assign another field on your model to the time of insertion-- call it add_time. Now your query would be "shard_number=N AND applied=False ORDER BY add_time DESC". This works fine for low throughput queues.

  • What if your work item write-rate goes up a ton? You're going to be writing many, many rows with roughly the same add_time. This requires a Bigtable row prefix for your entities as something like "shard_number=1|applied=False|add_time=2010-06-24T9:15:22". That means every work item insert is hitting the same Bigtable tablet server, the server that's currently owner of the lexical head of the descending index. So fundamentally you're limited to the throughput of a single machine for each work shard's Datastore writes.

  • With my approach, your only Bigtable index row is prefixed by the hash of the incrementing work sequence number. This work_index value is scattered across the lexical rowspace of Bigtable each time the sequence number is incremented. Thus, each sequential work item enqueue will likely go to a different tablet server (given enough data), spreading the load of my queue beyond a single machine. With this approach the write-rate should effectively be bound only by the number of physical Bigtable machines in a cluster.

Brett
Thanks, i'm clearer on much of this now!Specifically for in "fan-in with materialized view", am i wrong in thinking the fan-in queue serves as a fall back? You wrote in the slides "Optimistically insert named fan-in tasks - Guarantees completion; ignored in the common case". And since aggregations live in the same entity group, i thought the fan-in couldn't benefit from sharding as each batch to be processed locks the group?Is it intended that "fan-in with materialized view" would deal with situations where "inbound rate of tasks exceeds the amount of work a single thread can do"?
v_y