I've learned. I'll share.

August 12, 2009

More ways to do Reactive Programming in Python

In my last post, I covered a little bit of Rx and how you could a have invented it. But you might invent a different way of doing the same thing. And since most languages don't have anything like LINQ, you might be interested in ways to do things in your programming language that don't require monads.

Let's explore some other ways to do Reactive Programming (Rx).

What is Rx?

Just to remind you what we're trying to accomplish, Rx builds event handlers. The LINQ version of Rx works by making an event look like a query (or stream).

It makes sense if you think about it. An event is a stream, of event occurences. A list or enumerator or iterator is also a stream, of values. So if you squint hard enough, you see that events and enumerators are sort of the same thing. In fact, lists, streams, enumerators, events, channels, pipes, sockets, file handles, actors sending you messages are all pretty much the same thing: they are all producers of values.

Consumers and Receivers

Now what do you do with producers of values? You consume them, of course! Usually with something that looks like this (in python):
sum = 0
for val in vals:
   sum += val
   print sum
What we've created here is a consumer of vals. We can write it this way, as ordinary code, because vals is very flexible: it's anything that's iterable/enumerable. But what if instead of forcing the producer to be flexible, we forced the consumer to be flexible? What if we could write the consumer like this:
total = 0
while True:
  total += receive
  print total
Hmm... it sort of looks like the opposite of an iterator/generator/enumerator block. A mathematician might say something about "duals" at this point, but I'm not mathematician, so let's just go ahead and try and implement it. In fact, we'll use python generators to do just that. We'll call this a "receiver" and we'll spell "receive" as "yield receive":
class Event:
    def __init__(self):
        self.handlers = []

    def handle(self, handler):
        self.handlers.append(handler)

    def fire(self, val = None):
        for handler in self.handlers:
            handler(val)

receive = object()

def receiver(gen_rcvr):
    def gen_and_start_rcvr(*args, **kargs):
        rcvr = gen_rcvr(*args, **kargs)
        rcvr.send(None)
        return rcvr
    return gen_and_start_rcvr

@receiver
def sum_r(title):
    total = 0
    while True:
        total += yield receive
        print "%s: %d" % (title, total)

@receiver
def count_r(title):
    count = 0
    while True:
        yield receive
        count += 1
        print "%s: %d" % (title, count)


num_key  = Event()
sum_nums = sum_r("total nums")
num_key.handle(sum_nums.send)

num_key.fire(1) #prints "total nums: 1"
num_key.fire(2) #prints "total nums: 3" 
num_key.fire(3) #prints "total nums: 6"
It actually works! And because our consumer is very flexible, any producer, like an event, can use it. In fact, it's just a fancy event callback, just like everyrthing else in Rx land.

Remitters

But if we take this one step further and make a receiver wrap an event, we can make a receiver that's also a producer. We'll call it a "remitter", which is sort of like a receiver and an emitter.
class Remitter:
    def __init__(self, receiver_from_event_out):
        self.receiverFromEventOut = receiver_from_event_out

    def __rrshift__(self, event_in):
        event_out = Event()
        rcvr      = self.receiverFromEventOut(event_out)
        event_in.handle(rcvr.send)
        return event_out

def remitter(gen_rcvr):
    def gen_remitter(*args, **kargs):
        def receiver_from_event_out(event_out):
            rcvr = gen_rcvr(event_out, *args, **kargs)
            rcvr.send(None)
            return rcvr
        return Remitter(receiver_from_event_out)
    return gen_remitter

@remitter
def double_detect_r(double_click_event, threshold):
    last_click_time = 0
    while True:
        (yield)
        current_click_time = time.time()
        if (current_click_time - last_click_time) < threshold:
            double_click_event.fire()
        last_click_time = current_click_time

@remitter
def print_r(_, message):
    while True:
       val = (yield)
       print message

mouse_click  = Event()

mouse_click >> print_r("left")
mouse_click >> double_detect_r(.01) >> print_r("double left")

mouse_click.fire() #prints "left"
time.sleep(.02)
mouse_click.fire() #prints "left"
mouse_click.fire() #prints "left" and "double left"
Great. But it is kind of annoying passing in the event like that. What if we had the remitter yield values out and yield values in?

Remitters that yield out and in

We could do that using little state machines built from python generators. "yield receive" will mean receive and "yield" of anything else will mean "emit".
from collections import defaultdict

class Remitter:
  def __init__(self, ritr):
      self.ritr     = ritr
      self.eventOut = Event()

  def send(self, val_in):
      ritr      = self.ritr
      event_out = self.eventOut

      while True:
          val_out = ritr.send(val_in)
          if val_out is receive:
              break
          else:
              event_out.fire(val_out)            

  def handle(self, handler):
      self.eventOut.handle(handler)

  def handlein(self, *events):
      for event in events:
          event.handle(self.send)

  def __rrshift__(self, event_in):
      try:
          self.handlein(*event_in)
      except:
          self.handlein(event_in)
      return self


def remitter(gen_rcvr):
    def gen_remitter(*args, **kargs):
        ritr = gen_rcvr(*args, **kargs)
        ritr.send(None)
        return Remitter(ritr)
    return gen_remitter


@remitter
def double_detect_r(threshold):
    last_click_time = 0
    while True:
        yield receive
        current_click_time = time.time()
        if (current_click_time - last_click_time) < threshold:
            yield
        last_click_time = current_click_time

@remitter
def map_r(f, *args, **kargs):
    while True:
       val = yield receive
       yield f(val, *args, **kargs)

@remitter
def print_r(format):
    while True:
       val = yield receive
       print message % val

def label_r(label):
    return map_r(lambda val : (label, val))
        
@remitter
def label_count_r():
    count_by_label = defaultdict(int)
    while True:
        (label, val) = yield receive
        count_by_label[label] += 1
        yield count_by_label.copy()

def fix_click_counts(count_by_label, single_label, double_label):
    count_by_label[single_label] -= (count_by_label[double_label] * 2) #every double click "cancels" a single click
    return count_by_label.copy()

def print_label_counts(count_by_label, *labels):
    print ", ".join("%d %s" % (count, label) for (label, count) in count_by_label.iteritems())


mouse_clicks = Event()

([mouse_clicks >> label_r("single"),
  mouse_clicks >> double_detect_r(.01) >> label_r("double")] 
 >> label_count_r() >> map_r(fix_click_counts, "single", "double") >> map_r(print_label_counts))
 
#prints
#0 double, 1 single
#0 double, 2 single
#0 double, 3 single
#1 double, 1 single
mouse_clicks.fire() 
time.sleep(.02)
mouse_clicks.fire() 
mouse_clicks.fire()
Sweet. That looks pretty nice. But, it relies on the fact that Python allows you to yield values in to a generator. What if we have a programming language that only allows yielding values out (like any enumerator)?

Remitters that yield in by yielding out

We'll introduce a simple hack to work around that. We'll yield out a mutable "receive" value that "receives" in the value for us.
class Receive:
    def __init__(self, val = None):
        self.d = val

class Remitter:
  def __init__(self, receive, ritr):
      self.receive  = receive
      self.ritr     = ritr
      self.eventOut = Event()

  def send(self, val_in):
      self.receive.d = val_in

      ritr      = self.ritr
      event_out = self.eventOut
      while True:
          val_out = ritr.next()
          if isinstance(val_out, Receive):
              break
          else:
              event_out.fire(val_out)

  def handle(self, handler):
      self.eventOut.handle(handler)

  def handlein(self, *events):
      for event in events:
          event.handle(self.send)

  def __rrshift__(self, event_in):
      try:
          self.handlein(*event_in)
      except:
          self.handlein(event_in)
      return self

def remitter(gen_rcvr):
    def gen_remitter(*args, **kargs):
        receive = Receive()
        ritr = gen_rcvr(receive, *args, **kargs)
        ritr.send(None)
        return Remitter(receive, ritr)
    return gen_remitter

@remitter
def double_detect_r(receive, threshold):
    last_click_time = 0
    while True:
        yield receive
        current_click_time = time.time()
        gap                = current_click_time - last_click_time
        if gap < threshold:
            yield gap
        last_click_time = current_click_time

@remitter
def average_r(receive):
    total   = 0.0
    count   = 0
    while True:
        yield receive
        total += receive.d
        count += 1
        yield total/count

@remitter
def print_r(receive, format):
    while True:
        yield receive
        print format % (receive.d)
    
mouse_clicks = Event()
mouse_clicks >> double_detect_r(.05) >> average_r() >> print_r("double click; average gap is %s seconds")
        
mouse_clicks.fire() 
time.sleep(.1)
mouse_clicks.fire() 
time.sleep(.01)
mouse_clicks.fire() #prints #double click; average gap is 0.01... seconds
time.sleep(.02) 
mouse_clicks.fire() #double click; average gap is 0.015... seconds
It works! And it should work in any language with iterator blocks. You could even use this C# instead of using LINQ Rx, but then you'll have to type "yield return receive" :(.

Conclusion

Rx is all about making flexible consumers of values, which basically amounts to making event callbacks. LINQ Rx does so with monads, but that's not the only way. Here, we have shown how we can turn a generator or iterator block inside out and make it consume values rather than produce values. Using these is an alternative to LINQ Rx that might be more appropriate for your programming language. There are lots of other things to work out, like unhandling an event, error handling, catching the end of a stream, etc. But this is pretty good, simple foundation to show that the essense of reactive programming is making it easy to make flexible value consumers (basically event handlers). In both the case of Rx, and the code above, we've done so by making a little DSL in the host language.

Next time...

There are still other ways of making flexible consumers. If we had continuations or used CPS we could just use the current continuation as our consumer. So, scheme and Ruby ought to have easy solutions to this problem. We can do a similar thing with macros in any Lisp language that doesn't have continuations, like Clojure. In fact, I'd like to explore how to do Rx in clojure next time. And at some point, we need to figure out how concurrency fits into all of this.

P.S.

While I was researching all of this stuff, I was surprised to find that my friend, Wes Dyer, is right at the heart of it. You can see a video of him here. That was a surprise because I've never talked with him about this. In fact, I've only heard from him once in the last year because he's "gone dark" . I'm sure his work on Rx has something to do with that :). I just want to make it clear that all of my thoughts are mine alone, and not based on any communication with him. Don't blame him for my crazy stuff :).

No comments:

Post a Comment

Blog Archive

Google Analytics