I've learned. I'll share.

June 27, 2008

Message Passing Conccurrency (Actor Model) in Python

As I wrote previously, I'm a fan of message-passing concurrency. But to use it in python, I had to write my own infrastructure. It's far from perfect, but it's working for me. I've simplified it a bit and prepared some code to share with you.

The basic idea of my framework is to be and simple and easy to use as possible. There's no Erlang-style pattern matching or process linking. There isn't even any built-in error handling. But, there's a lot of support for the typical "react loop" style of message handling.

The architecture is very simple. There's one thread per actor. An actor is just an object you can send a message to. The actor receives the messages in the order they were sent. You always know the sender of the message; it's built right in. There's also "Bridge" that allows you to interact with actors from non-actor code.

The only tricky thing you'll see is a "handles" decorator. This is used to help you identify how you want an actor to handle a given message type. It helps you avoid re-creating a message loop and dispatcher yourself for every actor.

I've included a little example of how to use the framework. It obviously quite simple, but keep in mind that I used little more for the foundation of a rather complex file synchronization application. A little bit goes a long way.

So, here it is. Cut, paste, and run.

from threading import Thread, Event
from Queue     import Queue, Empty

class IActor:
    pass
    # send(message, sender)

class HandlerRegistry(dict):
    # should be used as a decorator
    def __call__(self, typ):
        def register(func):
            self[typ] = func
            return func
        return register

OtherMessage = "Other Message"

class Stop(Exception):
    def __repr__(self):
        return "Stop()"

class Stopped:
    def __repr__(self):
        return "Stopped()"

class ActorNotStartedError(Exception):
    def __init__(self):
        Exception.__init__(self, "actor not started")

class Actor(IActor):
    @classmethod
    def spawn(cls, *args, **kargs):
        self = cls(*args, **kargs)
        self.mailbox = Mailbox()
        start_thread(target = self.act, as_daemon = True)
        return self

    def send(self, message, sender):
        if self.mailbox is None:
            raise ActorNotStartedError()
        else:
            self.mailbox.send(message, sender)

    def receive(self):
        if self.mailbox is None:
            raise ActorNotStartedError()
        else:
            return self.mailbox.receive()

    # override if necessary
    def act(self):
        self.handleMessages()



    handles = HandlerRegistry()

    @classmethod
    def makeHandles(*classes):
        return HandlerRegistry((typ, handler) for cls in classes for (typ, handler) in cls.handles.iteritems())

    def handleMessages(self):
        try:
            while True:
                message, sender = self.receive()
                self.handleMessageWithRegistry(message, sender)
        except Stop:
            pass

    def handleMessageWithRegistry(self, message, sender):
        registry = self.__class__.handles
        handler  = registry.get(message.__class__) or registry.get(OtherMessage)
        if handler is not None:
            handler(self, message, sender)

    @handles(OtherMessage)
    def onOther(self, message, sender):
        pass

    @handles(Stop)
    def onStop(self, message, sender):
        sender.send(Stopped(), self)
        raise message

def start_thread(target, as_daemon, name = None):
    thread = Thread(target = target)
    if name:
        thread.setName(name)
    thread.setDaemon(as_daemon) 
    thread.start()
    return thread

class Mailbox:
    def __init__(self):
        self.mailbox = Queue()

    def send(self, message, sender):
        self.mailbox.put((message, sender), block = False)

    def receive(self, timeout = None):
        return self.mailbox.get(block = True, timeout = timeout)

class Bridge(IActor):
    def __init__(self):
        self.mailbox = Mailbox()

    def send(self, message, sender):
        self.mailbox.send(message, sender)

    def call(self, target, request, timeout, default = None):
        self.sendRequest(target, request)
        return self.receiveResponse(timeout, default)

    # targeted_requests can be an iterator
    def multiCall(self, targeted_requests, timeout, default = None):
        count = 0
        for target, request in targeted_requests:
            self.sendRequest(target, request)
            count += 1
            
        for _ in xrange(count):
            yield self.receiveResponse(timeout, default)

    def stop(self, actors, timeout):
        stop = Stop()
        return list(self.multiCall(((actor, stop) for actor in actors), timeout, default = None))

    def sendRequest(self, target, request):
        target.send(request, self)

    def receiveResponse(self, timeout, default):
        try:
            message, sender = self.mailbox.receive(timeout = timeout)
            return message
        except Empty:
            return default
        

if __name__ == "__main__":
    import time

    class GetInventory:
        pass

    class Task:
        def __init__(self, input, destination):
            self.input       = input
            self.destination = destination

    class Worker(Actor):
        handles = Actor.makeHandles()

        def __init__(self, skill):
            self.skill = skill

        @handles(Task)
        def onTask(self, task, sender):
            output = self.skill(task.input)
            task.destination.send(output, self)

    class Warehouse(Actor):
        handles = Actor.makeHandles()

        def __init__(self):
            self.inventory = []

        @handles(GetInventory)
        def onGetInventory(self, message, sender):
            # copy the inventory to avoid anyone mutating it
            sender.send(list(self.inventory), self)
            
        @handles(OtherMessage)
        def onTaskResult(self, result, sender):
            self.inventory.append(result)

    worker    = Worker.spawn(lambda x : x * 2)
    positives = Warehouse.spawn()
    negatives = Warehouse.spawn()
    bridge    = Bridge()

    for val in [1, 2, 3, -2, -4, -6]:
        warehouse = positives if val >= 0 else negatives
        worker.send(Task(val, warehouse), sender = None)

    print bridge.call(positives, GetInventory(), 1.0) #should be [ 2,  4,   6]
    print bridge.call(negatives, GetInventory(), 1.0) #should be [-4, -8, -12]
    print bridge.stop([worker, positives, negatives], 1.0) #should be [Stopped(), Stopped(), Stopped()]

    
    class Start:
        def __init__(self, target):
            self.target = target

    class Ping:
        def __repr__(self):
            return "Ping()"

    class Pong:
        def __repr__(self):
            return "Pong()"

    class Pinger(Actor):
        handles = Actor.makeHandles()

        @handles(Start)
        def onStart(self, start, sender):
            start.target.send(Ping(), self)

        @handles(Pong)
        def onPong(self, pong, sender):
            print "-",
            sender.send(Ping(), self)
            
    class Ponger(Actor):
        handles = Actor.makeHandles()

        @handles(Ping)
        def onPing(self, ping, sender):
            print "+",
            sender.send(Pong(), self)

    # should print lots of +-+-+-
    pinger = Pinger.spawn()
    ponger = Ponger.spawn()
    pinger.send(Start(ponger), sender = None)
    time.sleep(0.1)
    bridge.stop([pinger, ponger], 1.0)

My Experience with Message Passing Concurrency

I'm working on a peer-to-peer file synchronization program. It's really concurrent and distributed, and it's forced me to learn a thing or two about the concurrency models that we use as programmers. Over the past few years, I've tried both the shared-state model common to C, C++, Java, C#, Python, etc, and the message-passing model unique to Erlang and Scala. In my experience, the message-passing model (aka Actor Model) is far superior to the shared-state model for writing distributed applications. After having used both extensively, I'd go so far as to say that for distributed programming, shared-state concurrency is upside down and backwards.

Here's my informal proof that message-passing concurrency is necessary in a distributed system:

  1. Since the system in distributed, real shared state is impossible.
  2. The only way for the distributed components of an application to communicate is by sending a message from one to another.
  3. Everything else is an abstraction on top of message passing.
HTTP is an abstraction on top of message-passing. AJAX is an abstraction on top of HTTP on top of message-passing. XML-RPC is an abstraction on top of HTTP on top of message-passing. SOAP is an abstraction on top of an abstraction on top of an abstraction on top of HTTP on top of message-passing. Any RPC mechanism is an abstraction on top of message-passing. SQL queries to a remote database are an abstraction on top of message-passing. CORBA is a nasty, tangled mess on top of a foundation of message-passing.

See a pattern?

Not only are all of these abstractions, but they are leaky abstractions. Just about all RPC (Remote Procedure Call) frameworks try to pretend that remote objects or local. But the facade is impossible to keep from leaking. Calls to a remote object might fail or take arbitrarily long. If you want to make the same call to two different remote nodes, those calls must be made synchronously and sequentially; in order to call them in parallel on the remote nodes, they most be called in parallel on the local node. If a call to a remote node triggers a call back to the local node, which may trigger a call to a third node, you end up with a huge spaghetti mess of calls and threads.

I've been down that road. It wasn't pretty. There is a better way.

I think AJAX has opened our eyes a bit. It's a lot closer to message-passing than it is to RPC. In a REST architecture, you "send" messages by POSTing to a URL and you "receive" messages by GETing a URL. All messages are clearly local copies that were serialized and deserialized from the remote data. There isn't any leaky abstraction of data or classes pretending to be in two places at once. Data, timeouts, and failures are in your face and you have to deal with them. So, AJAX is a lot less leaky.

But AJAX is far from perfect. I can't help but think of the Greenspun's Tenth Rule of Programming with a twist on distributed programming: "Any sufficiently complicated distributed platform contains an ad hoc, informally-specified, bug-ridden, slow implementation of half of a message-passing system (Erlang)". Once you get message-passing in your head, you can't help but think of AJAX in that way.

I've been down a better road. It was much more pleasant.

About six months ago, I rewrote major portions of our application with message-passing concurrency. I took a long time. It was tricky. It hurt my head. But it worked. It's a success. It's capable of things that the shared-state system could never do.

Having done it, I can emphatically say that if I could start all over, I would go with message-passing. Most of the work was building the infrastructure and wrapping my head around a new way of thinking. But now that I've done both of those, I've paid the costs and I'm reaping the rewards. We're moving the application in directions that would have been nearly in possible with the old concurrency model.

In my experience, message-passing concurrency is the best way to write distributed applications. But it isn't an easy road to go down. Support for it just isn't there in most programming languages and environments. So far, Erlang has been the pioneer. I would humbly agree that the way I've implemented message-passing in Python, compared to Erlang, looks like an ad-hoc, bug-ridden half-implementation. But for various reasons, I can't use Erlang. I'm hoping for someone to create Erlang++ or E# or Eython or something that combines the concurrency model of Erlang with a modern programming language. Until then, I'll just keep on cobbling together what I can onto the programming language I happen to be using.

More on that in another article.

Blog Archive

Google Analytics