Source code for neat.pipe.mongodb

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2017 Stephen Bunn (stephen@bunn.io)
# GNU GPLv3 <https://www.gnu.org/licenses/gpl-3.0.en.html>

import time

from .. import const
from ..models import Record
from ._common import AbstractPipe

import pymongo


[docs]class MongoDBPipe(AbstractPipe): """ A record pipe for MongoDB. .. note:: Records are always placed in the `neat` table. """ def __init__(self, ip: str, port: int, table: str, entry_delay: int=600): """ Initializes the MongoDB pipe. :param ip: The IP of the MongoDB instance :type ip: str :param port: The port of the MongoDB instance :type port: int :param table: The table name of the table to place records into :type table: str :param entry_delay: Seconds between allowing records into the database :type entry_delay: int """ (self._ip, self._port) = (ip, port) self._table_name = table (self._entry_register, self._entry_delay) = ({}, entry_delay) def __repr__(self): """ A string representation of the pipe. :returns: A string representation of the pipe :rtype: str """ return (( '<{self.__class__.__name__} ({self._ip}:{self._port})>' ).format(self=self)) @property def client(self) -> pymongo.MongoClient: """ The client attached to the MongoDB uri. .. warning:: MongoDB driver connections are not fork safe """ # NOTE: since mongodb driver connections are not fork safe, setting # connect to False is required if not hasattr(self, '_client'): try: self._client = pymongo.MongoClient( self._ip, self._port, connect=False ) except pymongo.errors.ConnectionFailure as exc: const.log.error(( 'could not connect to mongodb server at ' '`{self._ip}:{self._port}`, {exc.message} ...' ).format(self=self)) raise exc return self._client @property def db(self): """ The database of the client to write to. """ if not hasattr(self, '_db'): self._db = self.client[const.module_name] return self._db @property def table(self): """ The table of the database to write to. """ if not hasattr(self, '_table'): self._table = self.db.collection[self._table_name] return self._table
[docs] def accept(self, record: Record) -> None: """ Accepts a record to be placed into the MongoDB instance. :param record: The record to be placed in the MongoDB instance :type record: Record :returns: Does not return :rtype: None """ const.log.debug(( 'handling `{record}` with `{self}` ...' ).format(self=self, record=record)) last_write = time.time() insert = False try: last_write = (time.time() - self._entry_register[record.name]) if last_write >= self._entry_delay: insert = True except KeyError as exc: self._entry_register[record.name] = time.time() insert = True if insert: const.log.debug(( 'commiting `{record}` records into `{self}` ...' ).format(self=self, record=record)) self.table.insert_one({ (k.replace('$', '') if k.startswith('$') else k): v for (k, v) in record.to_dict().items() }) else: const.log.debug(( 'dropping `{record}` for `{self}`, time till next write is ' '`{next_write}` seconds ...' ).format( self=self, record=record, next_write=(self._entry_delay - last_write) )) self.signal.send(self, record=record)
[docs] def validate(self) -> bool: """ Self validates the MongoDB pipe. :returns: True if the pipe is valid, otherwise False :rtype: bool """ try: self.client return True except pymongo.errors.ConnectionFailure as exc: pass return False