#!/usr/bin/env python
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2017 Stephen Bunn (stephen@bunn.io)
# GNU GPLv3 <https://www.gnu.org/licenses/gpl-3.0.en.html>
import os
import abc
from . import const
from .engine import Engine
from . import (
scheduler,
requester,
pipe
)
import yaml
[docs]class AbstractClient(object, metaclass=abc.ABCMeta):
""" The basic class for all valid clients.
"""
[docs] @staticmethod
@abc.abstractmethod
def from_config(config_path: str):
""" Creates a client from a config file.
:param config: The path of the config file to load from
:type config: str
:returns: An instance of the created client
:rtype: AbstractClient
"""
raise NotImplementedError()
[docs] @abc.abstractmethod
def start(self) -> None:
""" Starts the engine.
:returns: Does not return
:rtype: None
"""
raise NotImplementedError()
[docs]class BasicClient(AbstractClient):
""" A very basic client for engine initalization.
"""
def __init__(self, config: dict):
""" Initializes the client.
:param config: A dictionary read in directly from a config file.
:type config: dict
"""
device_pairs = []
pipers = []
for (device_index, device) in enumerate(config['devices']):
device_setup = []
try:
device_scheduler_config = {
k: v
for (k, v) in device['scheduler'].items()
if k != '$'
}
device_setup.append(getattr(
scheduler, device['scheduler']['$']
)(**device_scheduler_config))
except Exception as exc:
const.log.error((
'could not initialize scheduler '
'for device at index `{device_index}` with config '
'{device_scheduler_config} ...'
).format(
device_index=device_index,
device_scheduler_config=device_scheduler_config
))
try:
device_requester_config = {
k: v
for (k, v) in device['requester'].items()
if k != '$'
}
device_setup.append(getattr(
requester, device['requester']['$']
)(**device_requester_config))
except Exception as exc:
const.log.error((
'could not initialize requester for device at index '
'`{device_index}` with config '
'{device_requester_config} ...'
).format(
device_index=device_index,
device_requester_config=device_requester_config
))
if len(device_setup) == 2:
device_pairs.append(tuple(device_setup))
for piper in config['pipes']:
try:
piper_config = {
k: v for (k, v) in piper.items() if k != '$'
}
piper = getattr(pipe, piper['$'])(**piper_config)
pipers.append(piper)
except Exception as exc:
const.log.error((
'could not initialize pipe `{piper[$]}` with config '
'{piper_config} ...'
).format(piper=piper, piper_config=piper_config))
self.engine = Engine(dict(device_pairs), pipers)
[docs] @staticmethod
def from_config(config: str):
""" Creates a BasicClient from a config file.
:param config: The path of the config file to load from
:type config: str
:returns: An instance of the created BasicClient
:rtype: BasicClient
"""
config = os.path.abspath(os.path.expanduser(config))
if os.path.isfile(config):
with open(config, 'r') as fp:
try:
return BasicClient(yaml.load(fp))
except yaml.YAMLError as exc:
const.log.error((
'client failed loading from config at `{config}`, '
'{exc.message} ...'
).format(config=config, exc=exc))
else:
raise FileNotFoundError((
"given file at '{config}' does not exist"
).format(config=config))
[docs] def start(self) -> None:
""" Starts the engine.
:returns: Does not return
:rtype: None
"""
try:
self.engine.start()
for _ in self.engine.register.keys():
_.join()
except KeyboardInterrupt:
const.log.info(('terminating client `{self}`').format(self=self))
self.engine.stop()