Module event_processor.base.api_base

Expand source code
import json
import time

from event_processor.base.aggregator_base import AggregatorBase
from event_processor.util.cache_call import cache_call
from event_processor.util.http_utils import HttpUtils
from event_processor.config import config


class ApiBase(AggregatorBase):
    # Placeholder values that can be used if no request needs to be made through Scrapy
    allowed_domains = ['wikipedia.org','en.wikipedia.org']
    start_urls = ['https://www.wikipedia.org/']
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.session = HttpUtils.get_session({
            'Accept-Encoding': 'gzip, deflate, br',
            'Accept-Language': 'en-US,en;q=0.9',
            'Connection': 'keep-alive'
        })

    def wait(self, sleep_time=None):
        if sleep_time == None:
            time.sleep(config.api_delay_seconds)
        else:
            time.sleep(sleep_time)

    @cache_call
    def get_response(self, url=None, endpoint='', request_params=None, headers=None, method='GET'):
        method = method.upper()
        if url == None:
            url = self.base_url + endpoint
        if method == 'GET':
            response = self.session.get(url, params = request_params, headers = headers)
        elif method == 'POST':
            response = self.session.post(url, json = request_params, headers = headers)
        if not response.ok:
            raise ValueError(response.text)
        return response

    def parse_response_json(self, response):
        loads = json.loads(response.content)
        if isinstance(loads, list):
            # Don't return an array if it only contains one element
            return loads if (len(loads) != 1) else loads[0]
        return loads
        
    def get_response_json(self, url=None, endpoint='', request_params=None, property_to_return=None, method='GET'):
        response = self.get_response(url, endpoint, request_params, {'Accept': 'application/json, text/javascript, */*; q=0.01'}, method)
        if not response.ok:
            raise ValueError(response.text)
        response_json = self.parse_response_json(response)
        return response_json if property_to_return == None else response_json[property_to_return]

    @cache_call
    def get_response_graphql(self, url=None, endpoint='', gql_query=None, params=None):
        if params is not None:
            params['query'] = gql_query
        else:
            params = {'query': gql_query}
        response = self.get_response_json(url, endpoint, params, 'data', 'POST')
        return response

    def get_events(self):
        # Override me
        pass

Classes

class ApiBase (*args, **kwargs)
Expand source code
class ApiBase(AggregatorBase):
    # Placeholder values that can be used if no request needs to be made through Scrapy
    allowed_domains = ['wikipedia.org','en.wikipedia.org']
    start_urls = ['https://www.wikipedia.org/']
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.session = HttpUtils.get_session({
            'Accept-Encoding': 'gzip, deflate, br',
            'Accept-Language': 'en-US,en;q=0.9',
            'Connection': 'keep-alive'
        })

    def wait(self, sleep_time=None):
        if sleep_time == None:
            time.sleep(config.api_delay_seconds)
        else:
            time.sleep(sleep_time)

    @cache_call
    def get_response(self, url=None, endpoint='', request_params=None, headers=None, method='GET'):
        method = method.upper()
        if url == None:
            url = self.base_url + endpoint
        if method == 'GET':
            response = self.session.get(url, params = request_params, headers = headers)
        elif method == 'POST':
            response = self.session.post(url, json = request_params, headers = headers)
        if not response.ok:
            raise ValueError(response.text)
        return response

    def parse_response_json(self, response):
        loads = json.loads(response.content)
        if isinstance(loads, list):
            # Don't return an array if it only contains one element
            return loads if (len(loads) != 1) else loads[0]
        return loads
        
    def get_response_json(self, url=None, endpoint='', request_params=None, property_to_return=None, method='GET'):
        response = self.get_response(url, endpoint, request_params, {'Accept': 'application/json, text/javascript, */*; q=0.01'}, method)
        if not response.ok:
            raise ValueError(response.text)
        response_json = self.parse_response_json(response)
        return response_json if property_to_return == None else response_json[property_to_return]

    @cache_call
    def get_response_graphql(self, url=None, endpoint='', gql_query=None, params=None):
        if params is not None:
            params['query'] = gql_query
        else:
            params = {'query': gql_query}
        response = self.get_response_json(url, endpoint, params, 'data', 'POST')
        return response

    def get_events(self):
        # Override me
        pass

Ancestors

Subclasses

Class variables

var allowed_domains

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

var start_urls

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

Methods

def get_events(self)
Expand source code
def get_events(self):
    # Override me
    pass
def get_response(*args, **kwargs)
Expand source code
def try_call(*args, **kwargs):
    try:
        return cache.cache('web_call', expire=config.api_cache_expiration)(target)(*args, **kwargs)
    except Exception as e:
        logging.getLogger('scrapy').warning('Exception while calling cache: ' + str(e))
    return target(*args, **kwargs)
def get_response_graphql(*args, **kwargs)
Expand source code
def try_call(*args, **kwargs):
    try:
        return cache.cache('web_call', expire=config.api_cache_expiration)(target)(*args, **kwargs)
    except Exception as e:
        logging.getLogger('scrapy').warning('Exception while calling cache: ' + str(e))
    return target(*args, **kwargs)
def get_response_json(self, url=None, endpoint='', request_params=None, property_to_return=None, method='GET')
Expand source code
def get_response_json(self, url=None, endpoint='', request_params=None, property_to_return=None, method='GET'):
    response = self.get_response(url, endpoint, request_params, {'Accept': 'application/json, text/javascript, */*; q=0.01'}, method)
    if not response.ok:
        raise ValueError(response.text)
    response_json = self.parse_response_json(response)
    return response_json if property_to_return == None else response_json[property_to_return]
def parse_response_json(self, response)
Expand source code
def parse_response_json(self, response):
    loads = json.loads(response.content)
    if isinstance(loads, list):
        # Don't return an array if it only contains one element
        return loads if (len(loads) != 1) else loads[0]
    return loads
def wait(self, sleep_time=None)
Expand source code
def wait(self, sleep_time=None):
    if sleep_time == None:
        time.sleep(config.api_delay_seconds)
    else:
        time.sleep(sleep_time)

Inherited members