API¶

class pipeline.Message(*, kind: Kind = Kind.Message, id: str = None, created: datetime = None, logs: List[Log] = [], content: Dict[str, Any] = {})[source]¶
as_model(model_class: Type[BaseModel], mappings: Optional[Dict[str, str]] = None) BaseModel¶

return content as another BaseModel instance

Parameters
  • param model_class: return class type

  • type model_class: class

  • return: BaseModel

  • rtype: BaseModel

classmethod construct(_fields_set: Optional[SetStr] = None, **values: Any) Model¶

Creates a new model setting __dict__ and __fields_set__ from trusted or pre-validated data. Default values are respected, but no other validation is performed. Behaves as if Config.extra = ‘allow’ was set since it adds all passed values

copy(*, include: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, exclude: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, update: Optional[DictStrAny] = None, deep: bool = False) Model¶

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters
  • include – fields to include in new model

  • exclude – fields to exclude from new model, as with values this takes precedence over include

  • update – values to change/add in the new model. Note: the data is not validated before creating the new model: you should trust this data

  • deep – set to True to make a deep copy of the model

Returns

new model instance

dict(*, include: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, exclude: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False) DictStrAny¶

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

get(key: str, default: Optional[Any] = None) Any¶

access any field in message content

Parameters
  • param key: field name

  • type key: str

  • return: value

  • rtype: Any

json(*, include: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, exclude: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False, encoder: Optional[Callable[[Any], Any]] = None, models_as_dict: bool = True, **dumps_kwargs: Any) unicode¶

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

update_content(other: BaseModel) KeysView[str]¶

add fields from other model to update message’s content

Parameters
  • param other: other BaseModel object

  • type other: BaseModel

  • return: list of keys updated

  • rtype: KeysView[str]

classmethod update_forward_refs(**localns: Any) None¶

Try to update ForwardRefs on fields based on this Model, globalns and localns.

class pipeline.Producer(settings: ~pipeline.worker.ProducerSettings, output_class: ~typing.Type[~pydantic.main.BaseModel], logger: ~logging.Logger = <Logger pipeline (INFO)>)[source]¶

Producer is a worker to generate new messages. For example, a webcrawler can be a producer. It reads no input, and produce outputs until it exits.

Parameters
  • param settings: settings

  • type settings: ProducerSettings

  • param output_class: output class

  • type output_class: Type[BaseModel]

  • param logger: logger

  • type logger: Logger

Usage:

>>> from pydantic import BaseModel
>>>
>>> class Output(BaseModel):
...     pass
>>>
>>> settings = ProducerSettings(name='', version='', description='', out_kind='MEM')
>>> producer = Producer(settings, output_class=Output)
>>> producer.parse_args()
>>> #producer.start()
duplicate_destination(topic)¶

helper function

generate() Iterator[BaseModel][source]¶

a producer to generate dict.

parse_args(args: Union[List[str], str] = ['-T', '-E', '-W', '--keep-going', '-b', 'html', '-d', '_build/doctrees', '-D', 'language=en', '.', '/home/docs/checkouts/readthedocs.org/user_builds/tanbih-pipeline/checkouts/latest/_readthedocs//html']) None¶

parse command line arguments args: a list of arguments or a command line arguments string

setup() None¶

loading code goes here

shutdown() None¶

clean up code goes here

step() Tuple[int, Message][source]¶

make new message and write to destination rtype: Tuple[int, Message]

class pipeline.Splitter(settings: ~pipeline.worker.SplitterSettings, logger: ~logging.Logger = <Logger pipeline (INFO)>)[source]¶

Splitter will write to a topic whose name is based on a function

duplicate_destination(topic)¶

helper function

parse_args(args: Union[List[str], str] = ['-T', '-E', '-W', '--keep-going', '-b', 'html', '-d', '_build/doctrees', '-D', 'language=en', '.', '/home/docs/checkouts/readthedocs.org/user_builds/tanbih-pipeline/checkouts/latest/_readthedocs//html']) None¶

parse command line arguments args: a list of arguments or a command line arguments string

setup() None¶

loading code goes here

shutdown() None¶

clean up code goes here

class pipeline.Processor(settings: ~pipeline.worker.ProcessorSettings, input_class: ~typing.Type[~pydantic.main.BaseModel], output_class: ~typing.Type[~pydantic.main.BaseModel], logger: ~logging.Logger = <Logger pipeline (INFO)>)[source]¶

Processor is a worker which will process incoming messages and output new messages

duplicate_destination(topic)¶

helper function

parse_args(args: Union[List[str], str] = ['-T', '-E', '-W', '--keep-going', '-b', 'html', '-d', '_build/doctrees', '-D', 'language=en', '.', '/home/docs/checkouts/readthedocs.org/user_builds/tanbih-pipeline/checkouts/latest/_readthedocs//html']) None¶

parse command line arguments args: a list of arguments or a command line arguments string

process(message_content: BaseModel, message_id: str) BaseModel[source]¶

process function to be overridden by users, for streaming processing, this function needs to do in-place update on msg.dct and return an error or a list of errors (for batch processing). Message has been terminated though .terminates() will be skipped in output.

A typical process definition will be:

1newValue = msg.value
2return OutputModel(value=newValue)
setup() None¶

loading code goes here

shutdown() None¶

clean up code goes here

start() None[source]¶

start processing.

class pipeline.Pipeline(**kwargs: Any)[source]¶

Pipeline manages SourceTap and DestinationTap when you don’t want to use predefined worker logic. Instead, you have access to SourceTap and DestinationTap directly.

Usage:

from .tap import TapKind, MemorySourceSettings, MemoryDestinationSettings
in_settings = MemorySourceSettings()
out_settings = MemoryDestinationSettings()
pipeline = Pipeline(
  in_kind=TapKind.MEM,
  in_settings=in_settings,
  out_kind=TapKind.MEM,
  out_settings=out_settings
)

Take command line arguments:

pipeline = Pipeline(args=sys.argv)

Take environment settings:

pipeline = Pipeline()
add_destination_topic(name: str) None[source]¶

Add a new DestinationTap with a defined topic(queue) name

Parameters

name – a name given for the destination topic

add_source_topic(name: str) None[source]¶

Add a new SourceTap with a defined topic(queue) name

Parameters

name – a name given for the source topic

destination_of(name: str) DestinationTap[source]¶

Return the DestinationTap of specified topic(queue) name

source_of(name: str) SourceTap[source]¶

Return the SourceTap of specified topic(queue) name