data.py 33.4 KB
Newer Older
Pavel Spirek's avatar
Pavel Spirek committed
1
import json
2

3
from threading import Lock
Pavel Spirek's avatar
Pavel Spirek committed
4
from enum import Enum
5
from colorlog import error, warning as warn, info
6
from typing import List, Any, Dict, Callable, Optional
7

8
from yangson.datamodel import DataModel
9
from yangson.enumerations import ContentType, ValidationScope
10
from yangson.schemanode import (
11 12 13 14 15
    SchemaNode,
    ListNode,
    LeafListNode,
    SchemaError,
    SemanticError,
16
    InternalNode,
Pavel Spirek's avatar
Pavel Spirek committed
17 18 19
    ContainerNode
)
from yangson.instvalue import ArrayValue, ObjectValue
20 21 22
from yangson.instance import (
    InstanceNode,
    NonexistentInstance,
23
    InstanceValueError,
24 25 26
    MemberName,
    EntryKeys,
    EntryIndex,
27
    InstanceRoute,
28
    ArrayEntry,
29
    RootNode,
Pavel Spirek's avatar
Pavel Spirek committed
30 31
    ObjectMember
)
32

33
from .helpers import PathFormat, ErrorHelpers, LogHelpers, DataHelpers, JsonNodeT
Pavel Spirek's avatar
Pavel Spirek committed
34 35
from .config import CONFIG, CONFIG_NACM, CONFIG_HTTP
from .nacm import NacmConfig, Permission, Action, NacmForbiddenError
36
from .handler_list import OP_HANDLERS, STATE_DATA_HANDLES, CONF_DATA_HANDLES, ConfDataObjectHandler, ConfDataListHandler
37
from .usr_state_data_handlers import ContainerNodeHandlerBase, ListNodeHandlerBase
Pavel Spirek's avatar
Pavel Spirek committed
38
from .errors import JetconfError
39 40 41

epretty = ErrorHelpers.epretty
debug_data = LogHelpers.create_module_dbg_logger(__name__)
Pavel Spirek's avatar
Pavel Spirek committed
42 43


44 45 46 47 48 49
class ChangeType(Enum):
    CREATE = 0,
    REPLACE = 1,
    DELETE = 2


Pavel Spirek's avatar
Pavel Spirek committed
50 51
class DataLockError(JetconfError):
    pass
Pavel Spirek's avatar
Pavel Spirek committed
52 53


Pavel Spirek's avatar
Pavel Spirek committed
54 55
class NoStagingDataException(JetconfError):
    pass
56 57


Pavel Spirek's avatar
Pavel Spirek committed
58 59
class InstanceAlreadyPresent(JetconfError):
    pass
60

61

Pavel Spirek's avatar
Pavel Spirek committed
62 63
class HandlerError(JetconfError):
    pass
64 65


66 67 68 69
class NoHandlerError(HandlerError):
    pass


70 71 72 73
class ConfHandlerFailedError(HandlerError):
    pass


74
class NoHandlerForOpError(NoHandlerError):
75 76 77 78 79
    def __init__(self, op_name: str):
        self.op_name = op_name

    def __str__(self):
        return "Nonexistent handler for operation \"{}\"".format(self.op_name)
80 81 82 83 84 85


class NoHandlerForStateDataError(NoHandlerError):
    pass


86
class RpcInfo:
Pavel Spirek's avatar
Pavel Spirek committed
87
    def __init__(self):
88 89
        self.username = None    # type: str
        self.path = None        # type: str
90
        self.qs = None          # type: Dict[str, List[str]]
91 92 93 94
        self.path_format = PathFormat.URL   # type: PathFormat
        self.skip_nacm_check = False        # type: bool
        self.op_name = None                 # type: str
        self.op_input_args = None           # type: ObjectValue
Pavel Spirek's avatar
Pavel Spirek committed
95 96


97 98 99 100 101 102 103 104
class DataChange:
    def __init__(self, change_type: ChangeType, rpc_info: RpcInfo, data: Any):
        self.change_type = change_type
        self.rpc_info = rpc_info
        self.data = data


class ChangeList:
Pavel Spirek's avatar
Pavel Spirek committed
105 106
    def __init__(self, root_origin_cl: InstanceNode, changelist_name: str):
        self.root_list = [root_origin_cl]
107 108 109
        self.changelist_name = changelist_name
        self.journal = []   # type: List[DataChange]

Pavel Spirek's avatar
Pavel Spirek committed
110
    def add(self, change: DataChange, root_after_change: InstanceNode):
111
        self.journal.append(change)
Pavel Spirek's avatar
Pavel Spirek committed
112 113 114 115
        self.root_list.append(root_after_change)


class UsrChangeJournal:
116
    def __init__(self, root_origin: InstanceNode, transaction_opts: Optional[JsonNodeT]):
Pavel Spirek's avatar
Pavel Spirek committed
117
        self.root_origin = root_origin
118
        self.transaction_opts = transaction_opts
Pavel Spirek's avatar
Pavel Spirek committed
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
        self.clists = []    # type: List[ChangeList]

    def cl_new(self, cl_name: str):
        self.clists.append(ChangeList(self.get_root_head(), cl_name))

    def cl_drop(self) -> bool:
        try:
            self.clists.pop()
            return True
        except IndexError:
            return False

    def get_root_head(self) -> InstanceNode:
        if len(self.clists) > 0:
            return self.clists[-1].root_list[-1]
        else:
            return self.root_origin

    def list(self) -> str:
        chl_json = {}
        for chl in self.clists:
            changes = []
            for ch in chl.journal:
                changes.append(
                    [ch.change_type.name, ch.rpc_info.path]
                )

            chl_json[chl.changelist_name] = changes

        return chl_json

    def commit(self, ds: "BaseDatastore"):
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
        if hash(ds.get_data_root()) == hash(self.root_origin):
            info("Commiting new configuration (swapping roots)")
            # Set new root
            nr = self.get_root_head()
        else:
            info("Commiting new configuration (re-applying changes)")
            nr = ds.get_data_root()
            for cl in self.clists:
                for change in cl.journal:
                    if change.change_type == ChangeType.CREATE:
                        nr = ds.create_node_rpc(nr, change.rpc_info, change.data)
                    elif change.change_type == ChangeType.REPLACE:
                        nr = ds.update_node_rpc(nr, change.rpc_info, change.data)
                    elif change.change_type == ChangeType.DELETE:
                        nr = ds.delete_node_rpc(nr, change.rpc_info)

Pavel Spirek's avatar
Pavel Spirek committed
167
        try:
168
            # Validate syntax and semantics of new data
169 170
            if CONFIG["GLOBAL"]["VALIDATE_TRANSACTIONS"] is True:
                nr.validate(ValidationScope.all, ContentType.config)
171 172 173 174 175 176 177
            new_data_valid = True
        except (SchemaError, SemanticError) as e:
            error("Data validation error:")
            error(epretty(e))
            new_data_valid = False

        if new_data_valid:
Pavel Spirek's avatar
Pavel Spirek committed
178
            # Set new data root
179
            ds.set_data_root(nr)
Pavel Spirek's avatar
Pavel Spirek committed
180

181 182 183 184 185 186 187 188
            # Call commit begin hook
            begin_hook_failed = False
            try:
                ds.commit_begin_callback(self.transaction_opts)
            except Exception as e:
                error("Exception occured in commit_begin handler: {}".format(epretty(e)))
                begin_hook_failed = True

189
            # Run schema node handlers
190 191 192 193 194
            conf_handler_failed = False
            if not begin_hook_failed:
                try:
                    for cl in self.clists:
                        for change in cl.journal:
195
                            ii = ds.parse_ii(change.rpc_info.path, change.rpc_info.path_format)
196
                            ds.run_conf_edit_handler(ii, change)
197
                except IndexError as e: ## Exception
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
                    error("Exception occured in edit handler: {}".format(epretty(e)))
                    conf_handler_failed = True

            # Call commit end hook
            end_hook_failed = False
            end_hook_abort_failed = False
            if not (begin_hook_failed or conf_handler_failed):
                try:
                    ds.commit_end_callback(self.transaction_opts, failed=False)
                except Exception as e:
                    error("Exception occured in commit_end handler: {}".format(epretty(e)))
                    end_hook_failed = True

            if begin_hook_failed or conf_handler_failed or end_hook_failed:
                try:
                    # Call commit_end callback again with "failed" argument set to True
                    ds.commit_end_callback(self.transaction_opts, failed=True)
                except Exception as e:
                    error("Exception occured in commit_end handler (abort): {}".format(epretty(e)))
                    end_hook_abort_failed = True
Pavel Spirek's avatar
Pavel Spirek committed
218 219 220

            # Clear user changelists
            self.clists.clear()
221

222 223 224 225
            # Return to previous version of data and raise an exception if something went wrong
            if begin_hook_failed or conf_handler_failed or end_hook_failed or end_hook_abort_failed:
                ds.data_root_rollback(history_steps=1, store_current=False)
                raise ConfHandlerFailedError("(see logged)")
226

227

Pavel Spirek's avatar
Pavel Spirek committed
228
class BaseDatastore:
229
    def __init__(self, dm: DataModel, name: str="", with_nacm: bool=False):
230 231 232
        def _blankfn(*args, **kwargs):
            pass

Pavel Spirek's avatar
Pavel Spirek committed
233
        self.name = name
Pavel Spirek's avatar
Pavel Spirek committed
234
        self.nacm = None    # type: NacmConfig
235
        self._data = None   # type: InstanceNode
236 237
        self._data_history = []     # type: List[InstanceNode]
        self._yang_lib_data = None  # type: InstanceNode
238
        self._dm = dm       # type: DataModel
Pavel Spirek's avatar
Pavel Spirek committed
239
        self._data_lock = Lock()
Pavel Spirek's avatar
Pavel Spirek committed
240
        self._lock_username = None  # type: str
241
        self._usr_journals = {}     # type: Dict[str, UsrChangeJournal]
242 243
        self.commit_begin_callback = _blankfn   # type: Callable[..., bool]
        self.commit_end_callback = _blankfn     # type: Callable[..., bool]
Pavel Spirek's avatar
Pavel Spirek committed
244

245
        if with_nacm:
246
            self.nacm = NacmConfig(self, self._dm)
Pavel Spirek's avatar
Pavel Spirek committed
247

Pavel Spirek's avatar
Pavel Spirek committed
248
    # Returns the root node of data tree
249 250 251 252 253
    def get_data_root(self, previous_version: int=0) -> InstanceNode:
        if previous_version > 0:
            return self._data_history[-previous_version]
        else:
            return self._data
Pavel Spirek's avatar
Pavel Spirek committed
254

255 256 257
    def get_yl_data_root(self) -> InstanceNode:
        return self._yang_lib_data

258 259 260 261 262 263 264
    # Returns the root node of data tree
    def get_data_root_staging(self, username: str) -> InstanceNode:
        usr_journal = self._usr_journals.get(username)
        if usr_journal is not None:
            root = usr_journal.get_root_head()
            return root
        else:
Pavel Spirek's avatar
Pavel Spirek committed
265
            raise NoStagingDataException("No active changelist for user \"{}\"".format(username))
266

267
    # Set a new Instance node as data root, store old root to archive
Pavel Spirek's avatar
Pavel Spirek committed
268
    def set_data_root(self, new_root: InstanceNode):
269
        self._data_history.append(self._data)
Pavel Spirek's avatar
Pavel Spirek committed
270 271
        self._data = new_root

272 273 274 275 276 277
    def data_root_rollback(self, history_steps: int, store_current: bool):
        if store_current:
            self._data_history.append(self._data)

        self._data = self._data_history[-history_steps]

278 279 280 281 282 283 284 285
    def parse_ii(self, path: str, path_format: PathFormat) -> InstanceRoute:
        if path_format == PathFormat.URL:
            ii = self._dm.parse_resource_id(path)
        else:
            ii = self._dm.parse_instance_id(path)

        return ii

286 287
    # Get schema node with particular schema address
    def get_schema_node(self, sch_pth: str) -> SchemaNode:
288
        sn = self._dm.get_data_node(sch_pth)
289
        if sn is None:
290 291
            # raise NonexistentSchemaNode(sch_pth)
            debug_data("Cannot find schema node for " + sch_pth)
292 293 294
        return sn

    # Notify data observers about change in datastore
295
    def run_conf_edit_handler(self, ii: InstanceRoute, ch: DataChange):
296
        try:
297 298 299 300
            sch_pth_list = list(filter(lambda n: isinstance(n, MemberName), ii))

            if ch.change_type == ChangeType.CREATE:
                # Get target member name
301 302
                input_member_name_fq = tuple(ch.data.keys())[0]
                input_member_name_ns, input_member_name = input_member_name_fq.split(":", maxsplit=1)
303
                # Append it to ii
304
                sch_pth_list.append(MemberName(input_member_name, None))
305

306
            sch_pth = DataHelpers.ii2str(sch_pth_list)
307
            print(sch_pth)
308 309
            sn = self.get_schema_node(sch_pth)

310 311 312 313 314 315
            if sn is None:
                return

            h = CONF_DATA_HANDLES.get_handler(str(id(sn)))
            if h is not None:
                info("handler for actual data node triggered")
316 317 318 319 320 321 322 323 324 325 326 327 328 329
                if isinstance(h, ConfDataObjectHandler):
                    if ch.change_type == ChangeType.CREATE:
                        h.create(ii, ch)
                    elif ch.change_type == ChangeType.REPLACE:
                        h.replace(ii, ch)
                    elif ch.change_type == ChangeType.DELETE:
                        h.delete(ii, ch)
                if isinstance(h, ConfDataListHandler):
                    if ch.change_type == ChangeType.CREATE:
                        h.create_item(ii, ch)
                    elif ch.change_type == ChangeType.REPLACE:
                        h.replace_item(ii, ch)
                    elif ch.change_type == ChangeType.DELETE:
                        h.delete_item(ii, ch)
330
            else:
331
                sn = sn.parent
332 333 334 335
                while sn is not None:
                    h = CONF_DATA_HANDLES.get_handler(str(id(sn)))
                    if h is not None and isinstance(h, ConfDataObjectHandler):
                        info("handler for superior data node triggered, replace")
Pavel Spirek's avatar
Pavel Spirek committed
336 337
                        # print(h.schema_path)
                        # print(h.__class__.__name__)
338 339 340 341 342
                        h.replace(ii, ch)
                    if h is not None and isinstance(h, ConfDataListHandler):
                        info("handler for superior data node triggered, replace_item")
                        h.replace_item(ii, ch)
                    sn = sn.parent
343 344
        except NonexistentInstance:
            warn("Cannnot notify {}, parent container removed".format(ii))
345

Pavel Spirek's avatar
Pavel Spirek committed
346
    # Get data node, evaluate NACM if required
Pavel Spirek's avatar
Pavel Spirek committed
347 348
    def get_node_rpc(self, rpc: RpcInfo, staging=False) -> InstanceNode:
        ii = self.parse_ii(rpc.path, rpc.path_format)
349

Pavel Spirek's avatar
Pavel Spirek committed
350
        if staging:
351
            try:
352
                root = self.get_data_root_staging(rpc.username)
Pavel Spirek's avatar
Pavel Spirek committed
353
            except NoStagingDataException:
354
                root = self._data
355 356
        else:
            root = self._data
357

Pavel Spirek's avatar
Pavel Spirek committed
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
        if (len(ii) > 0) and (isinstance(ii[0], MemberName)):
            # Not getting root
            ns_first = ii[0].namespace
            if (ns_first == "ietf-netconf-acm") and (rpc.username not in CONFIG_NACM["ALLOWED_USERS"]):
                raise NacmForbiddenError(rpc.username + " not allowed to access NACM data")
            elif ns_first == "ietf-yang-library":
                root = self._yang_lib_data
        else:
            # Root node requested
            # Remove NACM data if user is not NACM privieged
            if rpc.username not in CONFIG_NACM["ALLOWED_USERS"]:
                try:
                    root = root.delete_item("ietf-netconf-acm:nacm")
                except NonexistentInstance:
                    pass

            # Append YANG library data
            for member_name, member_val in self._yang_lib_data.value.items():
                root = root.put_member(member_name, member_val)

378
        # Resolve schema node of the desired data node
379
        sch_pth_list = filter(lambda isel: isinstance(isel, MemberName), ii)
380
        sch_pth = DataHelpers.ii2str(sch_pth_list)
381
        sn = self.get_schema_node(sch_pth)
382

383 384
        state_roots = sn.state_roots()

Pavel Spirek's avatar
Pavel Spirek committed
385 386
        # Check if URL points to state data or node that contains state data
        if state_roots:
387
            debug_data("State roots: {}".format(state_roots))
388

389 390 391
            for state_root_sch_pth in state_roots:
                state_root_sn = self._dm.get_data_node(state_root_sch_pth)

Pavel Spirek's avatar
Pavel Spirek committed
392
                # Check if the desired node is child of the state root
393 394 395
                sni = sn
                is_child = False
                while sni:
396
                    if sni is state_root_sn:
397 398 399 400 401 402 403 404 405
                        is_child = True
                        break
                    sni = sni.parent

                if is_child:
                    # Direct request for the state data
                    sdh = STATE_DATA_HANDLES.get_handler(state_root_sch_pth)
                    if sdh is not None:
                        if isinstance(sdh, ContainerNodeHandlerBase):
406
                            state_handler_val = sdh.generate_node(ii, rpc.username, staging)
407 408
                            state_root_n = sdh.schema_node.orphan_instance(state_handler_val)
                        elif isinstance(sdh, ListNodeHandlerBase):
409 410 411 412 413 414
                            if (sn is sdh.schema_node) and isinstance(ii[-1], MemberName):
                                state_handler_val = sdh.generate_list(ii, rpc.username, staging)
                                state_root_n = sdh.schema_node.orphan_instance(state_handler_val)
                            else:
                                state_handler_val = sdh.generate_item(ii, rpc.username, staging)
                                state_root_n = sdh.schema_node.orphan_entry(state_handler_val)
415 416 417 418

                        # Select desired subnode from handler-generated content
                        ii_prefix, ii_rel = sdh.schema_node.split_instance_route(ii)
                        n = state_root_n.goto(ii_rel)
419 420 421 422 423

                        # There should be only one state root, no need to continue
                        if len(state_roots) != 1:
                            warn("URI points to directly to state data, but more state roots found")
                        break
424 425
                    else:
                        raise NoHandlerForStateDataError(rpc.path)
426
                else:
427
                    # Request for config data containing state data
428 429 430 431 432 433 434 435
                    n = root.goto(ii)

                    def _fill_state_roots(node: InstanceNode) -> InstanceNode:
                        if isinstance(node.value, ObjectValue):
                            if node.schema_node is state_root_sn.parent:
                                ii_gen = DataHelpers.node_get_ii(node)
                                sdh = STATE_DATA_HANDLES.get_handler(state_root_sch_pth)
                                if sdh is not None:
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
                                    try:
                                        if isinstance(sdh, ContainerNodeHandlerBase):
                                            state_handler_val = sdh.generate_node(ii_gen, rpc.username, staging)
                                        elif isinstance(sdh, ListNodeHandlerBase):
                                            state_handler_val = sdh.generate_list(ii_gen, rpc.username, staging)
                                    except Exception as e:
                                        error("Error occured in state data generator (sn: {})".format(state_root_sch_pth))
                                        error(epretty(e))
                                        error("This state node will be omitted.")
                                    else:
                                        if state_root_sn.ns == state_root_sn.parent.ns:
                                            nm_name = state_root_sn.qual_name[0]
                                        else:
                                            nm_name = state_root_sn.qual_name[1] + ":" + state_root_sn.qual_name[0]

                                        # print("nm={}".format(nm_name))
                                        node = node.put_member(nm_name, state_handler_val, raw=True).up()
453 454 455 456 457 458 459 460 461 462 463 464 465 466
                            else:
                                for key in node:
                                    member = node[key]
                                    node = _fill_state_roots(member).up()
                        elif isinstance(node.value, ArrayValue):
                            i = 0
                            arr_len = len(node.value)
                            while i < arr_len:
                                node = _fill_state_roots(node[i]).up()
                                i += 1

                        return node

                    n = _fill_state_roots(n)
467
                    root = n.top()
468
        else:
Pavel Spirek's avatar
Pavel Spirek committed
469
            # No state data in requested node
470
            n = root.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
471

Pavel Spirek's avatar
Pavel Spirek committed
472
        # Process "with-defaults" query parameter
473 474
        try:
            with_defs = rpc.qs["with-defaults"][0]
475
        except (IndexError, KeyError):
476 477 478 479 480
            with_defs = None

        if with_defs == "report-all":
            n = n.add_defaults()

Pavel Spirek's avatar
Pavel Spirek committed
481
        # Evaluate NACM if required
Pavel Spirek's avatar
Pavel Spirek committed
482
        if self.nacm:
Pavel Spirek's avatar
Pavel Spirek committed
483
            nrpc = self.nacm.get_user_rules(rpc.username)
484
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_READ) == Action.DENY:
Pavel Spirek's avatar
Pavel Spirek committed
485 486
                raise NacmForbiddenError()
            else:
487 488
                # Prune nodes that should not be accessible to user
                n = nrpc.prune_data_tree(n, root, ii, Permission.NACM_ACCESS_READ)
Pavel Spirek's avatar
Pavel Spirek committed
489

Pavel Spirek's avatar
Pavel Spirek committed
490
        # Process "depth" query parameter
491
        try:
492 493 494 495 496 497 498
            max_depth_str = rpc.qs["depth"][0]
            if max_depth_str == "unbounded":
                max_depth = None
            else:
                max_depth = int(max_depth_str) - 1
                if (max_depth < 0) or (max_depth > 65535):
                    raise ValueError()
499
        except (IndexError, KeyError):
500 501 502 503 504 505 506 507 508 509 510
            max_depth = None
        except ValueError:
            raise ValueError("Invalid value of query param \"depth\"")

        if max_depth is not None:
            def _tree_limit_depth(node: InstanceNode, depth: int) -> InstanceNode:
                if isinstance(node.value, ObjectValue):
                    if depth > max_depth:
                        node.value = ObjectValue({})
                    else:
                        for child_key in sorted(node.value.keys()):
511
                            m = node[child_key]
512 513 514 515 516 517
                            node = _tree_limit_depth(m, depth + 1).up()
                elif isinstance(node.value, ArrayValue):
                    if depth > max_depth:
                        node.value = ArrayValue([])
                    else:
                        for i in range(len(node.value)):
518
                            e = node[i]
519 520 521 522 523
                            node = _tree_limit_depth(e, depth + 1).up()

                return node
            n = _tree_limit_depth(n, 1)

Pavel Spirek's avatar
Pavel Spirek committed
524
        # Return result
Pavel Spirek's avatar
Pavel Spirek committed
525 526
        return n

527
    # Create new data node (Restconf draft compliant version)
528
    def create_node_rpc(self, root: InstanceNode, rpc: RpcInfo, value: Any) -> InstanceNode:
529
        ii = self.parse_ii(rpc.path, rpc.path_format)
530
        n = root.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
531

532 533 534
        insert = rpc.qs.get("insert", [None])[0]
        point = rpc.qs.get("point", [None])[0]

535
        if self.nacm:
Pavel Spirek's avatar
Pavel Spirek committed
536
            nrpc = self.nacm.get_user_rules(rpc.username)
537
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_CREATE) == Action.DENY:
538 539
                raise NacmForbiddenError()

540
        # Get target member name
541 542 543 544
        input_member_name = tuple(value.keys())
        if len(input_member_name) != 1:
            raise ValueError("Received json object must contain exactly one member")
        else:
545 546
            input_member_name_fq = input_member_name[0]
            input_member_name = input_member_name_fq.split(":", maxsplit=1)[-1]
Pavel Spirek's avatar
Pavel Spirek committed
547

548
        input_member_value = value[input_member_name_fq]
Pavel Spirek's avatar
Pavel Spirek committed
549

550
        # Check if target member already exists
551
        try:
552
            existing_member = n[input_member_name]
553
        except NonexistentInstance:
554
            existing_member = None
555

556
        # Get target schema node
557 558
        n = root.goto(ii)

559
        sn = n.schema_node  # type: InternalNode
560 561 562
        # sch_member_name = sn._iname2qname(input_member_name)
        # member_sn = sn.get_data_child(*sch_member_name)
        member_sn = sn.get_child(input_member_name)
563

564
        if isinstance(member_sn, ListNode):
Pavel Spirek's avatar
Pavel Spirek committed
565
            # Append received node to list
566

567 568
            # Create list if necessary
            if existing_member is None:
569
                existing_member = n.put_member(input_member_name, ArrayValue([]))
570

571
            # Convert input data from List/Dict to ArrayValue/ObjectValue
572 573
            new_value_list = member_sn.from_raw([input_member_value])
            new_value_item = new_value_list[0]    # type: ObjectValue
574 575

            list_node_key = member_sn.keys[0][0]
576
            if new_value_item[list_node_key] in map(lambda x: x[list_node_key], existing_member.value):
577 578 579
                raise InstanceAlreadyPresent("Duplicate key")

            if insert == "first":
580
                # Optimization
581
                if len(existing_member.value) > 0:
582
                    list_entry_first = existing_member[0]   # type: ArrayEntry
583
                    new_member = list_entry_first.insert_before(new_value_item).up()
584 585
                else:
                    new_member = existing_member.update(new_value_list)
586
            elif (insert == "last") or insert is None:
587
                # Optimization
588
                if len(existing_member.value) > 0:
589 590 591 592
                    list_entry_last = existing_member[-1]   # type: ArrayEntry
                    new_member = list_entry_last.insert_after(new_value_item).up()
                else:
                    new_member = existing_member.update(new_value_list)
593 594
            elif insert == "before":
                entry_sel = EntryKeys({list_node_key: point})
595 596
                list_entry = entry_sel.goto_step(existing_member)   # type: ArrayEntry
                new_member = list_entry.insert_before(new_value_item).up()
597 598
            elif insert == "after":
                entry_sel = EntryKeys({list_node_key: point})
599 600
                list_entry = entry_sel.goto_step(existing_member)   # type: ArrayEntry
                new_member = list_entry.insert_after(new_value_item).up()
601 602
            else:
                raise ValueError("Invalid 'insert' value")
603 604 605 606 607
        elif isinstance(member_sn, LeafListNode):
            # Append received node to leaf list

            # Create leaf list if necessary
            if existing_member is None:
608
                existing_member = n.put_member(input_member_name, ArrayValue([]))
609

610
            # Convert input data from List/Dict to ArrayValue/ObjectValue
611
            new_value_item = member_sn.from_raw([input_member_value])[0]
612 613

            if insert == "first":
614
                new_member = existing_member.update(ArrayValue([new_value_item] + existing_member.value))
615
            elif (insert == "last") or insert is None:
616
                new_member = existing_member.update(ArrayValue(existing_member.value + [new_value_item]))
617 618
            else:
                raise ValueError("Invalid 'insert' value")
Pavel Spirek's avatar
Pavel Spirek committed
619
        else:
620 621 622 623
            if existing_member is None:
                # Create new data node

                # Convert input data from List/Dict to ArrayValue/ObjectValue
624
                new_value_item = member_sn.from_raw(input_member_value)
625 626

                # Create new node (object member)
627
                new_member = n.put_member(input_member_name, new_value_item)
628 629 630
            else:
                # Data node already exists
                raise InstanceAlreadyPresent("Member \"{}\" already present in \"{}\"".format(input_member_name, ii))
Pavel Spirek's avatar
Pavel Spirek committed
631

632
        print(json.dumps(new_member.top().value, indent=4))
633
        return new_member.top()
634

635
    # PUT data node
636
    def update_node_rpc(self, root: InstanceNode, rpc: RpcInfo, value: Any) -> InstanceNode:
637
        ii = self.parse_ii(rpc.path, rpc.path_format)
638
        n = root.goto(ii)
639

640
        if self.nacm:
Pavel Spirek's avatar
Pavel Spirek committed
641
            nrpc = self.nacm.get_user_rules(rpc.username)
642
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_UPDATE) == Action.DENY:
643
                raise NacmForbiddenError()
Pavel Spirek's avatar
Pavel Spirek committed
644

645 646 647 648 649 650 651 652 653
            sn = n.schema_node
            new_val = sn.from_raw([value])[0]
            new_node = RootNode(new_val, sn, new_val.timestamp)
            new_node_prunned = nrpc.prune_data_tree(new_node, self._data, ii, Permission.NACM_ACCESS_UPDATE).raw_value()
            print(json.dumps(new_node_prunned, indent=4))

            new_n = n.update(new_node_prunned, raw=True)
        else:
            new_n = n.update(value, raw=True)
654

655
        return new_n.top()
656

657
    # Delete data node
658
    def delete_node_rpc(self, root: InstanceNode, rpc: RpcInfo) -> InstanceNode:
659
        ii = self.parse_ii(rpc.path, rpc.path_format)
660
        n = root.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
661 662 663
        n_parent = n.up()
        last_isel = ii[-1]

664
        if self.nacm:
Pavel Spirek's avatar
Pavel Spirek committed
665
            nrpc = self.nacm.get_user_rules(rpc.username)
666
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_DELETE) == Action.DENY:
667
                raise NacmForbiddenError()
Pavel Spirek's avatar
Pavel Spirek committed
668

669
        new_n = n_parent
670 671
        if isinstance(n_parent.value, ArrayValue):
            if isinstance(last_isel, EntryIndex):
672
                new_n = n_parent.delete_item(last_isel.key)
673
            elif isinstance(last_isel, EntryKeys):
674
                new_n = n_parent.delete_item(n.index)
675 676
        elif isinstance(n_parent.value, ObjectValue):
            if isinstance(last_isel, MemberName):
677
                new_n = n_parent.delete_item(last_isel.key)
678
        else:
679
            raise InstanceValueError(n, "Invalid target node type")
Pavel Spirek's avatar
Pavel Spirek committed
680

681
        return new_n.top()
Pavel Spirek's avatar
Pavel Spirek committed
682

683
    # Invoke an operation
684
    def invoke_op_rpc(self, rpc: RpcInfo) -> ObjectValue:
685
        if rpc.op_name == "jetconf:conf-start":
Pavel Spirek's avatar
Pavel Spirek committed
686 687 688 689 690
            try:
                cl_name = rpc.op_input_args["name"]
            except (TypeError, KeyError):
                raise ValueError("This operation expects \"name\" input parameter")

691 692 693 694 695
            transaction_opts = rpc.op_input_args.get("options")

            if self._usr_journals.get(rpc.username) is None:
                self._usr_journals[rpc.username] = UsrChangeJournal(self._data, transaction_opts)

Pavel Spirek's avatar
Pavel Spirek committed
696
            self._usr_journals[rpc.username].cl_new(cl_name)
697
            ret_data = {"status": "OK"}
698
        elif rpc.op_name == "jetconf:conf-list":
Pavel Spirek's avatar
Pavel Spirek committed
699 700 701 702 703 704
            usr_journal = self._usr_journals.get(rpc.username)
            if usr_journal is not None:
                chl_json = usr_journal.list()
            else:
                chl_json = str(None)

705 706 707 708 709
            ret_data = \
                {
                    "status": "OK",
                    "changelists": chl_json
                }
710
        elif rpc.op_name == "jetconf:conf-drop":
Pavel Spirek's avatar
Pavel Spirek committed
711 712 713 714
            usr_journal = self._usr_journals.get(rpc.username)
            if usr_journal is not None:
                if not usr_journal.cl_drop():
                    del self._usr_journals[rpc.username]
715 716

            ret_data = {"status": "OK"}
717
        elif rpc.op_name == "jetconf:conf-commit":
Pavel Spirek's avatar
Pavel Spirek committed
718 719
            usr_journal = self._usr_journals.get(rpc.username)
            if usr_journal is not None:
720 721 722 723 724 725 726 727
                try:
                    self.lock_data(rpc.username)
                    usr_journal.commit(self)
                    if CONFIG["GLOBAL"]["PERSISTENT_CHANGES"] is True:
                        self.save()
                finally:
                    self.unlock_data()

Pavel Spirek's avatar
Pavel Spirek committed
728 729
                del self._usr_journals[rpc.username]
            else:
730
                info("[{}]: Nothing to commit".format(rpc.username))
Pavel Spirek's avatar
Pavel Spirek committed
731 732 733 734 735 736

            ret_data = \
                {
                    "status": "OK",
                    "conf-changed": True
                }
737
        elif rpc.op_name == "jetconf:get-schema-digest":
738
            ret_data = self._dm.schema_digest()
739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754
        elif rpc.op_name == "jetconf:get-list-length":
            try:
                list_url = rpc.op_input_args["url"]     # type: str
            except (TypeError, KeyError):
                raise ValueError("This operation expects \"url\" input parameter")

            if list_url == "":
                list_ii = []
            else:
                list_ii = self.parse_ii(list_url, PathFormat.URL)

            ln_val = self._data.goto(list_ii).value
            if isinstance(ln_val, list):
                ret_data = {"jetconf:list-length": len(ln_val)}
            else:
                raise ValueError("Passed URI does not point to List")
755
        else:
756
            # User-defined operation
757
            if self.nacm and (not rpc.skip_nacm_check):
Pavel Spirek's avatar
Pavel Spirek committed
758
                nrpc = self.nacm.get_user_rules(rpc.username)
759 760 761 762 763
                if nrpc.check_rpc_name(rpc.op_name) == Action.DENY:
                    raise NacmForbiddenError(
                        "Op \"{}\" invocation denied for user \"{}\"".format(rpc.op_name, rpc.username)
                    )

764 765
            op_handler = OP_HANDLERS.get_handler(rpc.op_name)
            if op_handler is None:
766
                raise NoHandlerForOpError(rpc.op_name)
767 768

            # Print operation input schema
769 770
            sn = self._dm.get_schema_node(rpc.path)
            sn_input = sn.get_child("input")
771 772 773
            # if sn_input is not None:
            #     print("RPC input schema:")
            #     print(sn_input._ascii_tree(""))
774

775 776 777 778 779 780 781
            if sn_input.children:
                # Input arguments are expected
                op_input_args = sn_input.from_raw(rpc.op_input_args)
                ret_data = op_handler(op_input_args, rpc.username)
            else:
                # Operation with no input
                ret_data = op_handler(None, rpc.username)
782 783 784

        return ret_data

Pavel Spirek's avatar
Pavel Spirek committed
785 786 787 788 789
    def add_to_journal_rpc(self, ch_type: ChangeType, rpc: RpcInfo, value: Any, new_root: InstanceNode):
        usr_journal = self._usr_journals.get(rpc.username)
        if usr_journal is not None:
            usr_chs = usr_journal.clists[-1]
            usr_chs.add(DataChange(ch_type, rpc, value), new_root)
790 791 792
        else:
            raise NoHandlerError("No active changelist for user \"{}\"".format(rpc.username))

Pavel Spirek's avatar
Pavel Spirek committed
793
    # Locks datastore data
794 795
    def lock_data(self, username: str = None, blocking: bool=True):
        ret = self._data_lock.acquire(blocking=blocking, timeout=1)
Pavel Spirek's avatar
Pavel Spirek committed
796
        if ret:
Pavel Spirek's avatar
Pavel Spirek committed
797
            self._lock_username = username or "(unknown)"
798
            debug_data("Acquired lock in datastore \"{}\" for user \"{}\"".format(self.name, username))
Pavel Spirek's avatar
Pavel Spirek committed
799
        else:
Pavel Spirek's avatar
Pavel Spirek committed
800
            raise DataLockError(
801 802 803 804 805
                "Failed to acquire lock in datastore \"{}\" for user \"{}\", already locked by \"{}\"".format(
                    self.name,
                    username,
                    self._lock_username
                )
Pavel Spirek's avatar
Pavel Spirek committed
806 807
            )

808
    # Unlock datastore data
Pavel Spirek's avatar
Pavel Spirek committed
809 810
    def unlock_data(self):
        self._data_lock.release()
811
        debug_data("Released lock in datastore \"{}\" for user \"{}\"".format(self.name, self._lock_username))
Pavel Spirek's avatar
Pavel Spirek committed
812 813
        self._lock_username = None

814
    # Load data from persistent storage
815
    def load(self):
Pavel Spirek's avatar
Pavel Spirek committed
816 817
        raise NotImplementedError("Not implemented in base class")

818
    # Save data to persistent storage
819
    def save(self):
Pavel Spirek's avatar
Pavel Spirek committed
820 821
        raise NotImplementedError("Not implemented in base class")

Pavel Spirek's avatar
Pavel Spirek committed
822 823

class JsonDatastore(BaseDatastore):
824 825
    def __init__(self, dm: DataModel, json_file: str, name: str = "", with_nacm: bool=False):
        super().__init__(dm, name, with_nacm)
826 827 828
        self.json_file = json_file

    def load(self):
Pavel Spirek's avatar
Pavel Spirek committed
829
        self._data = None
830
        with open(self.json_file, "rt") as fp:
Pavel Spirek's avatar
Pavel Spirek committed
831
            self._data = self._dm.from_raw(json.load(fp))
Pavel Spirek's avatar
Pavel Spirek committed
832

833 834 835
        if self.nacm is not None:
            self.nacm.update()

836 837 838 839 840
    def load_yl_data(self, filename: str):
        self._yang_lib_data = None
        with open(filename, "rt") as fp:
            self._yang_lib_data = self._dm.from_raw(json.load(fp))

841 842 843
    def save(self):
        with open(self.json_file, "w") as jfd:
            json.dump(self._data.raw_value(), jfd, indent=4)