Module event_processor.scrapy_impl.pipelines
Expand source code
# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
from event_processor.models.event import EventManager, Event, EventLoader
from event_processor.util.object_hash import ObjectHash
from threading import Lock
from event_processor.config import config
from event_processor.util.data_utils import DataUtils
from event_processor.util.time_utils import TimeUtils
from event_processor.util.http_utils import HttpUtils
from scrapy.exceptions import DropItem
from datetime import datetime
import json
class EventTransformPipeline:
"""??? EventTransformPipeline: """
def __init__(self):
self.time_utils = TimeUtils()
def process_item(self, item, spider):
item['organization'] = spider.organization
if 'event_time' in item:
item['event_time']['date_format'] = spider.date_format
loader = EventLoader(**item)
# see if there is a custom filter for the item
if not spider.item_filter(item):
raise DropItem('Custom item filter did not allow this event')
if 'event_time' in loader.item:
time = loader.item['event_time']
if self.time_utils.time_range_is_between(time['start_timestamp'], time['end_timestamp'], spider.start_timestamp, spider.end_timestamp):
return loader.item
else:
raise DropItem('Event is not in the configured timeframe')
else:
return loader.item
class GeocodePipeline:
"""Get the Geocodes from the parsed address, if an address was found."""
def __init__(self):
self.session = HttpUtils.get_session()
def process_item(self, item, spider):
if 'address' in item:
try:
params = {
'address': item['address'],
'lat': item['lat'] if 'lat' in item else None,
'lon': item['lon'] if 'lon' in item else None
}
geocode = self.session.get(config.get_geocode, params=params)
geocode_json = geocode.json()
item['geocode_id'] = geocode_json['id']
if geocode_json['lat'] == None:
spider.logger.warning(f'No geocode response for address {item["address"]}')
except Exception as e:
spider.logger.warning(f'Exception while getting geocode for address {item["address"]}: {e}')
return item
class EventBuildPipeline:
"""??? EventBuildPipeline."""
def process_item(self, item, spider):
"""Given an item and a spider, update the item based on its url"""
spider.event_manager.update(item['url'], item)
return item
class EventSavePipeline:
def __init__(self):
self.session = HttpUtils.get_session()
def close_spider(self, spider):
if len(spider.event_manager.events) == 0:
spider.logger.info(f'No data returned for ' + spider.base_url)
else:
self.save_events(spider)
if config.run_scheduler:
spider.notify_spider_complete()
def save_events(self, spider):
"""??? Save the given events found by a spider to the database."""
event_list = spider.event_manager.to_dicts()
new_hash = ObjectHash.create_hash(event_list)
spider.logger.info(f'Found {len(event_list)} events for {event_list[0]["organization"]}.')
if new_hash == ObjectHash.get(spider.identifier):
spider.logger.info(f'Nothing to update.')
return
ObjectHash.set(spider.identifier, new_hash)
if spider.is_errored:
spider.logger.info('Errors occurred during processing so events will not be saved')
else:
response = self.session.post(config.put_events, json={'events': event_list})
if not response.ok:
raise Exception(response.text)
else:
spider.logger.info(f'Saved {len(event_list)} events for {event_list[0]["organization"]}')
Classes
class EventBuildPipeline (*args, **kwargs)
-
??? EventBuildPipeline.
Expand source code
class EventBuildPipeline: """??? EventBuildPipeline.""" def process_item(self, item, spider): """Given an item and a spider, update the item based on its url""" spider.event_manager.update(item['url'], item) return item
Methods
def process_item(self, item, spider)
-
Given an item and a spider, update the item based on its url
Expand source code
def process_item(self, item, spider): """Given an item and a spider, update the item based on its url""" spider.event_manager.update(item['url'], item) return item
class EventSavePipeline
-
Expand source code
class EventSavePipeline: def __init__(self): self.session = HttpUtils.get_session() def close_spider(self, spider): if len(spider.event_manager.events) == 0: spider.logger.info(f'No data returned for ' + spider.base_url) else: self.save_events(spider) if config.run_scheduler: spider.notify_spider_complete() def save_events(self, spider): """??? Save the given events found by a spider to the database.""" event_list = spider.event_manager.to_dicts() new_hash = ObjectHash.create_hash(event_list) spider.logger.info(f'Found {len(event_list)} events for {event_list[0]["organization"]}.') if new_hash == ObjectHash.get(spider.identifier): spider.logger.info(f'Nothing to update.') return ObjectHash.set(spider.identifier, new_hash) if spider.is_errored: spider.logger.info('Errors occurred during processing so events will not be saved') else: response = self.session.post(config.put_events, json={'events': event_list}) if not response.ok: raise Exception(response.text) else: spider.logger.info(f'Saved {len(event_list)} events for {event_list[0]["organization"]}')
Methods
def close_spider(self, spider)
-
Expand source code
def close_spider(self, spider): if len(spider.event_manager.events) == 0: spider.logger.info(f'No data returned for ' + spider.base_url) else: self.save_events(spider) if config.run_scheduler: spider.notify_spider_complete()
def save_events(self, spider)
-
??? Save the given events found by a spider to the database.
Expand source code
def save_events(self, spider): """??? Save the given events found by a spider to the database.""" event_list = spider.event_manager.to_dicts() new_hash = ObjectHash.create_hash(event_list) spider.logger.info(f'Found {len(event_list)} events for {event_list[0]["organization"]}.') if new_hash == ObjectHash.get(spider.identifier): spider.logger.info(f'Nothing to update.') return ObjectHash.set(spider.identifier, new_hash) if spider.is_errored: spider.logger.info('Errors occurred during processing so events will not be saved') else: response = self.session.post(config.put_events, json={'events': event_list}) if not response.ok: raise Exception(response.text) else: spider.logger.info(f'Saved {len(event_list)} events for {event_list[0]["organization"]}')
class EventTransformPipeline
-
??? EventTransformPipeline:
Expand source code
class EventTransformPipeline: """??? EventTransformPipeline: """ def __init__(self): self.time_utils = TimeUtils() def process_item(self, item, spider): item['organization'] = spider.organization if 'event_time' in item: item['event_time']['date_format'] = spider.date_format loader = EventLoader(**item) # see if there is a custom filter for the item if not spider.item_filter(item): raise DropItem('Custom item filter did not allow this event') if 'event_time' in loader.item: time = loader.item['event_time'] if self.time_utils.time_range_is_between(time['start_timestamp'], time['end_timestamp'], spider.start_timestamp, spider.end_timestamp): return loader.item else: raise DropItem('Event is not in the configured timeframe') else: return loader.item
Methods
def process_item(self, item, spider)
-
Expand source code
def process_item(self, item, spider): item['organization'] = spider.organization if 'event_time' in item: item['event_time']['date_format'] = spider.date_format loader = EventLoader(**item) # see if there is a custom filter for the item if not spider.item_filter(item): raise DropItem('Custom item filter did not allow this event') if 'event_time' in loader.item: time = loader.item['event_time'] if self.time_utils.time_range_is_between(time['start_timestamp'], time['end_timestamp'], spider.start_timestamp, spider.end_timestamp): return loader.item else: raise DropItem('Event is not in the configured timeframe') else: return loader.item
class GeocodePipeline
-
Get the Geocodes from the parsed address, if an address was found.
Expand source code
class GeocodePipeline: """Get the Geocodes from the parsed address, if an address was found.""" def __init__(self): self.session = HttpUtils.get_session() def process_item(self, item, spider): if 'address' in item: try: params = { 'address': item['address'], 'lat': item['lat'] if 'lat' in item else None, 'lon': item['lon'] if 'lon' in item else None } geocode = self.session.get(config.get_geocode, params=params) geocode_json = geocode.json() item['geocode_id'] = geocode_json['id'] if geocode_json['lat'] == None: spider.logger.warning(f'No geocode response for address {item["address"]}') except Exception as e: spider.logger.warning(f'Exception while getting geocode for address {item["address"]}: {e}') return item
Methods
def process_item(self, item, spider)
-
Expand source code
def process_item(self, item, spider): if 'address' in item: try: params = { 'address': item['address'], 'lat': item['lat'] if 'lat' in item else None, 'lon': item['lon'] if 'lon' in item else None } geocode = self.session.get(config.get_geocode, params=params) geocode_json = geocode.json() item['geocode_id'] = geocode_json['id'] if geocode_json['lat'] == None: spider.logger.warning(f'No geocode response for address {item["address"]}') except Exception as e: spider.logger.warning(f'Exception while getting geocode for address {item["address"]}: {e}') return item