views:

2224

answers:

9

Right now I have a central module in a framework that spawns multiple processes using the Python 2.6 multiprocessing module. Because it uses multiprocessing, there is module-level multiprocessing-aware log, LOG = multiprocessing.get_logger(). Per the docs, this logger has process-shared locks so that you don't garble things up in sys.stderr (or whatever filehandle) by having multiple processes writing to it simultaneously.

The issue I have now is that the other modules in the framework are not multiprocessing-aware. The way I see it, I need to make all dependencies on this central module use multiprocessing-aware logging. That's annoying within the framework, let alone for all clients of the framework. Are there alternatives I'm not thinking of?

+17  A: 

The only way to deal with this non-intrusively is to spawn each worker process such that its log goes to a different file descriptor (to disk or to pipe.) Ideally, all log entries should be timestamped. Your controller process can then (if using disk files) coalesce the log files at the end of the run (sorting by timestamp) or, if using pipes (recommended approach), coalesce log entries on-the-fly from all pipes into a central log (e.g. periodically select from the pipes' fd's, perform merge-sort on the available log entries, flush to centralized log, repeat.)

vladr
Nice, that was 35s before I thought of that (thought I'd use `atexit` :-). Problem is that it won't give you a realtime readout. This may be part of the price of multiprocessing as opposed to multithreading.
cdleary
@cdleary, using the piped approach it would be as near-realtime as one can get (especially if stderr is not buffered in the spawned processes.)
vladr
+1 I had this general thought too. I especially like your on-the-fly idea.
Adam Bernier
Okay, but then wouldn't you need the coalescer process to be a central dispatcher that gave each child process a new shared stderr pipe? That would mean that people couldn't use the libraries traditionally, but would have to hand a callback over to the coalescer/dispatcher.
cdleary
And by "shared stderr pipe" I don't mean shared among child processes, but shared between the coalescer and child process, as you're describing.
cdleary
Do you have control *between* forks? If so you just `dup` new per-child fd's over stderr (2) just before forking a new child; the child's stderr (2) output will automatically be picked up by the spawner process' coalescer through the corresponding per-child fd.
vladr
Incidentally, big assumption here: not Windows. Are you on Windows?
vladr
I'm using POSIX (but os.dup is on both platforms). I don't see how you can get around the fact you need the `select` in a centralized event loop, which would presumably be where the coalescer lives. Am I missing something?
cdleary
So anyway, if you are on *nix (i.e. multiprocess is using fork) you can dup a new fd over stderr (2) in Process.start (just before multiprocess calls self._popen = Popen(self), where Popen will do the actual fork) - check out the source code in lib/process.py, lib/forking.py
vladr
Using Pool? If so then you'd have to use the async variants of map or apply and do the select loop until you get all results. Or spawn a thread (har-har) to do the select polling. :)
vladr
Wow... that's crazy but would work. Spawn a coalescer thread so that it shares the main process' stderr locks, (hide the real sys.stderr in the coalescer, give sys a fake one for the coalescer to select on), have the coalescer terminate after join on subprocesses, and join on the coalescer `atexit`.
cdleary
Yes it will. :) Used this approach a while ago (but in perl, not python) to coalesce real-time log output from multiple remote ssh sessions. have fun!
vladr
Why not just use a multiprocessing.Queue and a logging thread in the main process instead? Seems simpler.
Brandon Craig Rhodes
+3  A: 

just publish somewhere your instance of the logger. that way, the other modules and clients can use your API to get the logger without having to import multiprocessing.

Javier
The problem with this is that the multiprocessing loggers appear unnamed, so you won't be able to decipher the message stream easily. Maybe it would be possible to name them after creation, which would make it more reasonable to look at.
cdleary
well, publish one logger for each module, or better, export diferent closures that use the logger with the module name. the point is to let other modules use your API
Javier
Definitely reasonable (and +1 from me!), but I would miss being able to just `import logging; logging.basicConfig(level=logging.DEBUG); logging.debug('spam!')` from anywhere and have it work properly.
cdleary
It's an interesting phenomenon that I see when I use Python, that we get so used to being able to do what we want in 1 or 2 simple lines that the simple and logical approach in other languages (eg. to publish the multiprocessing logger or wrap it in an accessor) still feels like a burden. :)
Kylotan
A: 

One of the alternatives is to write the mutliprocessing logging to a known file and register an atexit handler to join on those processes read it back on stderr; however, you won't get a real-time flow to the output messages on stderr that way.

cdleary
+6  A: 

Yet another alternative might be the various non-file-based logging handlers in the logging package:

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(and others)

This way, you could easily have a logging daemon somewhere that you could write to safely and would handle the results correctly. Eg a simple socket server that just unpickles the message and emits it to its own rotating file handler.

The syslog handler would take care of this for you too. Of course, you could use your own instance of syslog not the system one.

Ali A
+9  A: 

I just now wrote a log handler of my own that just feeds everything to the parent process via a pipe. I've only been testing it for ten minutes but it seems to work pretty well (note this is hardcoded to RotatingFileHandler, which is my own use case)

Updated. This now uses a queue for correct handling of concurrency, and also recovers from errors correctly. I've now been using this in production for several months and the current version below works without issue.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)
zzzeek
One nit: you need to import traceback as well.
Jason Baker
+2  A: 

I liked zzzeek's answer. I would just substitute the Pipe for a Queue since if multiple threads/processes use the same pipe end to generate log messages they will get garbled.

André Cruz
I was having some issues with the handler, though it wasnt that messages were garbled, its just the whole thing would stop working. I changed Pipe to be Queue since that is more appropriate. However the errors I was getting weren't resolved by that - ultimately I added a try/except to the receive() method - very rarely, an attempt to log exceptions will fail and wind up being caught there. Once I added the try/except, it runs for weeks with no problem, and a standarderr file will grab about two errant exceptions per week.
zzzeek
+2  A: 
Mike Miller
A: 

A variant of the others that keeps the logging and queue thread separate.

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)
ironhacker
A: 

I have a solution that's similar to ironhacker's except that I use logging.exception in some of my code and found that I needed to format the exception before passing it back over the Queue since tracebacks aren't pickle'able:

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can't pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "\n":
            s = s[:-1]
        return s
Richard Jones