frpy: minimal functional reactive programming powered by modern python

Require python 3.6+

FRP, a.k.a. functional reactive programming is a programming paradigm to model time related system better. As a rather radical idea, FRP mainly exists in theoretical articles. But its (mental) concept is very useful in some cases and has been adopted partially in many tools like Elm or React among others. Comparing to other python frp libraries, frpy is:

  • super lightweight, core part is merely 64 lines of code without comments.
  • carefully designed clock system, support both clock free style and streams running in real world time
  • totally thread-safe given a clock exits
  • take advantage of async python, allowing near-sequential style style with fmap_async

Full documentation can be found at API Doc.

Key concepts

Stream is a sequence of events. It could also be interpret as states changed stepwise. Continuous changing values like sin(x) over [0, 1] is not within the scope unless the changing is sampled as discrete events. Each stream has a clock property, an orphan stream is a stream whose clock is None.

Clock is a stream whose clock is itself. Clock is the most important concept supporting the Stream to ensure thread-safety and greatly reduce the pain of concurrency. The simplest clock can be constructed by Stream(None) whose clock will be set to self, and the time value can then be injected manually. However, in most cases other than tests it’s better to use the clock function to get a self-ticking clock.

Different clocks will provide different levels of capabilities. All clock with increasing substractable values enables all operators, no matter the values are real timestamp, natural numbers or event more complex structures (though the time unit differs). Clocks with non-substractable or non-increasing values will not support time sensitive operators like delay or timeout.

Example

Clock-free style streams

from frpy.api import Stream, fmap, where, merge

# even items from s1 merged with s2
s1 = Stream(None)
s2 = Stream(None)
s3 = fmap(where(lambda x: x % 2 == 0, s1))
s4 = merge([s2, s3])
s4.hook = print
s1(1)
s1(2)  # 2
s1(3)
s2(10)  # 10
s1(4)  # 4
s2(9)  # 9
s1(5)
s1(6)  # 6
# The footprint of s4 is: 2, 10, 4, 9, 6

Streams with manual clock

# even items from s1 delayed by 2 time units
clk = Stream(None)
clk.clock = clk
src = Stream(clk)
s = delay(2, src)
s.hook = print
src(0)
clk(0)  # src will be set here (the next clock tick)
src(1)
clk(1)
clk(2)  # 0
clk(3)  # empty since value 1 is odd
clk(4)  # 2

Complex case: number accumulation with timeout

Numbers randomly spawn and accumulated. If the accumulated number reaches a certain value, output “met!” and start next try. If the accumulated number does not reach the value after a given period, output “fail!” and start next try.

This simulation is a simplified “try with timeout” problem, which is quite uneasy for tranditional sequential paradigm (usually threads or event queues are necessary and may involve concurrency). Following we provide three methods with frpy to address this problem, each with only about 20-30 lines of code.

Method 1: pure stream-style approach

from frpy.api import Stream, fmap, repeat, scan, changed, \
    merge, each, timeout, clock
from frpy.fp import soft, const  # functional programming helpers
from functools import partial as bind

# options
value_thres = 3
time_thres = 1.2

# init the clock
clk, tick = clock()

# construct streams
sp = fmap(soft(random.random), repeat(0.2, clk))
term = Stream(clk)
interrupt = timeout(time_thres, term, term)
value = merge([sp, fmap(const(-1), term)])
acc = scan(lambda acc, v: acc + v if v >= 0 else 0, 0, value)
met = changed(lambda _, y: y <= value_thres, acc)
each(term, merge([met, interrupt]))

# hook to print trace
acc.hook = print
met.hook = bind(print, 'met!')
interrupt.hook = bind(print, 'fail!')

# start clock
tick()

Method 2: more sequential approach with async generator

import math
from frpy.api import Stream, fmap, repeat, merge, fmap_async, clock
from frpy.fp import soft

# options
value_thres = 3
time_thres = 1.2

clk, tick = clock()
sp = fmap(soft(random.random), repeat(0.2, clk))

# aysnc generator transformation
async def fn(s):
    acc = 0
    last = math.inf
    async for topic, v in s:
        if topic == 'clock':
            if acc > value_thres:
                met = True
            if v - last > time_thres or acc > value_thres:
                yield 'met' if met else 'fail'
                yield 0
                met = False
                acc = 0
                last = v

        elif topic == 'value':
            acc += v
            yield acc

# map the transformation over async generators to that over streams
res = fmap_async(fn, merge([clk, sp], ['clock', 'value']))

# hook to print trace
res.hook = print
tick()

Method 3: state reducer approach resembling React and Elm

from frpy.api import Stream, fmap, repeat, scan, merge, clock
from frpy.fp import soft

# options
value_thres = 3
time_thres = 1.2

clk, tick = clock()
sp = fmap(soft(random.random), repeat(0.2, clk))
events = merge([clk, sp], ['clock', 'value'])

# the reducer function to update state, print directly for convenience
def update(state: Tuple[float, float], event) -> Tuple[float, float]:
    channel, data = event
    start_at, acc = state
    if channel == 'clock':
        if data - start_at > time_thres:
            print('failed')
            return (data, 0)
        return state
    if channel == 'value':
        new_value = acc + data
        print(new_value)
        if new_value >= value_thres:
            print('met')
            return (time.time(), 0)
        return (start_at, new_value)
    else:
        return state

# we do not use states so just print changes in reducer
scan(update, (time.time(), 0), events)
tick()

For detailed docs please refer to API Doc.

Note

Thread-safety

Injecting an event to a stream with a clock is thread-safe since all actions will be scheduled by its clock. Injecting an event to an orphan stream is NOT thread-safe. Users have to be careful if use streams in a clock-free style.

Clock compatiblities

Frpy will try its best to construct compatible streams. For unary operators, clock will always be proporgated. This also means that orphan streams will always derives orphan streams. For multiary operators like merge, if all non-orphan upstreams have the same clock, inherit that clock, otherwise dettach the stream to be orphan to avoid problems. This behavior is implemented in the combine. It is highly recommended to avoid mixing clocks or do that only if with good reason, and always manually set the derived stream’s clock.

Attribution

This module is heavily inspired by flyd, with some important design decisions:

  1. The atomic update feature is not ported

    The atomic update is quite useful but adds too much complexity in my opinion, also the performance gain should not be too much since the diamond style dependencies could be avoided in many scenarios.

  2. Racial conditions are handled by a central event loop, a.k.a a clock stream

    Python unlike js has no event loop, and the new async API is not easy to use in this case. We use the conception of clock when necessary with asyncio event loops underhood. Per thread has its clock.

  3. No end stream mechanism is implemented

    End streams are useful but may introduce too much dynamism and it has an implact on the complexity ofimplementation. It may be added in the future after thorough consideration.

Usage

class frpy.api.Stream(clock, hook=None)[source]

Stream: The elemental class of frp

mainly two operations will be used:

  • s() to get current value
  • s(value) to push an event
>>> # basic query/push operations
>>> from frpy.api import Stream
>>> s = Stream(None)
>>> s.hook = print
>>> s(42)
42
>>> s()
42
>>> s.listeners.append(lambda _, x: print(x + 1))
>>> s(10)
10
11
frpy.api.changed(eq_fn: Callable[[T, T], bool], s: frpy.core.Stream[T]) → frpy.core.Stream[T][source]

output event when new event is different from the last

>>> # detect sharp increasing
>>> src = Stream(None)
>>> s = changed(lambda x, y: y - x <= 1, src)
>>> s.hook = print
>>> src(12)
12
>>> src(13)
>>> src(14)
>>> src(17)
17
>>> src(18)
Parameters:
  • eq_fn (Callable[[T, T], bool]) – function to determine whether 2 evengs are equal
  • s (Stream[T]) – source stream
Returns:

Return type:

Stream[T]

frpy.api.clock() → Tuple[frpy.core.Stream[float], Callable[None]][source]

create a clock stream producing the real world time, using an infinite async loop

Parameters:
  • loop (asyncio.AbstractEventLoop, optional) – async event loop
  • time_res (int, optional) – seconds to sleep before next tick, set it to positive decimals if only performance issue exists
Returns:

A clock stream and a function to start the clock (run forever)

Return type:

Tuple[Stream[float], Callable[[], None]]

frpy.api.delay(t: R, s: frpy.core.Stream[T]) → frpy.core.Stream[T][source]

delay every events by specific time when delayed t is 0, delay every events by a tick

>>> clk = Stream(None)
>>> clk.clock = clk
>>> src = Stream(clk)
>>> s = delay(2, src)
>>> s.hook = print
>>> src(0)
>>> clk(0)
>>> src(1)
>>> clk(1)
>>> src(2)
>>> clk(2)
0
>>> clk(3)
1
>>> clk(4)
2
Parameters:
  • t (float) – delayed seconds
  • s (Stream[T]) – source stream
Returns:

Return type:

Stream[T]

frpy.api.diff(fn: Callable[[T, T], S], init: T, s: frpy.core.Stream[T]) → frpy.core.Stream[S][source]

stream of events generated using neighbouring events

>>> # later minus previous
>>> src = Stream(None)
>>> s = diff(lambda x, y: y - x, 0, src)
>>> s.hook = print
>>> src(3)
3
>>> src(5)
2
>>> src(11)
6
Parameters:
  • fn (Callable[[T, T], S]) – [description]
  • init (T) – [description]
  • s (Stream[T]) – [description]
Returns:

[description]

Return type:

Stream[S]

frpy.api.each(fn: Callable[T, None], s: frpy.core.Stream[T]) → None[source]

for each event perform an unpure action

>>> # for each to print
>>> s = Stream(None)
>>> each(print, s)
>>> s(1)
1
>>> s(5)
5
>>> s(11)
11
Parameters:
  • fn (Callable[[T], None]) – the action
  • s (Stream[T]) – source stream
Returns:

Return type:

None

frpy.api.flatten(ss: frpy.core.Stream[frpy.core.Stream[T]]) → frpy.core.Stream[T][source]

redirect every event in stream of streams to a flattened stream

>>> # flatten a stream of streams
>>> s1 = Stream(None)
>>> s2 = Stream(None)
>>> s3 = Stream(None)
>>> ss = Stream(None)
>>> s = flatten(ss)
>>> footprint = []
>>> s.hook = footprint.append
>>> ss(s1)
>>> s1(1)
>>> s1(2)
>>> ss(s2)
>>> s1(3)
>>> s2(11)
>>> s2(12)
>>> ss(s3)
>>> s3(10)
>>> s1(4)
>>> s2(13)
>>> footprint
[1, 2, 3, 11, 12, 10, 4, 13]
Parameters:ss (Stream[Stream[T]]) – source stream
Returns:
Return type:Stream[T]
frpy.api.fmap(fn: Callable[T, S], s: frpy.core.Stream[T]) → frpy.core.Stream[S][source]

apply function to every event of a stream map operator for Stream the functor

>>> # plus 3 to the varying value
>>> src = Stream(None)
>>> s = fmap(lambda x: x + 3, src)
>>> s.hook = print
>>> src(1)
4
>>> src(9)
12
Parameters:
  • fn (Callable[[T], S]) – function to process a single event
  • s (Stream[T]) – source stream
Returns:

Return type:

Stream[S]

frpy.api.fmap_async(fn: Callable[AsyncIterator[T], AsyncIterator[T]], s: frpy.core.Stream[T]) → frpy.core.Stream[T][source]

map async generator transformer fn to stream transformer

A running AbstractEventLoop is necessary. The most convenient way is to use the clock offered by core.clock

>>> from frpy.api import Stream, clock
>>> async def transform(s):
...     async for e in s:
...         if e % 2 != 0:
...             yield e + 1
>>> clk, tick = clock()
>>> s = Stream(clk)
>>> s1 = fmap_async(transform, s)
>>> footprint = []
>>> s1.hook = footprint.append
>>> import threading
>>> loop = asyncio.get_event_loop()
>>> t = threading.Thread(target=tick, kwargs={'duration': 0.1, 'loop': loop})  # Note
>>> t.start()
>>> s(1)
>>> s(10)
>>> s(25)
>>> s(131)
>>> s(18)
>>> t.join()
>>> footprint
[2, 26, 132]

Note: terminating async routines right after events handled is very hard, here just use a reasonable timeout

Parameters:
  • fn (Callable[[AsyncIterator[T]], AsyncIterator[T]]) – function from async generator to async generator
  • s (Stream[T]) – source stream
Returns:

result stream

Return type:

Stream[T]

frpy.api.merge(ss: List[frpy.core.Stream[Any]], topics: List[Any] = None) → frpy.core.Stream[Any][source]

merge multiple streams

>>> t = []
>>> s1 = Stream(None)
>>> s2 = Stream(None)
>>> ms = merge([s1, s2])
>>> ms.hook = t.append
>>> s1(1)
>>> s1(5)
>>> s2(7)
>>> s1(1)
>>> s2(8)
>>> t
[1, 5, 7, 1, 8]
Parameters:
  • ss (List[Stream[Any]]) – streams to be merged
  • topics (List[Any], optional) – if provided, events becomes tuples like (topic, original event), the i-th topic is applied to the i-th stream
Returns:

Return type:

Stream[Any]

frpy.api.once(fn: Callable[T, None], s: frpy.core.Stream[T]) → None[source]

run once on tick, clock can act as msg queue with this

>>> s = Stream(None)
>>> once(print, s)
>>> s(1)
1
>>> s(3)
>>> s(6)
Parameters:
  • fn (Callable[[T], None]) – the action
  • s (Stream[T]) – source stream
Returns:

Return type:

None

frpy.api.repeat(interval: float, clock: frpy.core.Stream[T]) → frpy.core.Stream[T][source]

repeatedly inject unix timestamp

>>> clk = Stream(None)
>>> clk.clock = clk
>>> s = repeat(3, clk)
>>> s.hook = print
>>> clk(0)
0
>>> clk(1)
>>> clk(2)
>>> clk(3)
3
>>> clk(4)
>>> clk(5)
>>> clk(6)
6
Parameters:
  • interval (float) – interval between events
  • clock (Stream[T]) – clock stream of world
Returns:

every interval time units, inject clock event

Return type:

Stream[T]

frpy.api.scan(fn: Callable[[S, T], S], init: S, s: frpy.core.Stream[T]) → frpy.core.Stream[S][source]

streams whose event is an accumulation created from the arrived event and the previous accumulation

>>> # add all numbers
>>> src = Stream(None)
>>> s = scan(lambda acc, x: acc + x, 0, src)
>>> s.hook = print
>>> src(12)
12
>>> src(30)
42
Parameters:
  • fn (Callable[[S, T], S]) – accumulate function, from previous accumulation and arrived event to result event
  • init (S) – initial value of the accumulation, if None, use first event as init
  • s (Stream[T]) – source stream
Returns:

Return type:

Stream[S]

frpy.api.sequence(interval: float, it: Iterator[S], clock: frpy.core.Stream[T]) → frpy.core.Stream[S][source]

inject next item in iterator

>>> clk = Stream(None)
>>> clk.clock = clk
>>> s = sequence(3, iter(range(5, 10, 2)), clk)
>>> s.hook = print
>>> clk(0)
5
>>> clk(1)
>>> clk(2)
>>> clk(3)
7
>>> clk(4)
>>> clk(5)
>>> clk(6)
9
Parameters:
  • interval (float) – interval between events
  • it (Iterator[S]) – the iterator to generate values
  • clock (Stream[T]) – clock stream of world
Returns:

every interval time units, bumps an event

Return type:

Stream[S]

frpy.api.skip(n: int, s: frpy.core.Stream[T]) → frpy.core.Stream[T][source]

Skip first n events

>>> s = Stream(None)
>>> s2 = skip(2, s)
>>> s2.hook = print
>>> s(1)
>>> s(1)
>>> s(2)
2
>>> s(3)
3
Parameters:
  • n (int) – the number of events to skip
  • s (Stream[T]) – source stream
Returns:

Return type:

Stream[T]

frpy.api.timeout(t: R, responds: frpy.core.Stream[T], s: frpy.core.Stream[S]) → frpy.core.Stream[R][source]

inject a timeout event if t seconds pass after the last event of the source stream and no event from the responding stream has arrived, the source stream and the responding stream can be the same

>>> clk = Stream(None)
>>> clk.clock = clk
>>> requests = Stream(clk)
>>> responds = Stream(clk)
>>> s = timeout(2, responds, requests)
>>> s.hook = print
>>> requests(0)
>>> clk(0)
>>> responds(0)
>>> requests(0)
>>> clk(1)
>>> clk(2)
>>> clk(3)
>>> clk(4)
4
>>> clk(5)
>>> responds(0)
Parameters:
  • t (float) – limit of elapsed time
  • responds (Stream[T]) – responding stream to s
  • s (Stream[S]) – source stream (request stream)
Returns:

timestamps of timeout

Return type:

Stream[float]

frpy.api.trace(key_fn: Callable[T, S], stale: float, s: frpy.core.Stream[T]) → frpy.core.Stream[frpy.core.Stream[T]][source]

trace events with the same keys, from a mono stream to a stream of streams, each is a tracing stream generating when the first event with the key arrives

>>> # create sub streams for different ranges of values
>>> # every 10 create a stream, 1..10, 11..20 are two sub streams
>>> src = Stream(None)
>>> s = trace(lambda x: x // 10, 100, src)
>>> footprints = []
>>> def update_footprints(sub):
...     i = len(footprints)
...     footprints.append([sub()])
...     sub.hook = footprints[i].append
>>> s.hook = update_footprints
>>> src(1)
>>> footprints
[[1]]
>>> src(21)
>>> footprints
[[1], [21]]
>>> src(2)
>>> footprints
[[1, 2], [21]]
>>> src(15)
>>> footprints
[[1, 2], [21], [15]]
>>> src(11)
>>> footprints
[[1, 2], [21], [15, 11]]
>>> src(7)
>>> footprints
[[1, 2, 7], [21], [15, 11]]
Parameters:
  • key_fn (Callable[[T], S]) – function to generate key from the event
  • stale (float) – the seconds before a stream gets trimmed if no events arrived, works with real time no matter which lock is provided
  • s (Stream[T]) – source stream
Returns:

Return type:

Stream[Stream[T]]

frpy.api.where(fn: Callable[T, bool], s: frpy.core.Stream[T]) → frpy.core.Stream[T][source]

preserve events making fn to be true

>>> # preserve only even number
>>> src = Stream(None)
>>> s = where(lambda x: x % 2 == 0, src)
>>> s.hook = print
>>> src(17)
>>> src(4)
4
>>> src(10)
10
>>> src(5)
Parameters:
  • fn (Callable[[T], bool]) – when fn is true then preserve the event
  • s (Stream[T]) – source stream
Returns:

Return type:

Stream[T]

Indices and tables