# -*- coding: utf-8 -*-
"""
Peony Clients
:class:`BasePeonyClient` only handles requests while
:class:`PeonyClient` adds some methods that could help when using
the Twitter APIs, with a method to upload a media
"""
import asyncio
import io
import logging
import sys
import warnings
from contextlib import suppress
from typing import Dict, Set
from urllib.parse import urlparse
import aiohttp
if sys.version_info < (3, 8): # pragma: no cover
from concurrent.futures import CancelledError
else:
from asyncio.exceptions import CancelledError
from . import data_processing, exceptions, general, oauth, utils
from .api import APIPath
from .commands import EventStreams, task
from .exceptions import PeonyUnavailableMethod
from .oauth import OAuth1Headers
from .stream import StreamResponse
logger = logging.getLogger(__name__)
[docs]class BasePeonyClient(metaclass=MetaPeonyClient):
"""
Access the Twitter API easily
You can create tasks by decorating a function from a child
class with :class:`peony.task`
You also attach a :class:`EventStream` to a subclass using
the :func:`event_stream` of the subclass
After creating an instance of the child class you will be able
to run all the tasks easily by executing :func:`get_tasks`
Parameters
----------
base_url : str, optional
Format of the url for all the requests
api_version : str, optional
Default API version
suffix : str, optional
Default suffix of API endpoints
loads : function, optional
Function used to load JSON data
error_handler : function, optional
Requests decorator
session : aiohttp.ClientSession, optional
Session to use to make requests
proxy : str
Proxy used with every request
compression : bool, optional
Activate data compression on every requests, defaults to True
user_agent : str, optional
Set a custom user agent header
encoding : str, optional
text encoding of the response from the server
loop : event loop, optional
An event loop, if not specified :func:`asyncio.get_event_loop`
is called
"""
_tasks: Dict[str, Set[task]]
_streams: EventStreams
def __init__(
self,
consumer_key=None,
consumer_secret=None,
access_token=None,
access_token_secret=None,
bearer_token=None,
auth=None,
headers=None,
base_url=None,
api_version=None,
suffix=".json",
loads=data_processing.loads,
error_handler=utils.DefaultErrorHandler,
session=None,
proxy=None,
compression=True,
user_agent=None,
encoding=None,
loop=None,
**kwargs
):
if base_url is None:
self.base_url = general.twitter_base_api_url
else:
self.base_url = base_url
if api_version is None:
self.api_version = general.twitter_api_version
else:
self.api_version = api_version
if auth is None:
auth = OAuth1Headers
self.proxy = proxy
self._suffix = suffix
self.error_handler = error_handler
self.encoding = encoding
if encoding is not None:
def _loads(*args, **kwargs):
return loads(*args, encoding=encoding, **kwargs)
self._loads = _loads
else:
self._loads = loads
self.loop = asyncio.get_running_loop() if loop is None else loop
self._session = session
self._user_session = session is not None
self._gathered_tasks = None
if consumer_key is None or consumer_secret is None:
raise TypeError(
"missing 2 required arguments: 'consumer_key' and 'consumer_secret'"
)
# all the possible args required by headers in :mod:`peony.oauth`
kwargs = {
"consumer_key": consumer_key,
"consumer_secret": consumer_secret,
"access_token": access_token,
"access_token_secret": access_token_secret,
"bearer_token": bearer_token,
"compression": compression,
"user_agent": user_agent,
"headers": headers,
"client": self,
}
# get the args needed by the auth parameter on initialization
args = utils.get_args(auth.__init__, skip=1)
# keep only the arguments required by auth on init
kwargs = {key: value for key, value in kwargs.items() if key in args}
self.headers = auth(**kwargs)
self.setup = self.loop.create_task(self._setup())
async def _setup(self):
if self._session is None:
logger.debug("Creating session")
self._session = aiohttp.ClientSession()
@staticmethod
def _get_base_url(base_url, api, version):
"""
create the base url for the api
Parameters
----------
base_url : str
format of the base_url using {api} and {version}
api : str
name of the api to use
version : str
version of the api
Returns
-------
str
the base url of the api you want to use
"""
format_args = {}
if "{api}" in base_url:
if api == "":
base_url = base_url.replace("{api}.", "")
else:
format_args["api"] = api
if "{version}" in base_url:
if version == "":
base_url = base_url.replace("/{version}", "")
else:
format_args["version"] = version
return base_url.format(api=api, version=version)
def __getitem__(self, values):
"""
Access the api you want
This permits the use of any API you could know about
For most api you only need to type
>>> self[api] # api is the api you want to access
You can specify a custom api version using the syntax
>>> self[api, version] # version is the api version as a str
For more complex requests
>>> self[api, version, suffix, base_url]
Returns
-------
.api.BaseAPIPath
To access an API endpoint
"""
defaults = None, self.api_version, self._suffix, self.base_url
keys = ["api", "version", "suffix", "base_url"]
if isinstance(values, dict):
# set values in the right order
values = [values.get(key, defaults[i]) for i, key in enumerate(keys)]
elif isinstance(values, set):
raise TypeError(
"Cannot use a set to access an api, "
"please use a dict, a tuple or a list instead"
)
elif isinstance(values, str):
values = [values, *defaults[1:]]
elif isinstance(values, tuple):
if len(values) < len(keys):
padding = (None,) * (len(keys) - len(values))
values += padding
values = [
default if value is None else value
for value, default in zip(values, defaults)
if (value, default) != (None, None)
]
else:
raise TypeError(
"Could not create an endpoint from an object of "
"type " + values.__class__.__name__
)
api, version, suffix, base_url = values
base_url = self._get_base_url(base_url, api, version)
return APIPath([base_url], suffix=suffix, client=self)
__getattr__ = __getitem__
def __del__(self):
if self.loop.is_closed(): # pragma: no cover
pass
elif self.loop.is_running():
self.loop.create_task(self.close())
else:
self.loop.run_until_complete(self.close())
[docs] async def request(
self, method, url, future, headers=None, session=None, encoding=None, **kwargs
):
"""
Make requests to the REST API
Parameters
----------
future : asyncio.Future
Future used to return the response
method : str
Method to be used by the request
url : str
URL of the resource
headers : .oauth.PeonyHeaders
Custom headers (doesn't overwrite `Authorization` headers)
session : aiohttp.ClientSession, optional
Client session used to make the request
Returns
-------
data.PeonyResponse
Response to the request
"""
await self.setup
# prepare request arguments, particularly the headers
req_kwargs = await self.headers.prepare_request(
method=method, url=url, headers=headers, proxy=self.proxy, **kwargs
)
if encoding is None:
encoding = self.encoding
session = session if (session is not None) else self._session
logger.debug("making request with parameters: %s" % req_kwargs)
async with session.request(**req_kwargs) as response:
if response.status < 400:
data = await data_processing.read(
response, self._loads, encoding=encoding
)
future.set_result(
data_processing.PeonyResponse(
data=data,
headers=response.headers,
url=response.url,
request=req_kwargs,
)
)
else: # throw exception if status is not 2xx
await exceptions.throw(
response, loads=self._loads, encoding=encoding, url=url
)
[docs] def stream_request(self, method, url, headers=None, _session=None, *args, **kwargs):
"""
Make requests to the Streaming API
Parameters
----------
method : str
Method to be used by the request
url : str
URL of the resource
headers : dict
Custom headers (doesn't overwrite `Authorization` headers)
_session : aiohttp.ClientSession, optional
The session to use for this specific request, the session
given as argument of :meth:`__init__` is used by default
Returns
-------
.stream.StreamResponse
Stream context for the request
"""
return StreamResponse(
method=method,
url=url,
client=self,
headers=headers,
session=_session,
proxy=self.proxy,
**kwargs
)
[docs] @classmethod
def event_stream(cls, event_stream):
"""Decorator to attach an event stream to the class"""
cls._streams.append(event_stream)
return event_stream
def _get_tasks(self):
return [task(self) for task in self._tasks["tasks"]]
[docs] def get_tasks(self):
"""
Get the tasks attached to the instance
Returns
-------
list
List of tasks (:class:`asyncio.Task`)
"""
tasks = self._get_tasks()
tasks.extend(self._streams.get_tasks(self))
return tasks
[docs] async def run_tasks(self):
"""Run the tasks attached to the instance"""
tasks = self.get_tasks()
self._gathered_tasks = asyncio.gather(*tasks)
try:
await self._gathered_tasks
except CancelledError:
pass
[docs] async def arun(self):
try:
await self.run_tasks()
except KeyboardInterrupt:
pass
finally:
await self.close()
[docs] def run(self):
"""Run the tasks attached to the instance"""
self.loop.run_until_complete(self.arun())
def _get_close_tasks(self):
tasks = []
# cancel setup
if isinstance(self.setup, asyncio.Future):
if not self.setup.done():
async def cancel_setup():
self.setup.cancel()
try:
await self.setup
except CancelledError: # pragma: no cover
pass
tasks.append(self.loop.create_task(cancel_setup()))
# close currently running tasks
if self._gathered_tasks is not None:
async def cancel_tasks():
self._gathered_tasks.cancel()
try:
await self._gathered_tasks
except CancelledError:
pass
tasks.append(self.loop.create_task(cancel_tasks()))
return tasks
[docs] async def close(self):
"""properly close the client"""
tasks = self._get_close_tasks()
if tasks:
await asyncio.wait(tasks)
# close the session only if it was created by peony
if not self._user_session and self._session is not None:
with suppress(TypeError, AttributeError):
await self._session.close()
self._session = None
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
[docs]class PeonyClient(BasePeonyClient):
"""
A client with some useful methods for most usages
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = self.loop.create_task(self._get_user())
async def _get_user(self, init=False):
"""
create a ``user`` attribute with the response of the endpoint
https://api.twitter.com/1.1/account/verify_credentials.json
"""
api = self[
"api", general.twitter_api_version, ".json", general.twitter_base_api_url
]
if isinstance(self.headers, oauth.OAuth1Headers):
return await api.account.verify_credentials.get()
raise PeonyUnavailableMethod(
"user attribute is only available with OAuth 1 authentification."
)
def _get_close_tasks(self):
tasks = super()._get_close_tasks()
if not self.user.done():
async def cancel_user():
self.user.cancel()
try:
await self.user
except CancelledError: # pragma: no cover
pass
tasks.append(self.loop.create_task(cancel_user()))
return tasks
async def _chunked_upload(
self,
media,
media_size,
path=None,
media_type=None,
media_category=None,
chunk_size=2**20,
**params
):
"""
upload media in chunks
Parameters
----------
media : file object
a file object of the media
media_size : int
size of the media
path : str, optional
filename of the media
media_type : str, optional
mime type of the media
media_category : str, optional
twitter media category, must be used with ``media_type``
chunk_size : int, optional
size of a chunk in bytes
params : dict, optional
additional parameters of the request
Returns
-------
.data_processing.PeonyResponse
Response of the request
"""
if isinstance(media, bytes):
media = io.BytesIO(media)
chunk = media.read(chunk_size)
is_coro = asyncio.iscoroutine(chunk)
if is_coro:
chunk = await chunk
if media_type is None:
media_metadata = await utils.get_media_metadata(chunk, path)
media_type, media_category = media_metadata
elif media_category is None:
media_category = utils.get_category(media_type)
response = await self.upload.media.upload.post(
command="INIT",
total_bytes=media_size,
media_type=media_type,
media_category=media_category,
**params
)
media_id = response["media_id"]
i = 0
while chunk:
req = self.upload.media.upload.post(
command="APPEND", media_id=media_id, media=chunk, segment_index=i
)
if is_coro:
chunk, _ = await asyncio.gather(media.read(chunk_size), req)
else:
await req
chunk = media.read(chunk_size)
i += 1
status = await self.upload.media.upload.post(
command="FINALIZE", media_id=media_id
)
if "processing_info" in status:
while status["processing_info"].get("state") != "succeeded":
processing_info = status["processing_info"]
if processing_info.get("state") == "failed":
error = processing_info.get("error", {})
message = error.get("message", str(status))
raise exceptions.MediaProcessingError(
data=status, message=message, **params
)
delay = processing_info["check_after_secs"]
await asyncio.sleep(delay)
status = await self.upload.media.upload.get(
command="STATUS", media_id=media_id, **params
)
return response