frpy package

Submodules

frpy.api module

frpy.core module

class frpy.core.Stream(clock, hook=None)[source]

Bases: typing.Generic

Stream: The elemental class of frp

mainly two operations will be used:

  • s() to get current value
  • s(value) to push an event
>>> # basic query/push operations
>>> from frpy.api import Stream
>>> s = Stream(None)
>>> s.hook = print
>>> s(42)
42
>>> s()
42
>>> s.listeners.append(lambda _, x: print(x + 1))
>>> s(10)
10
11
frpy.core.clock() → Tuple[frpy.core.Stream[float], Callable[None]][source]

create a clock stream producing the real world time, using an infinite async loop

Parameters:
  • loop (asyncio.AbstractEventLoop, optional) – async event loop
  • time_res (int, optional) – seconds to sleep before next tick, set it to positive decimals if only performance issue exists
Returns:

A clock stream and a function to start the clock (run forever)

Return type:

Tuple[Stream[float], Callable[[], None]]

frpy.core.combine(fn: Callable[[List[frpy.core.Stream[Any]], frpy.core.Stream[T], frpy.core.Stream[Any], T], None], deps: List[frpy.core.Stream[Any]]) → frpy.core.Stream[T][source]

Combine several upstream streams into a new one

For examples please check the source code in unary, multiary, timely, etc.

Parameters:
  • fn (Callable[[List[Stream[Any]], Stream[T], Stream[Any]], None]) –

    combining function: (dependents, self, src) -> None
    • dependents
    • self: The returned stream
    • src: The stream who triggers the updating
  • deps (List[Stream[Any]]) – dependent streams

Returns:

Return type:

Stream[T]

frpy.fp module

frpy.fp.const(value: T) → Callable[..., T][source]

a function ignore input and output const value

>>> f = const(123)
>>> f()
123
>>> f('aaa', 'bbb')
123
Parameters:value (T) – value to return
Returns:the function to return value
Return type:Callable[.., T]
frpy.fp.pipe(*funcs)[source]
>>> from frpy.fp import pipe
>>> add1 = lambda x: x + 1
>>> add2 = lambda x: x + 2
>>> pipe(add1, add2)(2)
5
frpy.fp.soft(fn)[source]

a function ignore input and call wrapped function without args, (a “soft” version of the original function)

>>> s = iter(range(0, 10))
>>> g = lambda: next(s)
>>> f = soft(g)
>>> f('whatever', 456)
0
>>> f('xxx')
1
Parameters:fn (Callable[[], T]) – function to return value
Returns:the function to call and return value
Return type:Callable[.., T]

frpy.multiary module

frpy.multiary.merge(ss: List[frpy.core.Stream[Any]], topics: List[Any] = None) → frpy.core.Stream[Any][source]

merge multiple streams

>>> t = []
>>> s1 = Stream(None)
>>> s2 = Stream(None)
>>> ms = merge([s1, s2])
>>> ms.hook = t.append
>>> s1(1)
>>> s1(5)
>>> s2(7)
>>> s1(1)
>>> s2(8)
>>> t
[1, 5, 7, 1, 8]
Parameters:
  • ss (List[Stream[Any]]) – streams to be merged
  • topics (List[Any], optional) – if provided, events becomes tuples like (topic, original event), the i-th topic is applied to the i-th stream
Returns:

Return type:

Stream[Any]

frpy.producer module

frpy.producer.repeat(interval: float, clock: frpy.core.Stream[T]) → frpy.core.Stream[T][source]

repeatedly inject unix timestamp

>>> clk = Stream(None)
>>> clk.clock = clk
>>> s = repeat(3, clk)
>>> s.hook = print
>>> clk(0)
0
>>> clk(1)
>>> clk(2)
>>> clk(3)
3
>>> clk(4)
>>> clk(5)
>>> clk(6)
6
Parameters:
  • interval (float) – interval between events
  • clock (Stream[T]) – clock stream of world
Returns:

every interval time units, inject clock event

Return type:

Stream[T]

frpy.producer.sequence(interval: float, it: Iterator[S], clock: frpy.core.Stream[T]) → frpy.core.Stream[S][source]

inject next item in iterator

>>> clk = Stream(None)
>>> clk.clock = clk
>>> s = sequence(3, iter(range(5, 10, 2)), clk)
>>> s.hook = print
>>> clk(0)
5
>>> clk(1)
>>> clk(2)
>>> clk(3)
7
>>> clk(4)
>>> clk(5)
>>> clk(6)
9
Parameters:
  • interval (float) – interval between events
  • it (Iterator[S]) – the iterator to generate values
  • clock (Stream[T]) – clock stream of world
Returns:

every interval time units, bumps an event

Return type:

Stream[S]

frpy.timely module

frpy.timely.delay(t: R, s: frpy.core.Stream[T]) → frpy.core.Stream[T][source]

delay every events by specific time when delayed t is 0, delay every events by a tick

>>> clk = Stream(None)
>>> clk.clock = clk
>>> src = Stream(clk)
>>> s = delay(2, src)
>>> s.hook = print
>>> src(0)
>>> clk(0)
>>> src(1)
>>> clk(1)
>>> src(2)
>>> clk(2)
0
>>> clk(3)
1
>>> clk(4)
2
Parameters:
  • t (float) – delayed seconds
  • s (Stream[T]) – source stream
Returns:

Return type:

Stream[T]

frpy.timely.timeout(t: R, responds: frpy.core.Stream[T], s: frpy.core.Stream[S]) → frpy.core.Stream[R][source]

inject a timeout event if t seconds pass after the last event of the source stream and no event from the responding stream has arrived, the source stream and the responding stream can be the same

>>> clk = Stream(None)
>>> clk.clock = clk
>>> requests = Stream(clk)
>>> responds = Stream(clk)
>>> s = timeout(2, responds, requests)
>>> s.hook = print
>>> requests(0)
>>> clk(0)
>>> responds(0)
>>> requests(0)
>>> clk(1)
>>> clk(2)
>>> clk(3)
>>> clk(4)
4
>>> clk(5)
>>> responds(0)
Parameters:
  • t (float) – limit of elapsed time
  • responds (Stream[T]) – responding stream to s
  • s (Stream[S]) – source stream (request stream)
Returns:

timestamps of timeout

Return type:

Stream[float]

frpy.unary module

frpy.unary.changed(eq_fn: Callable[[T, T], bool], s: frpy.core.Stream[T]) → frpy.core.Stream[T][source]

output event when new event is different from the last

>>> # detect sharp increasing
>>> src = Stream(None)
>>> s = changed(lambda x, y: y - x <= 1, src)
>>> s.hook = print
>>> src(12)
12
>>> src(13)
>>> src(14)
>>> src(17)
17
>>> src(18)
Parameters:
  • eq_fn (Callable[[T, T], bool]) – function to determine whether 2 evengs are equal
  • s (Stream[T]) – source stream
Returns:

Return type:

Stream[T]

frpy.unary.diff(fn: Callable[[T, T], S], init: T, s: frpy.core.Stream[T]) → frpy.core.Stream[S][source]

stream of events generated using neighbouring events

>>> # later minus previous
>>> src = Stream(None)
>>> s = diff(lambda x, y: y - x, 0, src)
>>> s.hook = print
>>> src(3)
3
>>> src(5)
2
>>> src(11)
6
Parameters:
  • fn (Callable[[T, T], S]) – [description]
  • init (T) – [description]
  • s (Stream[T]) – [description]
Returns:

[description]

Return type:

Stream[S]

frpy.unary.each(fn: Callable[T, None], s: frpy.core.Stream[T]) → None[source]

for each event perform an unpure action

>>> # for each to print
>>> s = Stream(None)
>>> each(print, s)
>>> s(1)
1
>>> s(5)
5
>>> s(11)
11
Parameters:
  • fn (Callable[[T], None]) – the action
  • s (Stream[T]) – source stream
Returns:

Return type:

None

frpy.unary.flatten(ss: frpy.core.Stream[frpy.core.Stream[T]]) → frpy.core.Stream[T][source]

redirect every event in stream of streams to a flattened stream

>>> # flatten a stream of streams
>>> s1 = Stream(None)
>>> s2 = Stream(None)
>>> s3 = Stream(None)
>>> ss = Stream(None)
>>> s = flatten(ss)
>>> footprint = []
>>> s.hook = footprint.append
>>> ss(s1)
>>> s1(1)
>>> s1(2)
>>> ss(s2)
>>> s1(3)
>>> s2(11)
>>> s2(12)
>>> ss(s3)
>>> s3(10)
>>> s1(4)
>>> s2(13)
>>> footprint
[1, 2, 3, 11, 12, 10, 4, 13]
Parameters:ss (Stream[Stream[T]]) – source stream
Returns:
Return type:Stream[T]
frpy.unary.fmap(fn: Callable[T, S], s: frpy.core.Stream[T]) → frpy.core.Stream[S][source]

apply function to every event of a stream map operator for Stream the functor

>>> # plus 3 to the varying value
>>> src = Stream(None)
>>> s = fmap(lambda x: x + 3, src)
>>> s.hook = print
>>> src(1)
4
>>> src(9)
12
Parameters:
  • fn (Callable[[T], S]) – function to process a single event
  • s (Stream[T]) – source stream
Returns:

Return type:

Stream[S]

frpy.unary.fmap_async(fn: Callable[AsyncIterator[T], AsyncIterator[T]], s: frpy.core.Stream[T]) → frpy.core.Stream[T][source]

map async generator transformer fn to stream transformer

A running AbstractEventLoop is necessary. The most convenient way is to use the clock offered by core.clock

>>> from frpy.api import Stream, clock
>>> async def transform(s):
...     async for e in s:
...         if e % 2 != 0:
...             yield e + 1
>>> clk, tick = clock()
>>> s = Stream(clk)
>>> s1 = fmap_async(transform, s)
>>> footprint = []
>>> s1.hook = footprint.append
>>> import threading
>>> loop = asyncio.get_event_loop()
>>> t = threading.Thread(target=tick, kwargs={'duration': 0.1, 'loop': loop})  # Note
>>> t.start()
>>> s(1)
>>> s(10)
>>> s(25)
>>> s(131)
>>> s(18)
>>> t.join()
>>> footprint
[2, 26, 132]

Note: terminating async routines right after events handled is very hard, here just use a reasonable timeout

Parameters:
  • fn (Callable[[AsyncIterator[T]], AsyncIterator[T]]) – function from async generator to async generator
  • s (Stream[T]) – source stream
Returns:

result stream

Return type:

Stream[T]

frpy.unary.once(fn: Callable[T, None], s: frpy.core.Stream[T]) → None[source]

run once on tick, clock can act as msg queue with this

>>> s = Stream(None)
>>> once(print, s)
>>> s(1)
1
>>> s(3)
>>> s(6)
Parameters:
  • fn (Callable[[T], None]) – the action
  • s (Stream[T]) – source stream
Returns:

Return type:

None

frpy.unary.scan(fn: Callable[[S, T], S], init: S, s: frpy.core.Stream[T]) → frpy.core.Stream[S][source]

streams whose event is an accumulation created from the arrived event and the previous accumulation

>>> # add all numbers
>>> src = Stream(None)
>>> s = scan(lambda acc, x: acc + x, 0, src)
>>> s.hook = print
>>> src(12)
12
>>> src(30)
42
Parameters:
  • fn (Callable[[S, T], S]) – accumulate function, from previous accumulation and arrived event to result event
  • init (S) – initial value of the accumulation, if None, use first event as init
  • s (Stream[T]) – source stream
Returns:

Return type:

Stream[S]

frpy.unary.skip(n: int, s: frpy.core.Stream[T]) → frpy.core.Stream[T][source]

Skip first n events

>>> s = Stream(None)
>>> s2 = skip(2, s)
>>> s2.hook = print
>>> s(1)
>>> s(1)
>>> s(2)
2
>>> s(3)
3
Parameters:
  • n (int) – the number of events to skip
  • s (Stream[T]) – source stream
Returns:

Return type:

Stream[T]

frpy.unary.trace(key_fn: Callable[T, S], stale: float, s: frpy.core.Stream[T]) → frpy.core.Stream[frpy.core.Stream[T]][source]

trace events with the same keys, from a mono stream to a stream of streams, each is a tracing stream generating when the first event with the key arrives

>>> # create sub streams for different ranges of values
>>> # every 10 create a stream, 1..10, 11..20 are two sub streams
>>> src = Stream(None)
>>> s = trace(lambda x: x // 10, 100, src)
>>> footprints = []
>>> def update_footprints(sub):
...     i = len(footprints)
...     footprints.append([sub()])
...     sub.hook = footprints[i].append
>>> s.hook = update_footprints
>>> src(1)
>>> footprints
[[1]]
>>> src(21)
>>> footprints
[[1], [21]]
>>> src(2)
>>> footprints
[[1, 2], [21]]
>>> src(15)
>>> footprints
[[1, 2], [21], [15]]
>>> src(11)
>>> footprints
[[1, 2], [21], [15, 11]]
>>> src(7)
>>> footprints
[[1, 2, 7], [21], [15, 11]]
Parameters:
  • key_fn (Callable[[T], S]) – function to generate key from the event
  • stale (float) – the seconds before a stream gets trimmed if no events arrived, works with real time no matter which lock is provided
  • s (Stream[T]) – source stream
Returns:

Return type:

Stream[Stream[T]]

frpy.unary.where(fn: Callable[T, bool], s: frpy.core.Stream[T]) → frpy.core.Stream[T][source]

preserve events making fn to be true

>>> # preserve only even number
>>> src = Stream(None)
>>> s = where(lambda x: x % 2 == 0, src)
>>> s.hook = print
>>> src(17)
>>> src(4)
4
>>> src(10)
10
>>> src(5)
Parameters:
  • fn (Callable[[T], bool]) – when fn is true then preserve the event
  • s (Stream[T]) – source stream
Returns:

Return type:

Stream[T]

frpy.unary.window(n: int, s: frpy.core.Stream[T]) → frpy.core.Stream[List[T]][source]

convert to stream of sliding windows of events

>>> # sliding window of width 3
>>> src = Stream(None)
>>> s = window(3, src)
>>> s.hook = print
>>> src(1)
[1]
>>> src(2)
[1, 2]
>>> src(3)
[1, 2, 3]
>>> src(4)
[2, 3, 4]
>>> src(5)
[3, 4, 5]
Parameters:
  • n (int) – width of the sliding window
  • s (Stream[T]) – source stream
Returns:

Return type:

[Stream[List[T]]]

Module contents