Commit d870ebbe authored by Pavel Spirek's avatar Pavel Spirek

Basic support for operations

parent 4f7235bd
Pipeline #80 skipped
......@@ -4,10 +4,43 @@ import logging
import sys
from importlib import import_module
from . import rest_server
from . import op_handlers
from .rest_server import RestServer
from .config import load_config, print_config
from .nacm import NacmConfig
from .data import JsonDatastore
from .handler_list import OP_HANDLERS
def main():
# Load configuration
load_config("jetconf/config.yaml")
print_config()
# NACM init
nacm_data = JsonDatastore("./data", "./data/yang-library-data.json", "NACM data")
nacm_data.load("jetconf/example-data-nacm.json")
nacmc = NacmConfig(nacm_data)
# Datastore init
ex_datastore = JsonDatastore("./data", "./data/yang-library-data.json", "DNS data")
ex_datastore.load("jetconf/example-data.json")
ex_datastore.register_nacm(nacmc)
# Register op handlers
OP_HANDLERS.register_handler("play", op_handlers.play_op_handler)
# Create HTTP server
rest_srv = RestServer()
rest_srv.register_api_handlers(ex_datastore)
rest_srv.register_static_handlers()
# Run HTTP server
rest_srv.run()
if __name__ == "__main__":
opts, args = (None, None)
colorlog.basicConfig(
......@@ -34,12 +67,8 @@ def main():
tm.test()
except ImportError as e:
print(e.msg)
# except AttributeError:
# print("Module \"{}\" has no test() function".format(test_module))
# except AttributeError:
# print("Module \"{}\" has no test() function".format(test_module))
else:
rest_server.run()
if __name__ == "__main__":
main()
main()
......@@ -30,11 +30,13 @@ CONFIG = {
NACM_ADMINS = CONFIG["NACM"]["ALLOWED_USERS"]
API_ROOT_data = os.path.join(CONFIG_HTTP["API_ROOT"], "data")
API_ROOT_ops = os.path.join(CONFIG_HTTP["API_ROOT"], "operations")
def load_config(filename: str):
global NACM_ADMINS
global API_ROOT_data
global API_ROOT_ops
try:
with open(filename) as conf_fd:
......@@ -51,6 +53,7 @@ def load_config(filename: str):
# Shortcuts
NACM_ADMINS = CONFIG["NACM"]["ALLOWED_USERS"]
API_ROOT_data = os.path.join(CONFIG_HTTP["API_ROOT"], "data")
API_ROOT_ops = os.path.join(CONFIG_HTTP["API_ROOT"], "operations")
def print_config():
......
......@@ -194,6 +194,14 @@ class BaseDatastore:
self._data = new_n.top()
def check_operation_rpc(self, rpc: Rpc):
ii = self.parse_ii(rpc.path, rpc.path_format)
if self.nacm:
nrpc = NacmRpc(self.nacm, self, rpc.username)
if nrpc.check_data_node_path(ii, Permission.NACM_ACCESS_EXEC) == Action.DENY:
raise NacmForbiddenError("Op \"{}\" invocation denied for user \"{}\"".format(rpc.path, rpc.username))
# Locks datastore data
def lock_data(self, username: str = None, blocking: bool=True):
ret = self._data_lock.acquire(blocking=blocking, timeout=1)
......
from typing import List, Tuple, Callable
class OpHandlerList:
def __init__(self):
self.handlers = [] # type: List[Tuple[str, Callable]]
self.default_handler = None # type: Callable
def register_handler(self, name: str, handler: Callable):
self.handlers.append((name, handler))
def register_default_handler(self, handler: Callable):
self.default_handler = handler
def get_handler(self, name: str) -> Callable:
for h in self.handlers:
if h[0] == name:
return h[1]
return self.default_handler
OP_HANDLERS = OpHandlerList()
......@@ -9,9 +9,10 @@ from typing import Dict, List
from yangson.schema import NonexistentSchemaNode
from yangson.instance import NonexistentInstance, InstanceTypeError, DuplicateMember
from .config import CONFIG_GLOBAL, CONFIG_HTTP, NACM_ADMINS, API_ROOT_data
from .config import CONFIG_GLOBAL, CONFIG_HTTP, NACM_ADMINS, API_ROOT_data, API_ROOT_ops
from .helpers import CertHelpers, DataHelpers, DateTimeHelpers, ErrorHelpers
from .data import BaseDatastore, Rpc, DataLockError, NacmForbiddenError
from .handler_list import OP_HANDLERS
QueryStrT = Dict[str, List[str]]
epretty = ErrorHelpers.epretty
......@@ -354,3 +355,70 @@ def create_api_delete(ds: BaseDatastore):
_delete(prot, stream_id, ds, api_pth)
return api_delete_closure
def create_api_op(ds: BaseDatastore):
def api_op_closure(prot: "H2Protocol", stream_id: int, headers: OrderedDict, data: bytes):
info("invoke_op: " + headers[":path"])
data_str = data.decode("utf-8")
op_name_fq = headers[":path"][len(API_ROOT_ops) + 1:]
op_name_splitted = op_name_fq.split(":", maxsplit=1)
if len(op_name_splitted) < 2:
warn("Operation name must be in fully-qualified format")
prot.send_empty(stream_id, "400", "Bad Request")
return
ns = op_name_splitted[0]
op_name = op_name_splitted[1]
username = CertHelpers.get_field(prot.client_cert, "emailAddress")
try:
json_data = json.loads(data_str)
except ValueError as e:
error("Invalid HTTP data: " + str(e))
prot.send_empty(stream_id, "400", "Bad Request")
return
input_args = json_data.get(ns + ":input")
op_handler = OP_HANDLERS.get_handler(op_name)
if op_handler:
try:
# Skip NACM check for privileged users
if username not in NACM_ADMINS:
rpc1 = Rpc()
rpc1.username = username
rpc1.path = "/" + op_name_fq
ds.check_operation_rpc(rpc1)
ret_data = op_handler(input_args)
if ret_data is None:
prot.send_empty(stream_id, "204", "No Content", False)
else:
response = json.dumps(ret_data, indent=4) + "\n"
response_bytes = response.encode()
response_headers = (
(':status', '200'),
('content-type', 'application/yang.api+json'),
('content-length', len(response_bytes)),
('server', CONFIG_HTTP["SERVER_NAME"]),
)
prot.conn.send_headers(stream_id, response_headers)
prot.conn.send_data(stream_id, response_bytes, end_stream=True)
except NacmForbiddenError as e:
warn(epretty(e))
prot.send_empty(stream_id, "403", "Forbidden")
except NonexistentSchemaNode as e:
warn(epretty(e))
prot.send_empty(stream_id, "404", "Not Found")
else:
warn("Nonexistent handler for operation \"{}\"".format(op_name))
prot.send_empty(stream_id, "400", "Bad Request")
return api_op_closure
from typing import Dict, Any
JsonNodeT = Dict[str, Any]
def play_op_handler(input_args: JsonNodeT) -> JsonNodeT:
print("Playing song {} in playlist \"{}\"".format(input_args.get("song-number"), input_args.get("playlist")))
ret = {"status": "OK"}
return ret
......@@ -8,9 +8,8 @@ from h2.connection import H2Connection
from h2.events import DataReceived, RequestReceived, RemoteSettingsChanged
import jetconf.http_handlers as handlers
from .config import CONFIG_HTTP, API_ROOT_data, load_config, print_config
from .nacm import NacmConfig
from .data import JsonDatastore
from .config import CONFIG_HTTP, API_ROOT_data, API_ROOT_ops
from .data import BaseDatastore
# Function(method, path) -> bool
......@@ -112,66 +111,63 @@ class H2Protocol(asyncio.Protocol):
self.send_empty(stream_id, "400", "Bad Request")
def run():
global h2_handlers
# Load configuration
load_config("jetconf/config.yaml")
print_config()
# NACM init
nacm_data = JsonDatastore("./data", "./data/yang-library-data.json", "NACM data")
nacm_data.load("jetconf/example-data-nacm.json")
nacmc = NacmConfig(nacm_data)
# Datastore init
ex_datastore = JsonDatastore("./data", "./data/yang-library-data.json", "DNS data")
ex_datastore.load("jetconf/example-data.json")
ex_datastore.register_nacm(nacmc)
# Register HTTP handlers
api_get_root = handlers.api_root_handler
api_get = handlers.create_get_api(ex_datastore)
api_post = handlers.create_post_api(ex_datastore)
api_put = handlers.create_put_api(ex_datastore)
api_delete = handlers.create_api_delete(ex_datastore)
h2_handlers = HandlerList()
h2_handlers.register_handler(lambda m, p: (m == "GET") and (p == CONFIG_HTTP["API_ROOT"]), api_get_root)
h2_handlers.register_handler(lambda m, p: (m == "GET") and (p.startswith(API_ROOT_data)), api_get)
h2_handlers.register_handler(lambda m, p: (m == "POST") and (p.startswith(API_ROOT_data)), api_post)
h2_handlers.register_handler(lambda m, p: (m == "PUT") and (p.startswith(API_ROOT_data)), api_put)
h2_handlers.register_handler(lambda m, p: (m == "DELETE") and (p.startswith(API_ROOT_data)), api_delete)
h2_handlers.register_handler(lambda m, p: m == "GET", handlers.get_file)
h2_handlers.register_default_handler(handlers.unknown_req_handler)
# HTTP server init
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.options |= (ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_COMPRESSION)
ssl_context.load_cert_chain(certfile=CONFIG_HTTP["SERVER_SSL_CERT"], keyfile=CONFIG_HTTP["SERVER_SSL_PRIVKEY"])
try:
ssl_context.set_alpn_protocols(["h2"])
except AttributeError:
info("Python not compiled with ALPN support, using NPN instead.")
ssl_context.set_npn_protocols(["h2"])
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_verify_locations(cafile=CONFIG_HTTP["CA_CERT"])
loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
listener = loop.create_server(H2Protocol, "127.0.0.1", CONFIG_HTTP["PORT"], ssl=ssl_context)
server = loop.run_until_complete(listener)
info("Server started on {}".format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
class RestServer:
def __init__(self):
# HTTP server init
self.http_handlers = HandlerList()
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.options |= (ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_COMPRESSION)
ssl_context.load_cert_chain(certfile=CONFIG_HTTP["SERVER_SSL_CERT"], keyfile=CONFIG_HTTP["SERVER_SSL_PRIVKEY"])
try:
ssl_context.set_alpn_protocols(["h2"])
except AttributeError:
info("Python not compiled with ALPN support, using NPN instead.")
ssl_context.set_npn_protocols(["h2"])
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_verify_locations(cafile=CONFIG_HTTP["CA_CERT"])
self.loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
listener = self.loop.create_server(H2Protocol, "127.0.0.1", CONFIG_HTTP["PORT"], ssl=ssl_context)
self.server = self.loop.run_until_complete(listener)
def register_api_handlers(self, datastore: BaseDatastore):
global h2_handlers
# Register HTTP handlers
api_get_root = handlers.api_root_handler
api_get = handlers.create_get_api(datastore)
api_post = handlers.create_post_api(datastore)
api_put = handlers.create_put_api(datastore)
api_delete = handlers.create_api_delete(datastore)
api_op = handlers.create_api_op(datastore)
self.http_handlers.register_handler(lambda m, p: (m == "GET") and (p == CONFIG_HTTP["API_ROOT"]), api_get_root)
self.http_handlers.register_handler(lambda m, p: (m == "GET") and (p.startswith(API_ROOT_data)), api_get)
self.http_handlers.register_handler(lambda m, p: (m == "POST") and (p.startswith(API_ROOT_data)), api_post)
self.http_handlers.register_handler(lambda m, p: (m == "PUT") and (p.startswith(API_ROOT_data)), api_put)
self.http_handlers.register_handler(lambda m, p: (m == "DELETE") and (p.startswith(API_ROOT_data)), api_delete)
self.http_handlers.register_handler(lambda m, p: (m == "POST") and (p.startswith(API_ROOT_ops)), api_op)
h2_handlers = self.http_handlers
def register_static_handlers(self):
global h2_handlers
self.http_handlers.register_handler(lambda m, p: m == "GET", handlers.get_file)
self.http_handlers.register_default_handler(handlers.unknown_req_handler)
h2_handlers = self.http_handlers
def run(self):
info("Server started on {}".format(self.server.sockets[0].getsockname()))
try:
self.loop.run_forever()
except KeyboardInterrupt:
pass
self.server.close()
self.loop.run_until_complete(self.server.wait_closed())
self.loop.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment