Refactors in NACM module

parent dbb6bfd6
......@@ -22,7 +22,6 @@ from .usr_conf_data_handlers import *
def main():
# Load configuration
load_config("jetconf/config.yaml")
print_config()
log_level = {
"error": logging.ERROR,
......@@ -83,6 +82,9 @@ def main():
logger.addHandler(log_handler)
logger.setLevel(log_level)
# Print configuration
print_config()
# Create pidfile
fl = os.open(CONFIG["GLOBAL"]["PIDFILE"], os.O_WRONLY + os.O_CREAT, 0o666)
try:
......
......@@ -3,8 +3,8 @@ GLOBAL:
LOGFILE: "-"
PIDFILE: "/tmp/jetconf.pid"
PERSISTENT_CHANGES: true
LOG_LEVEL: "error"
LOG_DBG_MODULES: ["usr_conf_data_handlers", "knot_api"]
LOG_LEVEL: "debug"
LOG_DBG_MODULES: ["usr_conf_data_handlers", "knot_api", "nacm"]
HTTP_SERVER:
DOC_ROOT: "jetconf/doc-root"
......@@ -15,7 +15,7 @@ HTTP_SERVER:
SERVER_SSL_CERT: "jetconf/server.crt"
SERVER_SSL_PRIVKEY: "jetconf/server.key"
CA_CERT: "jetconf/ca.pem"
DBG_DISABLE_CERTS: true
DBG_DISABLE_CERTS: false
NACM:
ALLOWED_USERS: ["lojza@mail.cz"]
......
......@@ -306,6 +306,13 @@
"comment": "Users cannot write example.com.",
"action": "deny"
},
{
"name": "no-reads-on-example.com qm",
"path": "/dns-server:dns-server/zones/zone[domain='example.com']/query-module",
"access-operations": "read",
"comment": "Users cannot read example.com. qm",
"action": "deny"
},
{
"name": "permit-zone-access",
"path": "/dns-server:dns-server/zones",
......
......@@ -78,7 +78,7 @@ def _get(prot: "H2Protocol", stream_id: int, ds: BaseDatastore, pth: str, yl_dat
n = ds.get_node_rpc(rpc1, yl_data)
ds.unlock_data()
response = json.dumps(n.raw_value(), indent=4) + "\n"
response = json.dumps(n.raw_value(), indent=4) # + "\n"
response_bytes = response.encode()
response_headers = [
......
import collections
import copy
from io import StringIO
from threading import Lock
from enum import Enum
from colorlog import error, info
from typing import List, Set
from typing import List, Set, Optional
from yangson.instance import (
InstanceNode,
......@@ -83,7 +83,7 @@ class RuleTreeNode:
self.up = up
self.children = [] # type: List[RuleTreeNode]
def get_rule(self, perm: Permission) -> NacmRule:
def get_rule(self, perm: Permission) -> Optional[NacmRule]:
n = self
while n:
if (n.rule is not None) and (perm in n.rule.access):
......@@ -92,7 +92,7 @@ class RuleTreeNode:
return None
def get_action(self, perm: Permission) -> Action:
def get_action(self, perm: Permission) -> Optional[Action]:
rule = self.get_rule(perm)
return rule.action if rule is not None else None
......@@ -136,17 +136,19 @@ class DataRuleTree:
nl = node_match.children
def _print_rule_tree(self, io_str: StringIO, rule_node_list: List[RuleTreeNode], depth: int, vbars: List[int]):
ind_str = (" " * depth) + "+--"
indent_str_list = list((" " * depth) + "+--")
for vb in vbars:
isl = list(ind_str)
isl[vb * 3] = "|"
ind_str = "".join(isl)
indent_str_list[vb * 3] = "|"
indent_str = "".join(indent_str_list)
for rule_node in rule_node_list:
action = rule_node.get_action(Permission.NACM_ACCESS_READ)
action_str = str(action.name) if action is not None else ""
io_str.write(ind_str + " " + str(rule_node.isel) + " " + action_str + "\n")
rule = rule_node.rule
if rule is not None:
action_str = rule.action.name
access = sorted(list(map(lambda n: n.name.split("_")[-1].lower(), rule.access)))
io_str.write(indent_str + " " + str(rule_node.isel) + " " + action_str + str(access) + "\n")
else:
io_str.write(indent_str + " " + str(rule_node.isel) + "\n")
if rule_node is rule_node_list[-1]:
self._print_rule_tree(io_str, rule_node.children, depth + 1, vbars)
else:
......@@ -301,57 +303,54 @@ class UserNacm:
self.rule_tree = DataRuleTree(self.rule_lists)
debug_nacm("Rule tree for user \"{}\":\n{}".format(username, self.rule_tree.print_rule_tree()))
def check_data_node_path(self, root: InstanceNode, ii: InstanceRoute, access: Permission, out_matching_rule: List[NacmRule]=None) -> Action:
def check_data_node_path(self, root: InstanceNode, ii: InstanceRoute, access: Permission) -> Action:
if not self.nacm_enabled:
return Action.PERMIT
retval = None
data_node = root # type: InstanceNode
data_node_value = root.value # type: InstanceNode
nl = self.rule_tree.root
nl = self.rule_tree.root # type: List[RuleTreeNode]
node_match = None # type: RuleTreeNode
for isel in ii:
node_match = None
# Find child by instance selector
node_match_step = None # type: RuleTreeNode
for rule_node in nl:
if (type(rule_node.isel) == type(isel)) and (rule_node.isel == isel):
node_match = rule_node
node_match_step = rule_node
break
if isinstance(isel, EntryIndex) and isinstance(rule_node.isel, EntryKeys):
if isel.peek_step(data_node.value) is rule_node.isel.peek_step(data_node.value):
node_match = rule_node
break
data_node = isel.goto_step(data_node)
if isinstance(isel, EntryIndex) and isinstance(rule_node.isel, EntryKeys) and \
(isel.peek_step(data_node_value) is rule_node.isel.peek_step(data_node_value)):
node_match_step = rule_node
break
if node_match:
matching_rule = node_match.get_rule(access)
retval = matching_rule.action if matching_rule else None
if (matching_rule is not None) and (out_matching_rule is not None):
out_matching_rule.insert(0, matching_rule)
nl = node_match.children
if node_match_step:
nl = node_match_step.children
node_match = node_match_step
data_node_value = isel.peek_step(data_node_value)
else:
break
if retval is None:
# No matching rule
if access in {Permission.NACM_ACCESS_READ}:
retval = self.default_read
else:
retval = self.default_write
if node_match is not None:
# Matching rule found
retval = node_match.get_action(access)
else:
# No matching rule, return default action
retval = self.default_read if access == Permission.NACM_ACCESS_READ else self.default_write
debug_nacm("check_data_node_path, result = {}".format(retval.name))
# debug_nacm("check_data_node_path, result = {}".format(retval.name))
return retval
def _check_data_read_path(self, node: InstanceNode, root: InstanceNode, ii: InstanceRoute) -> InstanceNode:
if isinstance(node.value, ObjectValue):
# print("obj: {}".format(node.value))
for child_key in sorted(node.value.keys()):
nsel = MemberName(child_key)
nsel = MemberName("")
mii = ii + [nsel]
for child_key in node.value.keys():
nsel.name = child_key
m = nsel.goto_step(node)
mii = copy.copy(ii)
mii.append(nsel)
debug_nacm("checking mii {}".format(mii))
# debug_nacm("checking mii {}".format(mii))
if self.check_data_node_path(root, mii, Permission.NACM_ACCESS_READ) == Action.DENY:
# debug_nacm("Pruning node {} {}".format(id(node.value[child_key]), node.value[child_key]))
debug_nacm("Pruning node {}".format(mii))
......@@ -360,15 +359,15 @@ class UserNacm:
node = self._check_data_read_path(m, root, mii).up()
elif isinstance(node.value, ArrayValue):
# print("array: {}".format(node.value))
nsel = EntryIndex(0)
eii = ii + [nsel]
i = 0
arr_len = len(node.value)
while i < arr_len:
nsel = EntryIndex(i)
nsel.index = i
e = nsel.goto_step(node)
eii = copy.copy(ii)
eii.append(nsel)
debug_nacm("checking eii {}".format(eii))
# debug_nacm("checking eii {}".format(eii))
if self.check_data_node_path(root, eii, Permission.NACM_ACCESS_READ) == Action.DENY:
# debug_nacm("Pruning node {} {}".format(id(node.value[i]), node.value[i]))
debug_nacm("Pruning node {}".format(eii))
......
......@@ -2,7 +2,7 @@ import asyncio
import ssl
from io import BytesIO
from collections import OrderedDict
from colorlog import error, warning as warn, info, debug
from colorlog import error, warning as warn, info
from typing import List, Tuple, Dict, Any, Callable
from h2.connection import H2Connection
......@@ -126,7 +126,7 @@ class H2Protocol(asyncio.Protocol):
if dts:
self.transport.write(dts)
def handle_put_post(self, headers: OrderedDict, stream_id: int, data: bytes):
def handle_put_post(self, headers: OrderedDict, stream_id: int, data: str):
# Handle PUT, POST
url_path = headers[":path"].split("?")[0]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment