Fixed timing issues in HTTP/2 server implementation

parent 884850a1
......@@ -10,6 +10,8 @@ from yangson.datamodel import DataModel
from .config import CONFIG
CERT_TEST = True
class PathFormat(Enum):
URL = 0
......@@ -19,7 +21,14 @@ class PathFormat(Enum):
class CertHelpers:
@staticmethod
def get_field(cert: Dict[str, Any], key: str) -> str:
return ([x[0][1] for x in cert["subject"] if x[0][0] == key] or [None])[0]
if CERT_TEST and (key == "emailAddress"):
return "test-user"
try:
retval = ([x[0][1] for x in cert["subject"] if x[0][0] == key] or [None])[0]
except (IndexError, KeyError, TypeError):
retval = None
return retval
class DataHelpers:
......
......@@ -99,6 +99,7 @@ def _get(prot: "H2Protocol", stream_id: int, ds: BaseDatastore, pth: str, yl_dat
for data_chunk in split_arr(response_bytes, prot.conn.max_outbound_frame_size):
prot.conn.send_data(stream_id, data_chunk, end_stream=False)
prot.conn.send_data(stream_id, bytes(), end_stream=True)
except DataLockError as e:
warn(epretty(e))
prot.send_empty(stream_id, "500", "Internal Server Error")
......@@ -228,8 +229,13 @@ def _get_staging(prot: "H2Protocol", stream_id: int, ds: BaseDatastore, pth: str
response_headers.append(("Content-Type", "application/yang.api+json"))
response_headers.append(("content-length", len(response_bytes)))
prot.conn.send_headers(stream_id, response_headers)
prot.conn.send_data(stream_id, response_bytes, end_stream=True)
def split_arr(arr, chunk_size):
for i in range(0, len(arr), chunk_size):
yield arr[i:i + chunk_size]
for data_chunk in split_arr(response_bytes, prot.conn.max_outbound_frame_size):
prot.conn.send_data(stream_id, data_chunk, end_stream=False)
prot.conn.send_data(stream_id, bytes(), end_stream=True)
except DataLockError as e:
warn(epretty(e))
prot.send_empty(stream_id, "500", "Internal Server Error")
......@@ -245,9 +251,6 @@ def _get_staging(prot: "H2Protocol", stream_id: int, ds: BaseDatastore, pth: str
except InstanceValueError as e:
warn(epretty(e))
prot.send_empty(stream_id, "400", "Bad Request")
except NoHandlerError as e:
warn(epretty(e))
prot.send_empty(stream_id, "400", "Bad Request")
except KnotError as e:
error(epretty(e))
prot.send_empty(stream_id, "500", "Internal Server Error")
......
import io
import asyncio
import ssl
from collections import OrderedDict
......@@ -5,7 +6,8 @@ from colorlog import error, warning as warn, info, debug
from typing import List, Tuple, Dict, Any, Callable
from h2.connection import H2Connection
from h2.events import DataReceived, RequestReceived, RemoteSettingsChanged
from h2.errors import PROTOCOL_ERROR
from h2.events import DataReceived, RequestReceived, RemoteSettingsChanged, StreamEnded
import jetconf.http_handlers as handlers
from .config import CONFIG_HTTP, API_ROOT_data, API_ROOT_STAGING_data, API_ROOT_ops
......@@ -15,6 +17,14 @@ from .data import BaseDatastore
# Function(method, path) -> bool
HandlerConditionT = Callable[[str, str], bool]
h2_handlers = None # type: HandlerList
class RequestData:
def __init__(self, headers=None, data=None):
self.headers = headers
self.data = data
class HandlerList:
def __init__(self):
......@@ -39,7 +49,7 @@ class H2Protocol(asyncio.Protocol):
def __init__(self):
self.conn = H2Connection(client_side=False)
self.transport = None
self.reqs_waiting_upload = dict()
self.stream_data = {}
self.client_cert = None # type: Dict[str, Any]
def connection_made(self, transport: asyncio.Transport):
......@@ -63,42 +73,53 @@ class H2Protocol(asyncio.Protocol):
def data_received(self, data: bytes):
events = self.conn.receive_data(data)
self.transport.write(self.conn.data_to_send())
for event in events:
if isinstance(event, RequestReceived):
# Handle request
# Store request headers
headers = OrderedDict(event.headers)
http_method = headers[":method"]
if http_method in ("GET", "DELETE"):
# Handle immediately, no need to wait for incoming data
self.handle_get_delete(headers, event.stream_id)
elif http_method in ("PUT", "POST"):
if headers.get("content-length"):
# Store headers and wait for data upload
self.reqs_waiting_upload[event.stream_id] = headers
else:
# Handle immediately, incoming data empty
self.handle_put_post(headers, event.stream_id, bytes())
else:
warn("Unknown http method \"{}\"".format(headers[":method"]))
request_data = RequestData(headers, io.BytesIO())
self.stream_data[event.stream_id] = request_data
elif isinstance(event, DataReceived):
self.http_handle_upload(event.data, event.stream_id)
elif isinstance(event, RemoteSettingsChanged):
self.conn.update_settings(event.changed_settings)
def http_handle_upload(self, data: bytes, stream_id: int):
try:
headers = self.reqs_waiting_upload.pop(stream_id)
except KeyError:
return
self.handle_put_post(headers, stream_id, data)
# Store incoming data
try:
stream_data = self.stream_data[event.stream_id]
except KeyError:
self.conn.reset_stream(event.stream_id, error_code=PROTOCOL_ERROR)
else:
stream_data.data.write(event.data)
elif isinstance(event, StreamEnded):
# Process request
try:
request_data = self.stream_data.pop(event.stream_id)
except KeyError:
self.send_empty(event.stream_id, "400", "Bad Request")
else:
headers = request_data.headers
body = request_data.data.getvalue().decode('utf-8')
http_method = headers[":method"]
if http_method in ("GET", "DELETE"):
self.handle_get_delete(headers, event.stream_id)
elif http_method in ("PUT", "POST"):
self.handle_put_post(headers, event.stream_id, body)
else:
warn("Unknown http method \"{}\"".format(headers[":method"]))
self.send_empty(event.stream_id, "405", "Method Not Allowed")
# elif isinstance(event, RemoteSettingsChanged):
# changed_settings = {}
# for s in event.changed_settings.items():
# changed_settings[s[0]] = s[1].new_value
# self.conn.update_settings(changed_settings)
# else:
# print(type(event))
dts = self.conn.data_to_send()
if dts:
self.transport.write(dts)
def handle_put_post(self, headers: OrderedDict, stream_id: int, data: bytes):
# Handle PUT, POST
url_split = headers[":path"].split("?")
url_path = url_split[0]
url_path = headers[":path"].split("?")[0]
h = h2_handlers.get_handler(headers[":method"], url_path)
if h:
......@@ -108,8 +129,7 @@ class H2Protocol(asyncio.Protocol):
def handle_get_delete(self, headers: OrderedDict, stream_id: int):
# Handle GET, DELETE
url_split = headers[":path"].split("?")
url_path = url_split[0]
url_path = headers[":path"].split("?")[0]
h = h2_handlers.get_handler(headers[":method"], url_path)
if h:
......@@ -130,7 +150,7 @@ class RestServer:
except AttributeError:
info("Python not compiled with ALPN support, using NPN instead.")
ssl_context.set_npn_protocols(["h2"])
ssl_context.verify_mode = ssl.CERT_REQUIRED
# ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_verify_locations(cafile=CONFIG_HTTP["CA_CERT"])
self.loop = asyncio.get_event_loop()
......@@ -171,12 +191,9 @@ class RestServer:
def run(self):
info("Server started on {}".format(self.server.sockets[0].getsockname()))
self.loop.run_forever()
try:
self.loop.run_forever()
except KeyboardInterrupt:
pass
def shutdown(self):
self.server.close()
self.loop.run_until_complete(self.server.wait_closed())
self.loop.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment