Module event_processor.base.aggregator_base
Expand source code
import os
import json
import re
import logging
import logging.handlers
from multiprocessing import Lock
from event_processor.config import config
from event_processor.util.object_hash import ObjectHash
from event_processor.models.event import EventManager
from datetime import datetime
from dateutil.relativedelta import relativedelta
from event_processor.util.time_utils import TimeUtils
from event_processor.util.http_utils import HttpUtils
class AggregatorBase:
# This class includes functionality that should be shared by spiders and API-based classes
enabled = True
name = 'AggregatorBase'
@property
def is_errored(self):
return any(log.levelno == logging.ERROR for log in self.memory_handler.buffer)
def __init__(self, organization, base_url, date_format, request_date_format = None, **kwargs):
if config.debug:
try:
import ptvsd
ptvsd.enable_attach(address=('0.0.0.0', 5860))
except:
# attach already enabled
pass
if not ptvsd.is_attached():
ptvsd.wait_for_attach()
self.organization = organization
# date_format is the string that specifies the date style of the target website
if request_date_format == None:
request_date_format = date_format
self.jobid = kwargs['_job'] if '_job' in kwargs else None
self.session = HttpUtils.get_session()
self.date_format = date_format
self.time_utils = TimeUtils(date_format)
self.base_url = base_url
self.identifier = re.sub(r'\W', '', base_url)
self.event_manager = EventManager()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
self.memory_handler = logging.handlers.MemoryHandler(0)
self.memory_handler.setFormatter(formatter)
self.stream_handler = logging.StreamHandler()
self.stream_handler.setFormatter(formatter)
self.configure_logger(self.name, self.memory_handler, logging.INFO)
self.configure_logger(self.name, self.stream_handler, logging.INFO)
self.configure_logger('scrapy', self.memory_handler, logging.WARNING)
self.configure_logger('scrapy', self.stream_handler, logging.WARNING)
self.configure_logger('twisted', self.memory_handler, logging.WARNING)
self.configure_logger('twisted', self.stream_handler, logging.WARNING)
start_date = datetime.now().strftime('%m-%d-%Y')
end_date = (datetime.now() + relativedelta(months=+1)).strftime('%m-%d-%Y')
request_format_utils = TimeUtils('%m-%d-%Y')
# When this is running for multiple days, validating if the date is in the past causes issues
self.start_date = request_format_utils.convert_date_format(start_date, request_date_format, validate_past=False)
self.end_date = request_format_utils.convert_date_format(end_date, request_date_format, validate_past=False)
self.start_timestamp = request_format_utils.min_timestamp_for_day(start_date)
self.end_timestamp = request_format_utils.max_timestamp_for_day(end_date)
def configure_logger(self, logger, handler, log_level):
logging.getLogger(logger).addHandler(handler)
logging.getLogger(logger).setLevel(log_level)
def notify_spider_complete(self):
logs = [self.memory_handler.format(log) for log in self.memory_handler.buffer]
return self.session.post(config.scheduler_spider_complete,
json={'jobid': self.jobid, 'errored': self.is_errored, 'logs': logs},
headers={'Content-Type': 'application/json'})
def item_filter(self, item):
"""Specify a filter for event items. This item should return True if
the item should be included in the results and False if not. """
return True
Classes
class AggregatorBase (organization, base_url, date_format, request_date_format=None, **kwargs)
-
Expand source code
class AggregatorBase: # This class includes functionality that should be shared by spiders and API-based classes enabled = True name = 'AggregatorBase' @property def is_errored(self): return any(log.levelno == logging.ERROR for log in self.memory_handler.buffer) def __init__(self, organization, base_url, date_format, request_date_format = None, **kwargs): if config.debug: try: import ptvsd ptvsd.enable_attach(address=('0.0.0.0', 5860)) except: # attach already enabled pass if not ptvsd.is_attached(): ptvsd.wait_for_attach() self.organization = organization # date_format is the string that specifies the date style of the target website if request_date_format == None: request_date_format = date_format self.jobid = kwargs['_job'] if '_job' in kwargs else None self.session = HttpUtils.get_session() self.date_format = date_format self.time_utils = TimeUtils(date_format) self.base_url = base_url self.identifier = re.sub(r'\W', '', base_url) self.event_manager = EventManager() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') self.memory_handler = logging.handlers.MemoryHandler(0) self.memory_handler.setFormatter(formatter) self.stream_handler = logging.StreamHandler() self.stream_handler.setFormatter(formatter) self.configure_logger(self.name, self.memory_handler, logging.INFO) self.configure_logger(self.name, self.stream_handler, logging.INFO) self.configure_logger('scrapy', self.memory_handler, logging.WARNING) self.configure_logger('scrapy', self.stream_handler, logging.WARNING) self.configure_logger('twisted', self.memory_handler, logging.WARNING) self.configure_logger('twisted', self.stream_handler, logging.WARNING) start_date = datetime.now().strftime('%m-%d-%Y') end_date = (datetime.now() + relativedelta(months=+1)).strftime('%m-%d-%Y') request_format_utils = TimeUtils('%m-%d-%Y') # When this is running for multiple days, validating if the date is in the past causes issues self.start_date = request_format_utils.convert_date_format(start_date, request_date_format, validate_past=False) self.end_date = request_format_utils.convert_date_format(end_date, request_date_format, validate_past=False) self.start_timestamp = request_format_utils.min_timestamp_for_day(start_date) self.end_timestamp = request_format_utils.max_timestamp_for_day(end_date) def configure_logger(self, logger, handler, log_level): logging.getLogger(logger).addHandler(handler) logging.getLogger(logger).setLevel(log_level) def notify_spider_complete(self): logs = [self.memory_handler.format(log) for log in self.memory_handler.buffer] return self.session.post(config.scheduler_spider_complete, json={'jobid': self.jobid, 'errored': self.is_errored, 'logs': logs}, headers={'Content-Type': 'application/json'}) def item_filter(self, item): """Specify a filter for event items. This item should return True if the item should be included in the results and False if not. """ return True
Subclasses
Class variables
var enabled
-
bool(x) -> bool
Returns True when the argument x is true, False otherwise. The builtins True and False are the only two instances of the class bool. The class bool is a subclass of the class int, and cannot be subclassed.
var name
-
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
Instance variables
var is_errored
-
Expand source code
@property def is_errored(self): return any(log.levelno == logging.ERROR for log in self.memory_handler.buffer)
Methods
def configure_logger(self, logger, handler, log_level)
-
Expand source code
def configure_logger(self, logger, handler, log_level): logging.getLogger(logger).addHandler(handler) logging.getLogger(logger).setLevel(log_level)
def item_filter(self, item)
-
Specify a filter for event items. This item should return True if the item should be included in the results and False if not.
Expand source code
def item_filter(self, item): """Specify a filter for event items. This item should return True if the item should be included in the results and False if not. """ return True
def notify_spider_complete(self)
-
Expand source code
def notify_spider_complete(self): logs = [self.memory_handler.format(log) for log in self.memory_handler.buffer] return self.session.post(config.scheduler_spider_complete, json={'jobid': self.jobid, 'errored': self.is_errored, 'logs': logs}, headers={'Content-Type': 'application/json'})