123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- import os
- import requests
- import logging
- import contextlib
- from xml.etree import ElementTree
- from requests_oauthlib import OAuth1
- from .models import (Object, Download, Ingest, Investigation, User,
- AccessPoint)
- from .exceptions import (Timeout, PermissionError, NotFoundError,
- ArbitraryError, InvalidInput)
- BASE_URL = 'http://kitdm.anka.kit.edu:8080/KITDM/rest'
- KEY = os.getenv('ASTOR_RECO_KEY') or 'secret'
- SECRET = os.getenv('ASTOR_RECO_SECRET') or 'secret'
- TOKEN = os.getenv('ASTOR_RECO_TOKEN')
- TOKEN_SECRET = os.getenv('ASTOR_RECO_TOKEN_SECRET')
- def check_response(response):
- if response.status_code == 403:
- raise PermissionError()
- elif response.status_code == 404:
- raise NotFoundError()
- elif response.status_code == 412:
- raise InvalidInput()
- elif response.status_code != 200:
- raise ArbitraryError(response)
- @contextlib.contextmanager
- def check_timeouts():
- try:
- yield
- except requests.exceptions.Timeout:
- raise Timeout()
- class Client(object):
- def __init__(self, base_url=BASE_URL, key=KEY, secret=SECRET,
- token=TOKEN, token_secret=TOKEN_SECRET):
- self.session = requests.Session()
- self.session.auth = OAuth1(key, client_secret=secret,
- resource_owner_key=token,
- resource_owner_secret=token_secret,
- signature_method='PLAINTEXT')
- self.base_url = base_url
- self.log = logging.getLogger(__name__)
- def url(self, *args):
- return '/'.join([self.base_url] + [str(arg) for arg in args])
- def get_ingest_ids(self, limit=None):
- return self.get_collection('staging/ingests', 'id', limit)
- def get_ingests(self, limit=-1):
- return [Ingest(self, oid) for oid in self.get_ingest_ids(limit)]
- def get_investigation_ids(self, limit=None):
- return self.get_collection('basemetadata/investigations',
- 'investigationId', limit)
- def get_investigations(self, limit=-1):
- return [Investigation(self, oid) for oid
- in self.get_investigation_ids(limit)]
- def get_object_ids(self, limit=None, since=None):
- return self.get_collection('basemetadata/digitalObjects',
- 'baseId', limit)
- def get_objects(self, limit=-1, predicate=lambda o: True):
- objects = (Object(self, oid) for oid in self.get_object_ids(limit))
- return [o for o in objects if predicate(o)]
- def create_object(self, investigation_id, uploader, label=None, note=None):
- url = self.url('basemetadata/investigations', investigation_id,
- 'digitalObjects')
- root = self.post(url, label=label, uploaderId=uploader.id, note=note)
- return Object(self, -1, root=root)
- def delete_object(self, obj):
- self.delete(self.url('basemetadata/digitalObjects', obj.id))
- def get_download_ids(self, limit=None):
- return self.get_collection('staging/downloads', 'id', limit)
- def get_downloads(self, limit=-1):
- return [Download(self, oid) for oid in self.get_download_ids(limit)]
- def delete_ingest(self, ingest):
- self.delete(self.url('staging/ingests', ingest.id))
- def get_accesspoint_ids(self):
- return self.get_collection('staging/accesspoints', 'id')
- def get_accesspoints(self, scheme=None):
- aps = (AccessPoint(self, oid) for oid in self.get_accesspoint_ids())
- if scheme:
- return [ap for ap in aps if ap.url.startswith(scheme)]
- return list(aps)
- def get_collection(self, url, id_name, limit=None):
- params = {'results': limit} if limit else {}
- root = self.get(self.url(url), **params)
- entities = root.findall('./entities/entity/{}'.format(id_name))
- return [int(e.text) for e in entities]
- def get_user(self, oid):
- return User(self, oid)
- def delete(self, url):
- self.log.debug("DELETE {}".format(url))
- with check_timeouts():
- response = self.session.delete(url, timeout=5)
- check_response(response)
- def get(self, url, **params):
- self.log.debug("GET {}".format(url))
- with check_timeouts():
- response = self.session.get(url, params=params, timeout=5)
- check_response(response)
- return ElementTree.fromstring(response.text)
- def post(self, url, **params):
- self.log.debug("POST {}".format(url))
- with check_timeouts():
- response = self.session.post(url, data=params, timeout=5)
- check_response(response)
- return ElementTree.fromstring(response.text)
- def put(self, url, **params):
- self.log.debug("PUT {}".format(url))
- with check_timeouts():
- response = self.session.put(url, data=params, timeout=5)
- check_response(response)
|