Basic HTTP/2 server and NACM rule verification working

parent a846ef0a
......@@ -6,3 +6,6 @@ lib
pip-selfcheck.json
pyvenv.cfg
TAGS
.idea
__pycache__
jetconf/__pycache__
-----BEGIN CERTIFICATE-----
MIIDUDCCAjgCCQCy5aByD0x4HTANBgkqhkiG9w0BAQsFADBqMQswCQYDVQQGEwJD
WjELMAkGA1UEBwwCQ0IxGDAWBgNVBAoMD0F1dG9yaXRhIHMuci5vLjEVMBMGA1UE
AwwMYXV0b3JpdGEuY29tMR0wGwYJKoZIhvcNAQkBFg5hQGF1dG9yaXRhLmNvbTAe
Fw0xNjAxMjcxNTE0NTlaFw0xNjAyMjYxNTE0NTlaMGoxCzAJBgNVBAYTAkNaMQsw
CQYDVQQHDAJDQjEYMBYGA1UECgwPQXV0b3JpdGEgcy5yLm8uMRUwEwYDVQQDDAxh
dXRvcml0YS5jb20xHTAbBgkqhkiG9w0BCQEWDmFAYXV0b3JpdGEuY29tMIIBIjAN
BgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnMgi+WX1wcORWiq5Mv/1PbcSwAnM
mAlU1dZvYsCSmX96pBOowFtkF5QDFkgwUMzaefXeiliBd3IhNfTbYeToNx1un8o1
7TKz1vAl9ZL9pbRTKPXrOC6mKFZ2BVeeRg7I3/8Kik7a4OhScnuhaGzq5mEs1QN3
PAOCqefnsJPZCROxnuZcdkZDKpwjI2+oBaM7EvVTJeoAimKarremil4vobKs1g4b
0Pfmh6wigLWNlAFQlSJzL9RWcnI4WT1gLmrC0OMEsGwgdSHY2qcpLIOa8sOOXAtV
kLeUathGQIvUKAkCC8XKqZ6wAu0OcaqfV4fMw7xnUZ4reiN+NybisLv4lQIDAQAB
MA0GCSqGSIb3DQEBCwUAA4IBAQBycAb0rQ3EVU/9HY+ED7kGZ/B+aRaGdtG3xwAH
d191mA4c6qo10D8gwBgG/0eTn69mZzsyayXGc9wmqyDC7NsP/jXhH9JoT5uz0mI2
oBccYtDo48bDhscbHL4AlKDUPO5r71rq9fcgd1cQRTR3nbG3mylZKbxC+IZykUik
biDW9zur0pkmIHeb4x6UBWCpFmRbkqKfGFgoGnaGVtLX42wcvVSF8n86i+/iWy+6
ZBhd9GXhE7NoC0GBF/INmoLRheDSvli3oOSANZWa076kB+uMo+uxaQ28LWaJLonr
E3eIXnQI56McR2RiGhSW1wN0yacdHeZTro1l/A8GvKpqyugX
-----END CERTIFICATE-----
HTTP_SERVER:
DOC_ROOT: "doc-root"
DOC_DEFAULT_NAME: "index.html"
RESTCONF_API_ROOT: "/restconf"
RESTCONF_NACM_API_ROOT: "/restconf_nacm"
SERVER_NAME: "hyper-h2"
SERVER_SSL_CERT: "server.crt"
SERVER_SSL_PRIVKEY: "server.key"
CA_CERT: "ca.pem"
<html>
<head>
<title>Index</title>
</head>
<body>
<h1>It really works!</h1>
</body>
</html>
-----BEGIN CERTIFICATE-----
MIICeDCCAWACAQowDQYJKoZIhvcNAQEFBQAwajELMAkGA1UEBhMCQ1oxCzAJBgNV
BAcMAkNCMRgwFgYDVQQKDA9BdXRvcml0YSBzLnIuby4xFTATBgNVBAMMDGF1dG9y
aXRhLmNvbTEdMBsGCSqGSIb3DQEJARYOYUBhdXRvcml0YS5jb20wHhcNMTYwMTI5
MTAyNTMwWhcNMTYwMjI4MTAyNTMwWjAeMRwwGgYJKoZIhvcNAQkBFg1sb2p6YUBt
YWlsLmN6MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDZsaIBTzwjoHMGGiiK
uxoGwUJCBsAIMX5BUEBCIoX7A3Yuit3TKWSVuTZrikPEEOXGCfJw3odQAKlK7QU7
u732iP0+VIRMxhMh8BGS7p76QgDOGe+gBYRvuJBPgYdHqWu7kCuL/FrKUeu/Ui4Z
/7QP6AbWwT2aSswlJwZWRYhsvwIDAQABMA0GCSqGSIb3DQEBBQUAA4IBAQCLaMhp
4pbMpYUahrFupPecEgmrFiSDZOtXA113wiLk+8u1u2Yo0mvpyDMRa1PWaSEh4kAL
WJ8uKr6FLrmLQc+TJz7kpKxw1fOmB68t7as+gCjJKrQmT1Cw8D9bWZIzr7TLqquw
B8dBrDBW25xbfE8IJOpJ3kNATpiIpjt3Oa5xiD6uJyC3adL/VrBKSmkyAZXi9sll
5vm329/BKOgasmNdTdpqA3OAfnf5CrV0rrp57ygEf5vYW+e5p+6Ee+mMWpZYAhXN
Pxe5i8QPtpD+mHLBJnE8+SWGFYn+j9Jyq/8qUrKPejoVDPUp1FrsKj7IiQ+pKYL7
3st+hPwePhZPuEhn
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
MIICXAIBAAKBgQDZsaIBTzwjoHMGGiiKuxoGwUJCBsAIMX5BUEBCIoX7A3Yuit3T
KWSVuTZrikPEEOXGCfJw3odQAKlK7QU7u732iP0+VIRMxhMh8BGS7p76QgDOGe+g
BYRvuJBPgYdHqWu7kCuL/FrKUeu/Ui4Z/7QP6AbWwT2aSswlJwZWRYhsvwIDAQAB
AoGADrOqZ0A8pYNOgVj+5++e4Fno6aCfE26UQkDzNgQy3DLtplMGjB49hscNX9Hk
pbcmbWt6jhjZNpP02vSK5+/T8lSFdkpj5rx9huI+jTF/xROw6WtPeZsNkKfzYn0g
FZgh1U78VZRst7KkbqRj3sxC5nKd7jLq4p1qAm5s4i+vlFECQQDw+HLcZ/pyK1gC
J8b0YQ/Wl9rntEr1tv4tk8jPt0cJvDCNT2EFQ0+t6L2NgA4LrdYzHpNfzet932ZV
OJggz5nlAkEA50WHV5frHlSQcxf8VC9sIMwGCgn/5AH6ptzricEJv/5afm1QSTUW
CTlq/VVd9niXjRdp2l/6NOKTETxpx5zx0wJBALPrfa+Nv1ShgkvRsV5kXnQG3D6m
wlVmtbAIKSbWnKbrKYFJJJLOaF7caTOOym3z9PwkNzuQP+Wy+PBmmkfdWfUCQBph
wBKp7dRD6UZ30fz4e+kx3O6APYSRBTrLsMLCFo6kQUZyydnQCHd86UuE1XcsQn2B
0j1q/WvS/NvWfGqc6KkCQC/H+wNfWm8b0cU7jhuxofiqTGMRGUhCdZJjb19M/Sbt
QPZ6CLpEzeko5gbmCF3FcquEERmKGX7RPUY46q1a9W0=
-----END RSA PRIVATE KEY-----
This diff is collapsed.
This diff is collapsed.
import asyncio
import json
import mimetypes
import os
import ssl
from collections import OrderedDict
import sys
import logging
import colorlog
from colorlog import error, warning as warn, info, debug
from typing import List, Tuple, Dict, Any
import yaml
import copy
import nacm
import yang_json_path
from h2.connection import H2Connection
from h2.events import DataReceived, RequestReceived, RemoteSettingsChanged
CONFIG = {
"DOC_ROOT": "doc-root",
"DOC_DEFAULT_NAME": "index.html",
"RESTCONF_API_ROOT": "/restconf",
"RESTCONF_NACM_API_ROOT": "/restconf_nacm",
"SERVER_NAME": "hyper-h2",
"PORT": 8443,
"SERVER_SSL_CERT": "server.crt",
"SERVER_SSL_PRIVKEY": "server.key",
"CA_CERT": "ca.pem"
}
class CertHelpers:
@staticmethod
def get_field(cert: Dict[str, Any], key: str) -> str:
return ([x[0][1] for x in cert["subject"] if x[0][0] == key] or [None])[0]
class H2Protocol(asyncio.Protocol):
def __init__(self):
self.conn = H2Connection(client_side=False)
self.transport = None
self.reqs_waiting_upload = dict()
self.client_cert = None
def connection_made(self, transport: asyncio.Transport):
self.transport = transport
self.conn.initiate_connection()
self.transport.write(self.conn.data_to_send())
self.client_cert = self.transport.get_extra_info('peercert')
# print("cert = {}".format(self.client_cert))
def data_received(self, data: bytes):
events = self.conn.receive_data(data)
self.transport.write(self.conn.data_to_send())
for event in events:
if isinstance(event, RequestReceived):
# Handle request
headers = OrderedDict(event.headers)
http_method = headers[":method"]
if http_method == "GET":
# Handle GET
self.handle_get(headers, event.stream_id)
elif http_method in ("PUT", "POST"):
# Store headers and wait for data upload
self.reqs_waiting_upload[event.stream_id] = headers
else:
warn("Unknown http method \"{}\"".format(headers[":method"]))
elif isinstance(event, DataReceived):
self.http_handle_upload(event.data, event.stream_id)
elif isinstance(event, RemoteSettingsChanged):
self.conn.acknowledge_settings(event)
self.transport.write(self.conn.data_to_send())
def http_handle_upload(self, data: bytes, stream_id: int):
try:
req_headers = self.reqs_waiting_upload.pop(stream_id)
except KeyError:
return
{"PUT": self.handle_put, "POST": self.handle_post}[req_headers[":method"]](req_headers, data, stream_id)
def handle_get(self, headers: OrderedDict, stream_id: int):
response = None
parsed_url = yang_json_path.URLPath(headers[":path"])
if parsed_url.path_equals(CONFIG["RESTCONF_NACM_API_ROOT"]):
# Top level api resource (appendix D.1.1)
response = ("{\n"
" \"ietf-restconf:restconf\": {\n"
" \"data\" : [ null ],\n"
" \"operations\" : [ null ]\n"
" }\n"
"}")
response_headers = (
(':status', '200'),
('content-type', 'application/yang.api+json'),
('content-length', len(response)),
('server', CONFIG["SERVER_NAME"]),
)
self.conn.send_headers(stream_id, response_headers)
self.conn.send_data(stream_id, response.encode(), end_stream=True)
elif parsed_url.path_contains(os.path.join(CONFIG["RESTCONF_NACM_API_ROOT"], "data")):
# NACM api request
info(("nacm_api_get: " + headers[":path"]))
username = CertHelpers.get_field(self.client_cert, "emailAddress")
parsed_url.path_segments = parsed_url.path_segments[2:] # TODO: Hack
print(parsed_url.path_segments)
nacm_config.lock_data(username)
_node = copy.deepcopy(nacm_config.json.select_data_node(parsed_url))
nacm_config.unlock_data()
if not _node:
warn("Node not found")
response = "Not found"
http_status = "404"
else:
_doc = nacm.JsonDoc(_node[-1], parsed_url)
_rpc = nacm.NacmRpc(nacm_config, None, username)
_rpc.check_data_read(_node[-1], _doc)
response = json.dumps(_doc.root)
http_status = "200"
response_headers = (
(':status', http_status),
('content-type', 'application/yang.api+json'),
('content-length', len(response)),
('server', CONFIG["SERVER_NAME"]),
)
self.conn.send_headers(stream_id, response_headers)
self.conn.send_data(stream_id, response.encode(), end_stream=True)
elif parsed_url.path_contains(CONFIG["RESTCONF_API_ROOT"]):
# api request
pass
else:
# Ordinary file on filesystem
file_path = CONFIG["DOC_ROOT"] + "/" + parsed_url.path_str.replace("..", "").replace("&", "")
if os.path.isdir(file_path):
file_path = os.path.join(file_path, CONFIG["DOC_DEFAULT_NAME"])
(ctype, encoding) = mimetypes.guess_type(file_path)
if ctype is None:
ctype = "application/octet-stream"
subtype = ctype.split('/', 1)[1]
try:
fd = open(file_path, 'rb')
response = fd.read()
fd.close()
except FileNotFoundError:
warn("Cannot open requested file \"{}\"".format(file_path))
response_headers = (
(':status', '404'),
('content-length', 0),
('server', CONFIG["SERVER_NAME"]),
)
self.conn.send_headers(stream_id, response_headers, end_stream=True)
return
info("Serving ordinary file {} of type \"{}\"".format(file_path, ctype))
response_headers = (
(':status', '200'),
('content-type', ctype),
('content-length', len(response)),
('server', CONFIG["SERVER_NAME"]),
)
self.conn.send_headers(stream_id, response_headers)
def split_arr(arr, chunk_size):
for i in range(0, len(arr), chunk_size):
yield arr[i:i + chunk_size]
for data_chunk in split_arr(response, self.conn.max_outbound_frame_size):
self.conn.send_data(stream_id, data_chunk, end_stream=False)
self.conn.send_data(stream_id, bytes(), end_stream=True)
def handle_put(self, headers: OrderedDict, data: bytes, stream_id: int):
print("put")
def handle_post(self, headers: OrderedDict, data: bytes, stream_id: int):
print("post")
parsed_url = yang_json_path.URLPath(headers[":path"])
print(json.dumps({"query": parsed_url.query_table, "path": parsed_url.path_list}, indent=4))
print("prijato: " + data.decode("utf-8"))
print(json.dumps({"headers": headers}, indent=4))
response = "Jmenujes se {} a tvuj e-mail je {}\n".format(
CertHelpers.get_field(self.client_cert, "organizationName"),
CertHelpers.get_field(self.client_cert, "emailAddress")
).encode()
response_headers = (
(':status', '200'),
('content-type', 'application/yang.api+json'),
('content-length', len(response)),
('server', CONFIG["SERVER_NAME"]),
)
self.conn.send_headers(stream_id, response_headers)
self.conn.send_data(stream_id, response, end_stream=True)
if __name__ == "__main__":
colorlog.basicConfig(format="%(asctime)s %(log_color)s%(levelname)-8s%(reset)s %(message)s", level=logging.INFO,
stream=sys.stdout)
try:
with open("config.yaml") as conf_fd:
conf_yaml = yaml.load(conf_fd)
CONFIG.update(conf_yaml.get("HTTP_SERVER", {}))
except FileNotFoundError:
warn("Configuration file does not exist")
info("Using config:\n" + yaml.dump([CONFIG, ], default_flow_style=False))
global nacm_config
nacm_config = nacm.NacmConfig()
nacm_config.load_json("example-data.json")
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.options |= (ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_COMPRESSION)
ssl_context.load_cert_chain(certfile=CONFIG["SERVER_SSL_CERT"], keyfile=CONFIG["SERVER_SSL_PRIVKEY"])
try:
ssl_context.set_alpn_protocols(["h2"])
except AttributeError:
info("Python not compiled with ALPN support, using NPN instead.")
ssl_context.set_npn_protocols(["h2"])
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_verify_locations(cafile=CONFIG["CA_CERT"])
loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
listener = loop.create_server(H2Protocol, "127.0.0.1", CONFIG["PORT"], ssl=ssl_context)
server = loop.run_until_complete(listener)
info("Server started on {}".format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
-----BEGIN CERTIFICATE-----
MIIDOjCCAiKgAwIBAgIBATANBgkqhkiG9w0BAQsFADAWMRQwEgYDVQQDEwtIMk8g
VGVzdCBDQTAeFw0xNDEyMTAxOTMzMDVaFw0yNDEyMDcxOTMzMDVaMBsxGTAXBgNV
BAMTEDEyNy4wLjAuMS54aXAuaW8wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
AoIBAQDvNmF5nimH3wlp50E2/2SqxUD0JKaF3r2QFz1kB9UUwDhVDCms6PdkavF/
bQcHcWS+oa97D1miBQXo2Ns+6Z6JQ5sak/bVjnBxiU8vhqiOWvAwH947E4Km5HJB
NFJJ7WEM+90kAFB2ayEM/llIQEt1RKCs2fgpaEgOMWPUAdcgyp6pNd60W5GA3Md2
1tdDH5RYGKzYHqpkm6pICtvaaxU4LwPmA3Oc8+VDDsVt08Jos1dJvoacjQTS6PpC
ZiUDD2zqeSA//PGN8WV2o81SmsZwSpPCYBvxVW13tdsA1ivO5tng2fr9ZesKtXFZ
SaH/tKmB3Br8jg2vUke/0cfIvbP/AgMBAAGjgY0wgYowCQYDVR0TBAIwADAsBglg
hkgBhvhCAQ0EHxYdT3BlblNTTCBHZW5lcmF0ZWQgQ2VydGlmaWNhdGUwHQYDVR0O
BBYEFJXhddVQ68vtPvxoHWHsYkLnu3+4MDAGA1UdIwQpMCehGqQYMBYxFDASBgNV
BAMTC0gyTyBUZXN0IENBggkAmqS1V7DvzbYwDQYJKoZIhvcNAQELBQADggEBAJQ2
uvzL/lZnrsF4cvHhl/mg+s/RjHwvqFRrxOWUeWu2BQOGdd1Izqr8ZbF35pevPkXe
j3zQL4Nf8OxO/gx4w0165KL4dYxEW7EaxsDQUI2aXSW0JNSvK2UGugG4+E4aT+9y
cuBCtfWbL4/N6IMt2QW17B3DcigkreMoZavnnqRecQWkOx4nu0SmYg1g2QV4kRqT
nvLt29daSWjNhP3dkmLTxn19umx26/JH6rqcgokDfHHO8tlDbc9JfyxYH01ZP2Ps
esIiGa/LBXfKiPXxyHuNVQI+2cMmIWYf+Eu/1uNV3K55fA8806/FeklcQe/vvSCU
Vw6RN5S/14SQnMYWr7E=
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA7zZheZ4ph98JaedBNv9kqsVA9CSmhd69kBc9ZAfVFMA4VQwp
rOj3ZGrxf20HB3FkvqGvew9ZogUF6NjbPumeiUObGpP21Y5wcYlPL4aojlrwMB/e
OxOCpuRyQTRSSe1hDPvdJABQdmshDP5ZSEBLdUSgrNn4KWhIDjFj1AHXIMqeqTXe
tFuRgNzHdtbXQx+UWBis2B6qZJuqSArb2msVOC8D5gNznPPlQw7FbdPCaLNXSb6G
nI0E0uj6QmYlAw9s6nkgP/zxjfFldqPNUprGcEqTwmAb8VVtd7XbANYrzubZ4Nn6
/WXrCrVxWUmh/7Spgdwa/I4Nr1JHv9HHyL2z/wIDAQABAoIBAEVPf2zKrAPnVwXt
cJLr6xIj908GM43EXS6b3TjXoCDUFT5nOMgV9GCPMAwY3hmE/IjTtlG0v+bXB8BQ
3S3caQgio5VO3A1CqUfsXhpKLRqaNM/s2+pIG+oZdRV5gIJVGnK1o3yj7qxxG/F0
3Q+3OWXwDZIn0eTFh2M9YkxygA/KtkREZWv8Q8qZpdOpJSBYZyGE97Jqy/yGc+DQ
Vpoa9B8WwnIdUn47TkZfsbzqGIYZxatJQDC1j7Y+F8So7zBbUhpz7YqATQwf5Efm
K2xwvlwfdwykq6ffEr2M/Xna0220G2JZlGq3Cs2X9GT9Pt9OS86Bz+EL46ELo0tZ
yfHQe/kCgYEA+zh4k2be6fhQG+ChiG3Ue5K/kH2prqyGBus61wHnt8XZavqBevEy
4pdmvJ6Q1Ta9Z2YCIqqNmlTdjZ6B35lvAK8YFITGy0MVV6K5NFYVfhALWCQC2r3B
6uH39FQ0mDo3gS5ZjYlUzbu67LGFnyX+pyMr2oxlhI1fCY3VchXQAOsCgYEA88Nt
CwSOaZ1fWmyNAgXEAX1Jx4XLFYgjcA/YBXW9gfQ0AfufB346y53PsgjX1lB+Bbcg
cY/o5W7F0b3A0R4K5LShlPCq8iB2DC+VnpKwTgo8ylh+VZCPy2BmMK0jrrmyqWeg
PzwgP0lp+7l/qW8LDImeYi8nWoqd6f1ye4iJdD0CgYEAlIApJljk5EFYeWIrmk3y
EKoKewsNRqfNAkICoh4KL2PQxaAW8emqPq9ol47T5nVZOMnf8UYINnZ8EL7l3psA
NtNJ1Lc4G+cnsooKGJnaUo6BZjTDSzJocsPoopE0Fdgz/zS60yOe8Y5LTKcTaaQ4
B+yOe74KNHSs/STOS4YBUskCgYAIqaRBZPsOo8oUs5DbRostpl8t2QJblIf13opF
v2ZprN0ASQngwUqjm8sav5e0BQ5Fc7mSb5POO36KMp0ckV2/vO+VFGxuyFqJmlNN
3Fapn1GDu1tZ/RYvGxDmn/CJsA26WXVnaeKXfStoB7KSueCBpI5dXOGgJRbxjtE3
tKV13QKBgQCtmLtTJPJ0Z+9n85C8kBonk2MCnD9JTYWoDQzNMYGabthzSqJqcEek
dvhr82XkcHM+r6+cirjdQr4Qj7/2bfZesHl5XLvoJDB1YJIXnNJOELwbktrJrXLc
dJ+MMvPvBAMah/tqr2DqgTGfWLDt9PJiCJVsuN2kD9toWHV08pY0Og==
-----END RSA PRIVATE KEY-----
from colorlog import error, warning as warn, info, debug
from typing import List, Any, Dict, TypeVar, Tuple, Callable
import urllib.parse
import re
JsonNodeT = Dict[str, Any]
class PathSegment:
def __init__(self, segment: str, selector: Tuple[str] = None, ns: str = None):
self.val = segment
self.select = selector
self.ns = ns
def get_val(self, fully_qualified=False) -> str:
ns = (self.ns + ":") if (fully_qualified and self.ns) else ""
return ns + self.val
def __repr__(self):
ns = (self.ns + ":") if self.ns else ""
sel = "[{}='{}']".format(self.select[0], self.select[1]) if self.select else ""
return ns + self.val + sel
def __eq__(self, other):
return (self.val == other.val) and (self.select == other.select) and (self.ns == other.ns)
def __ne__(self, other):
return not self.__eq__(other)
class BasePath:
def __init__(self, path: str, segment_parser: Callable[[str], PathSegment] = None):
self.path_str = path
self.path_segments = [] # type: List[PathSegment]
self._is_absolute = False
if self.path_str[0] == "/":
self._is_absolute = True
self.path_str = self.path_str[1:]
_segments = filter(lambda x: len(x) > 0, self.path_str.split("/"))
if segment_parser is not None:
self.path_segments = list(map(segment_parser, _segments))
else:
self.path_segments = list(map(lambda x: PathSegment(x), _segments))
def is_absolute(self) -> bool:
return self._is_absolute
def path_contains(self, subpath) -> bool:
if subpath[0] == "/":
subpath = subpath[1:]
return (self.path_str == subpath) or (
(len(self.path_str) > len(subpath)) and self.path_str.startswith(subpath) and (
self.path_str[len(subpath)] == "/"))
else:
return self.path_str.find(subpath) != -1
def path_equals(self, path_to_compare: str) -> bool:
if (path_to_compare[0] == "/") and self._is_absolute:
return self.path_str.strip("/") == path_to_compare.strip("/")
elif (path_to_compare[0] != "/") and not self._is_absolute:
return self.path_str.rstrip("/") == path_to_compare.rstrip("/")
else:
return False
class URLPath(BasePath):
def __init__(self, url_path: str):
self.query_string = None
self.query_table = None
_last_ns = None
if url_path.find("?") != -1:
self.path_str, self.query_string = url_path.split("?", 1)
else:
self.path_str = url_path
path_str_unquoted = urllib.parse.unquote(self.path_str)
def parse_segment(s: str) -> PathSegment:
nonlocal _last_ns
sre_match = re.search("^(?:([\w-]*?):)?([\w-]*)(?:=(.+))?$", s)
if sre_match is None:
raise ValueError("Wrong formatting of path segment: {}".format(s))
else:
if sre_match.group(1) is not None:
_last_ns = sre_match.group(1)
# elif _last_ns is None and sre_match.group(2) != "":
# raise ValueError("First segment of path must be in namespace-qualified form")
return PathSegment(sre_match.group(2), ("name", sre_match.group(3)) if sre_match.group(3) else None,
_last_ns)
super().__init__(path_str_unquoted, parse_segment)
print(self.path_segments)
if self.query_string:
self.query_table = urllib.parse.parse_qs(self.query_string, keep_blank_values=True)
# Parses path in "Yang Json" format as defined in
# https://tools.ietf.org/html/draft-ietf-netmod-yang-json-07#section-6.11
class YangJsonPath(BasePath):
def __init__(self, path: str):
_last_ns = None
def parse_segment(s: str) -> PathSegment:
nonlocal _last_ns
sre_match = re.search("^(?:([\w-]*?):)?([\w-]*)(?:\[(.+)\])?$", s)
if sre_match is None:
raise ValueError("Wrong formatting of path segment: {}".format(s))
else:
if sre_match.group(1) is not None:
_last_ns = sre_match.group(1)
elif _last_ns is None and sre_match.group(2) != "":
raise ValueError(
"First segment of path must be in namespace-qualified form, see draft-ietf-netmod-yang-json-07")
return PathSegment(sre_match.group(2), tuple(
map(lambda x: x.strip("'\""), sre_match.group(3).split("="))) if sre_match.group(3) else None,
_last_ns)
super().__init__(path, parse_segment)
# print(self.segments)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment