Source code for frpy.multiary

from typing import TypeVar, List, Any
from .core import Stream, combine

T = TypeVar('T')
S = TypeVar('S')


[docs]def merge(ss: List[Stream[Any]], topics: List[Any] = None) -> Stream[Any]: """ merge multiple streams >>> t = [] >>> s1 = Stream(None) >>> s2 = Stream(None) >>> ms = merge([s1, s2]) >>> ms.hook = t.append >>> s1(1) >>> s1(5) >>> s2(7) >>> s1(1) >>> s2(8) >>> t [1, 5, 7, 1, 8] Parameters ---------- ss : List[Stream[Any]] streams to be merged topics : List[Any], optional if provided, events becomes tuples like (topic, original event), the i-th topic is applied to the i-th stream Returns ------- Stream[Any] """ def g(deps, this, src, value): if topics is not None: return (topics[ss.index(src)], value) return value return combine(g, ss)