Source code for frpy.producer
from typing import TypeVar, Iterator
from .core import Stream, combine
T = TypeVar('T')
S = TypeVar('S')
[docs]def repeat(interval: float, clock: Stream[T]) -> Stream[T]:
""" repeatedly inject unix timestamp
>>> clk = Stream(None)
>>> clk.clock = clk
>>> s = repeat(3, clk)
>>> s.hook = print
>>> clk(0)
0
>>> clk(1)
>>> clk(2)
>>> clk(3)
3
>>> clk(4)
>>> clk(5)
>>> clk(6)
6
Parameters
----------
interval : float
interval between events
clock : Stream[T]
clock stream of world
Returns
-------
Stream[T]
every interval time units, inject clock event
"""
def g(deps, this, src, value):
if this() is None or value - this() >= interval:
return value
return combine(g, [clock])
[docs]def sequence(interval: float, it: Iterator[S], clock: Stream[T]) -> Stream[S]:
""" inject next item in iterator
>>> clk = Stream(None)
>>> clk.clock = clk
>>> s = sequence(3, iter(range(5, 10, 2)), clk)
>>> s.hook = print
>>> clk(0)
5
>>> clk(1)
>>> clk(2)
>>> clk(3)
7
>>> clk(4)
>>> clk(5)
>>> clk(6)
9
Parameters
----------
interval : float
interval between events
it : Iterator[S]
the iterator to generate values
clock : Stream[T]
clock stream of world
Returns
-------
Stream[S]
every interval time units, bumps an event
"""
buffer = [clock()]
def g(deps, this, src, value):
if buffer[0] is None or value - buffer[0] >= interval:
try:
buffer[0] = value
return next(it)
except StopIteration:
pass
return combine(g, [clock])