Implemented some handlers for knot configuration

parent 30581c06
......@@ -36,7 +36,7 @@ module knot-dns {
reference
"https://www.knot-dns.cz/docs/2.0/html/";
revision 2016-01-13 {
revision 2016-06-08 {
description
"Initial revision.";
reference
......@@ -215,7 +215,7 @@ module knot-dns {
description
"Knot-specific configuration data.";
list log {
key "name";
key "target";
description
"List of log options.
......@@ -252,33 +252,20 @@ module knot-dns {
description
"Severity levels.";
}
uses dnss:entry-name;
uses dnss:description;
choice target {
mandatory "true";
leaf target {
type string;
description
"Destination of log messages.";
leaf stdout {
type empty;
description
"Standard output.";
}
leaf stderr {
type empty;
description
"Standard error.";
}
leaf syslog {
type empty;
description
"Syslog service.";
}
leaf file {
type dnss:fs-path;
description
"File name.";
}
"Destination of log messages. The value can be either a
file name, or one of the following special strings:
- stdout: log messages are sent to standard output,
- stderr: log messages are sent to standard error,
- stderr: log messages are passed to the syslog
facility.";
}
uses dnss:description;
leaf server {
type severity;
description
......
This diff is collapsed.
......@@ -23,10 +23,15 @@ CONFIG_NACM = {
"ALLOWED_USERS": "lojza@mail.cz"
}
CONFIG_KNOT = {
"SOCKET": "/tmp/knot.sock"
}
CONFIG = {
"GLOBAL": CONFIG_GLOBAL,
"HTTP_SERVER": CONFIG_HTTP,
"NACM": CONFIG_NACM
"NACM": CONFIG_NACM,
"KNOT": CONFIG_KNOT
}
NACM_ADMINS = CONFIG["NACM"]["ALLOWED_USERS"]
......
......@@ -12,4 +12,7 @@ HTTP_SERVER:
CA_CERT: "jetconf/ca.pem"
NACM:
ALLOWED_USERS: ["lojza@mail.cz"]
\ No newline at end of file
ALLOWED_USERS: ["lojza@mail.cz"]
KNOT:
SOCKET: "/tmp/knottest-1462525244-b67pmzm_/ddns/ttl/knot1/knot.sock"
......@@ -177,7 +177,10 @@ class UsrChangeJournal:
for cl in self.clists:
for change in cl.journal:
ii = ds.parse_ii(change.rpc_info.path, change.rpc_info.path_format)
ds.notify_edit(ii)
if change.change_type != ChangeType.DELETE:
ds.notify_edit(ii)
else:
ds.notify_edit(ii[0:-1])
# Clear user changelists
self.clists.clear()
......@@ -537,59 +540,7 @@ class JsonDatastore(BaseDatastore):
def test():
datamodel = DataHelpers.load_data_model("./data", "./data/yang-library-data.json")
data = JsonDatastore(datamodel)
data.load("jetconf/example-data.json")
rpc = RpcInfo()
rpc.username = "dominik"
rpc.path = "/dns-server:dns-server/zones/zone[domain='example.com']/query-module"
rpc.path_format = PathFormat.XPATH
info("Testing read of " + rpc.path)
n = data.get_node_rpc(rpc)
info("Result =")
print(n.value)
expected_value = \
[
{'name': 'test1', 'type': 'knot-dns:synth-record'},
{'name': 'test2', 'type': 'knot-dns:synth-record'}
]
if json.loads(json.dumps(n.value)) == expected_value:
info("OK")
else:
warn("FAILED")
rpc.path = "/dns-server:dns-server/zones"
rpc.path_format = PathFormat.URL
info("Testing creation of new list item (zone myzone.com) in " + rpc.path)
new_root = data.create_node_rpc(data.get_data_root(), rpc, {"zone": {"domain": "myzone.com"}})
new_node_ii = data.parse_ii("/dns-server:dns-server/zones/zone", PathFormat.URL)
new_node = new_root.goto(new_node_ii)
info("Result =")
print(json.dumps(new_node.value, indent=4))
if "myzone.com" in map(lambda x: x.get("domain"), new_node.value):
info("OK")
else:
warn("FAILED")
rpc.path = "/dns-server:dns-server/zones/zone=myzone.com"
rpc.path_format = PathFormat.URL
info("Testing creation of new leaf-list inside object " + rpc.path)
new_root2 = data.create_node_rpc(new_root, rpc, {"access-control-list": "acl-notify-pokus"})
new_node_ii = data.parse_ii("/dns-server:dns-server/zones/zone=myzone.com", PathFormat.URL)
new_node2 = new_root2.goto(new_node_ii)
info("Result =")
print(json.dumps(new_node2.value, indent=4))
if "acl-notify-pokus" in new_node2.member("access-control-list").value:
info("OK")
else:
warn("FAILED")
error("Tests moved to tests/tests_jetconf.py")
from .nacm import NacmConfig, Permission, Action
......
......@@ -2,7 +2,7 @@
"ietf-netconf-acm:nacm": {
"enable-nacm": true,
"read-default": "permit",
"write-default": "deny",
"write-default": "permit",
"exec-default": "permit",
"denied-operations": 123,
"denied-data-writes": 456,
......@@ -69,6 +69,13 @@
"users"
],
"rule": [
{
"name": "permit-server-config",
"path": "/dns-server:dns-server/server-options",
"access-operations": "*",
"comment": "Permit server config",
"action": "permit"
},
{
"name": "no-writes-on-example.com",
"path": "/dns-server:dns-server/zones/zone[domain='example.com']",
......
......@@ -40,13 +40,11 @@
},
"knot-dns:log": [
{
"name": "01",
"syslog": [null],
"target": "syslog",
"any": "warning"
},
{
"name": "02",
"file": "/tmp/knot.log",
"target": "/tmp/knot.log",
"server": "info",
"zone": "info"
}
......
......@@ -422,87 +422,7 @@ class UserNacm:
def test():
datamodel = DataHelpers.load_data_model("./data", "./data/yang-library-data.json")
nacm_data = JsonDatastore(datamodel)
nacm_data.load("jetconf/example-data-nacm.json")
nacm = NacmConfig(nacm_data)
data = JsonDatastore(datamodel)
data.load("jetconf/example-data.json")
data.register_nacm(nacm)
nacm.set_ds(data)
test_user = "dominik"
test_paths = (
(
"/dns-server:dns-server/zones/zone[domain='example.com']/query-module",
Permission.NACM_ACCESS_UPDATE,
Action.DENY
),
(
"/dns-server:dns-server/zones/zone",
Permission.NACM_ACCESS_READ,
Action.PERMIT
),
(
"/dns-server:dns-server/server-options",
Permission.NACM_ACCESS_READ,
Action.DENY
)
)
for test_path in test_paths:
info("Testing path \"{}\"".format(test_path[0]))
ii = data.parse_ii(test_path[0], PathFormat.XPATH)
datanode = data.get_node(data.get_data_root(), ii)
if datanode:
info("Node found")
debug("Node contents: {}".format(datanode.value))
test_ii = data.parse_ii(test_path[0], PathFormat.XPATH)
rule = []
action = nacm.get_user_nacm(test_user).check_data_node_path(data.get_data_root(), test_ii, test_path[1], out_matching_rule=rule)
if action == test_path[2]:
info("Action = {}, OK ({})\n".format(action.name, rule[0].name if len(rule) > 0 else "default"))
else:
info("Action = {}, FAILED ({})\n".format(action.name, rule[0].name if len(rule) > 0 else "default"))
else:
info("Node not found!")
test_ii2 = data.parse_ii("/dns-server:dns-server/zones/zone[domain='example.com']", PathFormat.XPATH)
info("Reading: " + str(test_ii2))
res = nacm.get_user_nacm(test_user).check_data_read_path(data.get_data_root(), test_ii2)
res = json.dumps(res.value, indent=4, sort_keys=True)
print("Result =")
print(res)
res_expected = """
{
"master": [
"server1"
],
"access-control-list": [
"acl-xfr-update",
"acl-notify"
],
"any-to-tcp": false,
"template": "default",
"notify": {
"recipient": [
"server0"
]
},
"domain": "example.com"
}"""
if json.loads(res) == json.loads(res_expected):
info("OK")
else:
warn("FAILED")
error("Tests moved to tests/tests_jetconf.py")
from .data import JsonDatastore, PathFormat
import json
import pytest
from jetconf.helpers import DataHelpers
from jetconf.data import JsonDatastore, RpcInfo, PathFormat
from jetconf.nacm import NacmConfig, Permission, Action
@pytest.fixture
def data_model():
return DataHelpers.load_data_model("./data", "./data/yang-library-data.json")
@pytest.fixture
def datastore_1(data_model):
ds = JsonDatastore(data_model)
ds.load("jetconf/example-data.json")
return ds
@pytest.fixture
def nacm_datastore_1(data_model):
ds = JsonDatastore(data_model)
ds.load("jetconf/example-data-nacm.json")
return ds
def test_datastore(datastore_1):
data = datastore_1
rpc = RpcInfo()
rpc.username = "dominik"
rpc.path = "/dns-server:dns-server/zones/zone[domain='example.com']/query-module"
rpc.path_format = PathFormat.XPATH
# info("Testing read of " + rpc.path)
n = data.get_node_rpc(rpc)
expected_value = \
[
{'name': 'test1', 'type': 'knot-dns:synth-record'},
{'name': 'test2', 'type': 'knot-dns:synth-record'}
]
assert json.loads(json.dumps(n.value)) == expected_value
rpc.path = "/dns-server:dns-server/zones"
rpc.path_format = PathFormat.URL
# info("Testing creation of new list item (zone myzone.com) in " + rpc.path)
new_root = data.create_node_rpc(data.get_data_root(), rpc, {"zone": {"domain": "myzone.com"}})
new_node_ii = data.parse_ii("/dns-server:dns-server/zones/zone", PathFormat.URL)
new_node = new_root.goto(new_node_ii)
assert "myzone.com" in map(lambda x: x.get("domain"), new_node.value)
rpc.path = "/dns-server:dns-server/zones/zone=myzone.com"
rpc.path_format = PathFormat.URL
# info("Testing creation of new leaf-list inside object " + rpc.path)
new_root2 = data.create_node_rpc(new_root, rpc, {"access-control-list": "acl-notify-pokus"})
new_node_ii = data.parse_ii("/dns-server:dns-server/zones/zone=myzone.com", PathFormat.URL)
new_node2 = new_root2.goto(new_node_ii)
assert "acl-notify-pokus" in new_node2.member("access-control-list").value
def test_nacm(datastore_1, nacm_datastore_1):
nacm_data = nacm_datastore_1
nacm_conf = NacmConfig(nacm_data)
data = datastore_1
data.register_nacm(nacm_conf)
nacm_conf.set_ds(data)
test_user = "dominik"
test_paths = (
(
"/dns-server:dns-server/zones/zone[domain='example.com']/query-module",
Permission.NACM_ACCESS_UPDATE,
Action.DENY
),
(
"/dns-server:dns-server/zones/zone",
Permission.NACM_ACCESS_READ,
Action.PERMIT
),
(
"/dns-server:dns-server/server-options",
Permission.NACM_ACCESS_READ,
Action.PERMIT
)
)
for test_path in test_paths:
print("Testing path \"{}\"".format(test_path[0]))
ii = data.parse_ii(test_path[0], PathFormat.XPATH)
datanode = data.get_node(data.get_data_root(), ii)
if datanode:
print("Node found")
# debug("Node contents: {}".format(datanode.value))
test_ii = data.parse_ii(test_path[0], PathFormat.XPATH)
rule = []
action = nacm_conf.get_user_nacm(test_user).check_data_node_path(data.get_data_root(), test_ii, test_path[1],
out_matching_rule=rule)
assert action == test_path[2]
"""
if action == test_path[2]:
info("Action = {}, OK ({})\n".format(action.name, rule[0].name if len(rule) > 0 else "default"))
else:
info("Action = {}, FAILED ({})\n".format(action.name, rule[0].name if len(rule) > 0 else "default"))
"""
else:
pytest.fail("Node not found!")
test_ii2 = data.parse_ii("/dns-server:dns-server/zones/zone[domain='example.com']", PathFormat.XPATH)
# info("Reading: " + str(test_ii2))
res = nacm_conf.get_user_nacm(test_user).check_data_read_path(data.get_data_root(), test_ii2)
res = json.dumps(res.value, indent=4, sort_keys=True)
res_expected = """
{
"master": [
"server1"
],
"access-control-list": [
"acl-xfr-update",
"acl-notify"
],
"any-to-tcp": false,
"template": "default",
"notify": {
"recipient": [
"server0"
]
},
"domain": "example.com"
}"""
assert json.loads(res) == json.loads(res_expected)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment