# Copyright 2020 Allan Feldman
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import os.path
import ssl
from dataclasses import dataclass
from types import TracebackType
from typing import Dict, Mapping, Optional, Type, Union
try:
with open(os.path.join(os.path.dirname(__file__), "version.txt")) as f:
__version__ = f.read()
except Exception: # pragma: no cover
__version__ = "0.0.0" # pragma: no cover
[docs]@dataclass(frozen=True)
class Response:
"""HTTP Response Container"""
status: int
headers: Dict[str, str]
data: bytes
[docs]class Connection:
"""Streaming connection wrapper
Provides an interface to make HTTP requests via a streaming connection.
It is not recommended to instantiate :class:`Connection` objects directly;
use :meth:`Connection.create()` instead.
"""
def __init__(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
lock: asyncio.Lock,
host: str,
close_timeout: Optional[float],
):
self._reader = reader
self._writer = writer
self._lock = lock
self._host = host
self._close_timeout = close_timeout
async def _readline_ascii(self) -> str:
result = await self._reader.readline()
return result.decode("latin1")
def _write_ascii(self, data: str) -> None:
return self._writer.write(data.encode("latin1"))
[docs] @classmethod
async def create(
cls,
host: str,
port: int = 443,
ssl: Union[bool, ssl.SSLContext] = True,
close_timeout: Optional[float] = 5.0,
) -> "Connection":
"""Create a Connection
:param host: Host string controlling both the DNS request and the
host header.
:type host: str
:param port: (optional) The TCP port used for the connection.
Default: 443
:type port: int
:param ssl: (optional) Indicates if SSL is to be used in
establishing the connection. Also accepts an SSLContext object.
Default: True
:type ssl: bool or ssl.SSLContext
:param close_timeout: (optional) The amount of time to wait in seconds
for the connection to close before forcing the connection to close
via TCP RST. Default: 5 seconds
:type close_timeout: float
:rtype: Connection
"""
reader, writer = await asyncio.open_connection(host, port, ssl=ssl)
return Connection(reader, writer, asyncio.Lock(), host, close_timeout)
async def __aenter__(self) -> "Connection":
return self
[docs] async def request(
self,
method: str,
uri: str,
payload: bytes = b"",
headers: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = 5.0,
) -> Response:
"""Make an HTTP request
Drives a full request/response cycle using the HTTP/1.1 protocol.
No exception handling is done for:
* Malformed HTTP/1.1 responses
* Network communication errors
The raw exceptions will be raised for any violation of HTTP/1.1
protocol.
:param method: HTTP method string send in the request line.
:type method: str
:param uri: Request-Uniform Resource Identifier (URI), usually
the absolute path to the resource being requested.
:type uri: str
:param payload: (optional) The encoded HTTP request body bytes to be
sent. Default: b""
:type payload: bytes
:param headers: (optional) A dictionary of headers to be sent
with the request. Default: {}
:type headers: dict
:param timeout: (optional) Timeout in seconds for the request.
Default: 5 seconds.
:type timeout: float
:rtype: Response
"""
coro = self._request(method, uri, payload, headers)
if timeout is not None:
return await asyncio.wait_for(coro, timeout=timeout)
return await coro
async def _request(
self,
method: str,
uri: str,
payload: bytes = b"",
headers: Optional[Mapping[str, str]] = None,
) -> Response:
# Since the connection can be a shared resource, we must ensure
# exclusive access for the duration of the request/response cycle
async with self._lock:
reader, writer = self._reader, self._writer
request = f"{method} {uri} HTTP/1.1\r\n"
self._write_ascii(request)
request_headers = {
"host": self._host,
"user-agent": f"pywreck/{__version__}",
}
if payload:
request_headers["content-length"] = str(len(payload))
if headers:
request_headers.update(headers)
for header_name, header_value in request_headers.items():
self._write_ascii(f"{header_name}:{header_value}\r\n")
# Finish request metadata section
writer.write(b"\r\n")
# Send payload
writer.write(payload)
response_line = await self._readline_ascii()
status = int(response_line.split(" ", 2)[1])
response_headers: Dict[str, str] = {}
content_length = 0
chunked = False
while True:
header_line = await self._readline_ascii()
header_line = header_line.rstrip()
if not header_line:
break
header_name, header_value = header_line.split(":", 1)
header_name = header_name.rstrip().lower()
header_value = header_value.lstrip()
if header_name in response_headers:
separator = "," if header_name != "set-cookie" else ";"
response_headers[header_name] += separator + header_value
else:
response_headers[header_name] = header_value
if method != "HEAD":
if "content-length" in response_headers:
content_length = int(response_headers["content-length"])
chunked = response_headers.get("transfer-encoding", "") == "chunked"
if chunked:
response_chunks = []
while True:
chunk_len_bytes = await reader.readuntil(b"\r\n")
content_length = int(chunk_len_bytes.rstrip(), 16)
part = await reader.readexactly(content_length + 2)
if not content_length:
break
response_chunks.append(part[:-2])
response_data = b"".join(response_chunks)
else:
response_data = await reader.readexactly(content_length)
return Response(status, response_headers, response_data)
async def __aexit__(
self,
exc: Optional[Type[BaseException]],
value: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
await self.close()
[docs] async def close(self) -> None:
"""Close the connection"""
coro = self._close()
timeout = self._close_timeout
if timeout is not None:
return await asyncio.wait_for(coro, timeout=timeout)
return await coro
async def _close(self) -> None:
writer = self._writer
writer.close()
try:
await writer.wait_closed()
finally:
writer.transport.abort()
[docs]async def request(
method: str,
host: str,
uri: str,
payload: bytes = b"",
headers: Optional[Mapping[str, str]] = None,
port: int = 443,
timeout: Optional[float] = 5.0,
ssl: Union[bool, ssl.SSLContext] = True,
) -> Response:
"""Make a full HTTP request
Drives a full request/response cycle using the HTTP/1.1 protocol.
No exception handling is done for:
* Malformed HTTP/1.1 responses
* Network communication errors
The raw exceptions will be raised for any violation of HTTP/1.1
protocol.
:param method: HTTP method string send in the request line.
:type method: str
:param host: Host string controlling both the DNS request and the
host header.
:type host: str
:param uri: Request-Uniform Resource Identifier (URI), usually
the absolute path to the resource being requested.
:type uri: str
:param payload: The encoded HTTP request body bytes to be sent.
:type payload: bytes
:param headers: (optional) A dictionary of headers to be sent
with the request. Default: {}
:type headers: dict
:param port: (optional) The TCP port used for the connection. Default: 443
:type port: int
:param timeout: (optional) Timeout in seconds for the request.
Default: 5 seconds.
:type timeout: float
:param ssl: (optional) Indicates if SSL is to be used in
establishing the connection. Also accepts an SSLContext object.
Default: True
:type ssl: bool or ssl.SSLContext
:rtype: Response
"""
async def _request() -> Response:
async with await Connection.create(
host,
port,
ssl=ssl,
close_timeout=None,
) as connection:
return await connection.request(
method,
uri,
payload,
headers,
None,
)
coro = _request()
if timeout is not None:
return await asyncio.wait_for(coro, timeout=timeout)
return await coro
async def get(
host: str,
uri: str,
payload: bytes = b"",
headers: Optional[Mapping[str, str]] = None,
port: int = 443,
timeout: Optional[float] = 5.0,
ssl: Union[bool, ssl.SSLContext] = True,
) -> Response:
return await request("GET", host, uri, payload, headers, port, timeout, ssl)
async def head(
host: str,
uri: str,
payload: bytes = b"",
headers: Optional[Mapping[str, str]] = None,
port: int = 443,
timeout: Optional[float] = 5.0,
ssl: Union[bool, ssl.SSLContext] = True,
) -> Response:
return await request("HEAD", host, uri, payload, headers, port, timeout, ssl)
async def post(
host: str,
uri: str,
payload: bytes = b"",
headers: Optional[Mapping[str, str]] = None,
port: int = 443,
timeout: Optional[float] = 5.0,
ssl: Union[bool, ssl.SSLContext] = True,
) -> Response:
return await request("POST", host, uri, payload, headers, port, timeout, ssl)
async def put(
host: str,
uri: str,
payload: bytes = b"",
headers: Optional[Mapping[str, str]] = None,
port: int = 443,
timeout: Optional[float] = 5.0,
ssl: Union[bool, ssl.SSLContext] = True,
) -> Response:
return await request("PUT", host, uri, payload, headers, port, timeout, ssl)
async def delete(
host: str,
uri: str,
payload: bytes = b"",
headers: Optional[Mapping[str, str]] = None,
port: int = 443,
timeout: Optional[float] = 5.0,
ssl: Union[bool, ssl.SSLContext] = True,
) -> Response:
return await request("DELETE", host, uri, payload, headers, port, timeout, ssl)