API¶

class pipeline.Message(*, kind: pipeline.message.Kind = Kind.Message, id: str = 'bca30f8c5e7b11ecb39e0242ac110002', created: datetime.datetime = datetime.datetime(2021, 12, 16, 14, 23, 28, 43459), logs: List[pipeline.message.Log] = [], content: Dict[str, Any] = {})[source]¶
as_model(model_class: Type[pydantic.main.BaseModel]) pydantic.main.BaseModel¶

return content as another BaseModel instance

Parameters
  • param model_class: return class type

  • type model_class: class

  • return: BaseModel

  • rtype: BaseModel

classmethod construct(_fields_set: Optional[SetStr] = None, **values: Any) Model¶

Creates a new model setting __dict__ and __fields_set__ from trusted or pre-validated data. Default values are respected, but no other validation is performed.

copy(*, include: Union[AbstractSetIntStr, MappingIntStrAny] = None, exclude: Union[AbstractSetIntStr, MappingIntStrAny] = None, update: DictStrAny = None, deep: bool = False) Model¶

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters
  • include – fields to include in new model

  • exclude – fields to exclude from new model, as with values this takes precedence over include

  • update – values to change/add in the new model. Note: the data is not validated before creating the new model: you should trust this data

  • deep – set to True to make a deep copy of the model

Returns

new model instance

dict(*, include: Union[AbstractSetIntStr, MappingIntStrAny] = None, exclude: Union[AbstractSetIntStr, MappingIntStrAny] = None, by_alias: bool = False, skip_defaults: bool = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False) DictStrAny¶

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

get(key: str, default: Optional[Any] = None) Any¶

access any field in message content

Parameters
  • param key: field name

  • type key: str

  • return: value

  • rtype: Any

json(*, include: Union[AbstractSetIntStr, MappingIntStrAny] = None, exclude: Union[AbstractSetIntStr, MappingIntStrAny] = None, by_alias: bool = False, skip_defaults: bool = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False, encoder: Optional[Callable[[Any], Any]] = None, **dumps_kwargs: Any) unicode¶

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

update_content(other: pydantic.main.BaseModel) KeysView[str]¶

add fields from other model to update message’s content

Parameters
  • param other: other BaseModel object

  • type other: BaseModel

  • return: list of keys updated

  • rtype: KeysView[str]

classmethod update_forward_refs(**localns: Any) None¶

Try to update ForwardRefs on fields based on this Model, globalns and localns.

class pipeline.Producer(settings: pipeline.worker.ProducerSettings, output_class: Type[pydantic.main.BaseModel], logger: logging.Logger = <Logger pipeline (INFO)>)[source]¶

Producer is a worker to generate new messages. For example, a webcrawler can be a producer. It reads no input, and produce outputs until it exits.

Parameters
  • param settings: settings

  • type settings: ProducerSettings

  • param output_class: output class

  • type output_class: Type[BaseModel]

  • param logger: logger

  • type logger: Logger

Usage:

>>> from pydantic import BaseModel
>>>
>>> class Output(BaseModel):
...     pass
>>>
>>> settings = ProducerSettings(name='', version='', description='', out_kind='MEM')
>>> producer = Producer(settings, output_class=Output)
>>> producer.parse_args()
>>> #producer.start()
generate() Iterator[pydantic.main.BaseModel][source]¶

a producer to generate dict.

setup() None¶

loading code goes here

shutdown() None¶

clean up code goes here

class pipeline.Splitter(settings: pipeline.worker.SplitterSettings, logger: logging.Logger = <Logger pipeline (INFO)>)[source]¶

Splitter will write to a topic whose name is based on a function

setup() None¶

loading code goes here

shutdown() None¶

clean up code goes here

class pipeline.Processor(settings: pipeline.worker.ProcessorSettings, input_class: Type[pydantic.main.BaseModel], output_class: Type[pydantic.main.BaseModel], logger: logging.Logger = <Logger pipeline (INFO)>)[source]¶

Processor is a worker which will process incoming messages and output new messages

process(message_content: pydantic.main.BaseModel, message_id: str) pydantic.main.BaseModel[source]¶

process function to be overridden by users, for streaming processing, this function needs to do in-place update on msg.dct and return an error or a list of errors (for batch processing). Message has been terminated though .terminates() will be skipped in output.

A typical process definition will be:

1newValue = msg.value
2return OutputModel(value=newValue)
setup() None¶

loading code goes here

shutdown() None¶

clean up code goes here

start() None[source]¶

start processing.

class pipeline.Pipeline(**kwargs: Any)[source]¶

Pipeline manages SourceTap and DestinationTap when you don’t want to use predefined worker logic. Instead, you have access to SourceTap and DestinationTap directly.

Usage:

from .tap import TapKind, MemorySourceSettings, MemoryDestinationSettings
in_settings = MemorySourceSettings()
out_settings = MemoryDestinationSettings()
pipeline = Pipeline( in_kind=TapKind.MEM, in_settings=in_settings, out_kind=TapKind.MEM, out_settings=out_settings)

Take command line arguments:

pipeline = Pipeline(args=sys.argv)

Take environment settings:

pipeline = Pipeline()
add_destination_topic(name: str) None[source]¶

Add a new DestinationTap with a defined topic(queue) name

Parameters

name – a name given for the destination topic

add_source_topic(name: str) None[source]¶

Add a new SourceTap with a defined topic(queue) name

Parameters

name – a name given for the source topic

destination_of(name: str) pipeline.tap.DestinationTap[source]¶

Return the DestinationTap of specified topic(queue) name

source_of(name: str) pipeline.tap.SourceTap[source]¶

Return the SourceTap of specified topic(queue) name