Module event_processor.apis.ical_reader

Expand source code
from icalendar import Calendar
from event_processor.util.cache_call import cache_call
from datetime import datetime, date
from event_processor.util.http_utils import HttpUtils

class ICal:
    """??? Reader for any events that are published as ICal feeds."""
    def __init__(self, cal, default_timezone):
        self.cal = cal
        self.default_timezone = default_timezone

    @staticmethod
    def from_file(filename, default_timezone):
        with open(filename) as f:
            data = f.read()
        return ICal(Calendar.from_ical(data), default_timezone)

    @staticmethod
    @cache_call
    def from_url(url, default_timezone):
        session = HttpUtils.get_session()
        r = session.get(url)
        return ICal(Calendar.from_ical(r.text), default_timezone)

    def parse_events(self):
        org = self.cal.get('X-WR-CALNAME', 'Unknown Organization')
        return [self.create_event(event, org) for event in self.cal.subcomponents if event.name == 'VEVENT']

    # TODO Double-check that unicode is handled correctly
    def create_event(self, event, org):
        start_time = int(self.localize_min(event.get('DTSTART', '').dt).timestamp())
        end_time = int(self.localize_max(event.get('DTEND', '').dt).timestamp())
        return {
            'event_time': {
                'start_timestamp': start_time,
                'end_timestamp': end_time,
            },
            'title': '' + event.get('SUMMARY', ''),
            'description': '' + event.get('DESCRIPTION', ''),
            'address': '' + event.get('LOCATION', ''),
            'url': '' + event.get('URL', ''),
            'organization': '' + org
        }

    def localize_min(self, time):
        return self.localize(time, datetime.min.time())

    def localize_max(self, time):
        return self.localize(time, datetime.max.time())

    def localize(self, time, default_time):
        if type(time) == date:
            time = datetime.combine(time, default_time)

        if time.tzinfo is None:
            return self.default_timezone.localize(time)
        return time

Classes

class ICal (cal, default_timezone)

??? Reader for any events that are published as ICal feeds.

Expand source code
class ICal:
    """??? Reader for any events that are published as ICal feeds."""
    def __init__(self, cal, default_timezone):
        self.cal = cal
        self.default_timezone = default_timezone

    @staticmethod
    def from_file(filename, default_timezone):
        with open(filename) as f:
            data = f.read()
        return ICal(Calendar.from_ical(data), default_timezone)

    @staticmethod
    @cache_call
    def from_url(url, default_timezone):
        session = HttpUtils.get_session()
        r = session.get(url)
        return ICal(Calendar.from_ical(r.text), default_timezone)

    def parse_events(self):
        org = self.cal.get('X-WR-CALNAME', 'Unknown Organization')
        return [self.create_event(event, org) for event in self.cal.subcomponents if event.name == 'VEVENT']

    # TODO Double-check that unicode is handled correctly
    def create_event(self, event, org):
        start_time = int(self.localize_min(event.get('DTSTART', '').dt).timestamp())
        end_time = int(self.localize_max(event.get('DTEND', '').dt).timestamp())
        return {
            'event_time': {
                'start_timestamp': start_time,
                'end_timestamp': end_time,
            },
            'title': '' + event.get('SUMMARY', ''),
            'description': '' + event.get('DESCRIPTION', ''),
            'address': '' + event.get('LOCATION', ''),
            'url': '' + event.get('URL', ''),
            'organization': '' + org
        }

    def localize_min(self, time):
        return self.localize(time, datetime.min.time())

    def localize_max(self, time):
        return self.localize(time, datetime.max.time())

    def localize(self, time, default_time):
        if type(time) == date:
            time = datetime.combine(time, default_time)

        if time.tzinfo is None:
            return self.default_timezone.localize(time)
        return time

Static methods

def from_file(filename, default_timezone)
Expand source code
@staticmethod
def from_file(filename, default_timezone):
    with open(filename) as f:
        data = f.read()
    return ICal(Calendar.from_ical(data), default_timezone)
def from_url(*args, **kwargs)
Expand source code
def try_call(*args, **kwargs):
    try:
        return cache.cache('web_call', expire=config.api_cache_expiration)(target)(*args, **kwargs)
    except Exception as e:
        logging.getLogger('scrapy').warning('Exception while calling cache: ' + str(e))
    return target(*args, **kwargs)

Methods

def create_event(self, event, org)
Expand source code
def create_event(self, event, org):
    start_time = int(self.localize_min(event.get('DTSTART', '').dt).timestamp())
    end_time = int(self.localize_max(event.get('DTEND', '').dt).timestamp())
    return {
        'event_time': {
            'start_timestamp': start_time,
            'end_timestamp': end_time,
        },
        'title': '' + event.get('SUMMARY', ''),
        'description': '' + event.get('DESCRIPTION', ''),
        'address': '' + event.get('LOCATION', ''),
        'url': '' + event.get('URL', ''),
        'organization': '' + org
    }
def localize(self, time, default_time)
Expand source code
def localize(self, time, default_time):
    if type(time) == date:
        time = datetime.combine(time, default_time)

    if time.tzinfo is None:
        return self.default_timezone.localize(time)
    return time
def localize_max(self, time)
Expand source code
def localize_max(self, time):
    return self.localize(time, datetime.max.time())
def localize_min(self, time)
Expand source code
def localize_min(self, time):
    return self.localize(time, datetime.min.time())
def parse_events(self)
Expand source code
def parse_events(self):
    org = self.cal.get('X-WR-CALNAME', 'Unknown Organization')
    return [self.create_event(event, org) for event in self.cal.subcomponents if event.name == 'VEVENT']