# -*- coding: utf-8 -*-
import asyncio
import logging
import sys
import aiohttp
import async_timeout
if sys.version_info < (3, 8): # pragma: no cover
from concurrent.futures import CancelledError
else:
from asyncio.exceptions import CancelledError
from . import data_processing, exceptions, utils
from .exceptions import StreamLimit
from .general import rate_limit_notices
ClientPayloadError = aiohttp.ClientPayloadError
ClientConnectionError = aiohttp.ClientConnectionError
RECONNECTION_TIMEOUT = 5
MAX_RECONNECTION_TIMEOUT = 320
DISCONNECTION_TIMEOUT = 0.25
ERROR_TIMEOUT = DISCONNECTION_TIMEOUT
MAX_DISCONNECTION_TIMEOUT = 16
ENHANCE_YOUR_CALM_TIMEOUT = 60
NORMAL = 0
DISCONNECTION = 1
ERROR = DISCONNECTION
RECONNECTION = 2
ENHANCE_YOUR_CALM = 3
EOF = 4
HandledErrors = asyncio.TimeoutError, ClientPayloadError, TimeoutError
logger = logging.getLogger(__name__)
[docs]class StreamResponse:
"""
Asynchronous iterator for streams
Parameters
----------
*args : list, optional
Positional arguments of the request
client : .client.BasePeonyClient
client used to make the request
session : aiohttp.ClientSession, optional
Session used by the request
loads : function, optional
function used to decode the JSON data received
timeout : int, optional
Timeout on connection
kwargs : dict, optional
Keyword parameters of the request
"""
def __init__(
self, client, session=None, loads=data_processing.loads, timeout=10, **kwargs
):
self.client = client
self.session = session
self.loads = loads
self.timeout = timeout
self.kwargs = kwargs
self.response = None
self._reconnecting = False
self._state = NORMAL
self._error_timeout = 0
async def _connect(self):
"""
Connect to the stream
Returns
-------
asyncio.coroutine
The streaming response
"""
logger.debug("connecting to the stream")
await self.client.setup
if self.session is None:
self.session = self.client._session
kwargs = await self.client.headers.prepare_request(**self.kwargs)
request = self.client.error_handler(self.session.request)
return await request(timeout=0, **kwargs)
[docs] async def connect(self):
"""
Create the connection
Returns
-------
self
Raises
------
exception.PeonyException
On a response status in 4xx that are not status 420 or 429
Also on statuses in 1xx or 3xx since this should not be the status
received here
"""
with async_timeout.timeout(self.timeout):
self.response = await self._connect()
if self.response.status in range(200, 300):
self._error_timeout = 0
self.state = NORMAL
elif self.response.status == 500:
self.state = DISCONNECTION
elif self.response.status in range(501, 600):
self.state = RECONNECTION
elif self.response.status in (420, 429):
self.state = ENHANCE_YOUR_CALM
else:
logger.debug("raising error during stream connection")
await exceptions.throw(
self.response, loads=self.client._loads, url=self.kwargs["url"]
)
logger.debug("stream state: %d" % self.state)
def __aiter__(self):
return self
if sys.version_info < (3, 5, 2): # pragma: no cover
__aiter__ = asyncio.coroutine(__aiter__)
async def __anext__(self):
"""
Decode each line using json
Returns
-------
dict
Decoded JSON data
"""
if self.response is None:
logger.info("first connection to the stream")
await self.connect()
return {"connected": True}
line = b""
try:
if self.state != NORMAL:
if self._reconnecting:
return await self.restart_stream()
else:
return await self.init_restart()
while not line:
if self.response.content.at_eof():
logger.debug("Received EOF")
self.state = EOF
return await self.init_restart()
with async_timeout.timeout(90):
line = await self.response.content.readline()
line = line.strip(b"\r\n")
logger.debug("received data: %s" % line)
if line in rate_limit_notices:
logger.debug("raising StreamLimit")
raise StreamLimit(line)
logger.debug("decoding data")
return self.loads(line)
except HandledErrors as e:
logger.debug("handling error %s: %s" % (e.__class__.__name__, e))
self.state = ERROR
return await self.init_restart()
except ClientConnectionError:
logger.debug("Disconnected from stream")
self.state = DISCONNECTION
return await self.init_restart()
except CancelledError:
logger.debug("Stopping stream")
raise
except Exception as e:
self.state = ERROR
return await self.init_restart(error=e)
@property
def state(self):
return self._state
@state.setter
def state(self, value):
if value == NORMAL or self.state < value:
self._state = value
[docs] async def init_restart(self, error=None):
"""
Restart the stream on error
Parameters
----------
error : bool, optional
Whether to print the error or not
"""
if error:
utils.log_error(logger=logger)
if self.state == DISCONNECTION:
if self._error_timeout < MAX_DISCONNECTION_TIMEOUT:
self._error_timeout += DISCONNECTION_TIMEOUT
logger.info(
"The stream was disconnected, will reconnect in %ss"
% self._error_timeout
)
elif self.state == RECONNECTION:
if self._error_timeout < RECONNECTION_TIMEOUT:
self._error_timeout = RECONNECTION_TIMEOUT
elif self._error_timeout < MAX_RECONNECTION_TIMEOUT:
self._error_timeout *= 2
logger.info(
"Could not connect to the stream, reconnection in %ss"
% self._error_timeout
)
elif self.state == ENHANCE_YOUR_CALM:
if self._error_timeout < ENHANCE_YOUR_CALM_TIMEOUT:
self._error_timeout = ENHANCE_YOUR_CALM_TIMEOUT
else:
self._error_timeout *= 2
logger.warning(
"Enhance Your Calm response received from Twitter. "
"If you didn't restart your program frenetically "
"then there is probably something wrong with it. "
"Make sure you are not opening too many connections"
" to the endpoint you are currently using by "
"checking out Twitter's Streaming API "
"documentation: "
"https://dev.twitter.com/streaming/overview\n"
"The stream will restart in %ss." % self._error_timeout
)
elif self.state == EOF:
pass # no timeout
else:
raise RuntimeError("Incorrect state: %d" % self.state)
self._reconnecting = True
return {"reconnecting_in": self._error_timeout, "error": error}
[docs] async def restart_stream(self):
"""
Restart the stream on error
"""
assert self.response is not None
await self.response.release()
await asyncio.sleep(self._error_timeout)
await self.connect()
logger.info("Reconnected to the stream")
self._reconnecting = False
return {"stream_restart": True}
def __enter__(self):
"""
Prepare the stream
Returns
-------
StreamResponse
The stream iterator
"""
return self
async def __aenter__(self):
"""
Prepare the stream
Returns
-------
StreamResponse
The stream iterator
"""
await self.client.setup
return self
def __exit__(self, *args):
"""Close the response on error"""
if getattr(self, "response", None) is not None:
assert self.response is not None
if not self.response.closed:
logger.debug("Closing the stream")
self.response.close()
async def __aexit__(self, *args):
"""Close the response on error"""
self.__exit__(*args)