data.py 17.2 KB
Newer Older
Pavel Spirek's avatar
Pavel Spirek committed
1
import json
2
from threading import Lock
Pavel Spirek's avatar
Pavel Spirek committed
3
from enum import Enum
Pavel Spirek's avatar
Pavel Spirek committed
4 5
from colorlog import error, warning as warn, info, debug
from typing import List, Any, Dict, TypeVar, Tuple, Set
6
from pydispatch import dispatcher
7

8
from yangson.schema import SchemaRoute, SchemaNode, NonexistentSchemaNode, ListNode, LeafListNode
9
from yangson.context import Context
10 11
from yangson.datamodel import InstanceIdentifier, DataModel
from yangson.instance import \
12
    InstanceNode, \
13
    NonexistentInstance, \
Pavel Spirek's avatar
Pavel Spirek committed
14
    InstanceTypeError, \
15 16 17 18 19 20
    ArrayValue, \
    ObjectValue, \
    MemberName, \
    EntryKeys, \
    EntryIndex

Pavel Spirek's avatar
Pavel Spirek committed
21
from .helpers import DataHelpers
Pavel Spirek's avatar
Pavel Spirek committed
22 23


Pavel Spirek's avatar
Pavel Spirek committed
24 25 26 27 28
class PathFormat(Enum):
    URL = 0
    XPATH = 1


29 30 31 32 33 34
class ChangeType(Enum):
    CREATE = 0,
    REPLACE = 1,
    DELETE = 2


Pavel Spirek's avatar
Pavel Spirek committed
35
class NacmForbiddenError(Exception):
36
    def __init__(self, msg="Access to data node rejected by NACM", rule=None):
Pavel Spirek's avatar
Pavel Spirek committed
37
        self.msg = msg
38
        self.rulename = rule
Pavel Spirek's avatar
Pavel Spirek committed
39

40 41 42
    def __str__(self):
        return self.msg

Pavel Spirek's avatar
Pavel Spirek committed
43 44 45 46 47

class DataLockError(Exception):
    def __init__(self, msg=""):
        self.msg = msg

Pavel Spirek's avatar
Pavel Spirek committed
48 49
    def __str__(self):
        return self.msg
Pavel Spirek's avatar
Pavel Spirek committed
50 51


52
class NoHandlerError(Exception):
53 54 55 56 57 58 59
    def __init__(self, msg=""):
        self.msg = msg

    def __str__(self):
        return self.msg


60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
class InstanceAlreadyPresent(Exception):
    def __init__(self, msg=""):
        self.msg = msg

    def __str__(self):
        return self.msg


class NoHandlerForOpError(NoHandlerError):
    pass


class NoHandlerForStateDataError(NoHandlerError):
    pass


76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
class BaseDataListener:
    def __init__(self, ds: "BaseDatastore"):
        self._ds = ds
        self.schema_paths = []

    def add_schema_node(self, sch_pth: str):
        sn = self._ds.get_schema_node(sch_pth)
        self.schema_paths.append(sch_pth)
        dispatcher.connect(self.process, str(id(sn)))

    def process(self, sn: SchemaNode, ii: InstanceIdentifier):
        raise NotImplementedError("Not implemented in base class")

    def __str__(self):
        return self.__class__.__name__ + ": listening at " + str(self.schema_paths)


93
class RpcInfo:
Pavel Spirek's avatar
Pavel Spirek committed
94
    def __init__(self):
95 96
        self.username = None    # type: str
        self.path = None        # type: str
97
        self.qs = None          # type: Dict[str, List[str]]
98 99 100 101
        self.path_format = PathFormat.URL   # type: PathFormat
        self.skip_nacm_check = False        # type: bool
        self.op_name = None                 # type: str
        self.op_input_args = None           # type: ObjectValue
Pavel Spirek's avatar
Pavel Spirek committed
102 103


104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
class DataChange:
    def __init__(self, change_type: ChangeType, rpc_info: RpcInfo, data: Any):
        self.change_type = change_type
        self.rpc_info = rpc_info
        self.data = data


class ChangeList:
    def __init__(self, changelist_name: str):
        self.changelist_name = changelist_name
        self.journal = []   # type: List[DataChange]

    def add(self, change: DataChange):
        self.journal.append(change)


Pavel Spirek's avatar
Pavel Spirek committed
120
class BaseDatastore:
121
    def __init__(self, dm: DataModel, name: str=""):
Pavel Spirek's avatar
Pavel Spirek committed
122
        self.name = name
Pavel Spirek's avatar
Pavel Spirek committed
123
        self.nacm = None    # type: NacmConfig
124
        self._data = None   # type: InstanceNode
125
        self._dm = dm       # type: DataModel
Pavel Spirek's avatar
Pavel Spirek committed
126
        self._data_lock = Lock()
Pavel Spirek's avatar
Pavel Spirek committed
127
        self._lock_username = None  # type: str
128
        self._usr_changelist = {}   # type: Dict[str, List[ChangeList]]
Pavel Spirek's avatar
Pavel Spirek committed
129

Pavel Spirek's avatar
Pavel Spirek committed
130
    # Register NACM module to datastore
Pavel Spirek's avatar
Pavel Spirek committed
131
    def register_nacm(self, nacm_config: "NacmConfig"):
Pavel Spirek's avatar
Pavel Spirek committed
132 133
        self.nacm = nacm_config

Pavel Spirek's avatar
Pavel Spirek committed
134
    # Returns the root node of data tree
135
    def get_data_root(self) -> InstanceNode:
Pavel Spirek's avatar
Pavel Spirek committed
136
        return self._data
Pavel Spirek's avatar
Pavel Spirek committed
137

138 139
    # Get schema node with particular schema address
    def get_schema_node(self, sch_pth: str) -> SchemaNode:
140
        sn = self._dm.get_schema_node(sch_pth)
141 142 143 144 145 146
        if sn is None:
            raise NonexistentSchemaNode(sch_pth)
        return sn

    # Get schema node for particular data node
    def get_schema_node_ii(self, ii: InstanceIdentifier) -> SchemaNode:
147
        sn = Context.schema.get_data_descendant(ii)
148 149
        return sn

150 151 152 153 154 155 156 157 158
    # Parse Instance Identifier from string
    def parse_ii(self, path: str, path_format: PathFormat) -> InstanceIdentifier:
        if path_format == PathFormat.URL:
            ii = self._dm.parse_resource_id(path)
        else:
            ii = self._dm.parse_instance_id(path)

        return ii

159 160 161 162 163 164 165
    # Notify data observers about change in datastore
    def notify_edit(self, ii: InstanceIdentifier):
        sn = self.get_schema_node_ii(ii)
        while sn is not None:
            dispatcher.send(str(id(sn)), **{'sn': sn, 'ii': ii})
            sn = sn.parent

Pavel Spirek's avatar
Pavel Spirek committed
166
    # Just get the node, do not evaluate NACM (for testing purposes)
167
    def get_node(self, ii: InstanceIdentifier) -> InstanceNode:
Pavel Spirek's avatar
Pavel Spirek committed
168
        n = self._data.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
169
        return n
Pavel Spirek's avatar
Pavel Spirek committed
170

Pavel Spirek's avatar
Pavel Spirek committed
171
    # Just get the node, do not evaluate NACM (for testing purposes)
172
    def get_node_path(self, path: str, path_format: PathFormat) -> InstanceNode:
173
        ii = self.parse_ii(path, path_format)
Pavel Spirek's avatar
Pavel Spirek committed
174
        n = self._data.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
175 176
        return n

177
    # Get data node, evaluate NACM if required
178
    def get_node_rpc(self, rpc: RpcInfo) -> InstanceNode:
179
        ii = self.parse_ii(rpc.path, rpc.path_format)
180 181 182 183 184 185 186 187 188 189 190 191 192 193
        root = self._data

        sn = self.get_schema_node_ii(ii)
        for state_node_pth in sn.state_roots():
            sn_pth_str = "".join(["/" + pth_seg for pth_seg in state_node_pth])
            # print(sn_pth_str)
            sdh = STATE_DATA_HANDLES.get_handler(sn_pth_str)
            if sdh is not None:
                root = sdh.update_node(ii, root).top()
                self._data = root
            else:
                raise NoHandlerForStateDataError()

        self._data = root
Pavel Spirek's avatar
Pavel Spirek committed
194
        n = self._data.goto(ii)
195 196

        if self.nacm:
197
            nrpc = self.nacm.get_user_nacm(rpc.username)
198
            if nrpc.check_data_node_path(ii, Permission.NACM_ACCESS_READ) == Action.DENY:
Pavel Spirek's avatar
Pavel Spirek committed
199
                raise NacmForbiddenError()
200 201
            else:
                # Prun subtree data
202
                n = nrpc.check_data_read_path(ii)
203 204 205

        return n

206 207
    # Create new data node (Restconf draft compliant version)
    def create_node_rpc(self, root: InstanceNode, rpc: RpcInfo, value: Any, insert=None, point=None) -> InstanceNode:
Pavel Spirek's avatar
Pavel Spirek committed
208
        ii = self.parse_ii(rpc.path, rpc.path_format)
209
        n = root.goto(ii)
210
        new_n = n
Pavel Spirek's avatar
Pavel Spirek committed
211

212
        if self.nacm:
213
            nrpc = self.nacm.get_user_nacm(rpc.username)
214 215 216
            if nrpc.check_data_node_path(ii, Permission.NACM_ACCESS_CREATE) == Action.DENY:
                raise NacmForbiddenError()

217
        # Get target member name
218 219 220 221 222
        input_member_name = tuple(value.keys())
        if len(input_member_name) != 1:
            raise ValueError("Received json object must contain exactly one member")
        else:
            input_member_name = input_member_name[0]
Pavel Spirek's avatar
Pavel Spirek committed
223

224
        input_member_value = value[input_member_name]
Pavel Spirek's avatar
Pavel Spirek committed
225

226
        # Check if target member already exists
227 228 229
        try:
            existing_member = n.member(input_member_name)
        except NonexistentInstance:
230
            existing_member = None
231

232 233
        # Get target schema node
        member_sn = self.get_schema_node_ii(ii + [MemberName(input_member_name)])
234

235
        if isinstance(member_sn, ListNode):
Pavel Spirek's avatar
Pavel Spirek committed
236
            # Append received node to list
237

238 239 240 241
            # Create list if necessary
            if existing_member is None:
                existing_member = n.new_member(input_member_name, ArrayValue([]))

242
            # Convert input data from List/Dict to ArrayValue/ObjectValue
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
            new_value_data = member_sn.from_raw([input_member_value])[0]

            list_node_key = member_sn.keys[0][0]
            if new_value_data[list_node_key] in map(lambda x: x[list_node_key], existing_member.value):
                raise InstanceAlreadyPresent("Duplicate key")

            if insert == "first":
                new_n = existing_member.update(ArrayValue([new_value_data] + existing_member.value))
            elif (insert == "last") or insert is None:
                new_n = existing_member.update(ArrayValue(existing_member.value + [new_value_data]))
            elif insert == "before":
                entry_sel = EntryKeys({list_node_key: point})
                list_entry = entry_sel.goto_step(existing_member)
                new_n = list_entry.insert_before(new_value_data).up()
            elif insert == "after":
                entry_sel = EntryKeys({list_node_key: point})
                list_entry = entry_sel.goto_step(existing_member)
                new_n = list_entry.insert_after(new_value_data).up()
        elif isinstance(member_sn, LeafListNode):
            # Append received node to leaf list

            # Create leaf list if necessary
            if existing_member is None:
                existing_member = n.new_member(input_member_name, ArrayValue([]))
267

268 269 270 271 272 273 274
            # Convert input data from List/Dict to ArrayValue/ObjectValue
            new_value_data = member_sn.from_raw([input_member_value])[0]

            if insert == "first":
                new_n = existing_member.update(ArrayValue([new_value_data] + existing_member.value))
            elif (insert == "last") or insert is None:
                new_n = existing_member.update(ArrayValue(existing_member.value + [new_value_data]))
Pavel Spirek's avatar
Pavel Spirek committed
275
        else:
276 277 278 279 280 281 282 283 284 285 286
            if existing_member is None:
                # Create new data node

                # Convert input data from List/Dict to ArrayValue/ObjectValue
                new_value_data = member_sn.from_raw(input_member_value)

                # Create new node (object member)
                new_n = n.new_member(input_member_name, new_value_data)
            else:
                # Data node already exists
                raise InstanceAlreadyPresent("Member \"{}\" already present in \"{}\"".format(input_member_name, ii))
Pavel Spirek's avatar
Pavel Spirek committed
287

288
        self.notify_edit(ii)
289
        return new_n.top()
290

291
    # Update already existing data node
292
    def update_node_rpc(self, root: InstanceNode, rpc: RpcInfo, value: Any) -> InstanceNode:
293
        ii = self.parse_ii(rpc.path, rpc.path_format)
294
        n = root.goto(ii)
295

296
        if self.nacm:
297
            nrpc = self.nacm.get_user_nacm(rpc.username)
298 299
            if nrpc.check_data_node_path(ii, Permission.NACM_ACCESS_UPDATE) == Action.DENY:
                raise NacmForbiddenError()
Pavel Spirek's avatar
Pavel Spirek committed
300

301 302
        sn = self.get_schema_node_ii(ii)
        new_value = sn.from_raw(value)
303
        new_n = n.update(new_value)
304

305
        self.notify_edit(ii)
306
        return new_n.top()
307

308
    # Delete data node
309
    def delete_node_rpc(self, root: InstanceNode, rpc: RpcInfo) -> InstanceNode:
Pavel Spirek's avatar
Pavel Spirek committed
310
        ii = self.parse_ii(rpc.path, rpc.path_format)
311
        n = root.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
312
        n_parent = n.up()
313
        new_n = n_parent
Pavel Spirek's avatar
Pavel Spirek committed
314 315
        last_isel = ii[-1]

316
        if self.nacm:
317
            nrpc = self.nacm.get_user_nacm(rpc.username)
318 319
            if nrpc.check_data_node_path(ii, Permission.NACM_ACCESS_DELETE) == Action.DENY:
                raise NacmForbiddenError()
Pavel Spirek's avatar
Pavel Spirek committed
320

321 322 323 324 325 326 327 328 329 330
        if isinstance(n_parent.value, ArrayValue):
            if isinstance(last_isel, EntryIndex):
                new_n = n_parent.remove_entry(last_isel.index)
            elif isinstance(last_isel, EntryKeys):
                new_n = n_parent.remove_entry(n.crumb.pointer_fragment())
        elif isinstance(n_parent.value, ObjectValue):
            if isinstance(last_isel, MemberName):
                new_n = n_parent.remove_member(last_isel.name)
        else:
            raise InstanceTypeError(n, "Invalid target node type")
Pavel Spirek's avatar
Pavel Spirek committed
331

332
        return new_n.top()
Pavel Spirek's avatar
Pavel Spirek committed
333

334
    # Invoke an operation
335
    def invoke_op_rpc(self, rpc: RpcInfo) -> ObjectValue:
336 337 338 339 340
        if self.nacm and (not rpc.skip_nacm_check):
            nrpc = self.nacm.get_user_nacm(rpc.username)
            if nrpc.check_rpc_name(rpc.op_name) == Action.DENY:
                raise NacmForbiddenError("Op \"{}\" invocation denied for user \"{}\"".format(rpc.op_name, rpc.username))

341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
        ret_data = {}

        if rpc.op_name == "conf-start":
            chl = ChangeList(rpc.op_input_args["name"])
            if self._usr_changelist.get(rpc.username) is None:
                self._usr_changelist[rpc.username] = []
            self._usr_changelist[rpc.username].append(chl)
            ret_data = {"status": "OK"}
        elif rpc.op_name == "conf-list":
            chls = self._usr_changelist.get(rpc.username)
            chl_json = {}
            for chl in chls:
                changes = []
                for ch in chl.journal:
                    changes.append(
                        [ch.change_type.name, ch.rpc_info.path]
                    )

                chl_json[chl.changelist_name] = changes
            ret_data = \
                {
                    "status": "OK",
                    "changelists": chl_json
                }
        elif rpc.op_name == "conf-drop":
            chls = self._usr_changelist.get(rpc.username)
            if chls is not None:
                chls.pop()
                if len(chls) == 0:
                    del self._usr_changelist[rpc.username]

            ret_data = {"status": "OK"}
        elif rpc.op_name == "conf-commit":
            pass
        else:
            op_handler = OP_HANDLERS.get_handler(rpc.op_name)
            if op_handler is None:
                raise NoHandlerForOpError()

            # Print operation input schema
            # sn = self.get_schema_node(rpc.path)
            # sn_input = sn.get_child("input")
            # if sn_input is not None:
            #     print("RPC input schema:")
            #     print(sn_input._ascii_tree(""))
386

387
            ret_data = op_handler(rpc.op_input_args)
388 389 390

        return ret_data

391 392 393 394 395 396 397 398 399
    def add_to_journal_rpc(self, type: ChangeType, rpc: RpcInfo, value: Any):
        usr_chss = self._usr_changelist.get(rpc.username)
        if usr_chss is not None:
            usr_chs = usr_chss[-1]
            usr_chs.add(DataChange(type, rpc, value))
        else:
            raise NoHandlerError("No active changelist for user \"{}\"".format(rpc.username))


Pavel Spirek's avatar
Pavel Spirek committed
400
    # Locks datastore data
401 402
    def lock_data(self, username: str = None, blocking: bool=True):
        ret = self._data_lock.acquire(blocking=blocking, timeout=1)
Pavel Spirek's avatar
Pavel Spirek committed
403
        if ret:
Pavel Spirek's avatar
Pavel Spirek committed
404
            self._lock_username = username or "(unknown)"
Pavel Spirek's avatar
Pavel Spirek committed
405
            debug("Acquired lock in datastore \"{}\" for user \"{}\"".format(self.name, username))
Pavel Spirek's avatar
Pavel Spirek committed
406
        else:
Pavel Spirek's avatar
Pavel Spirek committed
407
            raise DataLockError(
408 409 410 411 412
                "Failed to acquire lock in datastore \"{}\" for user \"{}\", already locked by \"{}\"".format(
                    self.name,
                    username,
                    self._lock_username
                )
Pavel Spirek's avatar
Pavel Spirek committed
413 414
            )

415
    # Unlock datastore data
Pavel Spirek's avatar
Pavel Spirek committed
416 417
    def unlock_data(self):
        self._data_lock.release()
Pavel Spirek's avatar
Pavel Spirek committed
418
        debug("Released lock in datastore \"{}\" for user \"{}\"".format(self.name, self._lock_username))
Pavel Spirek's avatar
Pavel Spirek committed
419 420
        self._lock_username = None

421
    # Load data from persistent storage
Pavel Spirek's avatar
Pavel Spirek committed
422 423 424
    def load(self, filename: str):
        raise NotImplementedError("Not implemented in base class")

425
    # Save data to persistent storage
Pavel Spirek's avatar
Pavel Spirek committed
426 427 428
    def save(self, filename: str):
        raise NotImplementedError("Not implemented in base class")

Pavel Spirek's avatar
Pavel Spirek committed
429 430

class JsonDatastore(BaseDatastore):
Pavel Spirek's avatar
Pavel Spirek committed
431 432
    def load(self, filename: str):
        self._data = None
Pavel Spirek's avatar
Pavel Spirek committed
433
        with open(filename, "rt") as fp:
Pavel Spirek's avatar
Pavel Spirek committed
434
            self._data = self._dm.from_raw(json.load(fp))
Pavel Spirek's avatar
Pavel Spirek committed
435

Pavel Spirek's avatar
Pavel Spirek committed
436
    def save(self, filename: str):
Pavel Spirek's avatar
Pavel Spirek committed
437
        with open(filename, "w") as jfd:
Pavel Spirek's avatar
Pavel Spirek committed
438 439 440
            self.lock_data("json_save")
            json.dump(self._data, jfd)
            self.unlock_data()
Pavel Spirek's avatar
Pavel Spirek committed
441

Pavel Spirek's avatar
Pavel Spirek committed
442 443

def test():
444 445
    datamodel = DataHelpers.load_data_model("./data", "./data/yang-library-data.json")
    data = JsonDatastore(datamodel)
Pavel Spirek's avatar
Pavel Spirek committed
446
    data.load("jetconf/example-data.json")
Pavel Spirek's avatar
Pavel Spirek committed
447

448
    rpc = RpcInfo()
Pavel Spirek's avatar
Pavel Spirek committed
449
    rpc.username = "dominik"
Pavel Spirek's avatar
Pavel Spirek committed
450
    rpc.path = "/dns-server:dns-server/zones/zone[domain='example.com']/query-module"
Pavel Spirek's avatar
Pavel Spirek committed
451
    rpc.path_format = PathFormat.XPATH
Pavel Spirek's avatar
Pavel Spirek committed
452

453
    info("Testing read of " + rpc.path)
Pavel Spirek's avatar
Pavel Spirek committed
454
    n = data.get_node_rpc(rpc)
455
    info("Result =")
Pavel Spirek's avatar
Pavel Spirek committed
456
    print(n.value)
457
    expected_value = \
458 459 460 461
        [
            {'name': 'test1', 'type': 'knot-dns:synth-record'},
            {'name': 'test2', 'type': 'knot-dns:synth-record'}
        ]
462 463

    if json.loads(json.dumps(n.value)) == expected_value:
464 465 466
        info("OK")
    else:
        warn("FAILED")
Pavel Spirek's avatar
Pavel Spirek committed
467

468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498
    rpc.path = "/dns-server:dns-server/zones"
    rpc.path_format = PathFormat.URL
    info("Testing creation of new list item (zone myzone.com) in " + rpc.path)

    new_root = data.create_node_rpc(data.get_data_root(), rpc, {"zone": {"domain": "myzone.com"}})
    new_node_ii = data.parse_ii("/dns-server:dns-server/zones/zone", PathFormat.URL)
    new_node = new_root.goto(new_node_ii)
    info("Result =")
    print(json.dumps(new_node.value, indent=4))

    if "myzone.com" in map(lambda x: x.get("domain"), new_node.value):
        info("OK")
    else:
        warn("FAILED")

    rpc.path = "/dns-server:dns-server/zones/zone=myzone.com"
    rpc.path_format = PathFormat.URL
    info("Testing creation of new leaf-list inside object " + rpc.path)

    new_root2 = data.create_node_rpc(new_root, rpc, {"access-control-list": "acl-notify-pokus"})
    new_node_ii = data.parse_ii("/dns-server:dns-server/zones/zone=myzone.com", PathFormat.URL)
    new_node2 = new_root2.goto(new_node_ii)
    info("Result =")
    print(json.dumps(new_node2.value, indent=4))

    if "acl-notify-pokus" in new_node2.member("access-control-list").value:
        info("OK")
    else:
        warn("FAILED")


499
from .nacm import NacmConfig, Permission, Action
500
from .handler_list import OP_HANDLERS, STATE_DATA_HANDLES