data.py 36.2 KB
Newer Older
Pavel Spirek's avatar
Pavel Spirek committed
1
import json
2

3
from threading import Lock
Pavel Spirek's avatar
Pavel Spirek committed
4
from enum import Enum
5
from colorlog import error, warning as warn, info
6
from typing import List, Any, Dict, Callable, Optional, Tuple
7
from datetime import datetime
8

9
from yangson.datamodel import DataModel
10
from yangson.enumerations import ContentType, ValidationScope
11
from yangson.schemanode import (
12 13 14 15 16
    SchemaNode,
    ListNode,
    LeafListNode,
    SchemaError,
    SemanticError,
17
    InternalNode,
Pavel Spirek's avatar
Pavel Spirek committed
18 19 20
    ContainerNode
)
from yangson.instvalue import ArrayValue, ObjectValue
21 22 23
from yangson.instance import (
    InstanceNode,
    NonexistentInstance,
24
    InstanceValueError,
25 26 27
    MemberName,
    EntryKeys,
    EntryIndex,
28
    InstanceRoute,
29
    ArrayEntry,
30
    RootNode,
Pavel Spirek's avatar
Pavel Spirek committed
31 32
    ObjectMember
)
33

34
from .helpers import PathFormat, ErrorHelpers, LogHelpers, DataHelpers, JsonNodeT
35
from .config import CONFIG, CONFIG_NACM
Pavel Spirek's avatar
Pavel Spirek committed
36
from .nacm import NacmConfig, Permission, Action, NacmForbiddenError
37 38 39 40 41 42 43 44 45
from .handler_list import (
    OP_HANDLERS,
    STATE_DATA_HANDLES,
    CONF_DATA_HANDLES,
    ConfDataObjectHandler,
    ConfDataListHandler,
    StateDataContainerHandler,
    StateDataListHandler
)
Pavel Spirek's avatar
Pavel Spirek committed
46
from .errors import JetconfError
47 48 49

epretty = ErrorHelpers.epretty
debug_data = LogHelpers.create_module_dbg_logger(__name__)
Pavel Spirek's avatar
Pavel Spirek committed
50 51


52 53 54 55 56 57
class ChangeType(Enum):
    CREATE = 0,
    REPLACE = 1,
    DELETE = 2


Pavel Spirek's avatar
Pavel Spirek committed
58 59
class DataLockError(JetconfError):
    pass
Pavel Spirek's avatar
Pavel Spirek committed
60 61


Pavel Spirek's avatar
Pavel Spirek committed
62
class StagingDataException(JetconfError):
Pavel Spirek's avatar
Pavel Spirek committed
63
    pass
64 65


Pavel Spirek's avatar
Pavel Spirek committed
66 67
class InstanceAlreadyPresent(JetconfError):
    pass
68

69

Pavel Spirek's avatar
Pavel Spirek committed
70 71
class HandlerError(JetconfError):
    pass
72 73


74 75 76 77
class NoHandlerError(HandlerError):
    pass


78 79 80
class ConfHandlerFailedError(HandlerError):
    pass

81

82 83
class OpHandlerFailedError(HandlerError):
    pass
84

85

86
class NoHandlerForOpError(NoHandlerError):
87 88 89 90 91
    def __init__(self, op_name: str):
        self.op_name = op_name

    def __str__(self):
        return "Nonexistent handler for operation \"{}\"".format(self.op_name)
92 93 94 95 96 97


class NoHandlerForStateDataError(NoHandlerError):
    pass


98
class RpcInfo:
Pavel Spirek's avatar
Pavel Spirek committed
99
    def __init__(self):
100 101
        self.username = None    # type: str
        self.path = None        # type: str
102
        self.qs = None          # type: Dict[str, List[str]]
103 104 105 106
        self.path_format = PathFormat.URL   # type: PathFormat
        self.skip_nacm_check = False        # type: bool
        self.op_name = None                 # type: str
        self.op_input_args = None           # type: ObjectValue
Pavel Spirek's avatar
Pavel Spirek committed
107 108


109
class DataChange:
110
    def __init__(self, change_type: ChangeType, rpc_info: RpcInfo, input_data: JsonNodeT, root_after_change: InstanceNode, nacm_modified: bool):
111 112
        self.change_type = change_type
        self.rpc_info = rpc_info
113 114
        self.input_data = input_data
        self.root_after_change = root_after_change
115
        self.nacm_modified = nacm_modified
116 117


Pavel Spirek's avatar
Pavel Spirek committed
118
class UsrChangeJournal:
119
    def __init__(self, root_origin: InstanceNode, transaction_opts: Optional[JsonNodeT]):
120 121 122
        self._root_origin = root_origin
        self._transaction_opts = transaction_opts
        self._journal = []  # type: List[DataChange]
Pavel Spirek's avatar
Pavel Spirek committed
123 124

    def get_root_head(self) -> InstanceNode:
125 126
        if len(self._journal) > 0:
            return self._journal[-1].root_after_change
Pavel Spirek's avatar
Pavel Spirek committed
127
        else:
128 129 130 131 132 133 134
            return self._root_origin

    def get_root_origin(self) -> InstanceNode:
        return self._root_origin

    def add(self, change: DataChange):
        self._journal.append(change)
Pavel Spirek's avatar
Pavel Spirek committed
135

136 137 138 139
    def list(self) -> JsonNodeT:
        changes_info = []
        for ch in self._journal:
            changes_info.append([ch.change_type.name, ch.rpc_info.path])
Pavel Spirek's avatar
Pavel Spirek committed
140

141
        return changes_info
Pavel Spirek's avatar
Pavel Spirek committed
142

143
    def commit(self, ds: "BaseDatastore") -> bool:
144 145
        nacm_modified = False

146 147 148
        if len(self._journal) == 0:
            return False

149
        if hash(ds.get_data_root()) == hash(self._root_origin):
150 151 152
            info("Commiting new configuration (swapping roots)")
            # Set new root
            nr = self.get_root_head()
153

154 155
            for change in self._journal:
                nacm_modified = nacm_modified or change.nacm_modified
156 157 158
        else:
            info("Commiting new configuration (re-applying changes)")
            nr = ds.get_data_root()
159

160 161 162 163 164 165 166 167 168
            for change in self._journal:
                nacm_modified = nacm_modified or change.nacm_modified

                if change.change_type == ChangeType.CREATE:
                    nr = ds.create_node_rpc(nr, change.rpc_info, change.input_data)[0]
                elif change.change_type == ChangeType.REPLACE:
                    nr = ds.update_node_rpc(nr, change.rpc_info, change.input_data)[0]
                elif change.change_type == ChangeType.DELETE:
                    nr = ds.delete_node_rpc(nr, change.rpc_info)[0]
169

Pavel Spirek's avatar
Pavel Spirek committed
170
        try:
171
            # Validate syntax and semantics of new data
172 173
            if CONFIG["GLOBAL"]["VALIDATE_TRANSACTIONS"] is True:
                nr.validate(ValidationScope.all, ContentType.config)
174 175 176
        except (SchemaError, SemanticError) as e:
            error("Data validation error:")
            error(epretty(e))
177
            raise e
178

179 180
        # Set new data root
        ds.set_data_root(nr)
Pavel Spirek's avatar
Pavel Spirek committed
181

182 183 184 185
        # Update NACM if NACM data has been affected by any edit
        if nacm_modified and ds.nacm is not None:
            ds.nacm.update()

186 187 188
        # Call commit begin hook
        begin_hook_failed = False
        try:
189
            ds.commit_begin_callback(self._transaction_opts)
190 191 192 193 194 195 196 197
        except Exception as e:
            error("Exception occured in commit_begin handler: {}".format(epretty(e)))
            begin_hook_failed = True

        # Run schema node handlers
        conf_handler_failed = False
        if not begin_hook_failed:
            try:
198 199 200
                for change in self._journal:
                    ii = ds.parse_ii(change.rpc_info.path, change.rpc_info.path_format)
                    ds.run_conf_edit_handler(ii, change)
201
            except Exception as e:
202 203 204 205 206 207 208
                error("Exception occured in edit handler: {}".format(epretty(e)))
                conf_handler_failed = True

        # Call commit end hook
        end_hook_failed = False
        end_hook_abort_failed = False
        if not (begin_hook_failed or conf_handler_failed):
209
            try:
210
                ds.commit_end_callback(self._transaction_opts, failed=False)
211
            except Exception as e:
212 213
                error("Exception occured in commit_end handler: {}".format(epretty(e)))
                end_hook_failed = True
214

215 216 217
        if begin_hook_failed or conf_handler_failed or end_hook_failed:
            try:
                # Call commit_end callback again with "failed" argument set to True
218
                ds.commit_end_callback(self._transaction_opts, failed=True)
219 220 221
            except Exception as e:
                error("Exception occured in commit_end handler (abort): {}".format(epretty(e)))
                end_hook_abort_failed = True
Pavel Spirek's avatar
Pavel Spirek committed
222

223 224 225
        # Return to previous version of data and raise an exception if something went wrong
        if begin_hook_failed or conf_handler_failed or end_hook_failed or end_hook_abort_failed:
            ds.data_root_rollback(history_steps=1, store_current=False)
226 227 228 229 230

            # Update NACM again after rollback
            if nacm_modified and ds.nacm is not None:
                ds.nacm.update()

231
            raise ConfHandlerFailedError("(see logged)")
232

233 234
        return True

235

Pavel Spirek's avatar
Pavel Spirek committed
236
class BaseDatastore:
237
    def __init__(self, dm: DataModel, with_nacm: bool=False):
238 239 240
        def _blankfn(*args, **kwargs):
            pass

241
        self.name = ""
Pavel Spirek's avatar
Pavel Spirek committed
242
        self.nacm = None    # type: NacmConfig
243
        self._data = None   # type: InstanceNode
244 245
        self._data_history = []     # type: List[InstanceNode]
        self._yang_lib_data = None  # type: InstanceNode
246
        self._dm = dm       # type: DataModel
Pavel Spirek's avatar
Pavel Spirek committed
247
        self._data_lock = Lock()
Pavel Spirek's avatar
Pavel Spirek committed
248
        self._lock_username = None  # type: str
249
        self._usr_journals = {}     # type: Dict[str, UsrChangeJournal]
250 251
        self.commit_begin_callback = _blankfn   # type: Callable[..., bool]
        self.commit_end_callback = _blankfn     # type: Callable[..., bool]
Pavel Spirek's avatar
Pavel Spirek committed
252

253
        if with_nacm:
254
            self.nacm = NacmConfig(self, self._dm)
Pavel Spirek's avatar
Pavel Spirek committed
255

256
        self._yang_lib_data = self._dm.from_raw(self._dm.yang_library)
257

258 259 260 261
    # Returns DataModel object
    def get_dm(self) -> DataModel:
        return self._dm

Pavel Spirek's avatar
Pavel Spirek committed
262
    # Returns the root node of data tree
263 264 265 266 267
    def get_data_root(self, previous_version: int=0) -> InstanceNode:
        if previous_version > 0:
            return self._data_history[-previous_version]
        else:
            return self._data
Pavel Spirek's avatar
Pavel Spirek committed
268

269 270 271
    def get_yl_data_root(self) -> InstanceNode:
        return self._yang_lib_data

272 273 274
    def make_user_journal(self, username: str, transaction_opts: Optional[JsonNodeT]):
        usr_journal = self._usr_journals.get(username)
        if usr_journal is not None:
Pavel Spirek's avatar
Pavel Spirek committed
275
            raise StagingDataException("Transaction for user \"{}\" already opened".format(username))
276 277 278 279 280 281 282 283
        else:
            self._usr_journals[username] = UsrChangeJournal(self._data, transaction_opts)

    def get_user_journal(self, username: str):
        usr_journal = self._usr_journals.get(username)
        if usr_journal is not None:
            return usr_journal
        else:
Pavel Spirek's avatar
Pavel Spirek committed
284
            raise StagingDataException("Transaction for user \"{}\" not opened".format(username))
285 286

    def drop_user_journal(self, username: str):
287 288
        usr_journal = self._usr_journals.get(username)
        if usr_journal is not None:
289
            del self._usr_journals[username]
290
        else:
Pavel Spirek's avatar
Pavel Spirek committed
291
            raise StagingDataException("Transaction for user \"{}\" not opened".format(username))
292

293 294 295 296 297 298
    # Returns the root node of data tree
    def get_data_root_staging(self, username: str) -> InstanceNode:
        usr_journal = self.get_user_journal(username)
        root = usr_journal.get_root_head()
        return root

299
    # Set a new Instance node as data root, store old root to archive
Pavel Spirek's avatar
Pavel Spirek committed
300
    def set_data_root(self, new_root: InstanceNode):
301
        self._data_history.append(self._data)
Pavel Spirek's avatar
Pavel Spirek committed
302 303
        self._data = new_root

304 305 306 307 308 309
    def data_root_rollback(self, history_steps: int, store_current: bool):
        if store_current:
            self._data_history.append(self._data)

        self._data = self._data_history[-history_steps]

310 311 312 313 314 315 316 317
    def parse_ii(self, path: str, path_format: PathFormat) -> InstanceRoute:
        if path_format == PathFormat.URL:
            ii = self._dm.parse_resource_id(path)
        else:
            ii = self._dm.parse_instance_id(path)

        return ii

318 319
    # Get schema node with particular schema address
    def get_schema_node(self, sch_pth: str) -> SchemaNode:
320
        sn = self._dm.get_data_node(sch_pth)
321
        if sn is None:
322 323
            # raise NonexistentSchemaNode(sch_pth)
            debug_data("Cannot find schema node for " + sch_pth)
324 325 326
        return sn

    # Notify data observers about change in datastore
327
    def run_conf_edit_handler(self, ii: InstanceRoute, ch: DataChange):
328
        try:
329 330 331 332
            sch_pth_list = list(filter(lambda n: isinstance(n, MemberName), ii))

            if ch.change_type == ChangeType.CREATE:
                # Get target member name
333
                input_member_name_fq = tuple(ch.input_data.keys())[0]
334
                input_member_name_ns, input_member_name = input_member_name_fq.split(":", maxsplit=1)
335
                # Append it to ii
336
                sch_pth_list.append(MemberName(input_member_name, None))
337

338
            sch_pth = DataHelpers.ii2str(sch_pth_list)
339 340
            sn = self.get_schema_node(sch_pth)

341 342 343 344 345 346
            if sn is None:
                return

            h = CONF_DATA_HANDLES.get_handler(str(id(sn)))
            if h is not None:
                info("handler for actual data node triggered")
347 348 349 350 351 352 353 354 355 356 357 358 359 360
                if isinstance(h, ConfDataObjectHandler):
                    if ch.change_type == ChangeType.CREATE:
                        h.create(ii, ch)
                    elif ch.change_type == ChangeType.REPLACE:
                        h.replace(ii, ch)
                    elif ch.change_type == ChangeType.DELETE:
                        h.delete(ii, ch)
                if isinstance(h, ConfDataListHandler):
                    if ch.change_type == ChangeType.CREATE:
                        h.create_item(ii, ch)
                    elif ch.change_type == ChangeType.REPLACE:
                        h.replace_item(ii, ch)
                    elif ch.change_type == ChangeType.DELETE:
                        h.delete_item(ii, ch)
361
            else:
362
                sn = sn.parent
363 364 365 366
                while sn is not None:
                    h = CONF_DATA_HANDLES.get_handler(str(id(sn)))
                    if h is not None and isinstance(h, ConfDataObjectHandler):
                        info("handler for superior data node triggered, replace")
Pavel Spirek's avatar
Pavel Spirek committed
367 368
                        # print(h.schema_path)
                        # print(h.__class__.__name__)
369 370 371 372 373
                        h.replace(ii, ch)
                    if h is not None and isinstance(h, ConfDataListHandler):
                        info("handler for superior data node triggered, replace_item")
                        h.replace_item(ii, ch)
                    sn = sn.parent
374 375
        except NonexistentInstance:
            warn("Cannnot notify {}, parent container removed".format(ii))
376

Pavel Spirek's avatar
Pavel Spirek committed
377
    # Get data node, evaluate NACM if required
Pavel Spirek's avatar
Pavel Spirek committed
378 379
    def get_node_rpc(self, rpc: RpcInfo, staging=False) -> InstanceNode:
        ii = self.parse_ii(rpc.path, rpc.path_format)
380

Pavel Spirek's avatar
Pavel Spirek committed
381
        if staging:
382
            try:
383
                root = self.get_data_root_staging(rpc.username)
Pavel Spirek's avatar
Pavel Spirek committed
384
            except StagingDataException:
385 386 387 388
                # root = self._data
                info("Starting transaction for user \"{}\"".format(rpc.username))
                self.make_user_journal(rpc.username, None)
                root = self.get_data_root_staging(rpc.username)
389 390
        else:
            root = self._data
391

Pavel Spirek's avatar
Pavel Spirek committed
392
        yl_data_request = False
Pavel Spirek's avatar
Pavel Spirek committed
393 394 395 396 397 398 399
        if (len(ii) > 0) and (isinstance(ii[0], MemberName)):
            # Not getting root
            ns_first = ii[0].namespace
            if (ns_first == "ietf-netconf-acm") and (rpc.username not in CONFIG_NACM["ALLOWED_USERS"]):
                raise NacmForbiddenError(rpc.username + " not allowed to access NACM data")
            elif ns_first == "ietf-yang-library":
                root = self._yang_lib_data
Pavel Spirek's avatar
Pavel Spirek committed
400
                yl_data_request = True
Pavel Spirek's avatar
Pavel Spirek committed
401 402 403 404 405 406 407 408 409 410 411
        else:
            # Root node requested
            # Remove NACM data if user is not NACM privieged
            if rpc.username not in CONFIG_NACM["ALLOWED_USERS"]:
                try:
                    root = root.delete_item("ietf-netconf-acm:nacm")
                except NonexistentInstance:
                    pass

            # Append YANG library data
            for member_name, member_val in self._yang_lib_data.value.items():
412
                root = root.put_member(member_name, member_val).top()
Pavel Spirek's avatar
Pavel Spirek committed
413

414
        # Resolve schema node of the desired data node
415
        sch_pth_list = filter(lambda isel: isinstance(isel, MemberName), ii)
416
        sch_pth = DataHelpers.ii2str(sch_pth_list)
417
        sn = self.get_schema_node(sch_pth)
418

419 420
        state_roots = sn.state_roots()

Pavel Spirek's avatar
Pavel Spirek committed
421
        # Check if URL points to state data or node that contains state data
Pavel Spirek's avatar
Pavel Spirek committed
422
        if state_roots and not yl_data_request:
423
            debug_data("State roots: {}".format(state_roots))
424
            n = None
425

426 427 428
            for state_root_sch_pth in state_roots:
                state_root_sn = self._dm.get_data_node(state_root_sch_pth)

Pavel Spirek's avatar
Pavel Spirek committed
429
                # Check if the desired node is child of the state root
430 431 432
                sni = sn
                is_child = False
                while sni:
433
                    if sni is state_root_sn:
434 435 436 437 438 439 440 441
                        is_child = True
                        break
                    sni = sni.parent

                if is_child:
                    # Direct request for the state data
                    sdh = STATE_DATA_HANDLES.get_handler(state_root_sch_pth)
                    if sdh is not None:
442
                        if isinstance(sdh, StateDataContainerHandler):
443
                            state_handler_val = sdh.generate_node(ii, rpc.username, staging)
444
                            state_root_n = sdh.schema_node.orphan_instance(state_handler_val)
445
                        elif isinstance(sdh, StateDataListHandler):
446 447 448 449 450 451
                            if (sn is sdh.schema_node) and isinstance(ii[-1], MemberName):
                                state_handler_val = sdh.generate_list(ii, rpc.username, staging)
                                state_root_n = sdh.schema_node.orphan_instance(state_handler_val)
                            else:
                                state_handler_val = sdh.generate_item(ii, rpc.username, staging)
                                state_root_n = sdh.schema_node.orphan_entry(state_handler_val)
452 453 454 455

                        # Select desired subnode from handler-generated content
                        ii_prefix, ii_rel = sdh.schema_node.split_instance_route(ii)
                        n = state_root_n.goto(ii_rel)
456 457 458 459 460

                        # There should be only one state root, no need to continue
                        if len(state_roots) != 1:
                            warn("URI points to directly to state data, but more state roots found")
                        break
461 462
                    else:
                        raise NoHandlerForStateDataError(rpc.path)
463
                else:
464
                    # Request for config data containing state data
465 466 467 468 469 470 471 472
                    n = root.goto(ii)

                    def _fill_state_roots(node: InstanceNode) -> InstanceNode:
                        if isinstance(node.value, ObjectValue):
                            if node.schema_node is state_root_sn.parent:
                                ii_gen = DataHelpers.node_get_ii(node)
                                sdh = STATE_DATA_HANDLES.get_handler(state_root_sch_pth)
                                if sdh is not None:
473
                                    try:
474
                                        if isinstance(sdh, StateDataContainerHandler):
475
                                            state_handler_val = sdh.generate_node(ii_gen, rpc.username, staging)
476
                                        elif isinstance(sdh, StateDataListHandler):
477 478 479 480 481 482 483 484 485 486 487 488 489
                                            state_handler_val = sdh.generate_list(ii_gen, rpc.username, staging)
                                    except Exception as e:
                                        error("Error occured in state data generator (sn: {})".format(state_root_sch_pth))
                                        error(epretty(e))
                                        error("This state node will be omitted.")
                                    else:
                                        if state_root_sn.ns == state_root_sn.parent.ns:
                                            nm_name = state_root_sn.qual_name[0]
                                        else:
                                            nm_name = state_root_sn.qual_name[1] + ":" + state_root_sn.qual_name[0]

                                        # print("nm={}".format(nm_name))
                                        node = node.put_member(nm_name, state_handler_val, raw=True).up()
490 491 492 493 494 495 496 497 498 499 500 501 502 503
                            else:
                                for key in node:
                                    member = node[key]
                                    node = _fill_state_roots(member).up()
                        elif isinstance(node.value, ArrayValue):
                            i = 0
                            arr_len = len(node.value)
                            while i < arr_len:
                                node = _fill_state_roots(node[i]).up()
                                i += 1

                        return node

                    n = _fill_state_roots(n)
504
                    root = n.top()
505
        else:
Pavel Spirek's avatar
Pavel Spirek committed
506
            # No state data in requested node
507
            n = root.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
508

Pavel Spirek's avatar
Pavel Spirek committed
509
        # Process "with-defaults" query parameter
510 511
        try:
            with_defs = rpc.qs["with-defaults"][0]
512
        except (IndexError, KeyError):
513 514 515 516 517
            with_defs = None

        if with_defs == "report-all":
            n = n.add_defaults()

Pavel Spirek's avatar
Pavel Spirek committed
518
        # Evaluate NACM if required
519
        if self.nacm and not rpc.skip_nacm_check:
Pavel Spirek's avatar
Pavel Spirek committed
520
            nrpc = self.nacm.get_user_rules(rpc.username)
521
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_READ) == Action.DENY:
Pavel Spirek's avatar
Pavel Spirek committed
522 523
                raise NacmForbiddenError()
            else:
524 525
                # Prune nodes that should not be accessible to user
                n = nrpc.prune_data_tree(n, root, ii, Permission.NACM_ACCESS_READ)
Pavel Spirek's avatar
Pavel Spirek committed
526

Pavel Spirek's avatar
Pavel Spirek committed
527
        # Process "depth" query parameter
528
        try:
529 530 531 532 533 534 535
            max_depth_str = rpc.qs["depth"][0]
            if max_depth_str == "unbounded":
                max_depth = None
            else:
                max_depth = int(max_depth_str) - 1
                if (max_depth < 0) or (max_depth > 65535):
                    raise ValueError()
536
        except (IndexError, KeyError):
537 538 539 540 541 542 543 544 545 546 547
            max_depth = None
        except ValueError:
            raise ValueError("Invalid value of query param \"depth\"")

        if max_depth is not None:
            def _tree_limit_depth(node: InstanceNode, depth: int) -> InstanceNode:
                if isinstance(node.value, ObjectValue):
                    if depth > max_depth:
                        node.value = ObjectValue({})
                    else:
                        for child_key in sorted(node.value.keys()):
548
                            m = node[child_key]
549 550
                            node = _tree_limit_depth(m, depth + 1).up()
                elif isinstance(node.value, ArrayValue):
551 552 553 554
                    depth -= 1
                    for i in range(len(node.value)):
                        e = node[i]
                        node = _tree_limit_depth(e, depth + 1).up()
555 556 557 558

                return node
            n = _tree_limit_depth(n, 1)

Pavel Spirek's avatar
Pavel Spirek committed
559
        # Return result
Pavel Spirek's avatar
Pavel Spirek committed
560 561
        return n

562
    # Create new data node
563
    def create_node_rpc(self, root: InstanceNode, rpc: RpcInfo, value: Any) -> Tuple[InstanceNode, bool]:
564
        ii = self.parse_ii(rpc.path, rpc.path_format)
565

566
        # Get target member name
567 568
        input_member_keys = tuple(value.keys())
        if len(input_member_keys) != 1:
569
            raise ValueError("Received json object must contain exactly one member")
Pavel Spirek's avatar
Pavel Spirek committed
570

571 572 573 574 575
        input_member_name_fq = input_member_keys[0]
        try:
            input_member_ns, input_member_name = input_member_name_fq.split(":", maxsplit=1)
        except ValueError:
            raise ValueError("Input object name must me in fully-qualified format")
576
        input_member_value = value[input_member_name_fq]
Pavel Spirek's avatar
Pavel Spirek committed
577

578
        # Deny any changes of NACM data for non-privileged users
579
        nacm_changed = False
580 581 582
        if (len(ii) > 0) and (isinstance(ii[0], MemberName)):
            # Not getting root
            ns_first = ii[0].namespace
583 584 585 586
            if ns_first == "ietf-netconf-acm":
                nacm_changed = True
                if rpc.username not in CONFIG_NACM["ALLOWED_USERS"]:
                    raise NacmForbiddenError(rpc.username + " not allowed to modify NACM data")
587 588
        else:
            # Editing root node
589 590 591 592
            if input_member_ns == "ietf-netconf-acm":
                nacm_changed = True
                if rpc.username not in CONFIG_NACM["ALLOWED_USERS"]:
                    raise NacmForbiddenError(rpc.username + " not allowed to modify NACM data")
593 594

        # Evaluate NACM
595
        if self.nacm and not rpc.skip_nacm_check:
596 597 598
            nrpc = self.nacm.get_user_rules(rpc.username)
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_CREATE) == Action.DENY:
                raise NacmForbiddenError()
599

600 601
        n = root.goto(ii)

602
        # Get target schema node
603
        sn = n.schema_node  # type: InternalNode
604
        member_sn = sn.get_child(input_member_name, input_member_ns)
605

606 607 608
        if member_sn is None:
            raise ValueError("Received json object contains unknown member")

609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624
        # Check if target member already exists
        if sn.ns == member_sn.ns:
            try:
                existing_member = n[input_member_name]
            except NonexistentInstance:
                existing_member = None
        else:
            try:
                existing_member = n[input_member_name_fq]
            except NonexistentInstance:
                existing_member = None

        # Get query parameters
        insert = rpc.qs.get("insert", [None])[0]
        point = rpc.qs.get("point", [None])[0]

625
        if isinstance(member_sn, ListNode):
Pavel Spirek's avatar
Pavel Spirek committed
626
            # Append received node to list
627

628 629
            # Create list if necessary
            if existing_member is None:
630 631
                new_member_name = input_member_name if n.namespace == input_member_ns else input_member_name_fq
                existing_member = n.put_member(new_member_name, ArrayValue([]))
632

633 634
            # Get ListNode key names
            list_node_keys = member_sn.keys     # Key names in the form [(key, ns), ]
635 636

            if insert == "first":
637
                # Optimization
638
                if len(existing_member.value) > 0:
639
                    list_entry_first = existing_member[0]   # type: ArrayEntry
640
                    new_member = list_entry_first.insert_before(input_member_value, raw=True).up()
641
                else:
642
                    new_member = existing_member.update([input_member_value], raw=True)
643
            elif (insert == "last") or (insert is None):
644
                # Optimization
645
                if len(existing_member.value) > 0:
646
                    list_entry_last = existing_member[-1]   # type: ArrayEntry
647
                    new_member = list_entry_last.insert_after(input_member_value, raw=True).up()
648
                else:
649
                    new_member = existing_member.update([input_member_value], raw=True)
650 651 652 653 654 655 656 657 658 659 660
            elif (insert == "before") and (point is not None):
                point_keys_val = point.split(",")  # List key values passed in the "point" query argument
                if len(list_node_keys) != len(point_keys_val):
                    raise ValueError(
                        "Invalid number of keys passed in 'point' query: {} ({} expected)".format(
                            len(point_keys_val), len(list_node_keys)
                        )
                    )
                entry_keys = dict(map(lambda i: (list_node_keys[i], point_keys_val[i]), range(len(list_node_keys))))
                entry_sel = EntryKeys(entry_keys)
                point_list_entry = entry_sel.goto_step(existing_member)   # type: ArrayEntry
661
                new_member = point_list_entry.insert_before(input_member_value, raw=True).up()
662 663 664 665 666 667 668 669 670 671 672
            elif (insert == "after") and (point is not None):
                point_keys_val = point.split(",")  # List key values passed in the "point" query argument
                if len(list_node_keys) != len(point_keys_val):
                    raise ValueError(
                        "Invalid number of keys passed in 'point' query: {} ({} expected)".format(
                            len(point_keys_val), len(list_node_keys)
                        )
                    )
                entry_keys = dict(map(lambda i: (list_node_keys[i], point_keys_val[i]), range(len(list_node_keys))))
                entry_sel = EntryKeys(entry_keys)
                point_list_entry = entry_sel.goto_step(existing_member)   # type: ArrayEntry
673
                new_member = point_list_entry.insert_after(input_member_value, raw=True).up()
674
            else:
675
                raise ValueError("Invalid 'insert'/'point' query values")
676 677 678 679 680
        elif isinstance(member_sn, LeafListNode):
            # Append received node to leaf list

            # Create leaf list if necessary
            if existing_member is None:
681 682
                new_member_name = input_member_name if n.namespace == input_member_ns else input_member_name_fq
                existing_member = n.put_member(new_member_name, ArrayValue([]))
683

684
            # Convert input data from List/Dict to ArrayValue/ObjectValue
685
            new_value_item = member_sn.entry_from_raw(input_member_value)
686 687

            if insert == "first":
688
                new_member = existing_member.update(ArrayValue([new_value_item] + existing_member.value))
689
            elif (insert == "last") or (insert is None):
690
                new_member = existing_member.update(ArrayValue(existing_member.value + [new_value_item]))
691
            else:
692
                raise ValueError("Invalid 'insert' query value")
Pavel Spirek's avatar
Pavel Spirek committed
693
        else:
694
            # Create new container member
695

696
            if existing_member is None:
697
                # Create new node (object member)
698 699
                new_member_name = input_member_name if n.namespace == input_member_ns else input_member_name_fq
                new_member = n.put_member(new_member_name, input_member_value, raw=True)
700 701 702
            else:
                # Data node already exists
                raise InstanceAlreadyPresent("Member \"{}\" already present in \"{}\"".format(input_member_name, ii))
Pavel Spirek's avatar
Pavel Spirek committed
703

704
        return new_member.top(), nacm_changed
705

706
    # PUT data node
707
    def update_node_rpc(self, root: InstanceNode, rpc: RpcInfo, value: Any) -> Tuple[InstanceNode, bool]:
708
        ii = self.parse_ii(rpc.path, rpc.path_format)
709 710 711 712 713 714 715 716 717 718 719 720 721

        # Get target member name
        input_member_keys = tuple(value.keys())
        if len(input_member_keys) != 1:
            raise ValueError("Received json object must contain exactly one member")

        input_member_name_fq = input_member_keys[0]
        try:
            input_member_ns, input_member_name = input_member_name_fq.split(":", maxsplit=1)
        except ValueError:
            raise ValueError("Input object name must me in fully-qualified format")
        input_member_value = value[input_member_name_fq]

722
        n = root.goto(ii)
723

724
        # Deny any changes of NACM data for non-privileged users
725
        nacm_changed = False
726 727 728
        if (len(ii) > 0) and (isinstance(ii[0], MemberName)):
            # Not getting root
            ns_first = ii[0].namespace
729 730 731 732
            if ns_first == "ietf-netconf-acm":
                nacm_changed = True
                if rpc.username not in CONFIG_NACM["ALLOWED_USERS"]:
                    raise NacmForbiddenError(rpc.username + " not allowed to modify NACM data")
733 734 735 736
        else:
            # Replacing root node
            # Check if NACM data are present in the datastore
            nacm_val = n.value.get("ietf-netconf-acm:nacm")
737 738 739 740
            if nacm_val is not None:
                nacm_changed = True
                if rpc.username not in CONFIG_NACM["ALLOWED_USERS"]:
                    raise NacmForbiddenError(rpc.username + " not allowed to modify NACM data")
741 742

        # Evaluate NACM
743
        if self.nacm and not rpc.skip_nacm_check:
Pavel Spirek's avatar
Pavel Spirek committed
744
            nrpc = self.nacm.get_user_rules(rpc.username)
745
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_UPDATE) == Action.DENY:
746
                raise NacmForbiddenError()
Pavel Spirek's avatar
Pavel Spirek committed
747

748
        new_n = n.update(input_member_value, raw=True)
749
        new_n.validate(ValidationScope.syntax)
750

751
        return new_n.top(), nacm_changed
752

753
    # Delete data node
754
    def delete_node_rpc(self, root: InstanceNode, rpc: RpcInfo) -> Tuple[InstanceNode, bool]:
755
        ii = self.parse_ii(rpc.path, rpc.path_format)
756
        n = root.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
757

758
        # Deny any changes of NACM data for non-privileged users
759
        nacm_changed = False
760 761 762
        if (len(ii) > 0) and (isinstance(ii[0], MemberName)):
            # Not getting root
            ns_first = ii[0].namespace
763 764 765 766
            if ns_first == "ietf-netconf-acm":
                nacm_changed = True
                if rpc.username not in CONFIG_NACM["ALLOWED_USERS"]:
                    raise NacmForbiddenError(rpc.username + " not allowed to modify NACM data")
767 768 769 770
        else:
            # Deleting root node
            # Check if NACM data are present in the datastore
            nacm_val = n.value.get("ietf-netconf-acm:nacm")
771 772 773 774
            if nacm_val is not None:
                nacm_changed = True
                if rpc.username not in CONFIG_NACM["ALLOWED_USERS"]:
                    raise NacmForbiddenError(rpc.username + " not allowed to modify NACM data")
775 776

        # Evaluate NACM
777
        if self.nacm and not rpc.skip_nacm_check:
Pavel Spirek's avatar
Pavel Spirek committed
778
            nrpc = self.nacm.get_user_rules(rpc.username)
779
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_DELETE) == Action.DENY:
780
                raise NacmForbiddenError()
Pavel Spirek's avatar
Pavel Spirek committed
781

782 783 784
        if len(ii) == 0:
            # Deleting entire datastore
            new_n = RootNode(ObjectValue({}), root.schema_node, datetime.now())
785
        else:
786 787 788 789 790 791 792 793 794 795 796 797 798
            n_parent = n.up()
            last_isel = ii[-1]
            if isinstance(n_parent.value, ArrayValue):
                if isinstance(last_isel, EntryIndex):
                    new_n = n_parent.delete_item(last_isel.index)
                elif isinstance(last_isel, EntryKeys):
                    new_n = n_parent.delete_item(n.index)
                else:
                    raise ValueError("Unknown node selector")
            elif isinstance(n_parent.value, ObjectValue):
                new_n = n_parent.delete_item(last_isel.namespace + ":" + last_isel.name if last_isel.namespace else last_isel.name)
            else:
                raise InstanceValueError(rpc.path, "Invalid target node type")
Pavel Spirek's avatar
Pavel Spirek committed
799

800
        return new_n.top(), nacm_changed
Pavel Spirek's avatar
Pavel Spirek committed
801

802
    # Invoke an operation
803
    def invoke_op_rpc(self, rpc: RpcInfo) -> JsonNodeT:
804 805 806 807 808
        if rpc.op_name.startswith("jetconf:"):
            # Jetconf internal operation
            op_handler = OP_HANDLERS.get_handler(rpc.op_name)
            if op_handler is None:
                raise NoHandlerForOpError(rpc.op_name)
809

810
            ret_data = op_handler(rpc)
811
        else:
812
            # External operation defined in data model
813
            if self.nacm and not rpc.skip_nacm_check:
Pavel Spirek's avatar
Pavel Spirek committed
814
                nrpc = self.nacm.get_user_rules(rpc.username)
815 816
                if nrpc.check_rpc_name(rpc.op_name) == Action.DENY:
                    raise NacmForbiddenError(
817
                        "Invocation of \"{}\" operation denied for user \"{}\"".format(rpc.op_name, rpc.username)
818 819
                    )

820 821
            op_handler = OP_HANDLERS.get_handler(rpc.op_name)
            if op_handler is None:
822
                raise NoHandlerForOpError(rpc.op_name)
823

824
            # Get operation input schema
825 826
            sn = self._dm.get_schema_node(rpc.path)
            sn_input = sn.get_child("input")
827

828 829
            # Input arguments are expected, this will validate them
            op_input_args = sn_input.from_raw(rpc.op_input_args) if sn_input.children else None
830

831 832 833 834
            try:
                ret_data = op_handler(op_input_args, rpc.username)
            except Exception as e:
                raise OpHandlerFailedError(epretty(e))
835 836 837

        return ret_data

838
    def add_to_journal_rpc(self, ch_type: ChangeType, rpc: RpcInfo, value: Optional[JsonNodeT], new_root: InstanceNode, nacm_modified: bool):
Pavel Spirek's avatar
Pavel Spirek committed
839 840
        usr_journal = self._usr_journals.get(rpc.username)
        if usr_journal is not None:
841
            usr_journal.add(DataChange(ch_type, rpc, value, new_root, nacm_modified))
842 843 844
        else:
            raise NoHandlerError("No active changelist for user \"{}\"".format(rpc.username))

Pavel Spirek's avatar
Pavel Spirek committed
845
    # Locks datastore data
846 847
    def lock_data(self, username: str = None, blocking: bool=True):
        ret = self._data_lock.acquire(blocking=blocking, timeout=1)
Pavel Spirek's avatar
Pavel Spirek committed
848
        if ret:
Pavel Spirek's avatar
Pavel Spirek committed
849
            self._lock_username = username or "(unknown)"
850
            debug_data("Acquired lock in datastore \"{}\" for user \"{}\"".format(self.name, username))
Pavel Spirek's avatar
Pavel Spirek committed
851
        else:
Pavel Spirek's avatar
Pavel Spirek committed
852
            raise DataLockError(
853 854 855 856 857
                "Failed to acquire lock in datastore \"{}\" for user \"{}\", already locked by \"{}\"".format(
                    self.name,
                    username,
                    self._lock_username
                )
Pavel Spirek's avatar
Pavel Spirek committed
858 859
            )

860
    # Unlock datastore data
Pavel Spirek's avatar
Pavel Spirek committed
861 862
    def unlock_data(self):
        self._data_lock.release()
863
        debug_data("Released lock in datastore \"{}\" for user \"{}\"".format(self.name, self._lock_username))
Pavel Spirek's avatar
Pavel Spirek committed
864 865
        self._lock_username = None

866
    # Load data from persistent storage
867
    def load(self):
Pavel Spirek's avatar
Pavel Spirek committed
868 869
        raise NotImplementedError("Not implemented in base class")

870
    # Save data to persistent storage
871
    def save(self):
Pavel Spirek's avatar
Pavel Spirek committed
872 873
        raise NotImplementedError("Not implemented in base class")

Pavel Spirek's avatar
Pavel Spirek committed
874 875

class JsonDatastore(BaseDatastore):
876 877
    def __init__(self, dm: DataModel, json_file: str, with_nacm: bool=False):
        super().__init__(dm, with_nacm)
878 879 880
        self.json_file = json_file

    def load(self):
Pavel Spirek's avatar
Pavel Spirek committed
881
        self._data = None
882
        with open(self.json_file, "rt") as fp:
Pavel Spirek's avatar
Pavel Spirek committed
883
            self._data = self._dm.from_raw(json.load(fp))
Pavel Spirek's avatar
Pavel Spirek committed
884

885 886 887
        if self.nacm is not None:
            self.nacm.update()

888 889 890
    def save(self):
        with open(self.json_file, "w") as jfd:
            json.dump(self._data.raw_value(), jfd, indent=4)