#!/usr/bin/env python
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2017 Stephen Bunn (stephen@bunn.io)
# GNU GPLv3 <https://www.gnu.org/licenses/gpl-3.0.en.html>
from typing import Dict, List
from . import const
from .models.record import Record
from .scheduler._common import AbstractScheduler
from .requester._common import AbstractRequester
from .translator._common import AbstractTranslator
from .pipe._common import AbstractPipe
from .translator import get_translator
import blinker
[docs]class Engine(object):
""" Provides communication between all of the subpackages.
"""
on_start = blinker.Signal()
on_stop = blinker.Signal()
def __init__(
self,
register: Dict[AbstractScheduler, AbstractRequester]={},
pipes: List[AbstractPipe]=[],
):
""" Initializes an instance of the engine.
:param register: A dictionary of schedulers mapped to requesters
:type register: dict
:param pipes: A list of pipes that should be used for records
:type pipes: list
"""
self._register = register
self._pipes = pipes
self._translators = {}
@property
def register(self) -> Dict[AbstractScheduler, AbstractRequester]:
""" The mapping of schedulers to requesters.
"""
return self._register
@property
def translators(self) -> List[AbstractTranslator]:
""" The list of translator objects that have been needed.
"""
return self._translators
@property
def pipes(self) -> List[AbstractPipe]:
""" The list of pipe objects that are handling created records.
"""
return self._pipes
[docs] def on_scheduled(self, scheduler: AbstractScheduler) -> None:
""" Event handler for when schedulers trigger their mapped requesters.
:param scheduler: The scheduler that needs to run its requester
:type scheduler: AbstractScheduler
:returns: Does not return
:rtype: None
"""
const.log.debug((
'scheduled request from scheduler `{scheduler}` ...'
).format(scheduler=scheduler))
self.register[scheduler].request()
[docs] def on_data(
self,
requester: AbstractRequester, data: str, meta: dict
) -> None:
""" Event handler for when requesters get a response from their device.
:param requester: The requester who retrieved the data
:type requester: AbstractRequester
:param data: The data returned from the device
:type data: str
:param meta: Any additional fields required to properly interpret data
:type meta: dict
:returns: Does not return
:rtype: None
"""
const.log.debug((
'recieved data from requester `{requester}` ...'
).format(requester=requester, data=data))
requester_name = requester.__class__.__name__
if requester_name not in self.translators:
translator = get_translator(requester_name)()
translator.signal.connect(self.on_record)
self.translators[requester_name] = get_translator(requester_name)()
self.translators[requester_name].translate(data, meta=meta)
[docs] def on_record(self, record: Record) -> None:
""" Event handler for when translators finish translation of some data.
:param record: The translated record
:type record: Record
:returns: Does not return
:rtype: None
"""
if not record.validate():
const.log.error((
'invalid record recieved `{record}` ...'
).format(record=record))
else:
const.log.debug((
'adding record `{record}` to pipes ...'
).format(record=record))
for piper in self.pipes:
piper.accept(record)
[docs] def on_commit(self, piper: AbstractPipe, record: Record) -> None:
""" Event handler for when pipes finish writing out a record.
:param piper: The pipe who wrote the record out
:type piper: AbstractPipe
:param record: The record that was written
:type record: Record
:returns: Does not return
:rtype: None
"""
const.log.debug((
'pipe `{piper}` successfully handled record `{record}` ...'
).format(piper=piper, record=record))
[docs] def start(self) -> None:
""" Starts the engine.
:returns: Does not return
:rtype: None
"""
self.on_start.send(self)
for piper in self.pipes:
if not piper.validate():
const.log.warning((
'pipe `{piper}` did not pass validation, '
'removing from pipes ...'
).format(piper=piper))
self.pipes.remove(piper)
else:
const.log.debug((
'utilizing pipe `{piper}` ...'
).format(piper=piper))
piper.signal.connect(self.on_commit)
for (scheduler, requester) in self.register.items():
scheduler.signal.connect(self.on_scheduled)
requester.signal.connect(self.on_data)
scheduler.start()
const.log.info((
'starting scheduler `{scheduler}` signal as daemon '
'with pid `{scheduler.pid}` for `{requester}` ...'
).format(scheduler=scheduler, requester=requester))
[docs] def stop(self) -> None:
""" Stops the engine.
:returns: Does not return
:rtype: None
"""
const.log.info((
'stopping engine and scheduler threads with pids '
'{scheduler_pids} ...'
).format(scheduler_pids=[_.pid for _ in self.register.keys()]))
for (scheduler, requester) in self.register.items():
const.log.debug((
'ensuring scheduler `{scheduler}` with pid '
'`{scheduler.pid}` is terminated ...'
).format(scheduler=scheduler))
scheduler.terminate()
self.on_stop.send(self)