Source code for frpy.core

from typing import TypeVar, Generic, List, Callable, Tuple, Any
import asyncio
import time
import math

T = TypeVar('T')
S = TypeVar('S')


def _once(f: Callable, cond=lambda *_: True):
    ''' make the function as stale after it's called extractly once '''
    def g(*args, **kw):
        if cond(*args, **kw):
            f(*args, **kw)
            g.stale = True

    return g


[docs]class Stream(Generic[T]): """ Stream: The elemental class of frp mainly two operations will be used: * s() to get current value * s(value) to push an event >>> # basic query/push operations >>> from frpy.api import Stream >>> s = Stream(None) >>> s.hook = print >>> s(42) 42 >>> s() 42 >>> s.listeners.append(lambda _, x: print(x + 1)) >>> s(10) 10 11 """ def __init__(self, clock, hook=None): self.value: T = None self.listeners: List[Callable[[Stream[T], T], None]] = [] self.clock = clock self.hook = hook # function to be call when setting value def __call__(self, value=None): """ when call without parameter, get, otherwise set Unless it's the origin stream (clock), when the value is set, listeners will not be called directly but scheduled as a one-time listener appended to the listener list of the clock. This approach provides a breadth first order of trggering and ensure that: 1. Streams with a farther distance to the origin stream will always get the value LATER so that event ordering will be intuitive. 2. All events triggered by one tick will happen in THAT tick prior to the subsequent tick so that concurrency issues will not bother. """ def update(*_): if self.hook is not None: self.hook(value) self.value = value self.listeners = [ f for f in self.listeners if not getattr(f, 'stale', False) ] for f in self.listeners: f(self, value) if value is None: return self.value else: if self.clock is None or self.clock == self: update() else: self.clock.listeners.append(_once(update))
[docs]def combine(fn: Callable[[List[Stream[Any]], Stream[T], Stream[Any], T], None], deps: List[Stream[Any]]) -> Stream[T]: """ Combine several upstream streams into a new one For examples please check the source code in unary, multiary, timely, etc. Parameters ---------- fn : Callable[[List[Stream[Any]], Stream[T], Stream[Any]], None] combining function: (dependents, self, src) -> None - dependents - self: The returned stream - src: The stream who triggers the updating deps : List[Stream[Any]] dependent streams Returns ------- Stream[T] """ s: Stream[T] = Stream(None) # propogate the upstream clock if only one clock is defined, # otherwise the stream is orphan if all(dep.clock == deps[0].clock for dep in deps if dep.clock is not None): s.clock = deps[0].clock def notify(src, value): s(fn(deps, s, src, value)) for dep in deps: if dep() is not None: notify(dep, dep()) dep.listeners.append(notify) return s
[docs]def clock() -> Tuple[Stream[float], Callable[[], None]]: """ create a clock stream producing the real world time, using an infinite async loop Parameters ---------- loop : asyncio.AbstractEventLoop, optional async event loop time_res : int, optional seconds to sleep before next tick, set it to positive decimals if only performance issue exists Returns ------- Tuple[Stream[float], Callable[[], None]] A clock stream and a function to start the clock (run forever) """ clk: Stream[float] = Stream(None) clk.clock = clk async def feed_clock(time_res, duration, loop): start = time.time() while True: if time.time() - start > duration: await loop.shutdown_asyncgens() break clk(time.time()) await asyncio.sleep(time_res) def run(time_res=0, duration=math.inf, loop: asyncio.AbstractEventLoop = None): loop = loop or asyncio.get_event_loop() loop.run_until_complete(feed_clock(time_res, duration, loop)) return clk, run