I've learned. I'll share.

June 27, 2008

Message Passing Conccurrency (Actor Model) in Python

As I wrote previously, I'm a fan of message-passing concurrency. But to use it in python, I had to write my own infrastructure. It's far from perfect, but it's working for me. I've simplified it a bit and prepared some code to share with you.

The basic idea of my framework is to be and simple and easy to use as possible. There's no Erlang-style pattern matching or process linking. There isn't even any built-in error handling. But, there's a lot of support for the typical "react loop" style of message handling.

The architecture is very simple. There's one thread per actor. An actor is just an object you can send a message to. The actor receives the messages in the order they were sent. You always know the sender of the message; it's built right in. There's also "Bridge" that allows you to interact with actors from non-actor code.

The only tricky thing you'll see is a "handles" decorator. This is used to help you identify how you want an actor to handle a given message type. It helps you avoid re-creating a message loop and dispatcher yourself for every actor.

I've included a little example of how to use the framework. It obviously quite simple, but keep in mind that I used little more for the foundation of a rather complex file synchronization application. A little bit goes a long way.

So, here it is. Cut, paste, and run.

from threading import Thread, Event
from Queue     import Queue, Empty

class IActor:
    pass
    # send(message, sender)

class HandlerRegistry(dict):
    # should be used as a decorator
    def __call__(self, typ):
        def register(func):
            self[typ] = func
            return func
        return register

OtherMessage = "Other Message"

class Stop(Exception):
    def __repr__(self):
        return "Stop()"

class Stopped:
    def __repr__(self):
        return "Stopped()"

class ActorNotStartedError(Exception):
    def __init__(self):
        Exception.__init__(self, "actor not started")

class Actor(IActor):
    @classmethod
    def spawn(cls, *args, **kargs):
        self = cls(*args, **kargs)
        self.mailbox = Mailbox()
        start_thread(target = self.act, as_daemon = True)
        return self

    def send(self, message, sender):
        if self.mailbox is None:
            raise ActorNotStartedError()
        else:
            self.mailbox.send(message, sender)

    def receive(self):
        if self.mailbox is None:
            raise ActorNotStartedError()
        else:
            return self.mailbox.receive()

    # override if necessary
    def act(self):
        self.handleMessages()



    handles = HandlerRegistry()

    @classmethod
    def makeHandles(*classes):
        return HandlerRegistry((typ, handler) for cls in classes for (typ, handler) in cls.handles.iteritems())

    def handleMessages(self):
        try:
            while True:
                message, sender = self.receive()
                self.handleMessageWithRegistry(message, sender)
        except Stop:
            pass

    def handleMessageWithRegistry(self, message, sender):
        registry = self.__class__.handles
        handler  = registry.get(message.__class__) or registry.get(OtherMessage)
        if handler is not None:
            handler(self, message, sender)

    @handles(OtherMessage)
    def onOther(self, message, sender):
        pass

    @handles(Stop)
    def onStop(self, message, sender):
        sender.send(Stopped(), self)
        raise message

def start_thread(target, as_daemon, name = None):
    thread = Thread(target = target)
    if name:
        thread.setName(name)
    thread.setDaemon(as_daemon) 
    thread.start()
    return thread

class Mailbox:
    def __init__(self):
        self.mailbox = Queue()

    def send(self, message, sender):
        self.mailbox.put((message, sender), block = False)

    def receive(self, timeout = None):
        return self.mailbox.get(block = True, timeout = timeout)

class Bridge(IActor):
    def __init__(self):
        self.mailbox = Mailbox()

    def send(self, message, sender):
        self.mailbox.send(message, sender)

    def call(self, target, request, timeout, default = None):
        self.sendRequest(target, request)
        return self.receiveResponse(timeout, default)

    # targeted_requests can be an iterator
    def multiCall(self, targeted_requests, timeout, default = None):
        count = 0
        for target, request in targeted_requests:
            self.sendRequest(target, request)
            count += 1
            
        for _ in xrange(count):
            yield self.receiveResponse(timeout, default)

    def stop(self, actors, timeout):
        stop = Stop()
        return list(self.multiCall(((actor, stop) for actor in actors), timeout, default = None))

    def sendRequest(self, target, request):
        target.send(request, self)

    def receiveResponse(self, timeout, default):
        try:
            message, sender = self.mailbox.receive(timeout = timeout)
            return message
        except Empty:
            return default
        

if __name__ == "__main__":
    import time

    class GetInventory:
        pass

    class Task:
        def __init__(self, input, destination):
            self.input       = input
            self.destination = destination

    class Worker(Actor):
        handles = Actor.makeHandles()

        def __init__(self, skill):
            self.skill = skill

        @handles(Task)
        def onTask(self, task, sender):
            output = self.skill(task.input)
            task.destination.send(output, self)

    class Warehouse(Actor):
        handles = Actor.makeHandles()

        def __init__(self):
            self.inventory = []

        @handles(GetInventory)
        def onGetInventory(self, message, sender):
            # copy the inventory to avoid anyone mutating it
            sender.send(list(self.inventory), self)
            
        @handles(OtherMessage)
        def onTaskResult(self, result, sender):
            self.inventory.append(result)

    worker    = Worker.spawn(lambda x : x * 2)
    positives = Warehouse.spawn()
    negatives = Warehouse.spawn()
    bridge    = Bridge()

    for val in [1, 2, 3, -2, -4, -6]:
        warehouse = positives if val >= 0 else negatives
        worker.send(Task(val, warehouse), sender = None)

    print bridge.call(positives, GetInventory(), 1.0) #should be [ 2,  4,   6]
    print bridge.call(negatives, GetInventory(), 1.0) #should be [-4, -8, -12]
    print bridge.stop([worker, positives, negatives], 1.0) #should be [Stopped(), Stopped(), Stopped()]

    
    class Start:
        def __init__(self, target):
            self.target = target

    class Ping:
        def __repr__(self):
            return "Ping()"

    class Pong:
        def __repr__(self):
            return "Pong()"

    class Pinger(Actor):
        handles = Actor.makeHandles()

        @handles(Start)
        def onStart(self, start, sender):
            start.target.send(Ping(), self)

        @handles(Pong)
        def onPong(self, pong, sender):
            print "-",
            sender.send(Ping(), self)
            
    class Ponger(Actor):
        handles = Actor.makeHandles()

        @handles(Ping)
        def onPing(self, ping, sender):
            print "+",
            sender.send(Pong(), self)

    # should print lots of +-+-+-
    pinger = Pinger.spawn()
    ponger = Ponger.spawn()
    pinger.send(Start(ponger), sender = None)
    time.sleep(0.1)
    bridge.stop([pinger, ponger], 1.0)

10 comments:

  1. did you took at look at
    http://twistedmatrix.com/
    ?

    ReplyDelete
  2. Very nice and small and clean.

    A couple of thoughts:

    In many ways, Erlang's selective receive is one of the most critical parts of Erlang. Without it, application code gets pretty complex, trying to manage when messages can be handled and when they need to be deferred. It's a key part of blocking RPC-like functions, which are, in turn, a key part of Erlang's gen_servers.

    One other comment: Python threads are pretty expensive. Having one per actor means the number of actors you can have is pretty limited (which might be fine for you need; and this true even if they get garbage-collected unless you pool them.) In actually, actors don't need a thread per actor, just a thread per message concurrently delivered, which can be a lot less. Of course, it's a lot messier too, though.

    ReplyDelete
  3. I did look at Twisted. It promises all of the things I need for distributed programming, but it didn't deliver (for me). Anytime you pull a 3rd party dependency into your code, you have to weight the cost against the benefit. With Twisted, the cost is pretty big because it's a big, complex thing and you can't really work with part of it without using all of it. The benefit wasn't there for me either, since many of their design choices didn't match my needs.

    ReplyDelete
  4. Selective receive is very nice, especially when mixed with power pattern matching.
    I started to add it, but it got complex pretty quickly, and I thought "am I ever going to really need this?". So I left it out. There have been a few times when I wish I had it, but on the whole, I haven't needed it.

    The big difficulty is that to really get the power of selective receive you need to have pattern matching, but I haven't yet come up with an elegant way to do pattern matching in python.
    This is one of the ways that Erlang is way beyond other programming languages.

    ReplyDelete
  5. Yes, threads are expensive. One thread per actor doesn't scale very much. So, I've also created and experimental threadless actor library. I'll post about it soon.

    I'd like to try and combine threadless and threaded actors so that you have exactly as many threads as you need. The Scala guys wrote a good paper about it a few years ago, but no one seems to have done much with it yet.

    ReplyDelete
  6. And what about stackless ? http://www.stackless.com/

    ReplyDelete
  7. I copied the selective semantics that a couple of other Ruby Actor packages have: match the message against an array of objects: if the object is not a class, do an == match. If the object is a class, do an instanceof match. The message itself can have more elements than the pattern, which are ignored.

    Not the same as Erlang, which can, for example, allow you to require two elements of a message be the same, but so far seems to be working for folks. I figure at some point, we'll need a catch-all case a bit more like Erlang guards for situations where that kind of matching doesn't work.

    I've been looking at thread allocations for actors. Right now, I have two conflicting requirements that I'm trying to balance. On the one hand, one reason I want actors to execute in separate threads is that I want them to be able to block on I/O without blocking the rest of the world. Given that I'm not willing to restrict what an actor method can do, this pretty much requires giving them their own thread, at least for the duration of that call.

    On the flip side, not restricting the number of threads allows things to go bad quickly in some cases. Doug Lea did a paper on needing to restrict threads in Java for embarrassingly parallel things like naive fib. So far I haven't seen that problem in real code, so it's not on the top of my list (though it will probably fall out of some other work to make dramatis work with GUI libraries which require single-threaded access.)

    I'll be interested to see how your threadless stuff works and combines with threaded.

    ReplyDelete
  8. The Scala guys have some really good papers about combining threadless and threaded actors: http://lamp.epfl.ch/~phaller/doc/haller07coord.pdf and http://lampwww.epfl.ch/~odersky/papers/jmlc06.pdf. There approach is the one I'm going to take.

    Basically, you have threadless actors that "schedule" their actions rather than executing them directly. Then, you have a "controller" (threaded) actor that hands those actions out to worker (threaded) actors. If any worker actor blocks for more than a specific amount of time, you spawn another worker actor. This way, you only spawn as many threaded actors as necessary, but many blocking IO calls can continue to run in parallel.

    ReplyDelete
  9. Your pattern matching technique sounds a lot like candygram: http://candygram.sourceforge.net/. Have you seen it? I think it's pretty good and probably as good as you can get in Python.

    ReplyDelete
  10. Yes, I have seen stackless. If they are successful, it would make Python MUCH better for doing things like continuations, monads, message-passing, etc. I'm very much hoping for their success. But it's still too immature to rely on for
    the application I'm writing.

    ReplyDelete

Blog Archive

Google Analytics