frpy package¶
Submodules¶
frpy.api module¶
frpy.core module¶
-
class
frpy.core.Stream(clock, hook=None)[source]¶ Bases:
typing.GenericStream: The elemental class of frp
mainly two operations will be used:
- s() to get current value
- s(value) to push an event
>>> # basic query/push operations >>> from frpy.api import Stream >>> s = Stream(None) >>> s.hook = print >>> s(42) 42 >>> s() 42 >>> s.listeners.append(lambda _, x: print(x + 1)) >>> s(10) 10 11
-
frpy.core.clock() → Tuple[frpy.core.Stream[float], Callable[None]][source]¶ create a clock stream producing the real world time, using an infinite async loop
Parameters: - loop (asyncio.AbstractEventLoop, optional) – async event loop
- time_res (int, optional) – seconds to sleep before next tick, set it to positive decimals if only performance issue exists
Returns: A clock stream and a function to start the clock (run forever)
Return type: Tuple[Stream[float], Callable[[], None]]
-
frpy.core.combine(fn: Callable[[List[frpy.core.Stream[Any]], frpy.core.Stream[T], frpy.core.Stream[Any], T], None], deps: List[frpy.core.Stream[Any]]) → frpy.core.Stream[T][source]¶ Combine several upstream streams into a new one
For examples please check the source code in unary, multiary, timely, etc.
Parameters: fn (Callable[[List[Stream[Any]], Stream[T], Stream[Any]], None]) –
- combining function: (dependents, self, src) -> None
- dependents
- self: The returned stream
- src: The stream who triggers the updating
deps (List[Stream[Any]]) – dependent streams
Returns: Return type: Stream[T]
frpy.fp module¶
-
frpy.fp.const(value: T) → Callable[..., T][source]¶ a function ignore input and output const value
>>> f = const(123) >>> f() 123 >>> f('aaa', 'bbb') 123
Parameters: value (T) – value to return Returns: the function to return value Return type: Callable[.., T]
-
frpy.fp.pipe(*funcs)[source]¶ >>> from frpy.fp import pipe >>> add1 = lambda x: x + 1 >>> add2 = lambda x: x + 2 >>> pipe(add1, add2)(2) 5
-
frpy.fp.soft(fn)[source]¶ a function ignore input and call wrapped function without args, (a “soft” version of the original function)
>>> s = iter(range(0, 10)) >>> g = lambda: next(s) >>> f = soft(g) >>> f('whatever', 456) 0 >>> f('xxx') 1
Parameters: fn (Callable[[], T]) – function to return value Returns: the function to call and return value Return type: Callable[.., T]
frpy.multiary module¶
-
frpy.multiary.merge(ss: List[frpy.core.Stream[Any]], topics: List[Any] = None) → frpy.core.Stream[Any][source]¶ merge multiple streams
>>> t = [] >>> s1 = Stream(None) >>> s2 = Stream(None) >>> ms = merge([s1, s2]) >>> ms.hook = t.append >>> s1(1) >>> s1(5) >>> s2(7) >>> s1(1) >>> s2(8) >>> t [1, 5, 7, 1, 8]
Parameters: - ss (List[Stream[Any]]) – streams to be merged
- topics (List[Any], optional) – if provided, events becomes tuples like (topic, original event), the i-th topic is applied to the i-th stream
Returns: Return type: Stream[Any]
frpy.producer module¶
-
frpy.producer.repeat(interval: float, clock: frpy.core.Stream[T]) → frpy.core.Stream[T][source]¶ repeatedly inject unix timestamp
>>> clk = Stream(None) >>> clk.clock = clk >>> s = repeat(3, clk) >>> s.hook = print >>> clk(0) 0 >>> clk(1) >>> clk(2) >>> clk(3) 3 >>> clk(4) >>> clk(5) >>> clk(6) 6
Parameters: - interval (float) – interval between events
- clock (Stream[T]) – clock stream of world
Returns: every interval time units, inject clock event
Return type: Stream[T]
-
frpy.producer.sequence(interval: float, it: Iterator[S], clock: frpy.core.Stream[T]) → frpy.core.Stream[S][source]¶ inject next item in iterator
>>> clk = Stream(None) >>> clk.clock = clk >>> s = sequence(3, iter(range(5, 10, 2)), clk) >>> s.hook = print >>> clk(0) 5 >>> clk(1) >>> clk(2) >>> clk(3) 7 >>> clk(4) >>> clk(5) >>> clk(6) 9
Parameters: - interval (float) – interval between events
- it (Iterator[S]) – the iterator to generate values
- clock (Stream[T]) – clock stream of world
Returns: every interval time units, bumps an event
Return type: Stream[S]
frpy.timely module¶
-
frpy.timely.delay(t: R, s: frpy.core.Stream[T]) → frpy.core.Stream[T][source]¶ delay every events by specific time when delayed t is 0, delay every events by a tick
>>> clk = Stream(None) >>> clk.clock = clk >>> src = Stream(clk) >>> s = delay(2, src) >>> s.hook = print >>> src(0) >>> clk(0) >>> src(1) >>> clk(1) >>> src(2) >>> clk(2) 0 >>> clk(3) 1 >>> clk(4) 2
Parameters: - t (float) – delayed seconds
- s (Stream[T]) – source stream
Returns: Return type: Stream[T]
-
frpy.timely.timeout(t: R, responds: frpy.core.Stream[T], s: frpy.core.Stream[S]) → frpy.core.Stream[R][source]¶ inject a timeout event if t seconds pass after the last event of the source stream and no event from the responding stream has arrived, the source stream and the responding stream can be the same
>>> clk = Stream(None) >>> clk.clock = clk >>> requests = Stream(clk) >>> responds = Stream(clk) >>> s = timeout(2, responds, requests) >>> s.hook = print >>> requests(0) >>> clk(0) >>> responds(0) >>> requests(0) >>> clk(1) >>> clk(2) >>> clk(3) >>> clk(4) 4 >>> clk(5) >>> responds(0)
Parameters: - t (float) – limit of elapsed time
- responds (Stream[T]) – responding stream to s
- s (Stream[S]) – source stream (request stream)
Returns: timestamps of timeout
Return type: Stream[float]
frpy.unary module¶
-
frpy.unary.changed(eq_fn: Callable[[T, T], bool], s: frpy.core.Stream[T]) → frpy.core.Stream[T][source]¶ output event when new event is different from the last
>>> # detect sharp increasing >>> src = Stream(None) >>> s = changed(lambda x, y: y - x <= 1, src) >>> s.hook = print >>> src(12) 12 >>> src(13) >>> src(14) >>> src(17) 17 >>> src(18)
Parameters: - eq_fn (Callable[[T, T], bool]) – function to determine whether 2 evengs are equal
- s (Stream[T]) – source stream
Returns: Return type: Stream[T]
-
frpy.unary.diff(fn: Callable[[T, T], S], init: T, s: frpy.core.Stream[T]) → frpy.core.Stream[S][source]¶ stream of events generated using neighbouring events
>>> # later minus previous >>> src = Stream(None) >>> s = diff(lambda x, y: y - x, 0, src) >>> s.hook = print >>> src(3) 3 >>> src(5) 2 >>> src(11) 6
Parameters: - fn (Callable[[T, T], S]) – [description]
- init (T) – [description]
- s (Stream[T]) – [description]
Returns: [description]
Return type: Stream[S]
-
frpy.unary.each(fn: Callable[T, None], s: frpy.core.Stream[T]) → None[source]¶ for each event perform an unpure action
>>> # for each to print >>> s = Stream(None) >>> each(print, s) >>> s(1) 1 >>> s(5) 5 >>> s(11) 11
Parameters: - fn (Callable[[T], None]) – the action
- s (Stream[T]) – source stream
Returns: Return type: None
-
frpy.unary.flatten(ss: frpy.core.Stream[frpy.core.Stream[T]]) → frpy.core.Stream[T][source]¶ redirect every event in stream of streams to a flattened stream
>>> # flatten a stream of streams >>> s1 = Stream(None) >>> s2 = Stream(None) >>> s3 = Stream(None) >>> ss = Stream(None) >>> s = flatten(ss) >>> footprint = [] >>> s.hook = footprint.append >>> ss(s1) >>> s1(1) >>> s1(2) >>> ss(s2) >>> s1(3) >>> s2(11) >>> s2(12) >>> ss(s3) >>> s3(10) >>> s1(4) >>> s2(13) >>> footprint [1, 2, 3, 11, 12, 10, 4, 13]
Parameters: ss (Stream[Stream[T]]) – source stream Returns: Return type: Stream[T]
-
frpy.unary.fmap(fn: Callable[T, S], s: frpy.core.Stream[T]) → frpy.core.Stream[S][source]¶ apply function to every event of a stream map operator for Stream the functor
>>> # plus 3 to the varying value >>> src = Stream(None) >>> s = fmap(lambda x: x + 3, src) >>> s.hook = print >>> src(1) 4 >>> src(9) 12
Parameters: - fn (Callable[[T], S]) – function to process a single event
- s (Stream[T]) – source stream
Returns: Return type: Stream[S]
-
frpy.unary.fmap_async(fn: Callable[AsyncIterator[T], AsyncIterator[T]], s: frpy.core.Stream[T]) → frpy.core.Stream[T][source]¶ map async generator transformer fn to stream transformer
A running AbstractEventLoop is necessary. The most convenient way is to use the clock offered by core.clock
>>> from frpy.api import Stream, clock >>> async def transform(s): ... async for e in s: ... if e % 2 != 0: ... yield e + 1 >>> clk, tick = clock() >>> s = Stream(clk) >>> s1 = fmap_async(transform, s) >>> footprint = [] >>> s1.hook = footprint.append >>> import threading >>> loop = asyncio.get_event_loop() >>> t = threading.Thread(target=tick, kwargs={'duration': 0.1, 'loop': loop}) # Note >>> t.start() >>> s(1) >>> s(10) >>> s(25) >>> s(131) >>> s(18) >>> t.join() >>> footprint [2, 26, 132]
Note: terminating async routines right after events handled is very hard, here just use a reasonable timeout
Parameters: - fn (Callable[[AsyncIterator[T]], AsyncIterator[T]]) – function from async generator to async generator
- s (Stream[T]) – source stream
Returns: result stream
Return type: Stream[T]
-
frpy.unary.once(fn: Callable[T, None], s: frpy.core.Stream[T]) → None[source]¶ run once on tick, clock can act as msg queue with this
>>> s = Stream(None) >>> once(print, s) >>> s(1) 1 >>> s(3) >>> s(6)
Parameters: - fn (Callable[[T], None]) – the action
- s (Stream[T]) – source stream
Returns: Return type: None
-
frpy.unary.scan(fn: Callable[[S, T], S], init: S, s: frpy.core.Stream[T]) → frpy.core.Stream[S][source]¶ streams whose event is an accumulation created from the arrived event and the previous accumulation
>>> # add all numbers >>> src = Stream(None) >>> s = scan(lambda acc, x: acc + x, 0, src) >>> s.hook = print >>> src(12) 12 >>> src(30) 42
Parameters: - fn (Callable[[S, T], S]) – accumulate function, from previous accumulation and arrived event to result event
- init (S) – initial value of the accumulation, if None, use first event as init
- s (Stream[T]) – source stream
Returns: Return type: Stream[S]
-
frpy.unary.skip(n: int, s: frpy.core.Stream[T]) → frpy.core.Stream[T][source]¶ Skip first n events
>>> s = Stream(None) >>> s2 = skip(2, s) >>> s2.hook = print >>> s(1) >>> s(1) >>> s(2) 2 >>> s(3) 3
Parameters: - n (int) – the number of events to skip
- s (Stream[T]) – source stream
Returns: Return type: Stream[T]
-
frpy.unary.trace(key_fn: Callable[T, S], stale: float, s: frpy.core.Stream[T]) → frpy.core.Stream[frpy.core.Stream[T]][source]¶ trace events with the same keys, from a mono stream to a stream of streams, each is a tracing stream generating when the first event with the key arrives
>>> # create sub streams for different ranges of values >>> # every 10 create a stream, 1..10, 11..20 are two sub streams >>> src = Stream(None) >>> s = trace(lambda x: x // 10, 100, src) >>> footprints = [] >>> def update_footprints(sub): ... i = len(footprints) ... footprints.append([sub()]) ... sub.hook = footprints[i].append >>> s.hook = update_footprints >>> src(1) >>> footprints [[1]] >>> src(21) >>> footprints [[1], [21]] >>> src(2) >>> footprints [[1, 2], [21]] >>> src(15) >>> footprints [[1, 2], [21], [15]] >>> src(11) >>> footprints [[1, 2], [21], [15, 11]] >>> src(7) >>> footprints [[1, 2, 7], [21], [15, 11]]
Parameters: - key_fn (Callable[[T], S]) – function to generate key from the event
- stale (float) – the seconds before a stream gets trimmed if no events arrived, works with real time no matter which lock is provided
- s (Stream[T]) – source stream
Returns: Return type: Stream[Stream[T]]
-
frpy.unary.where(fn: Callable[T, bool], s: frpy.core.Stream[T]) → frpy.core.Stream[T][source]¶ preserve events making fn to be true
>>> # preserve only even number >>> src = Stream(None) >>> s = where(lambda x: x % 2 == 0, src) >>> s.hook = print >>> src(17) >>> src(4) 4 >>> src(10) 10 >>> src(5)
Parameters: - fn (Callable[[T], bool]) – when fn is true then preserve the event
- s (Stream[T]) – source stream
Returns: Return type: Stream[T]
-
frpy.unary.window(n: int, s: frpy.core.Stream[T]) → frpy.core.Stream[List[T]][source]¶ convert to stream of sliding windows of events
>>> # sliding window of width 3 >>> src = Stream(None) >>> s = window(3, src) >>> s.hook = print >>> src(1) [1] >>> src(2) [1, 2] >>> src(3) [1, 2, 3] >>> src(4) [2, 3, 4] >>> src(5) [3, 4, 5]
Parameters: - n (int) – width of the sliding window
- s (Stream[T]) – source stream
Returns: Return type: [Stream[List[T]]]