tests package

Subpackages

Submodules

tests.test_import module

tests.test_import.test_import_actfw_core(from_: str, import_: str) None

tests.test_pipeline module

class tests.test_pipeline.Adder

Bases: actfw_core.task.pipe.Pipe[int, Tuple[int, …]]

proc(xs: Tuple[int, ...]) int

Pipeline Task Processor

class tests.test_pipeline.Counter

Bases: actfw_core.task.producer.Producer[int]

proc() int

Pipeline Task Processor

class tests.test_pipeline.Incrementer

Bases: actfw_core.task.pipe.Pipe[int, int]

proc(x: int) int

Pipeline Task Processor

class tests.test_pipeline.Logger

Bases: actfw_core.task.consumer.Consumer[int]

property logs: List[int]
proc(x: int) None

Pipeline Task Processor

Parameters

i – task input

xs: List[int]
class tests.test_pipeline.ThrouputBottleneck(duration_secs: float)

Bases: actfw_core.task.pipe.Pipe[int, int]

proc(x: int) int

Pipeline Task Processor

tests.test_pipeline.test_pipeline() None
tests.test_pipeline.test_pipeline_slow_but_no_loss() None

Module contents