another commit

parent b3ac2858
......@@ -8,3 +8,6 @@ HTTP_SERVER:
SERVER_SSL_CERT: "server.crt"
SERVER_SSL_PRIVKEY: "server.key"
CA_CERT: "ca.pem"
NACM:
ALLOWED_USERS: ["lojza@mail.cz"]
\ No newline at end of file
import json
import logging
from threading import Lock
import colorlog
import sys
from enum import Enum, unique
from colorlog import error, warning as warn, info, debug
from typing import List, Any, Dict, TypeVar, Tuple, Set
import copy
import yangson.instance
from yangson.instance import Instance, NonexistentInstance, ArrayValue, ObjectValue
from yangson import DataModel
from yangson.datamodel import InstanceIdentifier
from nacm import Permission, Action
from nacm import NacmConfig, NacmRpc
class Rpc:
def __init__(self):
self.username = None
self.path = None # type: str
class BaseDatastore:
def __init__(self, module_dir: str, yang_library_file: str):
self.data = None # type: Instance
self.dm = None # type: DataModel
self.nacm = None # type: NacmConfig
self._data_lock = Lock()
self._lock_username = None
with open(yang_library_file) as ylfile:
yl = ylfile.read()
self.dm = DataModel.from_yang_library(yl, module_dir)
def register_nacm(self, nacm_config: NacmConfig):
self.nacm = nacm_config
def get_data_root(self) -> Instance:
return self.data
def get_node(self, ii: InstanceIdentifier) -> Instance:
return self.data.goto(ii)
def get_node_path(self, ii_str: str) -> Instance:
ii = self.dm.parse_instance_id(ii_str)
return self.data.goto(ii)
def get_node_rpc(self, rpc: Rpc) -> Instance:
n = None
ii = self.dm.parse_instance_id(rpc.path)
self.lock_data(rpc.username)
n = self.data.goto(ii)
self.unlock_data()
if self.nacm:
nrpc = NacmRpc(self.nacm, None, rpc.username)
if nrpc.check_data_node(n, Permission.NACM_ACCESS_READ) == Action.DENY:
return None
else:
# Prun subtree data
n = nrpc.check_data_read(n)
return n
def lock_data(self, username: str = None):
res = self._data_lock.acquire(blocking=False)
if res:
self._lock_username = username or "(unknown)"
debug("Acquired data lock for user {}".format(username))
info("Acquired data lock for user {}".format(username))
else:
debug("Failed to acquire lock for user {}, already locked by {}".format(username, self._lock_username))
info("Failed to acquire lock for user {}, already locked by {}".format(username, self._lock_username))
return res
def unlock_data(self):
self._data_lock.release()
debug("Released data lock for user {}".format(self._lock_username))
info("Released data lock for user {}".format(self._lock_username))
self._lock_username = None
class JsonDatastore(BaseDatastore):
def load_json(self, filename: str):
with open(filename, "rt") as fp:
self.data = self.dm.from_raw(json.load(fp))
def save_json(self, filename: str):
with open(filename, "w") as jfd:
json.dump(self.data, jfd)
if __name__ == "__main__":
colorlog.basicConfig(format="%(asctime)s %(log_color)s%(levelname)-8s%(reset)s %(message)s", level=logging.INFO,
stream=sys.stdout)
nacm = NacmConfig()
nacm.load_json("example-data.json")
data = JsonDatastore("../data", "../data/yang-library-data.json")
data.load_json("example-data.json")
data.register_nacm(nacm)
rpc = Rpc()
rpc.username = "dominik"
rpc.path = "/ietf-netconf-acm:nacm/groups"
n = data.get_node_rpc(rpc)
print(n.value)
{
"ietf-netconf-acm:nacm": {
"enable-nacm": true,
"read-default": "permit",
"write-default": "deny",
"exec-default": "deny",
"denied-operations": 123,
"denied-data-writes": 456,
"denied-notifications": 0,
"groups": {
"group": [
{
"name": "admin",
"user-name": [
"root"
]
},
{
"name": "users",
"user-name": [
"lada",
"pavel",
"dominik",
"lojza@mail.cz"
]
}
]
},
"rule-list": [
{
"name": "admin-acl",
"group": [
"admin"
],
"rule": [
{
"name": "permit-all",
"module-name": "*",
"access-operations": "*",
"comment": "The 'admin' group has unlimited access.",
"action": "permit"
}
]
},
{
"name": "users-acl",
"group": [
"users"
],
"rule": [
{
"name": "no-writes-on-example.com",
"path": "/dns-server:dns-server/zones/zone[domain='example.com']",
"access-operations": "create update delete",
"comment": "Users cannot write example.com.",
"action": "deny"
},
{
"name": "permit-zone-access",
"path": "/dns-server:dns-server/zones/zone",
"access-operations": "*",
"comment": "Users can write other zones.",
"action": "permit"
},
{
"name": "povol pristup ke groups",
"path": "/ietf-netconf-acm:nacm/groups",
"access-operations": "read",
"comment": "komentar",
"action": "permit"
},
{
"name": "zakaz pristup k admin group",
"path": "/ietf-netconf-acm:nacm/groups/group[name='admin']",
"access-operations": "*",
"comment": "komentar",
"action": "deny"
},
{
"name": "permit-zone-reload",
"module-name": "dns-server",
"rpc-name": "zone-reload",
"comment": "Users can reload zones",
"action": "permit"
},
{
"name": "permit-zone-sign",
"module-name": "dnssec-signing",
"rpc-name": "zone-sign",
"comment": "Users can sign zones",
"action": "permit"
}
]
}
]
}
}
......@@ -334,107 +334,5 @@
}
}
]
},
"ietf-netconf-acm:nacm": {
"enable-nacm": true,
"read-default": "permit",
"write-default": "deny",
"exec-default": "deny",
"denied-operations": 123,
"denied-data-writes": 456,
"denied-notifications": 0,
"groups": {
"group": [
{
"name": "admin",
"user-name": [
"root"
]
},
{
"name": "users",
"user-name": [
"lada",
"pavel",
"dominik",
"lojza@mail.cz"
]
}
]
},
"rule-list": [
{
"name": "admin-acl",
"group": [
"admin"
],
"rule": [
{
"name": "permit-all",
"module-name": "*",
"access-operations": "*",
"comment": "The 'admin' group has unlimited access.",
"action": "permit"
}
]
},
{
"name": "users-acl",
"group": [
"users"
],
"rule": [
{
"name": "no-writes-on-example.com",
"path": "/dns-server:dns-server/zones/zone[domain='example.com']/statistics",
"access-operations": "create update delete",
"comment": "Users cannot write example.com.",
"action": "deny"
},
{
"name": "permit-zone-access",
"path": "/dns-server:dns-server/zones/zone",
"access-operations": "*",
"comment": "Users can write other zones.",
"action": "permit"
},
{
"name": "povol pristup ke groups",
"path": "/ietf-netconf-acm:nacm/groups",
"access-operations": "read",
"comment": "komentar",
"action": "permit"
},
{
"name": "povol pristup ke group",
"path": "/ietf-netconf-acm:nacm/groups/group",
"access-operations": "read",
"comment": "komentar",
"action": "permit"
},
{
"name": "zakaz pristup k admin group",
"path": "/ietf-netconf-acm:nacm/groups/group[name='admin']",
"access-operations": "read",
"comment": "komentar",
"action": "deny"
},
{
"name": "permit-zone-reload",
"module-name": "dns-server",
"rpc-name": "zone-reload",
"comment": "Users can reload zones",
"action": "permit"
},
{
"name": "permit-zone-sign",
"module-name": "dnssec-signing",
"rpc-name": "zone-sign",
"comment": "Users can sign zones",
"action": "permit"
}
]
}
]
}
}
This diff is collapsed.
......@@ -12,7 +12,6 @@ from typing import List, Tuple, Dict, Any
import yaml
import copy
from jetconf import nacm
from jetconf import yang_json_path
from h2.connection import H2Connection
from h2.events import DataReceived, RequestReceived, RemoteSettingsChanged
......
from colorlog import error, warning as warn, info, debug
from typing import List, Any, Dict, TypeVar, Tuple, Callable
import urllib.parse
import re
class PathSegment:
def __init__(self, segment: str, selector: Tuple[str] = None, ns: str = None):
self.val = segment
self.select = selector
self.ns = ns
def get_val(self, fully_qualified=False) -> str:
ns = (self.ns + ":") if (fully_qualified and self.ns) else ""
return ns + self.val
def __repr__(self):
ns = (self.ns + ":") if self.ns else ""
sel = "[{}='{}']".format(self.select[0], self.select[1]) if self.select else ""
return ns + self.val + sel
def __eq__(self, other):
return (self.val == other.val) and (self.select == other.select) and (self.ns == other.ns)
def __ne__(self, other):
return not self.__eq__(other)
class BasePath:
def __init__(self, path: str, segment_parser: Callable[[str], PathSegment] = None):
self.path_str = path
self.path_segments = [] # type: List[PathSegment]
self._is_absolute = False
if self.path_str[0] == "/":
self._is_absolute = True
self.path_str = self.path_str[1:]
_segments = filter(lambda x: len(x) > 0, self.path_str.split("/"))
if segment_parser is not None:
self.path_segments = list(map(segment_parser, _segments))
else:
self.path_segments = list(map(lambda x: PathSegment(x), _segments))
def is_absolute(self) -> bool:
return self._is_absolute
def path_contains(self, subpath) -> bool:
if subpath[0] == "/":
subpath = subpath[1:]
return (self.path_str == subpath) or (
(len(self.path_str) > len(subpath)) and self.path_str.startswith(subpath) and (
self.path_str[len(subpath)] == "/"))
else:
return self.path_str.find(subpath) != -1
def path_equals(self, path_to_compare: str) -> bool:
if (path_to_compare[0] == "/") and self._is_absolute:
return self.path_str.strip("/") == path_to_compare.strip("/")
elif (path_to_compare[0] != "/") and not self._is_absolute:
return self.path_str.rstrip("/") == path_to_compare.rstrip("/")
else:
return False
class URLPath(BasePath):
def __init__(self, url_path: str):
self.query_string = None
self.query_table = None
_last_ns = None
if url_path.find("?") != -1:
self.path_str, self.query_string = url_path.split("?", 1)
else:
self.path_str = url_path
path_str_unquoted = urllib.parse.unquote(self.path_str)
def parse_segment(s: str) -> PathSegment:
nonlocal _last_ns
sre_match = re.search("^(?:([\w-]*?):)?([\w-]*)(?:=(.+))?$", s)
if sre_match is None:
raise ValueError("Wrong formatting of path segment: {}".format(s))
else:
if sre_match.group(1) is not None:
_last_ns = sre_match.group(1)
# elif _last_ns is None and sre_match.group(2) != "":
# raise ValueError("First segment of path must be in namespace-qualified form")
return PathSegment(sre_match.group(2), ("name", sre_match.group(3)) if sre_match.group(3) else None,
_last_ns)
super().__init__(path_str_unquoted, parse_segment)
print(self.path_segments)
if self.query_string:
self.query_table = urllib.parse.parse_qs(self.query_string, keep_blank_values=True)
# Parses path in "Yang Json" format as defined in
# https://tools.ietf.org/html/draft-ietf-netmod-yang-json-07#section-6.11
class YangJsonPath(BasePath):
def __init__(self, path: str):
_last_ns = None
def parse_segment(s: str) -> PathSegment:
nonlocal _last_ns
sre_match = re.search("^(?:([\w-]*?):)?([\w-]*)(?:\[(.+)\])?$", s)
if sre_match is None:
raise ValueError("Wrong formatting of path segment: {}".format(s))
else:
if sre_match.group(1) is not None:
_last_ns = sre_match.group(1)
elif _last_ns is None and sre_match.group(2) != "":
raise ValueError(
"First segment of path must be in namespace-qualified form, see draft-ietf-netmod-yang-json-07")
return PathSegment(sre_match.group(2), tuple(
map(lambda x: x.strip("'\""), sre_match.group(3).split("="))) if sre_match.group(3) else None,
_last_ns)
super().__init__(path, parse_segment)
# print(self.segments)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment