I've learned. I'll share.

August 26, 2009

SQL is now turing complete!

David Fetter at Oscon 2009 just made a stunning claim: With CTE and Windowing, SQL is Turing Complete. He even offers a proof and an amazing example (although I'm not sure if the example requires turing completeness, it's still amazing).

This could be a big deal. Why? It means that you can do anything in an SQL query. While drawing the mandlebrot set is a nice demo, the first "killer app" will be tree traversal. Lots of relational tables end up looking like trees and are a royal pain to deal with in normal SQL. After that, only our imagination (and performance) is the limit. You can theoretically create any data view that runs in the DB all in one shot, without any of the troubles of stored procedures.

I see a similarity between turing-complete SQL in the DB and JavaScript on the browser. For years, we used JavaScript to do silly stuff, but then a few bright minds created amazing tools like gmail and google maps and our view of JavaScript was forever changed. Now JavaScript is everywhere and there's a race to make the best development framework and engines fastest engines.

Might the same thing happen with turning-complete SQL? is the race on to be the first programming language to compile down to turing-complete SQL?.

Honestly, if I were a betting man, I'd say it won't come to anything significant. But I'll also guess that the lure of compiling down to SQL will eventually capture someone, somewhere. It's only a matter of time. In fact, if you're a programming language geek, the sirens are probably calling to you right now :). When it does happen, I expect the first language to be a functional one, especially one with a small core, like a variant of lisp. Once we get clojure in clojure, maybe it will be a candidate. Or maybe C# will end up with LINQ-to-turning-complete-SQL (SQLINQ?).

What do you think?

August 14, 2009

The Code for Reactive Programming in Python

I packaged some python code that you can run for each of the steps I've shown in my articles about Reactive Programming, how you could have invented it and more ways to do it. I apologize that it's not in a more usable form. Go ahead and copy and paste to wherever you like. If you put it online somewhere more convenient, just put a link in the comments. I put the control of which example runs at the very end. Just comment the appropriate line for the example you want to run. And, here it is:

import time

## simplified event stuff 
class Event:
    def __init__(self):
        self.handlers = []

    def handle(self, handler):
        self.handlers.append(handler)
        return self #so += will work

    def fire(self, val = None):
        for handler in self.handlers:
            handler(val)

def echo(val):
    print val
    return val

def simple_click_event_example():
    click = Event()
    click.handle(echo)
    click.fire("left") #prints "left"

def click_event_manipulation_example():
    def left_double_click_from_click(click, threshold):
        dlclick = Event()
        last_lclick_time = [0] #closure hack

        def click_handler(click_value):
            if click_value == "left":
                lclick_time = time.time()
                if (lclick_time - last_lclick_time[0]) < threshold:
                    dlclick.fire("double left")
                last_lclick_time[0] = lclick_time
        
        click.handle(click_handler)
        return dlclick


    click   = Event()
    dlclick = left_double_click_from_click(click, .01)
    dlclick.handle(echo)

    click.fire("left")
    time.sleep(.02)
    click.fire("left")
    click.fire("right")
    click.fire("left") #prints "double left"
    
class EventFireRecord:
    def __init__(self, value, time):
        self.value = value
        self.time  = time

def click_event_maniuplation_refactored_example():
    def doubleize_event(evt, threshold, combine):
        double_evt = Event()

        last_fire  = EventFireRecord(None, 0)
        def evt_handler(value):
            fire_time = time.time()
            if (fire_time - last_fire.time) < threshold:
                double_evt.fire(combine(last_fire.value, value))
            last_fire.__init__(value, fire_time)
        
        evt.handle(evt_handler)
        return double_evt

    def filter_event(evt, predicate):
        filtered_evt = Event()

        def evt_handler(value):
            if predicate(value):
                filtered_evt.fire(value)

        evt.handle(evt_handler)
        return filtered_evt

    click  = Event()
    lclick = filter_event(click, lambda value : value == "left")
    dlclick = doubleize_event(lclick, .01, lambda click1, click2 : "double left")
    dlclick.handle(echo)

    click.fire("left")
    time.sleep(.02)
    click.fire("left")
    click.fire("right")
    click.fire("left") #prints "double click"


def click_event_handle_with_result_example():
    def handle_with_result(evt, handler_with_result):
        evt_out = Event()

        def handler(value):
            result = handler_with_result(value)
            if result is not None:
                evt_out.fire(result)

        evt.handle(handler)
        return evt_out

    def doubleize_r(evt, threshold):
        last_fire = EventFireRecord(None, 0)

        def handler(value):
            fire_time = time.time()
            try:
                if (fire_time - last_fire.time) < threshold:
                    return (last_fire.value, value)
            finally:
                last_fire.__init__(value, fire_time)

        return handle_with_result(evt, handler)

    def filter_r(evt, predicate):
        def handler(value):
            if predicate(value):
                return value

        return handle_with_result(evt, handler)


    clicks   = Event()
    dlclicks = doubleize_r(filter_r(click, lambda value : value == "left"), .01)
    dlclicks.handle(echo)

    clicks.fire("left")
    time.sleep(.02)
    clicks.fire("left")
    clicks.fire("right")
    clicks.fire("left") #prints ("left", "left")


def click_event_choosing_by_returning_event_example():
    def handle_with_result(evt, handler_with_result):
        evt_out = Event()

        def handler(value):
            result = handler_with_result(value)
            if result is None:
                pass #ignore
            elif isinstance(result, Event):
                result.handle(evt_out.fire)
            elif isinstance(result, list):
                for value_out in result:
                    evt_out.fire(value_out)
            else:
                evt_out.fire(result)
                    

        evt.handle(handler)
        return evt_out

    def filter_r(evt, predicate):
        def handler(value):
            if predicate(value):
                return value

        return handle_with_result(evt, handler)

    def value_filter_r(evt, value):
        return filter_r(evt, lambda val : val == value)

    def click_choose_r(keys, clicks):
        def key_handler(char):
            #TODO: unsubscribe from event after either "l" or "r"
            if char == "l":
                return value_filter_r(clicks, "left")
            elif char == "r":
                return value_filter_r(clicks, "right")
            elif char == "f":
                return ["fake", "fake"]

        return handle_with_result(keys, key_handler)

    keys   = Event()
    clicks = Event()
    choosen_clicks = click_choose_r(keys, clicks)

def click_event_looks_like_streams_example():
    class Event:
        def __init__(self):
            self.handlers = []

        def handle(self, handler):
            self.handlers.append(handler)
            return self #so += will work

        def fire(self, val = None):
            for handler in self.handlers:
                handler(val)

        def bind(evt, handler_with_result):
            evt_out = Event()

            def handler(value):
                result = handler_with_result(value)
                if result is not None:
                    Event.unit(result).handle(evt_out.fire)

            evt.handle(handler)
            return evt_out

        @classmethod
        def unit(cls, val):
            if isinstance(val, cls):
                return val
            elif isinstance(val, list):
                return MockEvent(*val)
            else:
                return MockEvent(val)

        __rshift__ = bind

    class MockEvent:
        def __init__(self, *vals):
            self.vals = vals

        def handle(self, handler):
            for val in self.vals:
                handler(val)  

    def doublize_r(threshold, combine):
        last_fire = EventFireRecord(None, 0)

        def handler(value):
            fire_time = time.time()
            try:
                if (fire_time - last_fire.time) < threshold:
                    return combine(last_fire.value, value)
            finally:
                last_fire.__init__(value, fire_time)

        return handler

    def filter_r(predicate):
        def handler(value):
            if predicate(value):
                return value

        return handler

    def value_filter_r(value):
        return filter_r(lambda val : val == value)

    def click_choose_r(**clicks_by_char):
        def key_handler(char):
            return clicks_by_char.get(char)

        return key_handler


    clicks   = Event()
    keys     = Event()
    dlclicks = clicks >> value_filter_r("left") >> doublize_r(.01, lambda l1, l2: "double left")
    keys >> click_choose_r(d = dlclicks, f = ["fake", "fake"]) >> echo

    clicks.fire("left")
    clicks.fire("left")
    keys.fire("f") #prints "fake" and then "fake" again
    keys.fire("d")
    clicks.fire("right")
    clicks.fire("right")
    time.sleep(.02)
    clicks.fire("left")
    clicks.fire("left") #print ("double left")


## basic consumer (receiver) using generators

receive = object()

def receiver_example():
    def receiver(gen_rcvr):
        def gen_and_start_rcvr(*args, **kargs):
            rcvr = gen_rcvr(*args, **kargs)
            rcvr.send(None)
            return rcvr
        return gen_and_start_rcvr

    @receiver
    def sum_r(title):
        total = 0
        while True:
            total += yield receive
            print "%s: %d" % (title, total)

    @receiver
    def count_r(title):
        count = 0
        while True:
            yield receive
            count += 1
            print "%s: %d" % (title, count)

    num_key      = Event()
    sum_nums     = sum_r("total nums")
    num_key.handle(sum_nums.send)

    num_key.fire(1) #prints "total nums: 1"
    num_key.fire(2) #prints "total nums: 3" 
    num_key.fire(3) #prints "total nums: 6"

## make retiterators that can also output values via an event fire
def remitter_example():
    class Remitter:
        def __init__(self, receiver_from_event_out):
            self.receiverFromEventOut = receiver_from_event_out

        def __rrshift__(self, event_in):
            event_out = Event()
            rcvr      = self.receiverFromEventOut(event_out)
            event_in.handle(rcvr.send)
            return event_out

    def remitter(gen_rcvr):
        def gen_remitter(*args, **kargs):
            def receiver_from_event_out(event_out):
                rcvr = gen_rcvr(event_out, *args, **kargs)
                rcvr.send(None)
                return rcvr
            return Remitter(receiver_from_event_out)
        return gen_remitter

    @remitter
    def double_detect_r(double_click_event, threshold):
        last_click_time = 0
        while True:
            yield receive
            current_click_time = time.time()
            if (current_click_time - last_click_time) < threshold:
                double_click_event.fire()
            last_click_time = current_click_time

    @remitter
    def print_r(_, message):
        while True:
           val = yield receive
           print message

    mouse_click  = Event()

    mouse_click >> print_r("left")
    mouse_click >> double_detect_r(.01) >> print_r("double left")

    mouse_click.fire() #prints "left"
    time.sleep(.02)
    mouse_click.fire() #prints "left"
    mouse_click.fire() #prints "left" and "double left"

## make retiterators out of generators that can send and receive

def remitter_example_yield_out():
    from collections import defaultdict

    class Remitter:
      def __init__(self, ritr):
          self.ritr     = ritr
          self.eventOut = Event()

      def send(self, val_in):
          ritr      = self.ritr
          event_out = self.eventOut

          while True:
              val_out = ritr.send(val_in)
              if val_out is receive:
                  break
              else:
                  event_out.fire(val_out)            

      def handle(self, handler):
          self.eventOut.handle(handler)

      def handlein(self, *events):
          for event in events:
              event.handle(self.send)

      def __rrshift__(self, event_in):
          try:
              self.handlein(*event_in)
          except:
              self.handlein(event_in)
          return self


    def remitter(gen_rcvr):
        def gen_remitter(*args, **kargs):
            ritr = gen_rcvr(*args, **kargs)
            ritr.send(None)
            return Remitter(ritr)
        return gen_remitter


    @remitter
    def double_detect_r(threshold):
        last_click_time = 0
        while True:
            yield receive
            current_click_time = time.time()
            if (current_click_time - last_click_time) < threshold:
                yield
            last_click_time = current_click_time

    @remitter
    def map_r(f, *args, **kargs):
        while True:
           val = yield receive
           yield f(val, *args, **kargs)

    @remitter
    def print_r(format):
        while True:
           val = yield receive
           print message % val

    def label_r(label):
        return map_r(lambda val : (label, val))

    @remitter
    def label_count_r():
        count_by_label = defaultdict(int)
        while True:
            (label, val) = yield receive
            count_by_label[label] += 1
            yield count_by_label.copy()

    def fix_click_counts(count_by_label, single_label, double_label):
        count_by_label[single_label] -= (count_by_label[double_label] * 2) #every double left "cancels" a single click
        return count_by_label.copy()

    def print_label_counts(count_by_label, *labels):
        print ", ".join("%d %s" % (count, label) for (label, count) in count_by_label.iteritems())


    mouse_clicks = Event()

    ([mouse_clicks >> label_r("single"),
      mouse_clicks >> double_detect_r(.01) >> label_r("double")] 
     >> label_count_r() >> map_r(fix_click_counts, "single", "double") >> map_r(print_label_counts))

    #prints
    #0 double, 1 single
    #0 double, 2 single
    #0 double, 3 single
    #1 double, 1 single
    mouse_clicks.fire() 
    time.sleep(.02)
    mouse_clicks.fire() 
    mouse_clicks.fire()


def remitter_without_yield_in_hack_example():
    class Receive:
        def __init__(self, val = None):
            self.d = val

    class Remitter:
      def __init__(self, receive, ritr):
          self.receive  = receive
          self.ritr     = ritr
          self.eventOut = Event()

      def send(self, val_in):
          self.receive.d = val_in

          ritr      = self.ritr
          event_out = self.eventOut
          while True:
              val_out = ritr.next()
              if isinstance(val_out, Receive):
                  break
              else:
                  event_out.fire(val_out)

      def handle(self, handler):
          self.eventOut.handle(handler)

      def handlein(self, *events):
          for event in events:
              event.handle(self.send)

      def __rrshift__(self, event_in):
          try:
              self.handlein(*event_in)
          except:
              self.handlein(event_in)
          return self

    def remitter(gen_rcvr):
        def gen_remitter(*args, **kargs):
            receive = Receive()
            ritr = gen_rcvr(receive, *args, **kargs)
            ritr.send(None)
            return Remitter(receive, ritr)
        return gen_remitter

    @remitter
    def double_detect_r(receive, threshold):
        last_click_time = 0
        while True:
            yield receive
            current_click_time = time.time()
            gap                = current_click_time - last_click_time
            if gap < threshold:
                yield gap
            last_click_time = current_click_time

    @remitter
    def average_r(receive):
        total   = 0.0
        count   = 0
        while True:
            yield receive
            total += receive.d
            count += 1
            yield total/count

    @remitter
    def print_r(receive, format):
        while True:
            yield receive
            print format % (receive.d)

    mouse_clicks = Event()
    mouse_clicks >> double_detect_r(.05) >> average_r() >> print_r("double left; average gap is %s seconds")

    mouse_clicks.fire() 
    time.sleep(.1)
    mouse_clicks.fire() 
    time.sleep(.01)
    mouse_clicks.fire() #prints #double left; average gap is 0.01... seconds
    time.sleep(.02) 
    mouse_clicks.fire() #double left; average gap is 0.015... seconds

if __name__ == "__main__":
    #simple_click_event_example()
    #click_event_manipulation_example()
    #click_event_maniuplation_refactored_example()
    #click_event_handle_with_result_example()
    #click_event_choosing_by_returning_event_example()
    #click_event_looks_like_streams_example()
    #remitter_example()
    #remitter_example_yield_out()
    remitter_without_yield_in_hack_example()

August 12, 2009

More ways to do Reactive Programming in Python

In my last post, I covered a little bit of Rx and how you could a have invented it. But you might invent a different way of doing the same thing. And since most languages don't have anything like LINQ, you might be interested in ways to do things in your programming language that don't require monads.

Let's explore some other ways to do Reactive Programming (Rx).

What is Rx?

Just to remind you what we're trying to accomplish, Rx builds event handlers. The LINQ version of Rx works by making an event look like a query (or stream).

It makes sense if you think about it. An event is a stream, of event occurences. A list or enumerator or iterator is also a stream, of values. So if you squint hard enough, you see that events and enumerators are sort of the same thing. In fact, lists, streams, enumerators, events, channels, pipes, sockets, file handles, actors sending you messages are all pretty much the same thing: they are all producers of values.

Consumers and Receivers

Now what do you do with producers of values? You consume them, of course! Usually with something that looks like this (in python):
sum = 0
for val in vals:
   sum += val
   print sum
What we've created here is a consumer of vals. We can write it this way, as ordinary code, because vals is very flexible: it's anything that's iterable/enumerable. But what if instead of forcing the producer to be flexible, we forced the consumer to be flexible? What if we could write the consumer like this:
total = 0
while True:
  total += receive
  print total
Hmm... it sort of looks like the opposite of an iterator/generator/enumerator block. A mathematician might say something about "duals" at this point, but I'm not mathematician, so let's just go ahead and try and implement it. In fact, we'll use python generators to do just that. We'll call this a "receiver" and we'll spell "receive" as "yield receive":
class Event:
    def __init__(self):
        self.handlers = []

    def handle(self, handler):
        self.handlers.append(handler)

    def fire(self, val = None):
        for handler in self.handlers:
            handler(val)

receive = object()

def receiver(gen_rcvr):
    def gen_and_start_rcvr(*args, **kargs):
        rcvr = gen_rcvr(*args, **kargs)
        rcvr.send(None)
        return rcvr
    return gen_and_start_rcvr

@receiver
def sum_r(title):
    total = 0
    while True:
        total += yield receive
        print "%s: %d" % (title, total)

@receiver
def count_r(title):
    count = 0
    while True:
        yield receive
        count += 1
        print "%s: %d" % (title, count)


num_key  = Event()
sum_nums = sum_r("total nums")
num_key.handle(sum_nums.send)

num_key.fire(1) #prints "total nums: 1"
num_key.fire(2) #prints "total nums: 3" 
num_key.fire(3) #prints "total nums: 6"
It actually works! And because our consumer is very flexible, any producer, like an event, can use it. In fact, it's just a fancy event callback, just like everyrthing else in Rx land.

Remitters

But if we take this one step further and make a receiver wrap an event, we can make a receiver that's also a producer. We'll call it a "remitter", which is sort of like a receiver and an emitter.
class Remitter:
    def __init__(self, receiver_from_event_out):
        self.receiverFromEventOut = receiver_from_event_out

    def __rrshift__(self, event_in):
        event_out = Event()
        rcvr      = self.receiverFromEventOut(event_out)
        event_in.handle(rcvr.send)
        return event_out

def remitter(gen_rcvr):
    def gen_remitter(*args, **kargs):
        def receiver_from_event_out(event_out):
            rcvr = gen_rcvr(event_out, *args, **kargs)
            rcvr.send(None)
            return rcvr
        return Remitter(receiver_from_event_out)
    return gen_remitter

@remitter
def double_detect_r(double_click_event, threshold):
    last_click_time = 0
    while True:
        (yield)
        current_click_time = time.time()
        if (current_click_time - last_click_time) < threshold:
            double_click_event.fire()
        last_click_time = current_click_time

@remitter
def print_r(_, message):
    while True:
       val = (yield)
       print message

mouse_click  = Event()

mouse_click >> print_r("left")
mouse_click >> double_detect_r(.01) >> print_r("double left")

mouse_click.fire() #prints "left"
time.sleep(.02)
mouse_click.fire() #prints "left"
mouse_click.fire() #prints "left" and "double left"
Great. But it is kind of annoying passing in the event like that. What if we had the remitter yield values out and yield values in?

Remitters that yield out and in

We could do that using little state machines built from python generators. "yield receive" will mean receive and "yield" of anything else will mean "emit".
from collections import defaultdict

class Remitter:
  def __init__(self, ritr):
      self.ritr     = ritr
      self.eventOut = Event()

  def send(self, val_in):
      ritr      = self.ritr
      event_out = self.eventOut

      while True:
          val_out = ritr.send(val_in)
          if val_out is receive:
              break
          else:
              event_out.fire(val_out)            

  def handle(self, handler):
      self.eventOut.handle(handler)

  def handlein(self, *events):
      for event in events:
          event.handle(self.send)

  def __rrshift__(self, event_in):
      try:
          self.handlein(*event_in)
      except:
          self.handlein(event_in)
      return self


def remitter(gen_rcvr):
    def gen_remitter(*args, **kargs):
        ritr = gen_rcvr(*args, **kargs)
        ritr.send(None)
        return Remitter(ritr)
    return gen_remitter


@remitter
def double_detect_r(threshold):
    last_click_time = 0
    while True:
        yield receive
        current_click_time = time.time()
        if (current_click_time - last_click_time) < threshold:
            yield
        last_click_time = current_click_time

@remitter
def map_r(f, *args, **kargs):
    while True:
       val = yield receive
       yield f(val, *args, **kargs)

@remitter
def print_r(format):
    while True:
       val = yield receive
       print message % val

def label_r(label):
    return map_r(lambda val : (label, val))
        
@remitter
def label_count_r():
    count_by_label = defaultdict(int)
    while True:
        (label, val) = yield receive
        count_by_label[label] += 1
        yield count_by_label.copy()

def fix_click_counts(count_by_label, single_label, double_label):
    count_by_label[single_label] -= (count_by_label[double_label] * 2) #every double click "cancels" a single click
    return count_by_label.copy()

def print_label_counts(count_by_label, *labels):
    print ", ".join("%d %s" % (count, label) for (label, count) in count_by_label.iteritems())


mouse_clicks = Event()

([mouse_clicks >> label_r("single"),
  mouse_clicks >> double_detect_r(.01) >> label_r("double")] 
 >> label_count_r() >> map_r(fix_click_counts, "single", "double") >> map_r(print_label_counts))
 
#prints
#0 double, 1 single
#0 double, 2 single
#0 double, 3 single
#1 double, 1 single
mouse_clicks.fire() 
time.sleep(.02)
mouse_clicks.fire() 
mouse_clicks.fire()
Sweet. That looks pretty nice. But, it relies on the fact that Python allows you to yield values in to a generator. What if we have a programming language that only allows yielding values out (like any enumerator)?

Remitters that yield in by yielding out

We'll introduce a simple hack to work around that. We'll yield out a mutable "receive" value that "receives" in the value for us.
class Receive:
    def __init__(self, val = None):
        self.d = val

class Remitter:
  def __init__(self, receive, ritr):
      self.receive  = receive
      self.ritr     = ritr
      self.eventOut = Event()

  def send(self, val_in):
      self.receive.d = val_in

      ritr      = self.ritr
      event_out = self.eventOut
      while True:
          val_out = ritr.next()
          if isinstance(val_out, Receive):
              break
          else:
              event_out.fire(val_out)

  def handle(self, handler):
      self.eventOut.handle(handler)

  def handlein(self, *events):
      for event in events:
          event.handle(self.send)

  def __rrshift__(self, event_in):
      try:
          self.handlein(*event_in)
      except:
          self.handlein(event_in)
      return self

def remitter(gen_rcvr):
    def gen_remitter(*args, **kargs):
        receive = Receive()
        ritr = gen_rcvr(receive, *args, **kargs)
        ritr.send(None)
        return Remitter(receive, ritr)
    return gen_remitter

@remitter
def double_detect_r(receive, threshold):
    last_click_time = 0
    while True:
        yield receive
        current_click_time = time.time()
        gap                = current_click_time - last_click_time
        if gap < threshold:
            yield gap
        last_click_time = current_click_time

@remitter
def average_r(receive):
    total   = 0.0
    count   = 0
    while True:
        yield receive
        total += receive.d
        count += 1
        yield total/count

@remitter
def print_r(receive, format):
    while True:
        yield receive
        print format % (receive.d)
    
mouse_clicks = Event()
mouse_clicks >> double_detect_r(.05) >> average_r() >> print_r("double click; average gap is %s seconds")
        
mouse_clicks.fire() 
time.sleep(.1)
mouse_clicks.fire() 
time.sleep(.01)
mouse_clicks.fire() #prints #double click; average gap is 0.01... seconds
time.sleep(.02) 
mouse_clicks.fire() #double click; average gap is 0.015... seconds
It works! And it should work in any language with iterator blocks. You could even use this C# instead of using LINQ Rx, but then you'll have to type "yield return receive" :(.

Conclusion

Rx is all about making flexible consumers of values, which basically amounts to making event callbacks. LINQ Rx does so with monads, but that's not the only way. Here, we have shown how we can turn a generator or iterator block inside out and make it consume values rather than produce values. Using these is an alternative to LINQ Rx that might be more appropriate for your programming language. There are lots of other things to work out, like unhandling an event, error handling, catching the end of a stream, etc. But this is pretty good, simple foundation to show that the essense of reactive programming is making it easy to make flexible value consumers (basically event handlers). In both the case of Rx, and the code above, we've done so by making a little DSL in the host language.

Next time...

There are still other ways of making flexible consumers. If we had continuations or used CPS we could just use the current continuation as our consumer. So, scheme and Ruby ought to have easy solutions to this problem. We can do a similar thing with macros in any Lisp language that doesn't have continuations, like Clojure. In fact, I'd like to explore how to do Rx in clojure next time. And at some point, we need to figure out how concurrency fits into all of this.

P.S.

While I was researching all of this stuff, I was surprised to find that my friend, Wes Dyer, is right at the heart of it. You can see a video of him here. That was a surprise because I've never talked with him about this. In fact, I've only heard from him once in the last year because he's "gone dark" . I'm sure his work on Rx has something to do with that :). I just want to make it clear that all of my thoughts are mine alone, and not based on any communication with him. Don't blame him for my crazy stuff :).

Rx Simplified (Reactive Programming in Python)

Lately, there's been interest in "reactive programming", especially with Rx. What is Rx? I've seen descriptions like "reactive programming with LINQ", "the dual of the enumerator/iterator" and even "a variation of the continuation monad". Oh right...uh...monad? dual? what's going on?

If you like things like "monads", "duals", and category theory, go watch this video, especially until the end. It's pretty funny.

But if those things make your eyes glaze over and you're left wondering what Rx really is, I want to give you a simple explanation of what Rx is all about. In fact, I'll show how you could have invented it yourself. We'll do so with simple event-based code written in Python.

Step 1: write simple event handlers

Imagine we have a mouse click event that fires either "left" or "right", and we want to make a new event that fires "double left" when there's a double left click. We might write something like this (including a simple Event class).

import time

class Event:
    def __init__(self):
        self.handlers = []

    def handle(self, handler):
        self.handlers.append(handler)

    def fire(self, val = None):
        for handler in self.handlers:
            handler(val)

def echo(val):
    print val
    return val

def left_double_click_from_click(click, threshold):
    dlclick = Event()
    last_lclick_time = [0] #closure hack

    def click_handler(click_value):
        if click_value == "left":
            lclick_time = time.time()
            if (lclick_time - last_lclick_time[0]) < threshold:
                dlclick.fire("double left")
            last_lclick_time[0] = lclick_time

    click.handle(click_handler)
    return dlclick


click   = Event()
dlclick = left_double_click_from_click(click, .01)
dlclick.handle(echo)

click.fire("left")
time.sleep(.02)
click.fire("left")
click.fire("right")
click.fire("left") #prints "double click"

Step 2: refactor event handlers

It works and it's pretty simple. But, we could refactor quite a bit. If we do so, we might write something like this (notice I like the suffix "_r" for "reactive"):

class EventFireRecord:
    def __init__(self, value, time):
        self.value = value
        self.time  = time

def doubleize_r(evt, threshold, combine):
    double_evt = Event()

    last_fire  = EventFireRecord(None, 0)
    def evt_handler(value):
        fire_time = time.time()
        if (fire_time - last_fire.time) < threshold:
            double_evt.fire(combine(last_fire.value, value))
        last_fire.__init__(value, fire_time)

    evt.handle(evt_handler)
    return double_evt

def filter_r(evt, predicate):
    filtered_evt = Event()

    def evt_handler(value):
        if predicate(value):
            filtered_evt.fire(value)

    evt.handle(evt_handler)
    return filtered_evt

click   = Event()
dlclick = doubleize_r(filter_r(click, lambda value : value == "left"), .01, lambda l1, l2: "double left")
dlclick.handle(echo)

click.fire("left")
time.sleep(.02)
click.fire("left")
click.fire("right")
click.fire("left") #prints "double left"

That looks better and is more generic. The logic of "double click" is now contained all on one line. But, we could do even better. Notice that we repeat ourselves a little with filter_r and doublize_r. The pattern looks like this:
evt_out = Event()

def handler(value):
   ...
   evt_out.fire(value)
   ...

evt_in.handle(handler)
return evt_out
What if we refactor to pull out that common pattern by making a special handler that returns a value and a special "handle_with_result" that looks like this pattern?
def handler(value):
    ...
    return value

evt_out = handle_with_result(evt_in, handler)

Step 3: make a higher-level "handle" function

If we do that, our code might look like this:
def handle_with_result(evt, handler_with_result):
    evt_out = Event()

    def handler(value):
        result = handler_with_result(value)
        if result is not None:
            evt_out.fire(result)

    evt.handle(handler)
    return evt_out

def doubleize_event(evt, threshold, combine):
    last_fire = EventFireRecord(None, 0)

    def handler(value):
        fire_time = time.time()
        try:
            if (fire_time - last_fire.time) < threshold:
                return combine(last_fire.value, value)
        finally:
            last_fire.__init__(value, fire_time)

    return handle_with_result(evt, handler)

def filter_event(evt, predicate):
    def handler(value):
        if predicate(value):
            return value

    return handle_with_result(evt, handler)


click  = Event()
dlclick = doubleize_event(filter_event(click, lambda value : value == "left"), .01, lambda l1, l2 : "double left")
dlclick.handle(echo)

click.fire("left")
time.sleep(.02)
click.fire("left")
click.fire("right")
click.fire("left") #prints "double left"

It works, and our code looks better than ever. handle_with_result is very useful.

But, we are now missing something: what if we want to return multiple values? Or do something more interesting, like listen to an keyboard event and return left-clicks if the user clicks "l" and right clicks if they type "r" and two "fake" clicks if they type "f". We'd like to write something like this:

def choose_clicks(keys, clicks):
    def key_handler(char):
        if char == "l":
            return filter_event("left", clicks)
        elif char == "r":
            return filter_event("right", clicks)
        elif char == "f":
            return ["fake", "fake"]

    retrn handle_with_result(keys, key_handler)
If we change handle_with_result to handle this, we might get something like this:

def handle_with_result(evt, handler_with_result):
    evt_out = Event()

    def handler(value):
        result = handler_with_result(value)
        if result is None:
            pass #ignore
        elif isinstance(result, Event):
            result.handle(evt_out.fire)
        elif isinstance(result, list):
            for value_out in result:
                evt_out.fire(value_out)
        else:
            evt_out.fire(result)


    evt.handle(handler)
    return evt_out

def filter_r(evt, predicate):
    def handler(value):
        if predicate(value):
            return value

    return handle_with_result(evt, handler)

def value_filter_r(evt, value):
    return filter_r(evt, lambda val : val == value)

def choose_clicks(keys, clicks):
    def key_handler(char):
        #TODO: unsubscribe from event after either "l" or "r"
        if char == "l":
            return value_filter_r(clicks, "left")
        elif char == "r":
            return value_filter_r(clicks, "right")
        elif char == "f":
            return ["fake", "fake"]

    return handle_with_result(keys, key_handler)

keys   = Event()
clicks = Event()
choosen_clicks = choose_clicks(keys, clicks)

choosen_clicks.handle(echo)
clicks.fire("left")
keys.fire("a")
clicks.fire("right")
keys.fire("l")
clicks.fire("left") # print "left"
clicks.fire("right")
clicks.fire("left") # print "left"
keys.fire("f") #prints "fake" and then "fake" again

Great. Now if we just add a little bit of syntax sugar to this, we can make events look like streams:

Step 4: add some syntax sugar

class Event:
    def __init__(self):
        self.handlers = []

    def handle(self, handler):
        self.handlers.append(handler)
        return self #so += will work

    def fire(self, val = None):
        for handler in self.handlers:
            handler(val)

    def bind(evt, handler_with_result):
        evt_out = Event()

        def handler(value):
            result = handler_with_result(value)
            if result is not None:
                Event.unit(result).handle(evt_out.fire)

        evt.handle(handler)
        return evt_out

    @classmethod
    def unit(cls, val):
        if isinstance(val, cls):
            return val
        elif isinstance(val, list):
            return MockEvent(*val)
        else:
            return MockEvent(val)

    __rshift__ = bind

class MockEvent:
    def __init__(self, *vals):
        self.vals = vals

    def handle(self, handler):
        for val in self.vals:
            handler(val)  

def doublize_r(threshold, combine):
    last_fire = EventFireRecord(None, 0)

    def handler(value):
        fire_time = time.time()
        try:
            if (fire_time - last_fire.time) < threshold:
                return combine(last_fire.value, value)
        finally:
            last_fire.__init__(value, fire_time)

    return handler

def filter_r(predicate):
    def handler(value):
        if predicate(value):
            return value

    return handler

def value_filter_r(value):
    return filter_r(lambda val : val == value)

def click_choose_r(**clicks_by_char):
    def key_handler(char):
        return clicks_by_char.get(char)

    return key_handler


clicks   = Event()
keys     = Event()
dlclicks = clicks >> value_filter_r("left") >> doublize_r(.01, lambda l1, l2: "double click")
keys >> click_choose_r(d = dlclicks, f = ["fake", "fake"]) >> echo

clicks.fire("left")
clicks.fire("left")
keys.fire("f") #prints "fake" and then "fake" again
keys.fire("d")
clicks.fire("right")
clicks.fire("right")
time.sleep(.02)
clicks.fire("left")
clicks.fire("left") #print ("left", "left")

So what have we made?

Wonderful. We've made events look like streams by making a slick way of creating event handlers. In fact, if you look closely at what I did in that last step, you'll notice that I renamed "handle_with_result" to "bind" and moved some code into a method called "unit". That's all it takes to turn Event into a monad, which is exactly what Rx does. Congratulations, we just reinvented monads and Rx, just by refactoring our event handler code and in the process we've discovered what Rx really is: a fancy way of writing event handlers, specifically event handlers that fire events that trigger other event handlers that fire events, and so on in big chain that looks like a query. So when your eyes glaze over about "duals" and "monads" and "reactive programming", just say to yourself: I'm making a fancy event handler. Because in the end, that's all you're really doing.

In fact, if you want to do so in Python, now you have a basic implementation to start with! Of course, this is just a toy implementation. It lack error handling, unsubscribing, end-of-stream, and concurrency. But it ain't bad for just 50 lines of code. And it lets you see the essence of Rx fairly easily.

Oh, and what's the big deal with monads, you ask? Nothing much. It's just that if you provide "bind" and "unit" (called "select many" and "select" in LINQ, I think), LINQ gives you some nice syntax sugar that makes your event handler look like a query. It's really pretty slick, especially now that they've added "extension methods".

And next time...

In future posts, I'll explore different ways of making slick event handlers, but without monads. And hopefully we'll get make this handle concurrency, which is what asynchronous programming is all about. In fact, I expect we'll start to see a serious blurring of lines between Rx, message passing, and data flow programming.

For now, when you start working with Rx, just remember: I'm making a big, fancy event handler. An "Observable" is just like and event and an "Observer" is just like an event handler.

P.S.

What's with the lame names? They started off with cool names like "Reactive" and "Rx" and then give us Observable, Observer, Subscribe, OnNext, OnDone, and OnError. Yuck. Think what an opportunity they missed! We could have had names like Emitter, Reactor, Chain, Emit, Extinguish, and Explode. Judge for yourself:
observable.Subscribe(observer)
observer.OnNext(val)
observer.OnDone()
observer.OnError(e)
or
emitter.chain(reactor)
reactor.emit("foo")
reactor.extinguish()
reactor.explode(e)

Blog Archive

Google Analytics