from __future__ import absolute_import
import errno
import logging
import re
import socket
import sys
import warnings
from socket import error as SocketError
from socket import timeout as SocketTimeout
from .connection import (
BaseSSLError,
BrokenPipeError,
DummyConnection,
HTTPConnection,
HTTPException,
HTTPSConnection,
VerifiedHTTPSConnection,
port_by_scheme,
)
from .exceptions import (
ClosedPoolError,
EmptyPoolError,
HeaderParsingError,
HostChangedError,
InsecureRequestWarning,
LocationValueError,
MaxRetryError,
NewConnectionError,
ProtocolError,
ProxyError,
ReadTimeoutError,
SSLError,
TimeoutError,
)
from .packages import six
from .packages.six.moves import queue
from .request import RequestMethods
from .response import HTTPResponse
from .util.connection import is_connection_dropped
from .util.proxy import connection_requires_http_tunnel
from .util.queue import LifoQueue
from .util.request import set_file_position
from .util.response import assert_header_parsing
from .util.retry import Retry
from .util.ssl_match_hostname import CertificateError
from .util.timeout import Timeout
from .util.url import Url, _encode_target
from .util.url import _normalize_host as normalize_host
from .util.url import get_host, parse_url
try: import weakref
weakref_finalize = weakref.finalize
except AttributeError: from .packages.backports.weakref_finalize import weakref_finalize
xrange = six.moves.xrange
log = logging.getLogger(__name__)
_Default = object()
class ConnectionPool(object):
scheme = None
QueueCls = LifoQueue
def __init__(self, host, port=None):
if not host:
raise LocationValueError("No host specified.")
self.host = _normalize_host(host, scheme=self.scheme)
self._proxy_host = host.lower()
self.port = port
def __str__(self):
return "%s(host=%r, port=%r)" % (type(self).__name__, self.host, self.port)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
def close(self):
pass
_blocking_errnos = {errno.EAGAIN, errno.EWOULDBLOCK}
class HTTPConnectionPool(ConnectionPool, RequestMethods):
scheme = "http"
ConnectionCls = HTTPConnection
ResponseCls = HTTPResponse
def __init__(
self,
host,
port=None,
strict=False,
timeout=Timeout.DEFAULT_TIMEOUT,
maxsize=1,
block=False,
headers=None,
retries=None,
_proxy=None,
_proxy_headers=None,
_proxy_config=None,
**conn_kw
):
ConnectionPool.__init__(self, host, port)
RequestMethods.__init__(self, headers)
self.strict = strict
if not isinstance(timeout, Timeout):
timeout = Timeout.from_float(timeout)
if retries is None:
retries = Retry.DEFAULT
self.timeout = timeout
self.retries = retries
self.pool = self.QueueCls(maxsize)
self.block = block
self.proxy = _proxy
self.proxy_headers = _proxy_headers or {}
self.proxy_config = _proxy_config
for _ in xrange(maxsize):
self.pool.put(None)
self.num_connections = 0
self.num_requests = 0
self.conn_kw = conn_kw
if self.proxy:
self.conn_kw.setdefault("socket_options", [])
self.conn_kw["proxy"] = self.proxy
self.conn_kw["proxy_config"] = self.proxy_config
pool = self.pool
weakref_finalize(self, _close_pool_connections, pool)
def _new_conn(self):
self.num_connections += 1
log.debug(
"Starting new HTTP connection (%d): %s:%s",
self.num_connections,
self.host,
self.port or "80",
)
conn = self.ConnectionCls(
host=self.host,
port=self.port,
timeout=self.timeout.connect_timeout,
strict=self.strict,
**self.conn_kw
)
return conn
def _get_conn(self, timeout=None):
conn = None
try:
conn = self.pool.get(block=self.block, timeout=timeout)
except AttributeError: raise ClosedPoolError(self, "Pool is closed.")
except queue.Empty:
if self.block:
raise EmptyPoolError(
self,
"Pool reached maximum size and no more connections are allowed.",
)
pass
if conn and is_connection_dropped(conn):
log.debug("Resetting dropped connection: %s", self.host)
conn.close()
if getattr(conn, "auto_open", 1) == 0:
conn = None
return conn or self._new_conn()
def _put_conn(self, conn):
try:
self.pool.put(conn, block=False)
return except AttributeError:
pass
except queue.Full:
log.warning(
"Connection pool is full, discarding connection: %s. Connection pool size: %s",
self.host,
self.pool.qsize(),
)
if conn:
conn.close()
def _validate_conn(self, conn):
pass
def _prepare_proxy(self, conn):
pass
def _get_timeout(self, timeout):
if timeout is _Default:
return self.timeout.clone()
if isinstance(timeout, Timeout):
return timeout.clone()
else:
return Timeout.from_float(timeout)
def _raise_timeout(self, err, url, timeout_value):
if isinstance(err, SocketTimeout):
raise ReadTimeoutError(
self, url, "Read timed out. (read timeout=%s)" % timeout_value
)
if hasattr(err, "errno") and err.errno in _blocking_errnos:
raise ReadTimeoutError(
self, url, "Read timed out. (read timeout=%s)" % timeout_value
)
if "timed out" in str(err) or "did not complete (read)" in str(
err
): raise ReadTimeoutError(
self, url, "Read timed out. (read timeout=%s)" % timeout_value
)
def _make_request(
self, conn, method, url, timeout=_Default, chunked=False, **httplib_request_kw
):
self.num_requests += 1
timeout_obj = self._get_timeout(timeout)
timeout_obj.start_connect()
conn.timeout = Timeout.resolve_default_timeout(timeout_obj.connect_timeout)
try:
self._validate_conn(conn)
except (SocketTimeout, BaseSSLError) as e:
self._raise_timeout(err=e, url=url, timeout_value=conn.timeout)
raise
try:
if chunked:
conn.request_chunked(method, url, **httplib_request_kw)
else:
conn.request(method, url, **httplib_request_kw)
except BrokenPipeError:
pass
except IOError as e:
if e.errno not in {
errno.EPIPE,
errno.ESHUTDOWN,
errno.EPROTOTYPE,
}:
raise
read_timeout = timeout_obj.read_timeout
if getattr(conn, "sock", None):
if read_timeout == 0:
raise ReadTimeoutError(
self, url, "Read timed out. (read timeout=%s)" % read_timeout
)
if read_timeout is Timeout.DEFAULT_TIMEOUT:
conn.sock.settimeout(socket.getdefaulttimeout())
else: conn.sock.settimeout(read_timeout)
try:
try:
httplib_response = conn.getresponse(buffering=True)
except TypeError:
try:
httplib_response = conn.getresponse()
except BaseException as e:
six.raise_from(e, None)
except (SocketTimeout, BaseSSLError, SocketError) as e:
self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
raise
http_version = getattr(conn, "_http_vsn_str", "HTTP/?")
log.debug(
'%s://%s:%s "%s %s %s" %s %s',
self.scheme,
self.host,
self.port,
method,
url,
http_version,
httplib_response.status,
httplib_response.length,
)
try:
assert_header_parsing(httplib_response.msg)
except (HeaderParsingError, TypeError) as hpe: log.warning(
"Failed to parse headers (url=%s): %s",
self._absolute_url(url),
hpe,
exc_info=True,
)
return httplib_response
def _absolute_url(self, path):
return Url(scheme=self.scheme, host=self.host, port=self.port, path=path).url
def close(self):
if self.pool is None:
return
old_pool, self.pool = self.pool, None
_close_pool_connections(old_pool)
def is_same_host(self, url):
if url.startswith("/"):
return True
scheme, host, port = get_host(url)
if host is not None:
host = _normalize_host(host, scheme=scheme)
if self.port and not port:
port = port_by_scheme.get(scheme)
elif not self.port and port == port_by_scheme.get(scheme):
port = None
return (scheme, host, port) == (self.scheme, self.host, self.port)
def urlopen(
self,
method,
url,
body=None,
headers=None,
retries=None,
redirect=True,
assert_same_host=True,
timeout=_Default,
pool_timeout=None,
release_conn=None,
chunked=False,
body_pos=None,
**response_kw
):
parsed_url = parse_url(url)
destination_scheme = parsed_url.scheme
if headers is None:
headers = self.headers
if not isinstance(retries, Retry):
retries = Retry.from_int(retries, redirect=redirect, default=self.retries)
if release_conn is None:
release_conn = response_kw.get("preload_content", True)
if assert_same_host and not self.is_same_host(url):
raise HostChangedError(self, url, retries)
if url.startswith("/"):
url = six.ensure_str(_encode_target(url))
else:
url = six.ensure_str(parsed_url.url)
conn = None
release_this_conn = release_conn
http_tunnel_required = connection_requires_http_tunnel(
self.proxy, self.proxy_config, destination_scheme
)
if not http_tunnel_required:
headers = headers.copy()
headers.update(self.proxy_headers)
err = None
clean_exit = False
body_pos = set_file_position(body, body_pos)
try:
timeout_obj = self._get_timeout(timeout)
conn = self._get_conn(timeout=pool_timeout)
conn.timeout = timeout_obj.connect_timeout
is_new_proxy_conn = self.proxy is not None and not getattr(
conn, "sock", None
)
if is_new_proxy_conn and http_tunnel_required:
self._prepare_proxy(conn)
httplib_response = self._make_request(
conn,
method,
url,
timeout=timeout_obj,
body=body,
headers=headers,
chunked=chunked,
)
response_conn = conn if not release_conn else None
response_kw["request_method"] = method
response = self.ResponseCls.from_httplib(
httplib_response,
pool=self,
connection=response_conn,
retries=retries,
**response_kw
)
clean_exit = True
except EmptyPoolError:
clean_exit = True
release_this_conn = False
raise
except (
TimeoutError,
HTTPException,
SocketError,
ProtocolError,
BaseSSLError,
SSLError,
CertificateError,
) as e:
clean_exit = False
def _is_ssl_error_message_from_http_proxy(ssl_error):
message = " ".join(re.split("[^a-z]", str(ssl_error).lower()))
return (
"wrong version number" in message or "unknown protocol" in message
)
if (
isinstance(e, BaseSSLError)
and self.proxy
and _is_ssl_error_message_from_http_proxy(e)
and conn.proxy
and conn.proxy.scheme == "https"
):
e = ProxyError(
"Your proxy appears to only use HTTP and not HTTPS, "
"try changing your proxy URL to be HTTP. See: "
"https://urllib3.readthedocs.io/en/1.26.x/advanced-usage.html"
"#https-proxy-error-http-proxy",
SSLError(e),
)
elif isinstance(e, (BaseSSLError, CertificateError)):
e = SSLError(e)
elif isinstance(e, (SocketError, NewConnectionError)) and self.proxy:
e = ProxyError("Cannot connect to proxy.", e)
elif isinstance(e, (SocketError, HTTPException)):
e = ProtocolError("Connection aborted.", e)
retries = retries.increment(
method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
)
retries.sleep()
err = e
finally:
if not clean_exit:
conn = conn and conn.close()
release_this_conn = True
if release_this_conn:
self._put_conn(conn)
if not conn:
log.warning(
"Retrying (%r) after connection broken by '%r': %s", retries, err, url
)
return self.urlopen(
method,
url,
body,
headers,
retries,
redirect,
assert_same_host,
timeout=timeout,
pool_timeout=pool_timeout,
release_conn=release_conn,
chunked=chunked,
body_pos=body_pos,
**response_kw
)
redirect_location = redirect and response.get_redirect_location()
if redirect_location:
if response.status == 303:
method = "GET"
try:
retries = retries.increment(method, url, response=response, _pool=self)
except MaxRetryError:
if retries.raise_on_redirect:
response.drain_conn()
raise
return response
response.drain_conn()
retries.sleep_for_retry(response)
log.debug("Redirecting %s -> %s", url, redirect_location)
return self.urlopen(
method,
redirect_location,
body,
headers,
retries=retries,
redirect=redirect,
assert_same_host=assert_same_host,
timeout=timeout,
pool_timeout=pool_timeout,
release_conn=release_conn,
chunked=chunked,
body_pos=body_pos,
**response_kw
)
has_retry_after = bool(response.headers.get("Retry-After"))
if retries.is_retry(method, response.status, has_retry_after):
try:
retries = retries.increment(method, url, response=response, _pool=self)
except MaxRetryError:
if retries.raise_on_status:
response.drain_conn()
raise
return response
response.drain_conn()
retries.sleep(response)
log.debug("Retry: %s", url)
return self.urlopen(
method,
url,
body,
headers,
retries=retries,
redirect=redirect,
assert_same_host=assert_same_host,
timeout=timeout,
pool_timeout=pool_timeout,
release_conn=release_conn,
chunked=chunked,
body_pos=body_pos,
**response_kw
)
return response
class HTTPSConnectionPool(HTTPConnectionPool):
scheme = "https"
ConnectionCls = HTTPSConnection
def __init__(
self,
host,
port=None,
strict=False,
timeout=Timeout.DEFAULT_TIMEOUT,
maxsize=1,
block=False,
headers=None,
retries=None,
_proxy=None,
_proxy_headers=None,
key_file=None,
cert_file=None,
cert_reqs=None,
key_password=None,
ca_certs=None,
ssl_version=None,
assert_hostname=None,
assert_fingerprint=None,
ca_cert_dir=None,
**conn_kw
):
HTTPConnectionPool.__init__(
self,
host,
port,
strict,
timeout,
maxsize,
block,
headers,
retries,
_proxy,
_proxy_headers,
**conn_kw
)
self.key_file = key_file
self.cert_file = cert_file
self.cert_reqs = cert_reqs
self.key_password = key_password
self.ca_certs = ca_certs
self.ca_cert_dir = ca_cert_dir
self.ssl_version = ssl_version
self.assert_hostname = assert_hostname
self.assert_fingerprint = assert_fingerprint
def _prepare_conn(self, conn):
if isinstance(conn, VerifiedHTTPSConnection):
conn.set_cert(
key_file=self.key_file,
key_password=self.key_password,
cert_file=self.cert_file,
cert_reqs=self.cert_reqs,
ca_certs=self.ca_certs,
ca_cert_dir=self.ca_cert_dir,
assert_hostname=self.assert_hostname,
assert_fingerprint=self.assert_fingerprint,
)
conn.ssl_version = self.ssl_version
return conn
def _prepare_proxy(self, conn):
conn.set_tunnel(self._proxy_host, self.port, self.proxy_headers)
if self.proxy.scheme == "https":
conn.tls_in_tls_required = True
conn.connect()
def _new_conn(self):
self.num_connections += 1
log.debug(
"Starting new HTTPS connection (%d): %s:%s",
self.num_connections,
self.host,
self.port or "443",
)
if not self.ConnectionCls or self.ConnectionCls is DummyConnection:
raise SSLError(
"Can't connect to HTTPS URL because the SSL module is not available."
)
actual_host = self.host
actual_port = self.port
if self.proxy is not None:
actual_host = self.proxy.host
actual_port = self.proxy.port
conn = self.ConnectionCls(
host=actual_host,
port=actual_port,
timeout=self.timeout.connect_timeout,
strict=self.strict,
cert_file=self.cert_file,
key_file=self.key_file,
key_password=self.key_password,
**self.conn_kw
)
return self._prepare_conn(conn)
def _validate_conn(self, conn):
super(HTTPSConnectionPool, self)._validate_conn(conn)
if not getattr(conn, "sock", None): conn.connect()
if not conn.is_verified:
warnings.warn(
(
"Unverified HTTPS request is being made to host '%s'. "
"Adding certificate verification is strongly advised. See: "
"https://urllib3.readthedocs.io/en/1.26.x/advanced-usage.html"
"#ssl-warnings" % conn.host
),
InsecureRequestWarning,
)
if getattr(conn, "proxy_is_verified", None) is False:
warnings.warn(
(
"Unverified HTTPS connection done to an HTTPS proxy. "
"Adding certificate verification is strongly advised. See: "
"https://urllib3.readthedocs.io/en/1.26.x/advanced-usage.html"
"#ssl-warnings"
),
InsecureRequestWarning,
)
def connection_from_url(url, **kw):
scheme, host, port = get_host(url)
port = port or port_by_scheme.get(scheme, 80)
if scheme == "https":
return HTTPSConnectionPool(host, port=port, **kw)
else:
return HTTPConnectionPool(host, port=port, **kw)
def _normalize_host(host, scheme):
host = normalize_host(host, scheme)
if host.startswith("[") and host.endswith("]"):
host = host[1:-1]
return host
def _close_pool_connections(pool):
try:
while True:
conn = pool.get(block=False)
if conn:
conn.close()
except queue.Empty:
pass