I've learned. I'll share.

August 12, 2009

Rx Simplified (Reactive Programming in Python)

Lately, there's been interest in "reactive programming", especially with Rx. What is Rx? I've seen descriptions like "reactive programming with LINQ", "the dual of the enumerator/iterator" and even "a variation of the continuation monad". Oh right...uh...monad? dual? what's going on?

If you like things like "monads", "duals", and category theory, go watch this video, especially until the end. It's pretty funny.

But if those things make your eyes glaze over and you're left wondering what Rx really is, I want to give you a simple explanation of what Rx is all about. In fact, I'll show how you could have invented it yourself. We'll do so with simple event-based code written in Python.

Step 1: write simple event handlers

Imagine we have a mouse click event that fires either "left" or "right", and we want to make a new event that fires "double left" when there's a double left click. We might write something like this (including a simple Event class).

import time

class Event:
    def __init__(self):
        self.handlers = []

    def handle(self, handler):
        self.handlers.append(handler)

    def fire(self, val = None):
        for handler in self.handlers:
            handler(val)

def echo(val):
    print val
    return val

def left_double_click_from_click(click, threshold):
    dlclick = Event()
    last_lclick_time = [0] #closure hack

    def click_handler(click_value):
        if click_value == "left":
            lclick_time = time.time()
            if (lclick_time - last_lclick_time[0]) < threshold:
                dlclick.fire("double left")
            last_lclick_time[0] = lclick_time

    click.handle(click_handler)
    return dlclick


click   = Event()
dlclick = left_double_click_from_click(click, .01)
dlclick.handle(echo)

click.fire("left")
time.sleep(.02)
click.fire("left")
click.fire("right")
click.fire("left") #prints "double click"

Step 2: refactor event handlers

It works and it's pretty simple. But, we could refactor quite a bit. If we do so, we might write something like this (notice I like the suffix "_r" for "reactive"):

class EventFireRecord:
    def __init__(self, value, time):
        self.value = value
        self.time  = time

def doubleize_r(evt, threshold, combine):
    double_evt = Event()

    last_fire  = EventFireRecord(None, 0)
    def evt_handler(value):
        fire_time = time.time()
        if (fire_time - last_fire.time) < threshold:
            double_evt.fire(combine(last_fire.value, value))
        last_fire.__init__(value, fire_time)

    evt.handle(evt_handler)
    return double_evt

def filter_r(evt, predicate):
    filtered_evt = Event()

    def evt_handler(value):
        if predicate(value):
            filtered_evt.fire(value)

    evt.handle(evt_handler)
    return filtered_evt

click   = Event()
dlclick = doubleize_r(filter_r(click, lambda value : value == "left"), .01, lambda l1, l2: "double left")
dlclick.handle(echo)

click.fire("left")
time.sleep(.02)
click.fire("left")
click.fire("right")
click.fire("left") #prints "double left"

That looks better and is more generic. The logic of "double click" is now contained all on one line. But, we could do even better. Notice that we repeat ourselves a little with filter_r and doublize_r. The pattern looks like this:
evt_out = Event()

def handler(value):
   ...
   evt_out.fire(value)
   ...

evt_in.handle(handler)
return evt_out
What if we refactor to pull out that common pattern by making a special handler that returns a value and a special "handle_with_result" that looks like this pattern?
def handler(value):
    ...
    return value

evt_out = handle_with_result(evt_in, handler)

Step 3: make a higher-level "handle" function

If we do that, our code might look like this:
def handle_with_result(evt, handler_with_result):
    evt_out = Event()

    def handler(value):
        result = handler_with_result(value)
        if result is not None:
            evt_out.fire(result)

    evt.handle(handler)
    return evt_out

def doubleize_event(evt, threshold, combine):
    last_fire = EventFireRecord(None, 0)

    def handler(value):
        fire_time = time.time()
        try:
            if (fire_time - last_fire.time) < threshold:
                return combine(last_fire.value, value)
        finally:
            last_fire.__init__(value, fire_time)

    return handle_with_result(evt, handler)

def filter_event(evt, predicate):
    def handler(value):
        if predicate(value):
            return value

    return handle_with_result(evt, handler)


click  = Event()
dlclick = doubleize_event(filter_event(click, lambda value : value == "left"), .01, lambda l1, l2 : "double left")
dlclick.handle(echo)

click.fire("left")
time.sleep(.02)
click.fire("left")
click.fire("right")
click.fire("left") #prints "double left"

It works, and our code looks better than ever. handle_with_result is very useful.

But, we are now missing something: what if we want to return multiple values? Or do something more interesting, like listen to an keyboard event and return left-clicks if the user clicks "l" and right clicks if they type "r" and two "fake" clicks if they type "f". We'd like to write something like this:

def choose_clicks(keys, clicks):
    def key_handler(char):
        if char == "l":
            return filter_event("left", clicks)
        elif char == "r":
            return filter_event("right", clicks)
        elif char == "f":
            return ["fake", "fake"]

    retrn handle_with_result(keys, key_handler)
If we change handle_with_result to handle this, we might get something like this:

def handle_with_result(evt, handler_with_result):
    evt_out = Event()

    def handler(value):
        result = handler_with_result(value)
        if result is None:
            pass #ignore
        elif isinstance(result, Event):
            result.handle(evt_out.fire)
        elif isinstance(result, list):
            for value_out in result:
                evt_out.fire(value_out)
        else:
            evt_out.fire(result)


    evt.handle(handler)
    return evt_out

def filter_r(evt, predicate):
    def handler(value):
        if predicate(value):
            return value

    return handle_with_result(evt, handler)

def value_filter_r(evt, value):
    return filter_r(evt, lambda val : val == value)

def choose_clicks(keys, clicks):
    def key_handler(char):
        #TODO: unsubscribe from event after either "l" or "r"
        if char == "l":
            return value_filter_r(clicks, "left")
        elif char == "r":
            return value_filter_r(clicks, "right")
        elif char == "f":
            return ["fake", "fake"]

    return handle_with_result(keys, key_handler)

keys   = Event()
clicks = Event()
choosen_clicks = choose_clicks(keys, clicks)

choosen_clicks.handle(echo)
clicks.fire("left")
keys.fire("a")
clicks.fire("right")
keys.fire("l")
clicks.fire("left") # print "left"
clicks.fire("right")
clicks.fire("left") # print "left"
keys.fire("f") #prints "fake" and then "fake" again

Great. Now if we just add a little bit of syntax sugar to this, we can make events look like streams:

Step 4: add some syntax sugar

class Event:
    def __init__(self):
        self.handlers = []

    def handle(self, handler):
        self.handlers.append(handler)
        return self #so += will work

    def fire(self, val = None):
        for handler in self.handlers:
            handler(val)

    def bind(evt, handler_with_result):
        evt_out = Event()

        def handler(value):
            result = handler_with_result(value)
            if result is not None:
                Event.unit(result).handle(evt_out.fire)

        evt.handle(handler)
        return evt_out

    @classmethod
    def unit(cls, val):
        if isinstance(val, cls):
            return val
        elif isinstance(val, list):
            return MockEvent(*val)
        else:
            return MockEvent(val)

    __rshift__ = bind

class MockEvent:
    def __init__(self, *vals):
        self.vals = vals

    def handle(self, handler):
        for val in self.vals:
            handler(val)  

def doublize_r(threshold, combine):
    last_fire = EventFireRecord(None, 0)

    def handler(value):
        fire_time = time.time()
        try:
            if (fire_time - last_fire.time) < threshold:
                return combine(last_fire.value, value)
        finally:
            last_fire.__init__(value, fire_time)

    return handler

def filter_r(predicate):
    def handler(value):
        if predicate(value):
            return value

    return handler

def value_filter_r(value):
    return filter_r(lambda val : val == value)

def click_choose_r(**clicks_by_char):
    def key_handler(char):
        return clicks_by_char.get(char)

    return key_handler


clicks   = Event()
keys     = Event()
dlclicks = clicks >> value_filter_r("left") >> doublize_r(.01, lambda l1, l2: "double click")
keys >> click_choose_r(d = dlclicks, f = ["fake", "fake"]) >> echo

clicks.fire("left")
clicks.fire("left")
keys.fire("f") #prints "fake" and then "fake" again
keys.fire("d")
clicks.fire("right")
clicks.fire("right")
time.sleep(.02)
clicks.fire("left")
clicks.fire("left") #print ("left", "left")

So what have we made?

Wonderful. We've made events look like streams by making a slick way of creating event handlers. In fact, if you look closely at what I did in that last step, you'll notice that I renamed "handle_with_result" to "bind" and moved some code into a method called "unit". That's all it takes to turn Event into a monad, which is exactly what Rx does. Congratulations, we just reinvented monads and Rx, just by refactoring our event handler code and in the process we've discovered what Rx really is: a fancy way of writing event handlers, specifically event handlers that fire events that trigger other event handlers that fire events, and so on in big chain that looks like a query. So when your eyes glaze over about "duals" and "monads" and "reactive programming", just say to yourself: I'm making a fancy event handler. Because in the end, that's all you're really doing.

In fact, if you want to do so in Python, now you have a basic implementation to start with! Of course, this is just a toy implementation. It lack error handling, unsubscribing, end-of-stream, and concurrency. But it ain't bad for just 50 lines of code. And it lets you see the essence of Rx fairly easily.

Oh, and what's the big deal with monads, you ask? Nothing much. It's just that if you provide "bind" and "unit" (called "select many" and "select" in LINQ, I think), LINQ gives you some nice syntax sugar that makes your event handler look like a query. It's really pretty slick, especially now that they've added "extension methods".

And next time...

In future posts, I'll explore different ways of making slick event handlers, but without monads. And hopefully we'll get make this handle concurrency, which is what asynchronous programming is all about. In fact, I expect we'll start to see a serious blurring of lines between Rx, message passing, and data flow programming.

For now, when you start working with Rx, just remember: I'm making a big, fancy event handler. An "Observable" is just like and event and an "Observer" is just like an event handler.

P.S.

What's with the lame names? They started off with cool names like "Reactive" and "Rx" and then give us Observable, Observer, Subscribe, OnNext, OnDone, and OnError. Yuck. Think what an opportunity they missed! We could have had names like Emitter, Reactor, Chain, Emit, Extinguish, and Explode. Judge for yourself:
observable.Subscribe(observer)
observer.OnNext(val)
observer.OnDone()
observer.OnError(e)
or
emitter.chain(reactor)
reactor.emit("foo")
reactor.extinguish()
reactor.explode(e)

6 comments:

  1. I stuck this progression in a bitbucket mercurial repository here: http://bitbucket.org/loppear/rx-python/ I hope that's alright with you. A fun exercise and a fun video to copy&paste to.

    ReplyDelete
  2. (I didn't look long, but am confused why I could not get:

    click = Event()

    dlclick = click >> value_filter_r("left") >> doublize_r(
    .01,
    lambda l1, l2 : "double left"
    )

    to still pass the original test with the final code. No output.

    ReplyDelete
  3. Luke,

    Sorry, I usually make a runnable version of the code, but I ended up splitting into two articles (I just published the latest one). My last post is the runnable version of both put together. I hope it helps.

    Feel free to take the code, fix it up, improve it, post it somewhere, etc.

    I looked into your repo. I don't know how your test harness works exactly. It seems weird that the tests are just comments. But, I added the event firing code directly to main.py and ran main.py and got the correct output.

    ReplyDelete
  4. Peter,

    Thanks for the new post. The tests are doctests http://docs.python.org/library/doctest.html although I prefer to use the Nose testrunner http://code.google.com/p/python-nose/ to collect and run tests - just run "nosetests" (or, without the setup.cfg, "nosetests --with-doctest") to see that the test passes. Not that important, just allowed me to keep the stdout-based feel of your examples without a more elaborate TestCase setup. (And of course, shortly after my second comment I realized the answer was in the question. "No output" indeed.)

    I also particularly like a) this refactoring style of teaching a programming idea and b) to see refactorings through the lens of a version controlled repo.

    Thanks again, I'll take some time to go through the runnable code soon.
    Luke

    ReplyDelete
  5. > It's just that if you provide "bind" and "unit" (called "select many" and "select" in LINQ, I think)

    LINQ's Select is actually fmap.

    ReplyDelete
  6. I guess that makes sense. For the list monad, "select" would make sense as a name for "map" (list fmap), and "select many" would make sense for "mapcat" (list bind).

    ReplyDelete

Blog Archive

Google Analytics