Source code for frpy.unary

import time
import asyncio
from typing import Callable, TypeVar
from typing import List, Dict, Deque, AsyncIterator
from collections import deque
from .core import Stream, combine

T = TypeVar('T')
S = TypeVar('S')


[docs]def fmap(fn: Callable[[T], S], s: Stream[T]) -> Stream[S]: """ apply function to every event of a stream map operator for Stream the functor >>> # plus 3 to the varying value >>> src = Stream(None) >>> s = fmap(lambda x: x + 3, src) >>> s.hook = print >>> src(1) 4 >>> src(9) 12 Parameters ---------- fn : Callable[[T], S] function to process a single event s : Stream[T] source stream Returns ------- Stream[S] """ def g(deps, this, src, value): return fn(value) return combine(g, [s])
[docs]def where(fn: Callable[[T], bool], s: Stream[T]) -> Stream[T]: """ preserve events making fn to be true >>> # preserve only even number >>> src = Stream(None) >>> s = where(lambda x: x % 2 == 0, src) >>> s.hook = print >>> src(17) >>> src(4) 4 >>> src(10) 10 >>> src(5) Parameters ---------- fn : Callable[[T], bool] when fn is true then preserve the event s : Stream[T] source stream Returns ------- Stream[T] """ def g(deps, this, src, value): if fn(value): return value return combine(g, [s])
[docs]def scan(fn: Callable[[S, T], S], init: S, s: Stream[T]) -> Stream[S]: """ streams whose event is an accumulation created from the arrived event and the previous accumulation >>> # add all numbers >>> src = Stream(None) >>> s = scan(lambda acc, x: acc + x, 0, src) >>> s.hook = print >>> src(12) 12 >>> src(30) 42 Parameters ---------- fn : Callable[[S, T], S] accumulate function, from previous accumulation and arrived event to result event init : S initial value of the accumulation, if None, use first event as init s : Stream[T] source stream Returns ------- Stream[S] """ def g(deps, this, src, value): if this() is None: return value if init is None else fn(init, value) else: return fn(this(), value) return combine(g, [s])
[docs]def window(n: int, s: Stream[T]) -> Stream[List[T]]: """ convert to stream of sliding windows of events >>> # sliding window of width 3 >>> src = Stream(None) >>> s = window(3, src) >>> s.hook = print >>> src(1) [1] >>> src(2) [1, 2] >>> src(3) [1, 2, 3] >>> src(4) [2, 3, 4] >>> src(5) [3, 4, 5] Parameters ---------- n : int width of the sliding window s : Stream[T] source stream Returns ------- [Stream[List[T]]] """ buffer: Deque[T] = deque(maxlen=n) def g(deps, this, src, value): buffer.append(value) return list(buffer) return combine(g, [s])
[docs]def diff(fn: Callable[[T, T], S], init: T, s: Stream[T]) -> Stream[S]: """ stream of events generated using neighbouring events >>> # later minus previous >>> src = Stream(None) >>> s = diff(lambda x, y: y - x, 0, src) >>> s.hook = print >>> src(3) 3 >>> src(5) 2 >>> src(11) 6 Parameters ---------- fn : Callable[[T, T], S] [description] init : T [description] s : Stream[T] [description] Returns ------- Stream[S] [description] """ buffer = [init] def g(deps, this, src, value): last = buffer[0] buffer[0] = value if init is None and this() is None: return value return fn(last, value) return combine(g, [s])
[docs]def changed(eq_fn: Callable[[T, T], bool], s: Stream[T]) -> Stream[T]: """ output event when new event is different from the last >>> # detect sharp increasing >>> src = Stream(None) >>> s = changed(lambda x, y: y - x <= 1, src) >>> s.hook = print >>> src(12) 12 >>> src(13) >>> src(14) >>> src(17) 17 >>> src(18) Parameters ---------- eq_fn : Callable[[T, T], bool] function to determine whether 2 evengs are equal s : Stream[T] source stream Returns ------- Stream[T] """ buffer = [None] def g(deps, this, src, value): last = buffer[0] current = value buffer[0] = current if last is None or not eq_fn(last, current): return current return combine(g, [s])
[docs]def trace(key_fn: Callable[[T], S], stale: float, s: Stream[T]) -> Stream[Stream[T]]: """ trace events with the same keys, from a mono stream to a stream of streams, each is a tracing stream generating when the first event with the key arrives >>> # create sub streams for different ranges of values >>> # every 10 create a stream, 1..10, 11..20 are two sub streams >>> src = Stream(None) >>> s = trace(lambda x: x // 10, 100, src) >>> footprints = [] >>> def update_footprints(sub): ... i = len(footprints) ... footprints.append([sub()]) ... sub.hook = footprints[i].append >>> s.hook = update_footprints >>> src(1) >>> footprints [[1]] >>> src(21) >>> footprints [[1], [21]] >>> src(2) >>> footprints [[1, 2], [21]] >>> src(15) >>> footprints [[1, 2], [21], [15]] >>> src(11) >>> footprints [[1, 2], [21], [15, 11]] >>> src(7) >>> footprints [[1, 2, 7], [21], [15, 11]] Parameters ---------- key_fn : Callable[[T], S] function to generate key from the event stale : float the seconds before a stream gets trimmed if no events arrived, works with real time no matter which lock is provided s : Stream[T] source stream Returns ------- Stream[Stream[T]] """ buffer: Dict[S, Stream[T]] = {} updated_at: Dict[S, float] = {} def g(deps, this, src, value): key = key_fn(value) # truncate staled sub streams for k, t in list(updated_at.items()): if time.time() - t > stale: del buffer[k] del updated_at[k] if key not in buffer.keys(): s = Stream(deps[0].clock) buffer[key] = s updated_at[key] = time.time() s(value) return s else: buffer[key](value) updated_at[key] = time.time() return combine(g, [s])
[docs]def flatten(ss: Stream[Stream[T]]) -> Stream[T]: """ redirect every event in stream of streams to a flattened stream >>> # flatten a stream of streams >>> s1 = Stream(None) >>> s2 = Stream(None) >>> s3 = Stream(None) >>> ss = Stream(None) >>> s = flatten(ss) >>> footprint = [] >>> s.hook = footprint.append >>> ss(s1) >>> s1(1) >>> s1(2) >>> ss(s2) >>> s1(3) >>> s2(11) >>> s2(12) >>> ss(s3) >>> s3(10) >>> s1(4) >>> s2(13) >>> footprint [1, 2, 3, 11, 12, 10, 4, 13] Parameters ---------- ss : Stream[Stream[T]] source stream Returns ------- Stream[T] """ res: Stream[T] = Stream(ss.clock) def on_new_substream(s): s.listeners.append(lambda _, v: res(v)) each(on_new_substream, ss) return res
[docs]def once(fn: Callable[[T], None], s: Stream[T]) -> None: """ run once on tick, clock can act as msg queue with this >>> s = Stream(None) >>> once(print, s) >>> s(1) 1 >>> s(3) >>> s(6) Parameters ---------- fn : Callable[[T], None] the action s : Stream[T] source stream Returns ------- None """ # create new function since we cannot set stale to built in function def g(_, value): fn(value) g.stale = True s.listeners.append(g)
[docs]def each(fn: Callable[[T], None], s: Stream[T]) -> None: """ for each event perform an unpure action >>> # for each to print >>> s = Stream(None) >>> each(print, s) >>> s(1) 1 >>> s(5) 5 >>> s(11) 11 Parameters ---------- fn : Callable[[T], None] the action s : Stream[T] source stream Returns ------- None """ def g(deps, this, src, value): fn(value) combine(g, [s])
[docs]def fmap_async(fn: Callable[[AsyncIterator[T]], AsyncIterator[T]], s: Stream[T]) -> Stream[T]: """ map async generator transformer fn to stream transformer A running AbstractEventLoop is necessary. The most convenient way is to use the clock offered by core.clock >>> from frpy.api import Stream, clock >>> async def transform(s): ... async for e in s: ... if e % 2 != 0: ... yield e + 1 >>> clk, tick = clock() >>> s = Stream(clk) >>> s1 = fmap_async(transform, s) >>> footprint = [] >>> s1.hook = footprint.append >>> import threading >>> loop = asyncio.get_event_loop() >>> t = threading.Thread(target=tick, kwargs={'duration': 0.1, 'loop': loop}) # Note >>> t.start() >>> s(1) >>> s(10) >>> s(25) >>> s(131) >>> s(18) >>> t.join() >>> footprint [2, 26, 132] *Note: terminating async routines right after events handled is very hard, here just use a reasonable timeout* Parameters ---------- fn : Callable[[AsyncIterator[T]], AsyncIterator[T]] function from async generator to async generator s : Stream[T] source stream Returns ------- Stream[T] result stream """ res: Stream[T] = Stream(s.clock) q: asyncio.Queue = asyncio.Queue() def notify(src, value): q.put_nowait(value) s.listeners.append(notify) async def a_src(): while True: yield await q.get() async def a_res(): async for v in fn(a_src()): res(v) asyncio.ensure_future(a_res()) return res