Module event_processor.base.aggregator_base

Expand source code
import os
import json
import re
import logging
import logging.handlers

from multiprocessing import Lock
from event_processor.config import config
from event_processor.util.object_hash import ObjectHash
from event_processor.models.event import EventManager
from datetime import datetime
from dateutil.relativedelta import relativedelta

from event_processor.util.time_utils import TimeUtils
from event_processor.util.http_utils import HttpUtils

class AggregatorBase:
    # This class includes functionality that should be shared by spiders and API-based classes
    enabled = True
    name = 'AggregatorBase'

    @property
    def is_errored(self):
        return any(log.levelno == logging.ERROR for log in self.memory_handler.buffer)

    def __init__(self, organization, base_url, date_format, request_date_format = None, **kwargs):
        if config.debug:
            try:
                import ptvsd
                ptvsd.enable_attach(address=('0.0.0.0', 5860))
            except:
                # attach already enabled
                pass
            if not ptvsd.is_attached():
                ptvsd.wait_for_attach()
        
        self.organization = organization
        # date_format is the string that specifies the date style of the target website
        if request_date_format == None:
            request_date_format = date_format

        self.jobid = kwargs['_job'] if '_job' in kwargs else None

        self.session = HttpUtils.get_session()

        self.date_format = date_format
        self.time_utils = TimeUtils(date_format)
        self.base_url = base_url
        self.identifier = re.sub(r'\W', '', base_url)
        self.event_manager = EventManager()

        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

        self.memory_handler = logging.handlers.MemoryHandler(0)
        self.memory_handler.setFormatter(formatter)

        self.stream_handler = logging.StreamHandler()
        self.stream_handler.setFormatter(formatter)

        self.configure_logger(self.name, self.memory_handler, logging.INFO)
        self.configure_logger(self.name, self.stream_handler, logging.INFO)
        self.configure_logger('scrapy', self.memory_handler, logging.WARNING)
        self.configure_logger('scrapy', self.stream_handler, logging.WARNING)
        self.configure_logger('twisted', self.memory_handler, logging.WARNING)
        self.configure_logger('twisted', self.stream_handler, logging.WARNING)
        
        start_date = datetime.now().strftime('%m-%d-%Y')
        end_date = (datetime.now() + relativedelta(months=+1)).strftime('%m-%d-%Y')
        
        request_format_utils = TimeUtils('%m-%d-%Y')
        # When this is running for multiple days, validating if the date is in the past causes issues
        self.start_date = request_format_utils.convert_date_format(start_date, request_date_format, validate_past=False)
        self.end_date = request_format_utils.convert_date_format(end_date, request_date_format, validate_past=False)
        self.start_timestamp = request_format_utils.min_timestamp_for_day(start_date)
        self.end_timestamp = request_format_utils.max_timestamp_for_day(end_date)

    def configure_logger(self, logger, handler, log_level):
        logging.getLogger(logger).addHandler(handler)
        logging.getLogger(logger).setLevel(log_level)

    def notify_spider_complete(self):
        logs = [self.memory_handler.format(log) for log in self.memory_handler.buffer]
        return self.session.post(config.scheduler_spider_complete, 
                            json={'jobid': self.jobid, 'errored': self.is_errored, 'logs': logs}, 
                            headers={'Content-Type': 'application/json'})

    def item_filter(self, item):
        """Specify a filter for event items. This item should return True if 
           the item should be included in the results and False if not. """
        return True  

Classes

class AggregatorBase (organization, base_url, date_format, request_date_format=None, **kwargs)
Expand source code
class AggregatorBase:
    # This class includes functionality that should be shared by spiders and API-based classes
    enabled = True
    name = 'AggregatorBase'

    @property
    def is_errored(self):
        return any(log.levelno == logging.ERROR for log in self.memory_handler.buffer)

    def __init__(self, organization, base_url, date_format, request_date_format = None, **kwargs):
        if config.debug:
            try:
                import ptvsd
                ptvsd.enable_attach(address=('0.0.0.0', 5860))
            except:
                # attach already enabled
                pass
            if not ptvsd.is_attached():
                ptvsd.wait_for_attach()
        
        self.organization = organization
        # date_format is the string that specifies the date style of the target website
        if request_date_format == None:
            request_date_format = date_format

        self.jobid = kwargs['_job'] if '_job' in kwargs else None

        self.session = HttpUtils.get_session()

        self.date_format = date_format
        self.time_utils = TimeUtils(date_format)
        self.base_url = base_url
        self.identifier = re.sub(r'\W', '', base_url)
        self.event_manager = EventManager()

        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

        self.memory_handler = logging.handlers.MemoryHandler(0)
        self.memory_handler.setFormatter(formatter)

        self.stream_handler = logging.StreamHandler()
        self.stream_handler.setFormatter(formatter)

        self.configure_logger(self.name, self.memory_handler, logging.INFO)
        self.configure_logger(self.name, self.stream_handler, logging.INFO)
        self.configure_logger('scrapy', self.memory_handler, logging.WARNING)
        self.configure_logger('scrapy', self.stream_handler, logging.WARNING)
        self.configure_logger('twisted', self.memory_handler, logging.WARNING)
        self.configure_logger('twisted', self.stream_handler, logging.WARNING)
        
        start_date = datetime.now().strftime('%m-%d-%Y')
        end_date = (datetime.now() + relativedelta(months=+1)).strftime('%m-%d-%Y')
        
        request_format_utils = TimeUtils('%m-%d-%Y')
        # When this is running for multiple days, validating if the date is in the past causes issues
        self.start_date = request_format_utils.convert_date_format(start_date, request_date_format, validate_past=False)
        self.end_date = request_format_utils.convert_date_format(end_date, request_date_format, validate_past=False)
        self.start_timestamp = request_format_utils.min_timestamp_for_day(start_date)
        self.end_timestamp = request_format_utils.max_timestamp_for_day(end_date)

    def configure_logger(self, logger, handler, log_level):
        logging.getLogger(logger).addHandler(handler)
        logging.getLogger(logger).setLevel(log_level)

    def notify_spider_complete(self):
        logs = [self.memory_handler.format(log) for log in self.memory_handler.buffer]
        return self.session.post(config.scheduler_spider_complete, 
                            json={'jobid': self.jobid, 'errored': self.is_errored, 'logs': logs}, 
                            headers={'Content-Type': 'application/json'})

    def item_filter(self, item):
        """Specify a filter for event items. This item should return True if 
           the item should be included in the results and False if not. """
        return True  

Subclasses

Class variables

var enabled

bool(x) -> bool

Returns True when the argument x is true, False otherwise. The builtins True and False are the only two instances of the class bool. The class bool is a subclass of the class int, and cannot be subclassed.

var name

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

Instance variables

var is_errored
Expand source code
@property
def is_errored(self):
    return any(log.levelno == logging.ERROR for log in self.memory_handler.buffer)

Methods

def configure_logger(self, logger, handler, log_level)
Expand source code
def configure_logger(self, logger, handler, log_level):
    logging.getLogger(logger).addHandler(handler)
    logging.getLogger(logger).setLevel(log_level)
def item_filter(self, item)

Specify a filter for event items. This item should return True if the item should be included in the results and False if not.

Expand source code
def item_filter(self, item):
    """Specify a filter for event items. This item should return True if 
       the item should be included in the results and False if not. """
    return True  
def notify_spider_complete(self)
Expand source code
def notify_spider_complete(self):
    logs = [self.memory_handler.format(log) for log in self.memory_handler.buffer]
    return self.session.post(config.scheduler_spider_complete, 
                        json={'jobid': self.jobid, 'errored': self.is_errored, 'logs': logs}, 
                        headers={'Content-Type': 'application/json'})