Source code for tamarco.resources.bases
from typing import List
from tamarco.resources.basic.status.status_codes import StatusCodes
[docs]class BaseResource:
"""Define the basic interface of a resource.
All the tamarco resources should inherit from this class.
Resource start call chain:
1. bind
2. configure_settings
3. pre_start
4. start
5. post_start
Resource stop call chain:
1. stop
2. post_stop
"""
depends_on = []
loggers_names = []
def __init__(self):
self.name = None
self.microservice = None
self.settings = None
self._status = StatusCodes.NOT_STARTED
[docs] async def bind(self, microservice, name):
"""Build method, the microservice binds all its resources. Microservice starts and stops the resources.
Args:
microservice (Microservice): Microservice instance managing the resource.
name (str): Name of the resource instance in the microservice class.
"""
self.microservice = microservice
self.name = name
[docs] async def pre_start(self):
"""Pre start stage of the resource lifecycle."""
pass
[docs] async def start(self):
"""Start stage of the resource lifecycle."""
self._status = StatusCodes.STARTED
[docs] async def post_start(self):
"""Post start stage of the resource lifecycle."""
pass
[docs] async def stop(self):
"""Stop stage of the resource lifecycle."""
self._status = StatusCodes.STOPPED
[docs] async def post_stop(self):
"""Post stop stage of the resource lifecycle."""
pass
[docs] async def status(self) -> dict:
"""Return information about the state of the resource."""
return {"status": self._status}
def __repr__(self):
return f"<{self.__class__} name={self.name}>"
class StreamBase:
def __init__(self, name, codec=None, resource=None):
"""
Args:
name (str): Name of the stream.
codec (CodecInterface): Used to decode the input and output from the stream.
resource (BaseResource): Owner resource of the stream.
"""
self.name = name
self.codec = codec
self.resource = resource
class InputBase(StreamBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.on_message_callback = None
if self.resource:
self.resource.add_input(self)
def __call__(self, callback):
"""Allow the input to behave as a decorator."""
self.on_message_callback = callback
return self
def __aiter__(self):
"""Allow the input to behave as an asyncronous iterator."""
return self
def __anext__(self):
"""Allow the input to behave as an asyncronous iterator."""
raise NotImplementedError("The __anext__ method should be implemented in the child class.")
def __repr__(self):
return f"Tamarco Input {self.name} from resource {self.resource}"
class OutputBase(StreamBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.resource:
self.resource.add_output(self)
def __repr__(self):
return f"Tamarco Output {self.name} from resource {self.resource}"
[docs]class IOResource(BaseResource):
"""Extended resource that manages I/O streams, like Kafka and AMQP."""
def __init__(self, inputs: List = None, outputs: List = None):
super().__init__()
self.inputs = {}
self.outputs = {}
inputs = [] if inputs is None else inputs
outputs = [] if outputs is None else outputs
for input_element in inputs:
self.add_input(input_element)
for output_element in outputs:
self.add_output(output_element)
[docs] def add_output(self, output):
"""Add one output.
Args:
output (OutputBase): Output to add.
"""
assert output.name not in self.outputs, "Error two outputs with the same name"
self.outputs[output.name] = output
[docs]class DatabaseResource(BaseResource):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.client = None
[docs] async def start(self, clean_database=False, register_scripts=True):
await self.connect()
await self.init_db(clean_database=clean_database, register_scripts=register_scripts)
await super().start()
[docs] async def stop(self):
self._status = StatusCodes.STOPPING
await self.disconnect()
await super().stop()
async def connect(self, *args, **kwargs):
pass
async def disconnect(self, *args, **kwargs):
pass
async def init_db(self, *args, **kwargs):
pass
def clear(self):
pass
[docs] async def status(self):
return {"status": self._status}