views:

167

answers:

6

I need to go over a huge amount of text (> 2 Tb, a Wikipedia full dump) and keep two counters for each seen token (each counter is incremented depending on the current event). The only operation that I will need for these counters is increase. On a second phase, I should calculate two floats based on these counters and store them.

It should perform the following steps:

  1. Go over huge amounts of text and increase two counters for each word it finds, depending on the current event.
  2. Go over all tokens and, for each of them, compute two additional floats based on these counters.
  3. Allow queries (getting the values for any given token).

Requirements and other details:

  • It must scale up to O(10^8) tokens.
  • The final result needs to be queried very fast!
  • While going over the texts, only increasing of two counters will be done. This is a one-time processing, so there will be no queries during processing. Only value updating.
  • No need for dynamic/updateable schema.

I have been trying CouchDB and MongoDB without too good results.

What do you think is the best approach to this problem?

Thank you!

EDIT 1: I have been suggested to try a Patricia trie and test if all the keys fit into memory (I suspect they do not). A custom Patricia trie with an extra operator for increasing the values of each key in one step might be a possible solution.

EDIT 2: Clarified what I mean by "huge": > 2 Tb of text. More clarifications.

EDIT 3: Unique token estimation. As suggested by Mike Dunlavey, I tried to do a quick estimation of the unique tokens. In the first 830Mb of the dataset, unique tokens grow linearly to 52134. Unless the number of unique tokens grows slower after processing more data (which is likely), there should be O(10^8) unique tokens.

EDIT 4: Java and Python solutions are preferred but any other language is ok too.

EDIT 5: Usually tokens will contain only printable ASCII characters, but they can contain any printable Unicode character. I will try the same process both with lower and upper-case untouched; and for lower-case only.

A: 

Must you use a DB, rather than reading a text file?

A simple C-type compiled language can run a simple parser in a fraction of the time it takes to read the file, so it should be basically "I/O bound". It would be a program similar to unix wc, word-count.

Sounds like the math is trivial and should not even be noticeable.

EDIT: OK, I didn't understand that you wanted to build a dictionary of the unique tokens, and count each one. In that case, a trie or hash-based dictionary should suffice. The storage size of that will depend on the typical length of tokens and how many different ones there are. This could be similar to the unix sort | uniq idiom.

Mike Dunlavey
I do not care about if it is a DB, a flat text file, or any other method, as long as it does the task. However, I fail to see the link with wc.
smmv
@smmv: `wc` is a simple program that reads files and counts up tokens. It shouldn't be too hard to get the source code for it, if you wanted to. Your program would be similar, it seems to me, but also not hard to write from scratch. If you wanted to get fancy, you could try asynchronous overlapped I/O (double buffering). That would change it from being mostly I/O bound to completely I/O bound.
Mike Dunlavey
@Mike: wc just counts words, characters or bytes. I need to keep counters for **each** token (depending of its type). With > 2 Tb of text, all tokens won't fit into memory as they are. So I doubt that the wc approach is related. (Sorry if I did not make myself clear about the fact that there are counters for each token, not overall).
smmv
@smmv: Right. I misunderstood that. Thanks. Then if I were in your shoes, I would first try running `sort | uniq` on the dataset, just to see how big the dictionary actually is. You say it's multi-TB, but the number of unique tokens in the dictionary should climb rapidly at first, then taper off, and grow very slowly at the end, so you could estimate the dictionary size using only a fraction of the dataset.
Mike Dunlavey
@Mike: Thank you, I'm trying to estimate that and I'll report back.
smmv
+1  A: 

A strategy, rather than a solution;

There's no escaping a read-through of the input data by one process, ie I don't see how to parallelise the initial operation unless the file is on a parallel I/O system and even then I think it might be difficult tackling a 7z file in parallel.

However, what you could try is to implement a process which reads the input data and writes chunks of it across your file system, preferably onto enough different disks that the processes you are going to start next don't all queue for the same read/write heads.

As soon as the first chunk has been written you start up a process on another core (you have got multicore haven't you ? possibly even a cluster or network of workstations ?) to start digesting that chunk. This process writes partial results to file(s).

As soon as the second chunk has been written you start up a process on another core ...

... you get the picture

Once the entire input has been processed you then devise tasks to merge the results from the output of the tasks processing each chunk. You'd do this in some sort of cascade (eg if you had 32 chunks and 16 processors you might have each merge 2 chunks, then 8 of them merge 2 of the merged chunks, and so on).

My best guess is that you should be fine with flat files for this, not sure that the extra power of a DB is worth the extra cost (in terms of performance and complexity of programming). I suppose you might want to write the final results into a database to support queries.

EDIT: well, if all your queries are of the form 'get me the counters for token XXX' then you could get away with a binary search through a single sorted text file. I'm not suggesting that you should but it might point you in the direction of a solution. Forgetting for the time being that tokens might start with any character (which is just a matter of the alphabet) you could have 26 files, one for tokens starting with A, one for tokens starting with B, and so on.

Or you could construct an index into the master file with entries for A (offset 0 from start of file) B (offset 12456 from start) and so on.

Personally I'd play around a bit with the one-sorted-text-file-per-initial-letter approach until I had a working solution, then figure out whether it was fast enough. But I have access to large clusters with oodles of disk and lashings of RAM and your platform might dictate another, possibly more sophisticated, approach.

High Performance Mark
Thanks. I have only a quad core with just 1Tb disk (a second 1.5Tb on the way). Eventually, I'm setting up a cluster at work with some decent machines, but by the time I get that cluster it will be way over my deadline for this ;-) I have thought about this strategy... processing the dataset is not so problematic, but rather storing the results in an efficient way (both during processing and after for querying). It is very likely that I maintain partial results and merge them later, but still, I don't get the full picture.
smmv
@smmv: tell us more about the types of query you want to support: full-blown SQL (or similar) or 'find token XXX'. That will have a material effect on choice of data structure(s) for your processed data.
High Performance Mark
@High Performance Mark: All queries will be "give me the counters for token X".
smmv
@smmv: well, if you need the full power of SQL that points towards an RDBMS solution. Doesn't it ? Or, some other combination of query-language and data structure ?
High Performance Mark
+1  A: 

As I understood you only want to count tokens. The first solution could be just using a hash map in a memory. 52-100k tokens (and advantage length of words in English is ca 5.1) + 4bytes for each token for keeping count is not so much data. You can easily store the map in the memory of a developer machine.

The second solution is to use apache lucene for storing new tokens -- unless you don't have 1M entries, you do not need to partition index --, and a counter value I would store in a database, e.g., sqllite (because updating lucene index is not the best idea).

To speed up the process -- for both solutions -- I would just split your dataset into k*100 dataset and run them separately on different machines (or in parallel) and then merged their results. Results of your counting, you can sum without any problems.

Your use case is a classical example in apache hadoop tutorials but I think it would be overengineering to deploy it .

Skarab
+1  A: 

If you have a lot of memory you could just use plain redis for storing the counters (10^8 unique tokens with two counters each would take around 12GB I guess).

If you don't have that much memory them you could still use redis, but with a little hashing strategy and vm_enabled to make it fit memory:

You could have tokens divided by first and second letters (aa, ab, ac... zz) for the hash name, and the actual word + token identifier as hash key, and the counte as the value. It would look like this:

hash ab
- absence_c1 5
- absence_c2 2
- abandon_c1 2
- abandon_c1 10
hash st
- stack_c1 10
- stack_c2 14

But in this approach as redis can't "incr" on hashes you would get the previous value and them incr and set it back, this way (pseudo-code):

var last = redis("hget st stack_c1")
var actual = last + 1
redis("hset st stack_c1 actual")

Using this hash pattern and with vm enabled redis will keep memory usage low while still fast enough. I was able to store 2 millions tokens, with 15 chars each, using less them 100MB of ram and almost 4G of disk.

diogok
Thanks. I'm going on with this method. I noticed that there is also 'hincrby' so I can increase a value inside these hashtables. I'll use first and last character (instead of two first) for a sparser distribution and I modified the tokenizer to do some simplifications in order to reduce the amount of unique tokens. Let's see if this works!
smmv
+1  A: 

High-level solution:

  1. Parse through the input, outputting "[token] +X +Y" lines to 1-of-N output files (Each of these "sharded" output files is small enough to be processed in-memory.)
  2. [For each file] read it into memory, output a sorted file with "[token] [count1] [count2] ..." lines
  3. At query time, do a binary search on the correct file

Details: Here is Python pseudo-code for step 1)

NUM_SHARDS = 1000  # big enough to make each file fit in memory  
output_files = [open("file" + str(n), "w") for n in xrange(NUM_SHARDS)]
for token in input_stream:
   shard_id = hash(token) % NUM_SHARDS
   output_files[shard_id].write(token + " +0 +1\n")
   # TODO: output the correct +X and +Y as needed

Here is Python pseudo-code for step 2)

input_files = [open("file" + str(n)) for n in xrange(NUM_SHARDS)]
for file in input_files:
   counts = {}   # Key: token   Value: { "count1": 0, "count2": 1 }

   # read the file, and populate 'counts'
   for line in file:
      (token, count1, count2) = line.split(" ")
      # make sure we have a value for this token
      counts.setdefault(token, { "count1": 0, "count2": 0 })
      counts[token]["count1"] += int(count1)
      counts[token]["count2"] += int(count2)
      # TODO: compute those floats, and stuff those inside 'counts' also

   # now write 'counts' out to a file (in sorted order)
   output_file = open(file.name + ".index", "w")
   for token, token_counts in sorted(counts.items()):
      output_file.write(token + " " + token_counts["counts1"] + " " + token_counts["counts2"] + "\n")
      # TODO: also write out those floats in the same line

Here is some Python code for step 3):

# assume 'token' contains the token you want to find
shard_id = hash(token) % NUM_SHARDS
filename = "file" + str(shard_id) + ".index"
binary_search(token, open(filename), 0, os.path.getsize(filename))

# print out the line in 'file' whose first token is 'token'
# begin/end always point to the start of a line
def binary_search(token, file, begin, end):
    # If we're close, just do brute force
    if end - begin < 10000:
            file.seek(begin)
            while file.tell() < end:
                    line = file.readline()
                    cur_token = line.strip().split(" ")[0]
                    if cur_token == token:
                            print line
                            return True
            return False  # not found

    # If we're not close, pivot based on a line near the middle
    file.seek((begin + end) / 2)
    partial_line = file.readline()  # ignore the first fractional line
    line = file.readline()

    cur_token = line.strip().split(" ")[0]
    if cur_token == token:
            print line
            return True
    elif cur_token < token:
            return binary_search(token, file, file.tell(), end)
    else:  # cur_token > token
            return binary_search(token, file, begin, file.tell() - len(line))
Dustin Boswell
+1  A: 

OK, if MongoDB and CouchDB don't work for you, then you basically have one problem: not enough power.

Let's look at the laundry list:

It must scale up to O(10^8) tokens.

How much RAM do you have? You're talking about hundreds of millions of tokens and you're talking about streaming out a 7zip file. If you want to issue "increments" quickly, you need to be able to keep the entire data structure in memory or the whole thing will go very slowly.

The final result needs to be queried very fast!

How fast? Microseconds, Millisecond, hundreds of Milliseconds? If you want to query into 500M records on a machine with 8GB of RAM, you're pretty much hooped. The data just won't fit, doesn't matter what DB you're using.

Dataset > 2Tb

OK, let's assume that your computer can average about 50MB / second of sustained throughput and that your proc can actually decompress data at that pace. At that pace you're talking about 11+ hours of processing time just to stream the data (you wanted this done in a weekend?)

50MB/s of throughput for 11 hours is not small potatoes, that's a real drive. And if you try to write anything to the disk while that's happening (or the OS swaps), then that is going to degrade fast.

Look from a DB perspective, MongoDB can handle both the front-end updating and the back-end querying. But it needs to flush to disk every minute or so and that's going to significantly extend your 11-hour run time.

That total run-time is just going to get worse and worse unless you can handle the entire DB in memory and the entire stream in memory.

My point...

is pretty simple, you need more power.

If you're not running this operation with 24GB+ of RAM, then everything you do is going to feel slow. If you don't have 24GB+ of RAM, then your final dataset is not going to be "lightning-quick", at best it will be "200 ms-quick". You just can index 500M rows and expect to find an entry unless you can keep an index in RAM.

If you're not running this operation with awesome HDDs, then the opration is going to seem slow. I mean, you're talking about hours and hours of high-throughput sustained reads (and probably writes).

I know that you want help, I know that you've put a bounty on this question, but it's really hard to fix the following problem:

I have been trying CouchDB and MongoDB without too good results.

when it sounds like you haven't really gotten together the right gear to solve the problem.

Gates VP