import logging
import sys
import traceback
from abc import ABC
from argparse import ArgumentParser
from copy import copy
from datetime import datetime
from enum import IntEnum, Enum
from typing import Optional, List, Dict, Iterator, Type, KeysView, Union, cast, Any
from logging import Logger
from pydantic import BaseModel, ByteSize, Field, ValidationError, parse_obj_as
from pydantic.schema import model_schema
from .version import version
from .exception import PipelineError, PipelineInputError, PipelineOutputError
from .message import Message, Command, Log, Kind, MessageBase
from .monitor import WorkerMonitor as Monitor
from .tap import DestinationTap, SourceTap
from .tap import TapKind, SourceSettings, DestinationSettings # noqa: F401
from .helpers import Settings, Timer
pipelineLogger = logging.getLogger("pipeline")
pipelineLogger.setLevel(logging.INFO)
class CommandActions(str, Enum):
Reset = "RESET"
Define = "DEFINE"
class Definition(BaseModel):
""" """
name: str
version: str
description: str
source: SourceSettings
destination: Optional[DestinationSettings]
input_schema: Dict[str, Any] = dict()
output_schema: Dict[str, Any] = dict()
@classmethod
def new(cls, **data):
source = data.get("source")
data["source"] = SourceSettings(
**source.dict(include=SourceSettings.__fields__.keys())
)
destination = data.get("destination")
if destination:
data["destination"] = DestinationSettings(
**destination.dict(include=DestinationSettings.__fields__.keys())
)
if "input_class" in data:
input_class = data.get("input_class")
input_schema = model_schema(input_class, ref_prefix="#/components/schemas/")
del data["input_class"]
data["input_schema"] = input_schema
if "output_class" in data:
output_class = data.get("output_class")
output_schema = model_schema(
output_class, ref_prefix="#/components/schemas/"
)
del data["output_class"]
data["output_schema"] = output_schema
return cls(**data)
class WorkerType(IntEnum):
Normal = 0
NoInput = 1
NoOutput = 2
class WorkerSettings(Settings):
name: str
version: str
description: str
in_kind: TapKind = Field(None, title="input kind")
out_kind: TapKind = Field(None, title="output kind")
debug: bool = Field(False, title="print DEBUG log")
monitoring: bool = Field(False, title="enable prometheus monitoring")
class Worker(ABC):
"""Internal base class for pulsar-worker, DO NOT use it in your program!"""
def __init__(
self,
settings: WorkerSettings,
worker_type: WorkerType = WorkerType.Normal,
logger: Logger = pipelineLogger,
) -> None:
self.name = settings.name
self.version = settings.version
self.description = settings.description
self.worker_type = worker_type
self.settings = settings
self.logger = logger
self.timer = Timer()
self.monitor = Monitor(self)
self.logger.info(
f"Pipeline Worker {self.name} {self.version} (pipeline {version})"
)
def setup(self) -> None:
"""loading code goes here"""
pass
def shutdown(self) -> None:
"""clean up code goes here"""
pass
def has_input(self) -> bool:
return self.worker_type != WorkerType.NoInput
def has_output(self) -> bool:
return self.worker_type != WorkerType.NoOutput
def parse_args(self, args: List[str] = sys.argv[1:]) -> None:
parser = ArgumentParser(add_help=False)
parser.add_argument(
"-V",
action="version",
version=f"{self.version} (pipeline {version})",
help="show version",
)
parser.add_argument(
"-h", "--help", action="store_true", default=False, help="display help"
)
options, unknown = self.settings.parse_args(args, parser)
if self.has_input() and self.settings.in_kind:
self.source_and_settings_classes = SourceTap.of(self.settings.in_kind)
source_settings = self.source_and_settings_classes.settings_class()
source_settings.parse_args(args, parser)
if not options.help:
self.source = self.source_and_settings_classes.source_class(
settings=source_settings, logger=self.logger
)
self.logger.info(f"Source: {self.source}")
if self.settings.out_kind:
self.destination_and_settings_classes = DestinationTap.of(
self.settings.out_kind
)
destination_settings = (
self.destination_and_settings_classes.settings_class()
)
destination_settings.parse_args(args, parser)
if not options.help:
self.destination = (
self.destination_and_settings_classes.destination_class(
settings=destination_settings, logger=self.logger
)
)
self.logger.info(f"Destination: {self.destination}")
if options.help:
print(parser.format_help())
sys.exit(0)
if self.has_input() and self.settings.in_kind is None:
self.logger.critical("Please specify '--in-kind' or environment 'IN_KIND'!")
raise PipelineError("Please specify '--in-kind' or environment 'IN_KIND'!")
elif self.has_output() and self.settings.out_kind is None:
self.logger.critical(
"Please specify '--out-kind' or environment 'OUT_KIND'!"
)
raise PipelineError(
"Please specify '--out-kind' or environment 'OUT_KIND'!"
)
# report worker info to monitor
self.monitor.record_worker_info()
class ProducerSettings(WorkerSettings):
"""Producer settings"""
pass
[docs]class Producer(Worker):
"""Producer is a worker to generate new messages. For example, a webcrawler can
be a producer. It reads no input, and produce outputs until it exits.
Parameters:
:param settings: settings
:type settings: ProducerSettings
:param output_class: output class
:type output_class: Type[BaseModel]
:param logger: logger
:type logger: Logger
Usage:
.. code-block:: python
>>> from pydantic import BaseModel
>>>
>>> class Output(BaseModel):
... pass
>>>
>>> settings = ProducerSettings(name='', version='', description='', out_kind='MEM')
>>> producer = Producer(settings, output_class=Output)
>>> producer.parse_args()
>>> #producer.start()
"""
generator: Iterator[BaseModel]
def __init__(
self,
settings: ProducerSettings,
output_class: Type[BaseModel],
logger: Logger = pipelineLogger,
):
super().__init__(settings, worker_type=WorkerType.NoInput, logger=logger)
self.output_class = output_class
[docs] def generate(self) -> Iterator[BaseModel]:
"""a producer to generate dict."""
yield self.output_class()
def _step(self) -> BaseModel:
return next(self.generator)
def start(self) -> None:
try:
options = self.settings
except AttributeError:
self.logger.critical("Did you forget to run .parse_args before start?")
return
self.logger.setLevel(level=logging.INFO)
if options.debug:
self.logger.setLevel(level=logging.DEBUG)
self.logger.info("settings: write topic %s", self.destination.topic)
self.setup()
if self.settings.monitoring:
self.monitor.expose()
self.monitor.record_start()
try:
self.generator = self.generate()
i = 0
while True:
i += 1
log = Log(
name=self.name,
version=self.version,
updated=set(),
received=datetime.now(),
)
self.timer.start()
output = self._step()
self.timer.log(self.logger)
if hasattr(output, "id"):
msg = Message(id=output.id, content=output.dict()) # type: ignore
else:
msg = Message(content=output.dict())
self.logger.info("Generated %d-th message %s", i, msg)
log.updated.update(output.dict().keys())
msg.logs.append(log)
self.logger.info("Writing message %s", msg)
size = ByteSize(self.destination.write(msg))
self.logger.info(f"Message size: {size.human_readable()}")
self.monitor.record_write(self.destination.topic)
except StopIteration:
pass
except PipelineError as e:
e.log(self.logger)
except Exception as e:
self.logger.error(traceback.format_exc())
self.monitor.record_error(str(e))
self.shutdown()
self.destination.close()
self.monitor.record_finish()
class SplitterSettings(WorkerSettings):
in_kind: TapKind = Field(None, title="input kind")
out_kind: TapKind = Field(None, title="output kind")
[docs]class Splitter(Worker):
"""Splitter will write to a topic whose name is based on a function"""
def __init__(
self,
settings: SplitterSettings,
logger: Logger = pipelineLogger,
) -> None:
super().__init__(settings, logger=logger)
# keep a dictionary for 'topic': 'destination', self.destination is only used to parse
# command line arguments
self.destinations: Dict[str, DestinationTap] = {}
def get_topic(self, msg: Message) -> str:
raise NotImplementedError("You need to implement .get_topic(self, msg)")
def _run_streaming(self) -> None:
for msg in self.source.read():
self.logger.info("Received message '%s'", str(msg))
self.monitor.record_read(self.source.topic)
log = Log(
name=self.name,
version=self.version,
updated=set(),
received=datetime.now(),
)
if msg.kind == Kind.Message:
topic = self.get_topic(cast(Message, msg))
if topic not in self.destinations:
settings = copy(self.destination.settings)
settings.topic = topic
self.destinations[
topic
] = self.destination_and_settings_classes.destination_class(
settings, logger=self.logger
)
destination = self.destinations[topic]
msg.logs.append(log)
self.logger.info("Writing message %s to topic <%s>", str(msg), topic)
msgSize = destination.write(msg)
self.logger.info(f"Message size: {msgSize}")
self.monitor.record_write(topic)
self.source.acknowledge()
def start(self) -> None:
self.logger.setLevel(level=logging.INFO)
if self.settings.debug:
self.logger.setLevel(level=logging.DEBUG)
# if options.rewind:
# # consumer.seek(pulsar.MessageId.earliest)
# self.logger.info("seeked to earliest message available as requested")
self.setup()
self.logger.info("start listening")
# if batch_mode:
# self._run_batch()
# else:
if self.settings.monitoring:
self.monitor.expose()
self.monitor.record_start()
try:
self._run_streaming()
except PipelineError as e:
e.log(self.logger)
except Exception as e:
self.logger.error(traceback.format_exc())
self.monitor.record_error(str(e))
self.shutdown()
self.source.close()
self.destination.close()
self.monitor.record_finish()
class ProcessorSettings(WorkerSettings):
limit: int = Field(
None, title="set a limit to number of messages to process before exiting"
)
[docs]class Processor(Worker):
"""Processor is a worker which will process incoming messages and output
new messages
"""
settings: ProcessorSettings
input_class: Type[BaseModel]
output_class: Type[BaseModel]
destination_class: Optional[Type[DestinationTap]]
def __init__(
self,
settings: ProcessorSettings,
input_class: Type[BaseModel],
output_class: Type[BaseModel],
logger: Logger = pipelineLogger,
) -> None:
if output_class is None:
super().__init__(settings, worker_type=WorkerType.NoOutput, logger=logger)
else:
super().__init__(settings, worker_type=WorkerType.Normal, logger=logger)
self.retryEnabled = False
self.input_class = input_class
self.output_class = output_class
self.destination_class = None
[docs] def process(self, message_content: BaseModel, message_id: str) -> BaseModel:
"""process function to be overridden by users, for streaming
processing, this function needs to do in-place update on msg.dct
and return an error or a list of errors (for batch processing).
Message has been terminated though .terminates() will be skipped
in output.
A typical process definition will be:
.. code-block:: python
:linenos:
newValue = msg.value
return OutputModel(value=newValue)
"""
raise NotImplementedError("You need to implement .process()")
def _step(self, msg: MessageBase) -> None:
"""process one message"""
self.logger.info(f"Receive message {msg}")
log = Log(
name=self.name,
version=self.version,
updated=set(),
received=datetime.now(),
)
if msg.kind == Kind.Command:
updated = self.process_command(cast(Command, msg))
elif msg.kind == Kind.Message:
updated = self.process_message(cast(Message, msg))
else:
raise PipelineError("Unrecognized message kind")
if updated:
log.updated.update(updated)
log.processed = datetime.now()
log.elapsed = self.timer.elapsed_time()
msg.logs.append(log)
if self.has_output():
size = self.destination.write(msg)
self.logger.info(f"Wrote message {msg}(size:{size})")
self.monitor.record_write(self.destination.topic)
def process_message(self, msg: Message) -> Union[KeysView[str], None]:
try:
if isinstance(self.input_class, dict):
input_data = msg.content
else:
input_data = msg.as_model(self.input_class)
self.logger.info(f"Prepared input {input_data}")
except ValidationError as e:
self.logger.exception(
f"Input validation failed for message {msg}", exc_info=e
)
raise PipelineInputError(f"Input validation failed for message {msg}")
try:
setattr(self, "message", msg)
output_data = self.process(input_data, msg.id)
self.logger.info(f"Processed message {msg}")
except Exception:
raise
finally:
delattr(self, "message")
updated = None
if self.has_output():
try:
output_model = parse_obj_as(self.output_class, output_data)
self.logger.info(f"Validated output {output_model}")
except ValidationError as e:
self.logger.exception(
f"Output validation failed for message {msg}", exc_info=e
)
raise PipelineOutputError(f"Output validation failed for message {msg}")
if output_model:
updated = msg.update_content(output_model)
return updated
def process_command(self, cmd: Command) -> Union[KeysView[str], None]:
updated = None
if cmd.action == CommandActions.Define:
dct = {
"name": self.name,
"version": self.version,
"description": self.description,
}
dct["source"] = self.source.settings
if self.has_output():
dct["destination"] = self.destination.settings
if self.input_class:
dct["input_class"] = self.input_class
if self.output_class:
dct["output_class"] = self.output_class
definition = Definition.new(**dct)
updated = cmd.update_content(definition)
else:
raise PipelineInputError("Unknown command")
return updated
[docs] def start(self) -> None:
"""start processing."""
self.logger.setLevel(level=logging.INFO)
if self.settings.debug:
self.logger.setLevel(level=logging.DEBUG)
self.limit = (
self.settings.limit - 1 if self.settings.limit else self.settings.limit
)
# if options.rewind:
# # consumer.seek(pulsar.MessageId.earliest)
# self.logger.info("seeked to earliest message available as requested")
self.setup()
self.logger.info(f"start reading from source {self.source}")
if self.has_output():
self.logger.info(f"will write to destination {self.destination}")
if self.settings.monitoring:
self.monitor.expose()
self.monitor.record_start()
for i, msg in enumerate(self.source.read()):
self.monitor.record_read(self.source.topic)
self.logger.info("Received %d-th message '%s'", i, str(msg))
self.timer.start()
with self.monitor.process_timer.time():
try:
self._step(msg)
except PipelineInputError as e:
# If input error, upstream worker should act
self.logger.error(traceback.format_exc())
self.monitor.record_error(str(e))
except PipelineOutputError as e:
# If output error, worker needs to exit without acknowldgement
self.logger.error(traceback.format_exc())
self.monitor.record_error(str(e))
raise
except Exception as e:
self.logger.error(traceback.format_exc())
self.monitor.record_error(str(e))
self.timer.log(self.logger)
self.source.acknowledge()
if i == self.limit:
self.logger.info(f"Limit {self.limit} reached, exiting")
break
self.shutdown()
self.source.close()
if self.has_output():
self.destination.close()
self.monitor.record_finish()