As I wrote previously, I'm a fan of message-passing concurrency. But to use it in python, I had to write my own infrastructure. It's far from perfect, but it's working for me. I've simplified it a bit and prepared some code to share with you.
The basic idea of my framework is to be and simple and easy to use as possible. There's no Erlang-style pattern matching or process linking. There isn't even any built-in error handling. But, there's a lot of support for the typical "react loop" style of message handling.
The architecture is very simple. There's one thread per actor. An actor is just an object you can send a message to. The actor receives the messages in the order they were sent. You always know the sender of the message; it's built right in. There's also "Bridge" that allows you to interact with actors from non-actor code.
The only tricky thing you'll see is a "handles" decorator. This is used to help you identify how you want an actor to handle a given message type. It helps you avoid re-creating a message loop and dispatcher yourself for every actor.
I've included a little example of how to use the framework. It obviously quite simple, but keep in mind that I used little more for the foundation of a rather complex file synchronization application. A little bit goes a long way.
So, here it is. Cut, paste, and run.
from threading import Thread, Event
from Queue import Queue, Empty
class IActor:
pass
# send(message, sender)
class HandlerRegistry(dict):
# should be used as a decorator
def __call__(self, typ):
def register(func):
self[typ] = func
return func
return register
OtherMessage = "Other Message"
class Stop(Exception):
def __repr__(self):
return "Stop()"
class Stopped:
def __repr__(self):
return "Stopped()"
class ActorNotStartedError(Exception):
def __init__(self):
Exception.__init__(self, "actor not started")
class Actor(IActor):
@classmethod
def spawn(cls, *args, **kargs):
self = cls(*args, **kargs)
self.mailbox = Mailbox()
start_thread(target = self.act, as_daemon = True)
return self
def send(self, message, sender):
if self.mailbox is None:
raise ActorNotStartedError()
else:
self.mailbox.send(message, sender)
def receive(self):
if self.mailbox is None:
raise ActorNotStartedError()
else:
return self.mailbox.receive()
# override if necessary
def act(self):
self.handleMessages()
handles = HandlerRegistry()
@classmethod
def makeHandles(*classes):
return HandlerRegistry((typ, handler) for cls in classes for (typ, handler) in cls.handles.iteritems())
def handleMessages(self):
try:
while True:
message, sender = self.receive()
self.handleMessageWithRegistry(message, sender)
except Stop:
pass
def handleMessageWithRegistry(self, message, sender):
registry = self.__class__.handles
handler = registry.get(message.__class__) or registry.get(OtherMessage)
if handler is not None:
handler(self, message, sender)
@handles(OtherMessage)
def onOther(self, message, sender):
pass
@handles(Stop)
def onStop(self, message, sender):
sender.send(Stopped(), self)
raise message
def start_thread(target, as_daemon, name = None):
thread = Thread(target = target)
if name:
thread.setName(name)
thread.setDaemon(as_daemon)
thread.start()
return thread
class Mailbox:
def __init__(self):
self.mailbox = Queue()
def send(self, message, sender):
self.mailbox.put((message, sender), block = False)
def receive(self, timeout = None):
return self.mailbox.get(block = True, timeout = timeout)
class Bridge(IActor):
def __init__(self):
self.mailbox = Mailbox()
def send(self, message, sender):
self.mailbox.send(message, sender)
def call(self, target, request, timeout, default = None):
self.sendRequest(target, request)
return self.receiveResponse(timeout, default)
# targeted_requests can be an iterator
def multiCall(self, targeted_requests, timeout, default = None):
count = 0
for target, request in targeted_requests:
self.sendRequest(target, request)
count += 1
for _ in xrange(count):
yield self.receiveResponse(timeout, default)
def stop(self, actors, timeout):
stop = Stop()
return list(self.multiCall(((actor, stop) for actor in actors), timeout, default = None))
def sendRequest(self, target, request):
target.send(request, self)
def receiveResponse(self, timeout, default):
try:
message, sender = self.mailbox.receive(timeout = timeout)
return message
except Empty:
return default
if __name__ == "__main__":
import time
class GetInventory:
pass
class Task:
def __init__(self, input, destination):
self.input = input
self.destination = destination
class Worker(Actor):
handles = Actor.makeHandles()
def __init__(self, skill):
self.skill = skill
@handles(Task)
def onTask(self, task, sender):
output = self.skill(task.input)
task.destination.send(output, self)
class Warehouse(Actor):
handles = Actor.makeHandles()
def __init__(self):
self.inventory = []
@handles(GetInventory)
def onGetInventory(self, message, sender):
# copy the inventory to avoid anyone mutating it
sender.send(list(self.inventory), self)
@handles(OtherMessage)
def onTaskResult(self, result, sender):
self.inventory.append(result)
worker = Worker.spawn(lambda x : x * 2)
positives = Warehouse.spawn()
negatives = Warehouse.spawn()
bridge = Bridge()
for val in [1, 2, 3, -2, -4, -6]:
warehouse = positives if val >= 0 else negatives
worker.send(Task(val, warehouse), sender = None)
print bridge.call(positives, GetInventory(), 1.0) #should be [ 2, 4, 6]
print bridge.call(negatives, GetInventory(), 1.0) #should be [-4, -8, -12]
print bridge.stop([worker, positives, negatives], 1.0) #should be [Stopped(), Stopped(), Stopped()]
class Start:
def __init__(self, target):
self.target = target
class Ping:
def __repr__(self):
return "Ping()"
class Pong:
def __repr__(self):
return "Pong()"
class Pinger(Actor):
handles = Actor.makeHandles()
@handles(Start)
def onStart(self, start, sender):
start.target.send(Ping(), self)
@handles(Pong)
def onPong(self, pong, sender):
print "-",
sender.send(Ping(), self)
class Ponger(Actor):
handles = Actor.makeHandles()
@handles(Ping)
def onPing(self, ping, sender):
print "+",
sender.send(Pong(), self)
# should print lots of +-+-+-
pinger = Pinger.spawn()
ponger = Ponger.spawn()
pinger.send(Start(ponger), sender = None)
time.sleep(0.1)
bridge.stop([pinger, ponger], 1.0)