views:

587

answers:

2

Background:

I'm cleaning large (cannot be held in memory) tab-delimited files. As I clean the input file, I build up a list in memory; when it gets to 1,000,000 entries (about 1GB in memory) I sort it (using the default key below) and write the list to a file. This class is for putting the sorted files back together. It works on the files I have encountered thus far. My largest case, so far, is merging 66 sorted files.

Questions:

  1. Are there holes in my logic (where is it fragile)?
  2. Have I implemented the merge-sort algorithm correctly?
  3. Are there any obvious improvements that could be made?

Example Data:

This is an abstraction of a line in one of these files:

'hash_of_SomeStringId\tSome String Id\t\t\twww.somelink.com\t\tOtherData\t\n'

The takeaway is that I use 'SomeStringId'.lower().replace(' ', '') as my sort key.

Original Code:

class SortedFileMerger():
    """ A one-time use object that merges any number of smaller sorted 
        files into one large sorted file.

        ARGS:
            paths - list of paths to sorted files
            output_path - string path to desired output file
            dedup - (boolean) remove lines with duplicate keys, default = True
            key - use to override sort key, default = "line.split('\t')[1].lower().replace(' ', '')"
                  will be prepended by "lambda line: ".  This should be the same 
                  key that was used to sort the files being merged!
    """
    def __init__(self, paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"):
        self.key = eval("lambda line: %s" % key)
        self.dedup = dedup
        self.handles = [open(path, 'r') for path in paths]
        # holds one line from each file
        self.lines = [file_handle.readline() for file_handle in self.handles]
        self.output_file = open(output_path, 'w')
        self.lines_written = 0
        self._mergeSortedFiles() #call the main method

    def __del__(self):
        """ Clean-up file handles.
        """
        for handle in self.handles:
            if not handle.closed:
                handle.close()
        if self.output_file and (not self.output_file.closed):
            self.output_file.close()

    def _mergeSortedFiles(self):
        """ Merge the small sorted files to 'self.output_file'. This can 
            and should only be called once.
            Called from __init__().
        """
        previous_comparable = ''
        min_line = self._getNextMin()
        while min_line:
            index = self.lines.index(min_line)
            comparable = self.key(min_line)
            if not self.dedup:                      
                #not removing duplicates
                self._writeLine(index)
            elif comparable != previous_comparable: 
                #removing duplicates and this isn't one
                self._writeLine(index)
            else:                                   
                #removing duplicates and this is one
                self._readNextLine(index)
            previous_comparable = comparable
            min_line = self._getNextMin()
        #finished merging
        self.output_file.close()

    def _getNextMin(self):
        """ Returns the next "smallest" line in sorted order.
            Returns None when there are no more values to get.
        """
        while '' in self.lines:
            index = self.lines.index('')
            if self._isLastLine(index):
                # file.readline() is returning '' because 
                # it has reached the end of a file.
                self._closeFile(index)
            else:
                # an empty line got mixed in
                self._readNextLine(index)
        if len(self.lines) == 0:
            return None
        return min(self.lines, key=self.key)

    def _writeLine(self, index):
        """ Write line to output file and update self.lines
        """
        self.output_file.write(self.lines[index])
        self.lines_written += 1
        self._readNextLine(index)

    def _readNextLine(self, index):
        """ Read the next line from handles[index] into lines[index]
        """
        self.lines[index] = self.handles[index].readline()

    def _closeFile(self, index):
        """ If there are no more lines to get in a file, it 
            needs to be closed and removed from 'self.handles'.
            It's entry in 'self.lines' also need to be removed.
        """
        handle = self.handles.pop(index)
        if not handle.closed:
            handle.close()
        # remove entry from self.lines to preserve order
        _ = self.lines.pop(index)

    def _isLastLine(self, index):
        """ Check that handles[index] is at the eof.
        """
        handle = self.handles[index]            
        if handle.tell() == os.path.getsize(handle.name):
            return True
        return False

Edit: Implementing the suggestions from Brian I came up with the following solution:

Second Edit: Updated the code per John Machin's suggestion:

def decorated_file(f, key):
    """ Yields an easily sortable tuple. 
    """
    for line in f:
        yield (key(line), line)

def standard_keyfunc(line):
    """ The standard key function in my application.
    """
    return line.split('\t', 2)[1].replace(' ', '').lower()

def mergeSortedFiles(paths, output_path, dedup=True, keyfunc=standard_keyfunc):
    """ Does the same thing SortedFileMerger class does. 
    """
    files = map(open, paths) #open defaults to mode='r'
    output_file = open(output_path, 'w')
    lines_written = 0
    previous_comparable = ''
    for line in heapq26.merge(*[decorated_file(f, keyfunc) for f in files]):
        comparable = line[0]
        if previous_comparable != comparable:
            output_file.write(line[1])
            lines_written += 1
        previous_comparable = comparable
    return lines_written

Rough Test

Using the same input files (2.2 GB of data):

  • SortedFileMerger class took 51 minutes (3068.4 seconds)
  • Brian's solution took 40 minutes (2408.5 seconds)
  • After adding John Machin's suggestions, the solution code took 36 minutes (2214.0 seconds)
+7  A: 

Note that in python2.6, heapq has a new merge function which will do this for you.

To handle the custom key function, you can just wrap the file iterator with something that decorates it so that it compares based on the key, and strip it out afterwards:

def decorated_file(f, key):
    for line in f: 
        yield (key(line), line)

filenames = ['file1.txt','file2.txt','file3.txt']
files = map(open, filenames)
outfile = open('merged.txt')

for line in heapq.merge(*[decorated_file(f, keyfunc) for f in files]):
    outfile.write(line[1])

[Edit] Even in earlier versions of python, it's probably worthwhile simply to take the implementation of merge from the later heapq module. It's pure python, and runs unmodified in python2.5, and since it uses a heap to get the next minimum should be very efficient when merging large numbers of files.

You should be able to simply copy the heapq.py from a python2.6 installation, copy it to your source as "heapq26.py" and use "from heapq26 import merge" - there are no 2.6 specific features used in it. Alternatively, you could just copy the merge function (rewriting the heappop etc calls to reference the python2.5 heapq module).

Brian
Actually, I'm still using python 2.5.
tgray
This is a great answer though, I searched Google for weeks and couldn't find this.
tgray
+2  A: 

<< This "answer" is a comment on the original questioner's resultant code >>

Suggestion: using eval() is ummmm and what you are doing restricts the caller to using lambda -- key extraction may require more than a one-liner, and in any case don't you need the same function for the preliminary sort step?

So replace this:

def mergeSortedFiles(paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"):
    keyfunc = eval("lambda line: %s" % key)

with this:

def my_keyfunc(line):
    return line.split('\t', 2)[1].replace(' ', '').lower()
    # minor tweaks may speed it up a little

def mergeSortedFiles(paths, output_path, keyfunc, dedup=True):
John Machin
Thanks, the eval() felt wierd to me too, but I didn't know the alternative. I had gotten the method from this recipe: http://code.activestate.com/recipes/576755/
tgray
That recipe provides the eval() gimmick only as an optional feature for those who are brave enough to type their key extraction function's source into the command-line when they're running a multi-GB sort :-) You'll notice that this was cleanly separated; both the merge and sort functions take a function for the key arg, not a string.
John Machin