from __future__ import absolute_import
import io
import logging
import sys
import warnings
import zlib
from contextlib import contextmanager
from socket import error as SocketError
from socket import timeout as SocketTimeout
brotli = None
from . import util
from ._collections import HTTPHeaderDict
from .connection import BaseSSLError, HTTPException
from .exceptions import (
BodyNotHttplibCompatible,
DecodeError,
HTTPError,
IncompleteRead,
InvalidChunkLength,
InvalidHeader,
ProtocolError,
ReadTimeoutError,
ResponseNotChunked,
SSLError,
)
from .packages import six
from .util.response import is_fp_closed, is_response_to_head
log = logging.getLogger(__name__)
class DeflateDecoder(object):
def __init__(self):
self._first_try = True
self._data = b""
self._obj = zlib.decompressobj()
def __getattr__(self, name):
return getattr(self._obj, name)
def decompress(self, data):
if not data:
return data
if not self._first_try:
return self._obj.decompress(data)
self._data += data
try:
decompressed = self._obj.decompress(data)
if decompressed:
self._first_try = False
self._data = None
return decompressed
except zlib.error:
self._first_try = False
self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
try:
return self.decompress(self._data)
finally:
self._data = None
class GzipDecoderState(object):
FIRST_MEMBER = 0
OTHER_MEMBERS = 1
SWALLOW_DATA = 2
class GzipDecoder(object):
def __init__(self):
self._obj = zlib.decompressobj(16 + zlib.MAX_WBITS)
self._state = GzipDecoderState.FIRST_MEMBER
def __getattr__(self, name):
return getattr(self._obj, name)
def decompress(self, data):
ret = bytearray()
if self._state == GzipDecoderState.SWALLOW_DATA or not data:
return bytes(ret)
while True:
try:
ret += self._obj.decompress(data)
except zlib.error:
previous_state = self._state
self._state = GzipDecoderState.SWALLOW_DATA
if previous_state == GzipDecoderState.OTHER_MEMBERS:
return bytes(ret)
raise
data = self._obj.unused_data
if not data:
return bytes(ret)
self._state = GzipDecoderState.OTHER_MEMBERS
self._obj = zlib.decompressobj(16 + zlib.MAX_WBITS)
if brotli is not None:
class BrotliDecoder(object):
def __init__(self):
self._obj = brotli.Decompressor()
if hasattr(self._obj, "decompress"):
self.decompress = self._obj.decompress
else:
self.decompress = self._obj.process
def flush(self):
if hasattr(self._obj, "flush"):
return self._obj.flush()
return b""
class MultiDecoder(object):
def __init__(self, modes):
self._decoders = [_get_decoder(m.strip()) for m in modes.split(",")]
def flush(self):
return self._decoders[0].flush()
def decompress(self, data):
for d in reversed(self._decoders):
data = d.decompress(data)
return data
def _get_decoder(mode):
if "," in mode:
return MultiDecoder(mode)
if mode == "gzip":
return GzipDecoder()
if brotli is not None and mode == "br":
return BrotliDecoder()
return DeflateDecoder()
class HTTPResponse(io.IOBase):
CONTENT_DECODERS = ["gzip", "deflate"]
if brotli is not None:
CONTENT_DECODERS += ["br"]
REDIRECT_STATUSES = [301, 302, 303, 307, 308]
def __init__(
self,
body="",
headers=None,
status=0,
version=0,
reason=None,
strict=0,
preload_content=True,
decode_content=True,
original_response=None,
pool=None,
connection=None,
msg=None,
retries=None,
enforce_content_length=False,
request_method=None,
request_url=None,
auto_close=True,
):
if isinstance(headers, HTTPHeaderDict):
self.headers = headers
else:
self.headers = HTTPHeaderDict(headers)
self.status = status
self.version = version
self.reason = reason
self.strict = strict
self.decode_content = decode_content
self.retries = retries
self.enforce_content_length = enforce_content_length
self.auto_close = auto_close
self._decoder = None
self._body = None
self._fp = None
self._original_response = original_response
self._fp_bytes_read = 0
self.msg = msg
self._request_url = request_url
if body and isinstance(body, (six.string_types, bytes)):
self._body = body
self._pool = pool
self._connection = connection
if hasattr(body, "read"):
self._fp = body
self.chunked = False
self.chunk_left = None
tr_enc = self.headers.get("transfer-encoding", "").lower()
encodings = (enc.strip() for enc in tr_enc.split(","))
if "chunked" in encodings:
self.chunked = True
self.length_remaining = self._init_length(request_method)
if preload_content and not self._body:
self._body = self.read(decode_content=decode_content)
def get_redirect_location(self):
if self.status in self.REDIRECT_STATUSES:
return self.headers.get("location")
return False
def release_conn(self):
if not self._pool or not self._connection:
return
self._pool._put_conn(self._connection)
self._connection = None
def drain_conn(self):
try:
self.read()
except (HTTPError, SocketError, BaseSSLError, HTTPException):
pass
@property
def data(self):
if self._body:
return self._body
if self._fp:
return self.read(cache_content=True)
@property
def connection(self):
return self._connection
def isclosed(self):
return is_fp_closed(self._fp)
def tell(self):
return self._fp_bytes_read
def _init_length(self, request_method):
length = self.headers.get("content-length")
if length is not None:
if self.chunked:
log.warning(
"Received response with both Content-Length and "
"Transfer-Encoding set. This is expressly forbidden "
"by RFC 7230 sec 3.3.2. Ignoring Content-Length and "
"attempting to process response as Transfer-Encoding: "
"chunked."
)
return None
try:
lengths = set([int(val) for val in length.split(",")])
if len(lengths) > 1:
raise InvalidHeader(
"Content-Length contained multiple "
"unmatching values (%s)" % length
)
length = lengths.pop()
except ValueError:
length = None
else:
if length < 0:
length = None
try:
status = int(self.status)
except ValueError:
status = 0
if status in (204, 304) or 100 <= status < 200 or request_method == "HEAD":
length = 0
return length
def _init_decoder(self):
content_encoding = self.headers.get("content-encoding", "").lower()
if self._decoder is None:
if content_encoding in self.CONTENT_DECODERS:
self._decoder = _get_decoder(content_encoding)
elif "," in content_encoding:
encodings = [
e.strip()
for e in content_encoding.split(",")
if e.strip() in self.CONTENT_DECODERS
]
if len(encodings):
self._decoder = _get_decoder(content_encoding)
DECODER_ERROR_CLASSES = (IOError, zlib.error)
if brotli is not None:
DECODER_ERROR_CLASSES += (brotli.error,)
def _decode(self, data, decode_content, flush_decoder):
if not decode_content:
return data
try:
if self._decoder:
data = self._decoder.decompress(data)
except self.DECODER_ERROR_CLASSES as e:
content_encoding = self.headers.get("content-encoding", "").lower()
raise DecodeError(
"Received response with content-encoding: %s, but "
"failed to decode it." % content_encoding,
e,
)
if flush_decoder:
data += self._flush_decoder()
return data
def _flush_decoder(self):
if self._decoder:
buf = self._decoder.decompress(b"")
return buf + self._decoder.flush()
return b""
@contextmanager
def _error_catcher(self):
clean_exit = False
try:
try:
yield
except SocketTimeout:
raise ReadTimeoutError(self._pool, None, "Read timed out.")
except BaseSSLError as e:
if "read operation timed out" not in str(e):
raise SSLError(e)
raise ReadTimeoutError(self._pool, None, "Read timed out.")
except (HTTPException, SocketError) as e:
raise ProtocolError("Connection broken: %r" % e, e)
clean_exit = True
finally:
if not clean_exit:
if self._original_response:
self._original_response.close()
if self._connection:
self._connection.close()
if self._original_response and self._original_response.isclosed():
self.release_conn()
def _fp_read(self, amt):
assert self._fp
c_int_max = 2 ** 31 - 1
if (
(
(amt and amt > c_int_max)
or (self.length_remaining and self.length_remaining > c_int_max)
)
and not util.IS_SECURETRANSPORT
and (util.IS_PYOPENSSL or sys.version_info < (3, 10))
):
buffer = io.BytesIO()
max_chunk_amt = 2 ** 28
while amt is None or amt != 0:
if amt is not None:
chunk_amt = min(amt, max_chunk_amt)
amt -= chunk_amt
else:
chunk_amt = max_chunk_amt
data = self._fp.read(chunk_amt)
if not data:
break
buffer.write(data)
del data return buffer.getvalue()
else:
return self._fp.read(amt) if amt is not None else self._fp.read()
def read(self, amt=None, decode_content=None, cache_content=False):
self._init_decoder()
if decode_content is None:
decode_content = self.decode_content
if self._fp is None:
return
flush_decoder = False
fp_closed = getattr(self._fp, "closed", False)
with self._error_catcher():
data = self._fp_read(amt) if not fp_closed else b""
if amt is None:
flush_decoder = True
else:
cache_content = False
if (
amt != 0 and not data
): self._fp.close()
flush_decoder = True
if self.enforce_content_length and self.length_remaining not in (
0,
None,
):
raise IncompleteRead(self._fp_bytes_read, self.length_remaining)
if data:
self._fp_bytes_read += len(data)
if self.length_remaining is not None:
self.length_remaining -= len(data)
data = self._decode(data, decode_content, flush_decoder)
if cache_content:
self._body = data
return data
def stream(self, amt=2 ** 16, decode_content=None):
if self.chunked and self.supports_chunked_reads():
for line in self.read_chunked(amt, decode_content=decode_content):
yield line
else:
while not is_fp_closed(self._fp):
data = self.read(amt=amt, decode_content=decode_content)
if data:
yield data
@classmethod
def from_httplib(ResponseCls, r, **response_kw):
headers = r.msg
if not isinstance(headers, HTTPHeaderDict):
if six.PY2:
headers = HTTPHeaderDict.from_httplib(headers)
else:
headers = HTTPHeaderDict(headers.items())
strict = getattr(r, "strict", 0)
resp = ResponseCls(
body=r,
headers=headers,
status=r.status,
version=r.version,
reason=r.reason,
strict=strict,
original_response=r,
**response_kw
)
return resp
def getheaders(self):
warnings.warn(
"HTTPResponse.getheaders() is deprecated and will be removed "
"in urllib3 v2.1.0. Instead access HTTPResponse.headers directly.",
category=DeprecationWarning,
stacklevel=2,
)
return self.headers
def getheader(self, name, default=None):
warnings.warn(
"HTTPResponse.getheader() is deprecated and will be removed "
"in urllib3 v2.1.0. Instead use HTTPResponse.headers.get(name, default).",
category=DeprecationWarning,
stacklevel=2,
)
return self.headers.get(name, default)
def info(self):
return self.headers
def close(self):
if not self.closed:
self._fp.close()
if self._connection:
self._connection.close()
if not self.auto_close:
io.IOBase.close(self)
@property
def closed(self):
if not self.auto_close:
return io.IOBase.closed.__get__(self)
elif self._fp is None:
return True
elif hasattr(self._fp, "isclosed"):
return self._fp.isclosed()
elif hasattr(self._fp, "closed"):
return self._fp.closed
else:
return True
def fileno(self):
if self._fp is None:
raise IOError("HTTPResponse has no file to get a fileno from")
elif hasattr(self._fp, "fileno"):
return self._fp.fileno()
else:
raise IOError(
"The file-like object this HTTPResponse is wrapped "
"around has no file descriptor"
)
def flush(self):
if (
self._fp is not None
and hasattr(self._fp, "flush")
and not getattr(self._fp, "closed", False)
):
return self._fp.flush()
def readable(self):
return True
def readinto(self, b):
temp = self.read(len(b))
if len(temp) == 0:
return 0
else:
b[: len(temp)] = temp
return len(temp)
def supports_chunked_reads(self):
return hasattr(self._fp, "fp")
def _update_chunk_length(self):
if self.chunk_left is not None:
return
line = self._fp.fp.readline()
line = line.split(b";", 1)[0]
try:
self.chunk_left = int(line, 16)
except ValueError:
self.close()
raise InvalidChunkLength(self, line)
def _handle_chunk(self, amt):
returned_chunk = None
if amt is None:
chunk = self._fp._safe_read(self.chunk_left)
returned_chunk = chunk
self._fp._safe_read(2) self.chunk_left = None
elif amt < self.chunk_left:
value = self._fp._safe_read(amt)
self.chunk_left = self.chunk_left - amt
returned_chunk = value
elif amt == self.chunk_left:
value = self._fp._safe_read(amt)
self._fp._safe_read(2) self.chunk_left = None
returned_chunk = value
else: returned_chunk = self._fp._safe_read(self.chunk_left)
self._fp._safe_read(2) self.chunk_left = None
return returned_chunk
def read_chunked(self, amt=None, decode_content=None):
self._init_decoder()
if not self.chunked:
raise ResponseNotChunked(
"Response is not chunked. "
"Header 'transfer-encoding: chunked' is missing."
)
if not self.supports_chunked_reads():
raise BodyNotHttplibCompatible(
"Body should be http.client.HTTPResponse like. "
"It should have have an fp attribute which returns raw chunks."
)
with self._error_catcher():
if self._original_response and is_response_to_head(self._original_response):
self._original_response.close()
return
if self._fp.fp is None:
return
while True:
self._update_chunk_length()
if self.chunk_left == 0:
break
chunk = self._handle_chunk(amt)
decoded = self._decode(
chunk, decode_content=decode_content, flush_decoder=False
)
if decoded:
yield decoded
if decode_content:
decoded = self._flush_decoder()
if decoded: yield decoded
while True:
line = self._fp.fp.readline()
if not line:
break
if line == b"\r\n":
break
if self._original_response:
self._original_response.close()
def geturl(self):
if self.retries is not None and len(self.retries.history):
return self.retries.history[-1].redirect_location
else:
return self._request_url
def __iter__(self):
buffer = []
for chunk in self.stream(decode_content=True):
if b"\n" in chunk:
chunk = chunk.split(b"\n")
yield b"".join(buffer) + chunk[0] + b"\n"
for x in chunk[1:-1]:
yield x + b"\n"
if chunk[-1]:
buffer = [chunk[-1]]
else:
buffer = []
else:
buffer.append(chunk)
if buffer:
yield b"".join(buffer)