Source code for frpy.timely
import time
from typing import TypeVar, List, Tuple
from .core import Stream
T = TypeVar('T')
S = TypeVar('S')
R = TypeVar('R')
[docs]def timeout(t: R, responds: Stream[T], s: Stream[S]) -> Stream[R]:
""" inject a timeout event if t seconds pass after the last
event of the source stream and no event from the responding stream has
arrived, the source stream and the responding stream can be the same
>>> clk = Stream(None)
>>> clk.clock = clk
>>> requests = Stream(clk)
>>> responds = Stream(clk)
>>> s = timeout(2, responds, requests)
>>> s.hook = print
>>> requests(0)
>>> clk(0)
>>> responds(0)
>>> requests(0)
>>> clk(1)
>>> clk(2)
>>> clk(3)
>>> clk(4)
4
>>> clk(5)
>>> responds(0)
Parameters
----------
t : float
limit of elapsed time
responds : Stream[T]
responding stream to s
s : Stream[S]
source stream (request stream)
Returns
-------
Stream[float]
timestamps of timeout
"""
buffer = [time.time()]
res: Stream[R] = Stream(s.clock)
# TODO: use combine
def on_s(src, value):
buffer[0] = res.clock()
def on_respond(src, value):
buffer[0] = None
def on_clock(clock, value):
if buffer[0] is not None and res.clock() - buffer[0] > t:
res(res.clock())
buffer[0] = None
responds.listeners.append(on_respond)
s.clock.listeners.append(on_clock)
s.listeners.append(on_s)
return res
[docs]def delay(t: R, s: Stream[T]) -> Stream[T]:
""" delay every events by specific time
when delayed t is 0, delay every events by a tick
>>> clk = Stream(None)
>>> clk.clock = clk
>>> src = Stream(clk)
>>> s = delay(2, src)
>>> s.hook = print
>>> src(0)
>>> clk(0)
>>> src(1)
>>> clk(1)
>>> src(2)
>>> clk(2)
0
>>> clk(3)
1
>>> clk(4)
2
Parameters
----------
t : float
delayed seconds
s : Stream[T]
source stream
Returns
-------
Stream[T]
"""
buffer: List[Tuple[R, T]] = []
res: Stream[T] = Stream(s.clock)
def on_t(src, now):
for item in list(buffer):
v_t, v = item
if now - v_t >= t:
res(v_t)
buffer.remove(item)
def on_s(src, value):
buffer.append((res.clock(), value))
s.clock.listeners.append(on_t)
s.listeners.append(on_s)
return res