I've learned. I'll share.

August 14, 2009

The Code for Reactive Programming in Python

I packaged some python code that you can run for each of the steps I've shown in my articles about Reactive Programming, how you could have invented it and more ways to do it. I apologize that it's not in a more usable form. Go ahead and copy and paste to wherever you like. If you put it online somewhere more convenient, just put a link in the comments. I put the control of which example runs at the very end. Just comment the appropriate line for the example you want to run. And, here it is:

import time

## simplified event stuff 
class Event:
    def __init__(self):
        self.handlers = []

    def handle(self, handler):
        self.handlers.append(handler)
        return self #so += will work

    def fire(self, val = None):
        for handler in self.handlers:
            handler(val)

def echo(val):
    print val
    return val

def simple_click_event_example():
    click = Event()
    click.handle(echo)
    click.fire("left") #prints "left"

def click_event_manipulation_example():
    def left_double_click_from_click(click, threshold):
        dlclick = Event()
        last_lclick_time = [0] #closure hack

        def click_handler(click_value):
            if click_value == "left":
                lclick_time = time.time()
                if (lclick_time - last_lclick_time[0]) < threshold:
                    dlclick.fire("double left")
                last_lclick_time[0] = lclick_time
        
        click.handle(click_handler)
        return dlclick


    click   = Event()
    dlclick = left_double_click_from_click(click, .01)
    dlclick.handle(echo)

    click.fire("left")
    time.sleep(.02)
    click.fire("left")
    click.fire("right")
    click.fire("left") #prints "double left"
    
class EventFireRecord:
    def __init__(self, value, time):
        self.value = value
        self.time  = time

def click_event_maniuplation_refactored_example():
    def doubleize_event(evt, threshold, combine):
        double_evt = Event()

        last_fire  = EventFireRecord(None, 0)
        def evt_handler(value):
            fire_time = time.time()
            if (fire_time - last_fire.time) < threshold:
                double_evt.fire(combine(last_fire.value, value))
            last_fire.__init__(value, fire_time)
        
        evt.handle(evt_handler)
        return double_evt

    def filter_event(evt, predicate):
        filtered_evt = Event()

        def evt_handler(value):
            if predicate(value):
                filtered_evt.fire(value)

        evt.handle(evt_handler)
        return filtered_evt

    click  = Event()
    lclick = filter_event(click, lambda value : value == "left")
    dlclick = doubleize_event(lclick, .01, lambda click1, click2 : "double left")
    dlclick.handle(echo)

    click.fire("left")
    time.sleep(.02)
    click.fire("left")
    click.fire("right")
    click.fire("left") #prints "double click"


def click_event_handle_with_result_example():
    def handle_with_result(evt, handler_with_result):
        evt_out = Event()

        def handler(value):
            result = handler_with_result(value)
            if result is not None:
                evt_out.fire(result)

        evt.handle(handler)
        return evt_out

    def doubleize_r(evt, threshold):
        last_fire = EventFireRecord(None, 0)

        def handler(value):
            fire_time = time.time()
            try:
                if (fire_time - last_fire.time) < threshold:
                    return (last_fire.value, value)
            finally:
                last_fire.__init__(value, fire_time)

        return handle_with_result(evt, handler)

    def filter_r(evt, predicate):
        def handler(value):
            if predicate(value):
                return value

        return handle_with_result(evt, handler)


    clicks   = Event()
    dlclicks = doubleize_r(filter_r(click, lambda value : value == "left"), .01)
    dlclicks.handle(echo)

    clicks.fire("left")
    time.sleep(.02)
    clicks.fire("left")
    clicks.fire("right")
    clicks.fire("left") #prints ("left", "left")


def click_event_choosing_by_returning_event_example():
    def handle_with_result(evt, handler_with_result):
        evt_out = Event()

        def handler(value):
            result = handler_with_result(value)
            if result is None:
                pass #ignore
            elif isinstance(result, Event):
                result.handle(evt_out.fire)
            elif isinstance(result, list):
                for value_out in result:
                    evt_out.fire(value_out)
            else:
                evt_out.fire(result)
                    

        evt.handle(handler)
        return evt_out

    def filter_r(evt, predicate):
        def handler(value):
            if predicate(value):
                return value

        return handle_with_result(evt, handler)

    def value_filter_r(evt, value):
        return filter_r(evt, lambda val : val == value)

    def click_choose_r(keys, clicks):
        def key_handler(char):
            #TODO: unsubscribe from event after either "l" or "r"
            if char == "l":
                return value_filter_r(clicks, "left")
            elif char == "r":
                return value_filter_r(clicks, "right")
            elif char == "f":
                return ["fake", "fake"]

        return handle_with_result(keys, key_handler)

    keys   = Event()
    clicks = Event()
    choosen_clicks = click_choose_r(keys, clicks)

def click_event_looks_like_streams_example():
    class Event:
        def __init__(self):
            self.handlers = []

        def handle(self, handler):
            self.handlers.append(handler)
            return self #so += will work

        def fire(self, val = None):
            for handler in self.handlers:
                handler(val)

        def bind(evt, handler_with_result):
            evt_out = Event()

            def handler(value):
                result = handler_with_result(value)
                if result is not None:
                    Event.unit(result).handle(evt_out.fire)

            evt.handle(handler)
            return evt_out

        @classmethod
        def unit(cls, val):
            if isinstance(val, cls):
                return val
            elif isinstance(val, list):
                return MockEvent(*val)
            else:
                return MockEvent(val)

        __rshift__ = bind

    class MockEvent:
        def __init__(self, *vals):
            self.vals = vals

        def handle(self, handler):
            for val in self.vals:
                handler(val)  

    def doublize_r(threshold, combine):
        last_fire = EventFireRecord(None, 0)

        def handler(value):
            fire_time = time.time()
            try:
                if (fire_time - last_fire.time) < threshold:
                    return combine(last_fire.value, value)
            finally:
                last_fire.__init__(value, fire_time)

        return handler

    def filter_r(predicate):
        def handler(value):
            if predicate(value):
                return value

        return handler

    def value_filter_r(value):
        return filter_r(lambda val : val == value)

    def click_choose_r(**clicks_by_char):
        def key_handler(char):
            return clicks_by_char.get(char)

        return key_handler


    clicks   = Event()
    keys     = Event()
    dlclicks = clicks >> value_filter_r("left") >> doublize_r(.01, lambda l1, l2: "double left")
    keys >> click_choose_r(d = dlclicks, f = ["fake", "fake"]) >> echo

    clicks.fire("left")
    clicks.fire("left")
    keys.fire("f") #prints "fake" and then "fake" again
    keys.fire("d")
    clicks.fire("right")
    clicks.fire("right")
    time.sleep(.02)
    clicks.fire("left")
    clicks.fire("left") #print ("double left")


## basic consumer (receiver) using generators

receive = object()

def receiver_example():
    def receiver(gen_rcvr):
        def gen_and_start_rcvr(*args, **kargs):
            rcvr = gen_rcvr(*args, **kargs)
            rcvr.send(None)
            return rcvr
        return gen_and_start_rcvr

    @receiver
    def sum_r(title):
        total = 0
        while True:
            total += yield receive
            print "%s: %d" % (title, total)

    @receiver
    def count_r(title):
        count = 0
        while True:
            yield receive
            count += 1
            print "%s: %d" % (title, count)

    num_key      = Event()
    sum_nums     = sum_r("total nums")
    num_key.handle(sum_nums.send)

    num_key.fire(1) #prints "total nums: 1"
    num_key.fire(2) #prints "total nums: 3" 
    num_key.fire(3) #prints "total nums: 6"

## make retiterators that can also output values via an event fire
def remitter_example():
    class Remitter:
        def __init__(self, receiver_from_event_out):
            self.receiverFromEventOut = receiver_from_event_out

        def __rrshift__(self, event_in):
            event_out = Event()
            rcvr      = self.receiverFromEventOut(event_out)
            event_in.handle(rcvr.send)
            return event_out

    def remitter(gen_rcvr):
        def gen_remitter(*args, **kargs):
            def receiver_from_event_out(event_out):
                rcvr = gen_rcvr(event_out, *args, **kargs)
                rcvr.send(None)
                return rcvr
            return Remitter(receiver_from_event_out)
        return gen_remitter

    @remitter
    def double_detect_r(double_click_event, threshold):
        last_click_time = 0
        while True:
            yield receive
            current_click_time = time.time()
            if (current_click_time - last_click_time) < threshold:
                double_click_event.fire()
            last_click_time = current_click_time

    @remitter
    def print_r(_, message):
        while True:
           val = yield receive
           print message

    mouse_click  = Event()

    mouse_click >> print_r("left")
    mouse_click >> double_detect_r(.01) >> print_r("double left")

    mouse_click.fire() #prints "left"
    time.sleep(.02)
    mouse_click.fire() #prints "left"
    mouse_click.fire() #prints "left" and "double left"

## make retiterators out of generators that can send and receive

def remitter_example_yield_out():
    from collections import defaultdict

    class Remitter:
      def __init__(self, ritr):
          self.ritr     = ritr
          self.eventOut = Event()

      def send(self, val_in):
          ritr      = self.ritr
          event_out = self.eventOut

          while True:
              val_out = ritr.send(val_in)
              if val_out is receive:
                  break
              else:
                  event_out.fire(val_out)            

      def handle(self, handler):
          self.eventOut.handle(handler)

      def handlein(self, *events):
          for event in events:
              event.handle(self.send)

      def __rrshift__(self, event_in):
          try:
              self.handlein(*event_in)
          except:
              self.handlein(event_in)
          return self


    def remitter(gen_rcvr):
        def gen_remitter(*args, **kargs):
            ritr = gen_rcvr(*args, **kargs)
            ritr.send(None)
            return Remitter(ritr)
        return gen_remitter


    @remitter
    def double_detect_r(threshold):
        last_click_time = 0
        while True:
            yield receive
            current_click_time = time.time()
            if (current_click_time - last_click_time) < threshold:
                yield
            last_click_time = current_click_time

    @remitter
    def map_r(f, *args, **kargs):
        while True:
           val = yield receive
           yield f(val, *args, **kargs)

    @remitter
    def print_r(format):
        while True:
           val = yield receive
           print message % val

    def label_r(label):
        return map_r(lambda val : (label, val))

    @remitter
    def label_count_r():
        count_by_label = defaultdict(int)
        while True:
            (label, val) = yield receive
            count_by_label[label] += 1
            yield count_by_label.copy()

    def fix_click_counts(count_by_label, single_label, double_label):
        count_by_label[single_label] -= (count_by_label[double_label] * 2) #every double left "cancels" a single click
        return count_by_label.copy()

    def print_label_counts(count_by_label, *labels):
        print ", ".join("%d %s" % (count, label) for (label, count) in count_by_label.iteritems())


    mouse_clicks = Event()

    ([mouse_clicks >> label_r("single"),
      mouse_clicks >> double_detect_r(.01) >> label_r("double")] 
     >> label_count_r() >> map_r(fix_click_counts, "single", "double") >> map_r(print_label_counts))

    #prints
    #0 double, 1 single
    #0 double, 2 single
    #0 double, 3 single
    #1 double, 1 single
    mouse_clicks.fire() 
    time.sleep(.02)
    mouse_clicks.fire() 
    mouse_clicks.fire()


def remitter_without_yield_in_hack_example():
    class Receive:
        def __init__(self, val = None):
            self.d = val

    class Remitter:
      def __init__(self, receive, ritr):
          self.receive  = receive
          self.ritr     = ritr
          self.eventOut = Event()

      def send(self, val_in):
          self.receive.d = val_in

          ritr      = self.ritr
          event_out = self.eventOut
          while True:
              val_out = ritr.next()
              if isinstance(val_out, Receive):
                  break
              else:
                  event_out.fire(val_out)

      def handle(self, handler):
          self.eventOut.handle(handler)

      def handlein(self, *events):
          for event in events:
              event.handle(self.send)

      def __rrshift__(self, event_in):
          try:
              self.handlein(*event_in)
          except:
              self.handlein(event_in)
          return self

    def remitter(gen_rcvr):
        def gen_remitter(*args, **kargs):
            receive = Receive()
            ritr = gen_rcvr(receive, *args, **kargs)
            ritr.send(None)
            return Remitter(receive, ritr)
        return gen_remitter

    @remitter
    def double_detect_r(receive, threshold):
        last_click_time = 0
        while True:
            yield receive
            current_click_time = time.time()
            gap                = current_click_time - last_click_time
            if gap < threshold:
                yield gap
            last_click_time = current_click_time

    @remitter
    def average_r(receive):
        total   = 0.0
        count   = 0
        while True:
            yield receive
            total += receive.d
            count += 1
            yield total/count

    @remitter
    def print_r(receive, format):
        while True:
            yield receive
            print format % (receive.d)

    mouse_clicks = Event()
    mouse_clicks >> double_detect_r(.05) >> average_r() >> print_r("double left; average gap is %s seconds")

    mouse_clicks.fire() 
    time.sleep(.1)
    mouse_clicks.fire() 
    time.sleep(.01)
    mouse_clicks.fire() #prints #double left; average gap is 0.01... seconds
    time.sleep(.02) 
    mouse_clicks.fire() #double left; average gap is 0.015... seconds

if __name__ == "__main__":
    #simple_click_event_example()
    #click_event_manipulation_example()
    #click_event_maniuplation_refactored_example()
    #click_event_handle_with_result_example()
    #click_event_choosing_by_returning_event_example()
    #click_event_looks_like_streams_example()
    #remitter_example()
    #remitter_example_yield_out()
    remitter_without_yield_in_hack_example()

1 comment:

  1. This would be a good gist (http://gist.github.com)

    ReplyDelete

Blog Archive

Google Analytics