code refactor

parent e5e28f49
HTTP_SERVER:
DOC_ROOT: "doc-root"
DOC_ROOT: "jetconf/doc-root"
DOC_DEFAULT_NAME: "index.html"
RESTCONF_API_ROOT: "/restconf"
RESTCONF_NACM_API_ROOT: "/restconf_nacm"
SERVER_NAME: "hyper-h2"
SERVER_SSL_CERT: "server.crt"
SERVER_SSL_PRIVKEY: "server.key"
CA_CERT: "ca.pem"
SERVER_SSL_CERT: "jetconf/server.crt"
SERVER_SSL_PRIVKEY: "jetconf/server.key"
CA_CERT: "jetconf/ca.pem"
NACM:
ALLOWED_USERS: ["lojza@mail.cz"]
\ No newline at end of file
......@@ -12,14 +12,12 @@ import yangson.instance
from yangson.instance import Instance, NonexistentInstance, ArrayValue, ObjectValue
from yangson import DataModel
from yangson.datamodel import InstanceIdentifier
from nacm import Permission, Action
from nacm import NacmConfig, NacmRpc
class Rpc:
def __init__(self):
self.username = None
self.path = None # type: str
self.path = None # type: str
class BaseDatastore:
......@@ -34,14 +32,17 @@ class BaseDatastore:
yl = ylfile.read()
self.dm = DataModel.from_yang_library(yl, module_dir)
def register_nacm(self, nacm_config: NacmConfig):
def register_nacm(self, nacm_config: "NacmConfig"):
self.nacm = nacm_config
def get_data_root(self) -> Instance:
return self.data
def get_node(self, ii: InstanceIdentifier) -> Instance:
return self.data.goto(ii)
self.lock_data()
n = self.data.goto(ii)
self.unlock_data()
return n
def get_node_path(self, ii_str: str) -> Instance:
ii = self.dm.parse_instance_id(ii_str)
......@@ -55,7 +56,7 @@ class BaseDatastore:
self.unlock_data()
if self.nacm:
nrpc = NacmRpc(self.nacm, None, rpc.username)
nrpc = NacmRpc(self.nacm, self, None, rpc.username)
if nrpc.check_data_node(n, Permission.NACM_ACCESS_READ) == Action.DENY:
return None
else:
......@@ -65,19 +66,16 @@ class BaseDatastore:
return n
def lock_data(self, username: str = None):
res = self._data_lock.acquire(blocking=False)
if res:
ret = self._data_lock.acquire(blocking=False)
if ret:
self._lock_username = username or "(unknown)"
debug("Acquired data lock for user {}".format(username))
info("Acquired data lock for user {}".format(username))
else:
debug("Failed to acquire lock for user {}, already locked by {}".format(username, self._lock_username))
info("Failed to acquire lock for user {}, already locked by {}".format(username, self._lock_username))
return res
return ret
def unlock_data(self):
self._data_lock.release()
debug("Released data lock for user {}".format(self._lock_username))
info("Released data lock for user {}".format(self._lock_username))
self._lock_username = None
......@@ -91,20 +89,20 @@ class JsonDatastore(BaseDatastore):
with open(filename, "w") as jfd:
json.dump(self.data, jfd)
if __name__ == "__main__":
def test():
colorlog.basicConfig(format="%(asctime)s %(log_color)s%(levelname)-8s%(reset)s %(message)s", level=logging.INFO,
stream=sys.stdout)
nacm = NacmConfig()
nacm.load_json("example-data.json")
data = JsonDatastore("../data", "../data/yang-library-data.json")
data.load_json("example-data.json")
data.register_nacm(nacm)
data = JsonDatastore("./data", "./data/yang-library-data.json")
data.load_json("jetconf/example-data.json")
rpc = Rpc()
rpc.username = "dominik"
rpc.path = "/ietf-netconf-acm:nacm/groups"
rpc.path = "/dns-server:dns-server/zones/zone[domain='example.com']/query-module"
n = data.get_node_rpc(rpc)
print(n.value)
from .nacm import NacmConfig, NacmRpc, Permission, Action
......@@ -5,15 +5,12 @@ from threading import Lock
import colorlog
import sys
from enum import Enum, unique
from enum import Enum
from colorlog import error, warning as warn, info, debug
from typing import List, Any, Dict, TypeVar, Tuple, Set
import copy
import yangson.instance
from yangson.instance import Instance, NonexistentInstance, ArrayValue, ObjectValue
from yangson.schema import NonexistentSchemaNode
from yangson import DataModel
import data
JsonNodeT = Dict[str, Any]
......@@ -73,7 +70,7 @@ class NacmRuleList:
class NacmConfig:
def __init__(self, nacm_ds: "BaseDatastore"):
self.nacm_ds = nacm_ds # type: BaseDatastore
self.nacm_ds = nacm_ds
self.enabled = False
self.default_read = Action.PERMIT
self.default_write = Action.PERMIT
......@@ -212,7 +209,7 @@ class NacmRpc:
continue
try:
selected = data.get_node_path(rule.type_data.path)
selected = self.data.get_node_path(rule.type_data.path)
if selected.value == n.value:
# Success!
# the path selects the node
......@@ -278,17 +275,17 @@ class NacmRpc:
return self._check_data_read_recursion(node)
if __name__ == "__main__":
def test():
colorlog.basicConfig(format="%(asctime)s %(log_color)s%(levelname)-8s%(reset)s %(message)s", level=logging.INFO,
stream=sys.stdout)
nacm_data = data.JsonDatastore("../data", "../data/yang-library-data.json")
nacm_data.load_json("example-data-nacm.json")
nacm_data = JsonDatastore("./data", "./data/yang-library-data.json")
nacm_data.load_json("jetconf/example-data-nacm.json")
nacm = NacmConfig(nacm_data)
data = data.JsonDatastore("../data", "../data/yang-library-data.json")
data.load_json("example-data.json")
data = JsonDatastore("./data", "./data/yang-library-data.json")
data.load_json("jetconf/example-data.json")
data.register_nacm(nacm)
rpc = NacmRpc(nacm, data, None, "dominik")
......@@ -332,3 +329,5 @@ if __name__ == "__main__":
info("OK")
else:
warn("FAILED")
from .data import JsonDatastore
......@@ -11,7 +11,8 @@ from colorlog import error, warning as warn, info, debug
from typing import List, Tuple, Dict, Any
import yaml
import copy
from jetconf import nacm
from .nacm import NacmConfig
from .data import JsonDatastore
from h2.connection import H2Connection
from h2.events import DataReceived, RequestReceived, RemoteSettingsChanged
......@@ -48,7 +49,6 @@ class H2Protocol(asyncio.Protocol):
self.conn.initiate_connection()
self.transport.write(self.conn.data_to_send())
self.client_cert = self.transport.get_extra_info('peercert')
# print("cert = {}".format(self.client_cert))
def data_received(self, data: bytes):
events = self.conn.receive_data(data)
......@@ -210,12 +210,12 @@ class H2Protocol(asyncio.Protocol):
self.conn.send_data(stream_id, response, end_stream=True)
if __name__ == "__main__":
def run():
colorlog.basicConfig(format="%(asctime)s %(log_color)s%(levelname)-8s%(reset)s %(message)s", level=logging.INFO,
stream=sys.stdout)
try:
with open("config.yaml") as conf_fd:
with open("jetconf/config.yaml") as conf_fd:
conf_yaml = yaml.load(conf_fd)
CONFIG.update(conf_yaml.get("HTTP_SERVER", {}))
except FileNotFoundError:
......@@ -223,9 +223,16 @@ if __name__ == "__main__":
info("Using config:\n" + yaml.dump([CONFIG, ], default_flow_style=False))
global nacm_config
nacm_config = nacm.NacmConfig()
nacm_config.load_json("example-data.json")
global ex_datastore
nacm_data = JsonDatastore("./data", "./data/yang-library-data.json")
nacm_data.load_json("jetconf/example-data-nacm.json")
nacmc = NacmConfig(nacm_data)
ex_datastore = JsonDatastore("./data", "./data/yang-library-data.json")
ex_datastore.load_json("jetconf/example-data.json")
ex_datastore.register_nacm(nacmc)
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.options |= (ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_COMPRESSION)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment