test_jetconf.py 4.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
import json
import pytest
from jetconf.helpers import DataHelpers
from jetconf.data import JsonDatastore, RpcInfo, PathFormat
from jetconf.nacm import NacmConfig, Permission, Action


@pytest.fixture
def data_model():
    return DataHelpers.load_data_model("./data", "./data/yang-library-data.json")


@pytest.fixture
def datastore_1(data_model):
    ds = JsonDatastore(data_model)
    ds.load("jetconf/example-data.json")
    return ds


@pytest.fixture
def nacm_datastore_1(data_model):
    ds = JsonDatastore(data_model)
    ds.load("jetconf/example-data-nacm.json")
    return ds


def test_datastore(datastore_1):
    data = datastore_1
    rpc = RpcInfo()
    rpc.username = "dominik"
    rpc.path = "/dns-server:dns-server/zones/zone[domain='example.com']/query-module"
    rpc.path_format = PathFormat.XPATH

    # info("Testing read of " + rpc.path)
    n = data.get_node_rpc(rpc)

    expected_value = \
        [
            {'name': 'test1', 'type': 'knot-dns:synth-record'},
            {'name': 'test2', 'type': 'knot-dns:synth-record'}
        ]

    assert json.loads(json.dumps(n.value)) == expected_value

    rpc.path = "/dns-server:dns-server/zones"
    rpc.path_format = PathFormat.URL

    # info("Testing creation of new list item (zone myzone.com) in " + rpc.path)
    new_root = data.create_node_rpc(data.get_data_root(), rpc, {"zone": {"domain": "myzone.com"}})
    new_node_ii = data.parse_ii("/dns-server:dns-server/zones/zone", PathFormat.URL)
    new_node = new_root.goto(new_node_ii)
    assert "myzone.com" in map(lambda x: x.get("domain"), new_node.value)

    rpc.path = "/dns-server:dns-server/zones/zone=myzone.com"
    rpc.path_format = PathFormat.URL

    # info("Testing creation of new leaf-list inside object " + rpc.path)
    new_root2 = data.create_node_rpc(new_root, rpc, {"access-control-list": "acl-notify-pokus"})
    new_node_ii = data.parse_ii("/dns-server:dns-server/zones/zone=myzone.com", PathFormat.URL)
    new_node2 = new_root2.goto(new_node_ii)
    assert "acl-notify-pokus" in new_node2.member("access-control-list").value


def test_nacm(datastore_1, nacm_datastore_1):
    nacm_data = nacm_datastore_1
    nacm_conf = NacmConfig(nacm_data)

    data = datastore_1
    data.register_nacm(nacm_conf)
    nacm_conf.set_ds(data)

    test_user = "dominik"

    test_paths = (
        (
            "/dns-server:dns-server/zones/zone[domain='example.com']/query-module",
            Permission.NACM_ACCESS_UPDATE,
            Action.DENY
        ),
        (
            "/dns-server:dns-server/zones/zone",
            Permission.NACM_ACCESS_READ,
            Action.PERMIT
        ),
        (
            "/dns-server:dns-server/server-options",
            Permission.NACM_ACCESS_READ,
            Action.PERMIT
        )
    )

    for test_path in test_paths:
        print("Testing path \"{}\"".format(test_path[0]))

        ii = data.parse_ii(test_path[0], PathFormat.XPATH)
        datanode = data.get_node(data.get_data_root(), ii)
        if datanode:
            print("Node found")
            # debug("Node contents: {}".format(datanode.value))
            test_ii = data.parse_ii(test_path[0], PathFormat.XPATH)
            rule = []
Pavel Spirek's avatar
Pavel Spirek committed
102 103
            action = nacm_conf.get_user_rules(test_user).check_data_node_permission(data.get_data_root(), test_ii, test_path[1],
                                                                                    out_matching_rule=rule)
104 105 106 107 108 109 110 111 112 113 114 115 116
            assert action == test_path[2]
            """
            if action == test_path[2]:
                info("Action = {}, OK ({})\n".format(action.name, rule[0].name if len(rule) > 0 else "default"))
            else:
                info("Action = {}, FAILED ({})\n".format(action.name, rule[0].name if len(rule) > 0 else "default"))
            """
        else:
            pytest.fail("Node not found!")

    test_ii2 = data.parse_ii("/dns-server:dns-server/zones/zone[domain='example.com']", PathFormat.XPATH)

    # info("Reading: " + str(test_ii2))
Pavel Spirek's avatar
Pavel Spirek committed
117
    res = nacm_conf.get_user_rules(test_user).prun_data_tree(data.get_data_root(), test_ii2)
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
    res = json.dumps(res.value, indent=4, sort_keys=True)

    res_expected = """
        {
        "master": [
            "server1"
        ],
        "access-control-list": [
            "acl-xfr-update",
            "acl-notify"
        ],
        "any-to-tcp": false,
        "template": "default",
        "notify": {
            "recipient": [
                "server0"
            ]
        },
        "domain": "example.com"
        }"""

    assert json.loads(res) == json.loads(res_expected)