views:

770

answers:

4

Hi!

(I'm using the pyprocessing module in this example, but replacing processing with multiprocessing should probably work if you run python 2.6 or use the multiprocessing backport)

I currently have a program that listens to a unix socket (using a processing.connection.Listener), accept connections and spawns a thread handling the request. At a certain point I want to quit the process gracefully, but since the accept()-call is blocking and I see no way of cancelling it in a nice way. I have one way that works here (OS X) at least, setting a signal handler and signalling the process from another thread like so:

import processing
from processing.connection import Listener
import threading
import time
import os
import signal
import socket
import errno

# This is actually called by the connection handler.
def closeme():
    time.sleep(1)
    print 'Closing socket...'
    listener.close()
    os.kill(processing.currentProcess().getPid(), signal.SIGPIPE)

oldsig = signal.signal(signal.SIGPIPE, lambda s, f: None)

listener = Listener('/tmp/asdf', 'AF_UNIX')
# This is a thread that handles one already accepted connection, left out for brevity
threading.Thread(target=closeme).start()
print 'Accepting...'
try:
    listener.accept()
except socket.error, e:
    if e.args[0] != errno.EINTR:
        raise
# Cleanup here...
print 'Done...'

The only other way I've thought about is reaching deep into the connection (listener._listener._socket) and setting the non-blocking option...but that probably has some side effects and is generally really scary.

Does anyone have a more elegant (and perhaps even correct!) way of accomplishing this? It needs to be portable to OS X, Linux and BSD, but Windows portability etc is not necessary.

Clarification: Thanks all! As usual, ambiguities in my original question are revealed :)

  • I need to perform cleanup after I have cancelled the listening, and I don't always want to actually exit that process.
  • I need to be able to access this process from other processes not spawned from the same parent, which makes Queues unwieldy
  • The reasons for threads are that:
    • They access a shared state. Actually more or less a common in-memory database, so I suppose it could be done differently.
    • I must be able to have several connections accepted at the same time, but the actual threads are blocking for something most of the time. Each accepted connection spawns a new thread; this in order to not block all clients on I/O ops.

Regarding threads vs. processes, I use threads for making my blocking ops non-blocking and processes to enable multiprocessing.

A: 

Probably not ideal, but you can release the block by sending the socket some data from the signal handler or the thread that is terminating the process.

EDIT: Another way to implement this might be to use the Connection Queues, since they seem to support timeouts (apologies, I misread your code in my first read).

codelogic
As it is not receive operation I'm trying to cancel I can't really just send data to the connection (there is none) and there is also a race condition there. For the second, that would work if I actually was polling for data, I'm not; I'm waiting for a new connection. Or did I misunderstand you?
Henrik Gustafsson
Since the Listener is accepting connections on the specified socket, shouldn't connecting to it from another thread release accept() (using multiprocessing.connection.Client)? Apologies for the ambiguity in the 2nd part of my response, I will correct it.
codelogic
Queues are not discoverable from external tools etc. Connecting to the socket will release the accepy(), but would create a race, I think.
Henrik Gustafsson
+1  A: 

I'm new to the multiprocessing module, but it seems to me that mixing the processing module and the threading module is counter-intuitive, aren't they targetted at solving the same problem?

Anyway, how about wrapping your listen functions into a process itself? I'm not clear how this affects the rest of your code, but this may be a cleaner alternative.

from multiprocessing import Process
from multiprocessing.connection import Listener


class ListenForConn(Process):

    def run(self):
        listener = Listener('/tmp/asdf', 'AF_UNIX')
        listener.accept()

        # do your other handling here


listen_process = ListenForConn()
listen_process.start()

print listen_process.is_alive()

listen_process.terminate()
listen_process.join()

print listen_process.is_alive()
print 'No more listen process.'
monkut
That is more or less what I do, but you send SIGTERM instead of SIGPIPE, and you can't do the cleanup in the context of run(), but rather in a signal handler.
Henrik Gustafsson
Thanks for the clarification, nothing comes to mind at the moment, if I think of something I'll update my answer. ;)
monkut
Alright, nice effort though :)
Henrik Gustafsson
+1  A: 

Isnt that what select is for??

Only call accept on the socket if the select indicates it will not block...

The select has a timeout, so you can break out occasionally occasionally to check if its time to shut down....

Not a bad idea per se, but the Listener object does not expose the underlying socket, and I would rather not violate demeters law in such a big way. As it turns out, however is that that is exactly what I have to do :)
Henrik Gustafsson
A: 

I thought I could avoid it, but it seems I have to do something like this:

from processing import connection
connection.Listener.fileno = lambda self: self._listener._socket.fileno()

import select

l = connection.Listener('/tmp/x', 'AF_UNIX')
r, w, e = select.select((l, ), (), ())
if l in r:
  print "Accepting..."
  c = l.accept()
  # ...

I am aware that this breaks the law of demeter and introduces some evil monkey-patching, but it seems this would be the most easy-to-port way of accomplishing this. If anyone has a more elegant solution I would be happy to hear it :)

Henrik Gustafsson