views:

353

answers:

6

I have a large-ish file (4-5 GB compressed) of small messages that I wish to parse into approximately 6,000 files by message type. Messages are small; anywhere from 5 to 50 bytes depending on the type.

Each message starts with a fixed-size type field (a 6-byte key). If I read a message of type '000001', I want to write append its payload to 000001.dat, etc. The input file contains a mixture of messages; I want N homogeneous output files, where each output file contains only the messages of a given type.

What's an efficient a fast way of writing these messages to so many individual files? I'd like to use as much memory and processing power to get it done as fast as possible. I can write compressed or uncompressed files to the disk.

I'm thinking of using a hashmap with a message type key and an outputstream value, but I'm sure there's a better way to do it.

Thanks!

+4  A: 

You might not need a hash map. You could just...

  1. Read a message
  2. Open the new file in append mode
  3. Write the message to the new file
  4. Close the new file

Not sure if this would be faster though because you'd be doing a lot of opens and closes.

Pace
+1, this is the first thing to try because it is simple and straightforward. Only if performance is unacceptable would I proceed to something more complicated, like caching open outputstreams.
GregS
Good simple solution but if some output files end up being opened 1000s of times this is certainly not the fastest!
Carl Smotricz
@Carl: how do you know? A modern OS (or more precisely OS/filesystem combination) wouldn't need any physical I/O to re-open a file that's been opened seconds ago.
Joachim Sauer
If there's a 5 GB file being read and thousands of files being written at the same time, it's a safe bet that even a modern OS will see no alternative but to ditch its IO buffers post haste. But even without physical IO, opening a file is expensive because of the several system calls needed to do so, especially with append.
Carl Smotricz
+1, it would be reasonably trivial to add stream pooling to this if it turns out to be slow but you'd be surprised how well the OS will help you.
PSpeed
@Carl, that may be true and depends on several factors but the rest of the code wouldn't change much if pools are added and the OP would be sure that it all works before adding the complexity. Though it's not very complicated to inject a simple LRU cache in step 2.
PSpeed
+2  A: 

I'd recommend some kind of intelligent pooling: keep the largest/most frequently used files open to improve performance and close the rest to conserve resources.

If the main file is made up mostly of record types 1-5, keep those files open as long as they're needed. The others can be opened and closed as required so that you don't starve the system of resources.

Rory
Certain types do account for the vast majority (99.9%) of messages, so this is a good suggestion.
Rudiger
It should be easy to make it adaptive so that you don't have to modify it if those ratios change. Just keep track of how many of each record you've written, periodically order them by those values, close off the ones you no longer need, and open the ones at the top of the list.
Rory
Just now saw this comment. I believe my approach handles this situation "automatically" with very little "intelligence".
Carl Smotricz
A: 

Since you're doing many small writes to many files you want to minimize the number of writes, especially given that the simplest design would pretty much guarantee that each new write would involve a new file open/close.

Instead, why not map each key to a buffer? at the end, write each buffer to disk. Or if you're concerned that you'll be holding too much memory, you could structure your buffers to write every 1K, or 5K, or whatever lines. e.g.

public class HashLogger {

          private HashMap<String,MessageBuffer> logs;

          public void write(String messageKey, String message)
          {
              if (!logs.contains(messageKey)) { logs.put(messageKey, new MessageBuffer(messageKey); }
              logs.get(messageKey).write(message);
          }

         public void flush()
         {
             for (MessageBuffer buffer: logs.values())
             {
                buffer.flush();
             }
            // ...flush all the buffers when you're done...
         }

    private class MessageBuffer {
             private MessageBuffer(String name){ ... }
             void flush(){ .. something here to write to a file specified by name ... }
             void write(String message){
             //... something here to add to internal buffer, or StringBuilder, or whatever... 
             //... you could also have something here that flushes if the internal builder gets larger than N lines ...
     }
}

You could even create separate Log4j loggers, which can be configured to use buffered logging, I'd be surprised if more modern logging frameworks like slf4j didn't support this as well.

Steve B.
Seriously, a logging framework for this problem???
erikkallen
With SO postings it's better to assume, I think, that the poster is giving you the simplest definition of their problem; I know I do, because I don't want to bore people with the details. Hey, maybe for the poster it's a better option because of the built-in message formatting. Whatever.
Steve B.
+4  A: 

A Unix-like system will typically have a limit on the number of file handles open at any given time; on my Linux, for example, it's currently at 1024, though I could change it within reason. But there are good reasons for these limits, as open files are a burden to the system.

You haven't yet responded to my question on whether there are multiple occurrences of the same key in your input, meaning that several separate batches of data may need to be concatenated into each file. If this isn't the case, Pace's answer would be handily the best you can do, as all that work needs to be done and there's no sense in setting up a huge administration around such a simple sequence of events.

But if there are multiple messages in your input for the same key, it would be efficient to keep a large number of files open. I'd advise against trying to keep all 6000 open at once, though. Instead, I'd go for something like 500, opened on a first-come-first-served basis; i.e. you open up files for the first 500 (or so) distinct message keys and then chew through your entire input file looking for stuff to add into those 500, then close them all upon hitting EOF on input. You will also need to keep a HashSet of keys already processed, because you then proceed to re-read your input file again, processing the next batch of 500 keys you didn't catch on the first round.

Rationale: Opening and closing a file is (usually) a costly operation; you do NOT want to open and close thousands of files more than once each if you can help it. So you keep as many handles open as possible, all of which end up filled on a single pass through your input. On the other hand, streaming sequentially through a single input file is quite efficient, and even if you have to make 12 passes through your input file, the time to do so will be almost negligible compared to the time needed to open/close 6000 other files.

Pseudocode:

processedSet = [ ]
keysWaiting = true
MAXFILE = 500
handlesMap = [ ]
while (keysWaiting) {
  keysWaiting = false
  open/rewind input file
  while (not EOF(input file)) {
    read message
    if (handlesMap.containsKey(messageKey)) {
       write data to handlesMap.get(messageKey)
    } else if (processedSet.contains(messageKey) {
       continue // already processed
    } else if (handlesMap.size < MAXFILE) {
       handlesMap.put(messageKey, new FileOutputStream(messageKey + ".dat")
       processedSet.add(messageKey)
       write data to handlesMap.get(messageKey)
    else
       keysWaiting = true
    endif
  }
  for all handlesMap.values() {
     close file handle
  }
  handlesMap.clear
}
Carl Smotricz
I was writing exactly this. I think this is the best option - divide and conquest.
Ravi Wallau
Thank you for your support! I was beginning to think I was crazy because my solution was seeing no upvotes. +1 to you for your troubles, I'm (almost!) sorry I beat you to it ;)
Carl Smotricz
Keep in mind that you could essentially waste a pass (or more) if your first 500 distinct record types are also distinct across the entire file. While it might be unlikely that this is the case, its impossible to tell without taking a look at the data. While your answer has merit, neither of us could possibly say that we have the best solution without seeing the data and/or the system the process is to be run on.
Rory
My solution assures that each output file will be opened and closed exactly once. In doing so, it's optimal for the general case. It is wasteful only in the corner case where the input file is sorted, or almost so, on the message keys. In that case, the gamble of closing output files early and making room for new ones would indeed pay off.
Carl Smotricz
No, I understand the logic. What I'm saying is that if the first 500 records you read (i.e. the ones you select for the current pass) are distinct across the entire file, there is no point in reading to the end of the file, since you will not find more records to be written out. In other words, you've wasted time reading the entire file, looking for something that isn't there. Again, I would like to make it clear that this is just a possibility and nobody can say that it will or will not happen without seeing the data.
Rory
OK, agreed. It's the OP's call what he wants to do, I'm done flogging this dead horse.
Carl Smotricz
Thanks everyone. Carl's solution worked excellently; incidentally, my client only wanted the top 100 messages, so it ended up much quicker.
Rudiger
+1  A: 

I'm going to make some assumptions about your question:

  • Each message starts with the message type, as a fixed-size field
  • You have a heterogenous input file, containing a mixture of messages; you want N homogenous output files, where each output file contains only the messages of a given type.

The approach that jumps to mind is functor based: you create a mapping of message types to objects that handle that particular message. Your main() is a dispatch loop that reads the fixed message header, finds the appropriate functor from the map, then calls it.

You probably won't be able to hold 6,000 files (one per message type) open at once; most operating systems have a limit of around 1,024 simultaneous open files (although with Linux you can change the kernel parameters that control this). So this implies that you'll be opening and closing files repeatedly.

Probably the best approach is to set a fixed-count buffer on every functor, so that it opens, writes, and closes after, say 10 messages. If your messages are max 50 bytes, then that's 500 bytes (10 x 50) x 6,000 that will remain in memory at any given time.

I'd probably write my functors to hold fixed-size byte arrays, and create a generic functor class that reads N bytes at a time into that array:

public class MessageProcessor
{
    int _msgSize;                   // the number of bytes to read per message
    byte[] _buf = new byte[1024];   // bigger than I said, but it's only 6 Mb total
    int _curSize;                   // when this approaches _buf.length, write
kdgregory
A: 

There's usually limits on open files in the system, and in any case accessing thousands of little files in a more or less random order is going to bog your system down very badly.

Consider breaking the large file up into a file (or some sort of in-memory table, if you've got the memory) of individual messages, and sorting that by message type. Once that is done, write the message out to their appropriate files.

David Thornley