data.py 30.9 KB
Newer Older
Pavel Spirek's avatar
Pavel Spirek committed
1
import json
2

3
from threading import Lock
Pavel Spirek's avatar
Pavel Spirek committed
4
from enum import Enum
5
from colorlog import error, warning as warn, info
6
from typing import List, Any, Dict, Callable, Optional
7

8
from yangson.datamodel import DataModel
9
from yangson.enumerations import ContentType, ValidationScope
10
from yangson.schemanode import (
11 12 13 14 15
    SchemaNode,
    ListNode,
    LeafListNode,
    SchemaError,
    SemanticError,
16 17
    InternalNode,
    ContainerNode)
18 19 20
from yangson.instance import (
    InstanceNode,
    NonexistentInstance,
21
    InstanceValueError,
22 23 24 25 26
    ArrayValue,
    ObjectValue,
    MemberName,
    EntryKeys,
    EntryIndex,
27
    InstanceRoute,
28
    ArrayEntry,
29 30
    RootNode,
    ObjectMember)
31

32
from .helpers import PathFormat, ErrorHelpers, LogHelpers, DataHelpers, JsonNodeT
33
from .config import CONFIG
34
from .nacm import NacmConfig, Permission, Action
35
from .handler_list import OP_HANDLERS, STATE_DATA_HANDLES, CONF_DATA_HANDLES, ConfDataObjectHandler, ConfDataListHandler
36
from .usr_state_data_handlers import ContainerNodeHandlerBase, ListNodeHandlerBase
37 38 39

epretty = ErrorHelpers.epretty
debug_data = LogHelpers.create_module_dbg_logger(__name__)
Pavel Spirek's avatar
Pavel Spirek committed
40 41


42 43 44 45 46 47
class ChangeType(Enum):
    CREATE = 0,
    REPLACE = 1,
    DELETE = 2


Pavel Spirek's avatar
Pavel Spirek committed
48
class NacmForbiddenError(Exception):
49
    def __init__(self, msg="Access to data node rejected by NACM", rule=None):
Pavel Spirek's avatar
Pavel Spirek committed
50
        self.msg = msg
51
        self.rulename = rule
Pavel Spirek's avatar
Pavel Spirek committed
52

53 54 55
    def __str__(self):
        return self.msg

Pavel Spirek's avatar
Pavel Spirek committed
56 57 58 59 60

class DataLockError(Exception):
    def __init__(self, msg=""):
        self.msg = msg

Pavel Spirek's avatar
Pavel Spirek committed
61 62
    def __str__(self):
        return self.msg
Pavel Spirek's avatar
Pavel Spirek committed
63 64


65
class InstanceAlreadyPresent(Exception):
66 67 68 69 70 71 72
    def __init__(self, msg=""):
        self.msg = msg

    def __str__(self):
        return self.msg


73
class HandlerError(Exception):
74 75 76 77 78 79 80
    def __init__(self, msg=""):
        self.msg = msg

    def __str__(self):
        return self.msg


81 82 83 84
class NoHandlerError(HandlerError):
    pass


85 86 87 88
class ConfHandlerFailedError(HandlerError):
    pass


89
class NoHandlerForOpError(NoHandlerError):
90 91 92 93 94
    def __init__(self, op_name: str):
        self.op_name = op_name

    def __str__(self):
        return "Nonexistent handler for operation \"{}\"".format(self.op_name)
95 96 97 98 99 100


class NoHandlerForStateDataError(NoHandlerError):
    pass


101
class RpcInfo:
Pavel Spirek's avatar
Pavel Spirek committed
102
    def __init__(self):
103 104
        self.username = None    # type: str
        self.path = None        # type: str
105
        self.qs = None          # type: Dict[str, List[str]]
106 107 108 109
        self.path_format = PathFormat.URL   # type: PathFormat
        self.skip_nacm_check = False        # type: bool
        self.op_name = None                 # type: str
        self.op_input_args = None           # type: ObjectValue
Pavel Spirek's avatar
Pavel Spirek committed
110 111


112 113 114 115 116 117 118 119
class DataChange:
    def __init__(self, change_type: ChangeType, rpc_info: RpcInfo, data: Any):
        self.change_type = change_type
        self.rpc_info = rpc_info
        self.data = data


class ChangeList:
Pavel Spirek's avatar
Pavel Spirek committed
120 121
    def __init__(self, root_origin_cl: InstanceNode, changelist_name: str):
        self.root_list = [root_origin_cl]
122 123 124
        self.changelist_name = changelist_name
        self.journal = []   # type: List[DataChange]

Pavel Spirek's avatar
Pavel Spirek committed
125
    def add(self, change: DataChange, root_after_change: InstanceNode):
126
        self.journal.append(change)
Pavel Spirek's avatar
Pavel Spirek committed
127 128 129 130
        self.root_list.append(root_after_change)


class UsrChangeJournal:
131
    def __init__(self, root_origin: InstanceNode, transaction_opts: Optional[JsonNodeT]):
Pavel Spirek's avatar
Pavel Spirek committed
132
        self.root_origin = root_origin
133
        self.transaction_opts = transaction_opts
Pavel Spirek's avatar
Pavel Spirek committed
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
        self.clists = []    # type: List[ChangeList]

    def cl_new(self, cl_name: str):
        self.clists.append(ChangeList(self.get_root_head(), cl_name))

    def cl_drop(self) -> bool:
        try:
            self.clists.pop()
            return True
        except IndexError:
            return False

    def get_root_head(self) -> InstanceNode:
        if len(self.clists) > 0:
            return self.clists[-1].root_list[-1]
        else:
            return self.root_origin

    def list(self) -> str:
        chl_json = {}
        for chl in self.clists:
            changes = []
            for ch in chl.journal:
                changes.append(
                    [ch.change_type.name, ch.rpc_info.path]
                )

            chl_json[chl.changelist_name] = changes

        return chl_json

    def commit(self, ds: "BaseDatastore"):
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
        if hash(ds.get_data_root()) == hash(self.root_origin):
            info("Commiting new configuration (swapping roots)")
            # Set new root
            nr = self.get_root_head()
        else:
            info("Commiting new configuration (re-applying changes)")
            nr = ds.get_data_root()
            for cl in self.clists:
                for change in cl.journal:
                    if change.change_type == ChangeType.CREATE:
                        nr = ds.create_node_rpc(nr, change.rpc_info, change.data)
                    elif change.change_type == ChangeType.REPLACE:
                        nr = ds.update_node_rpc(nr, change.rpc_info, change.data)
                    elif change.change_type == ChangeType.DELETE:
                        nr = ds.delete_node_rpc(nr, change.rpc_info)

Pavel Spirek's avatar
Pavel Spirek committed
182
        try:
183
            # Validate syntax and semantics of new data
184 185
            if CONFIG["GLOBAL"]["VALIDATE_TRANSACTIONS"] is True:
                nr.validate(ValidationScope.all, ContentType.config)
186 187 188 189 190 191 192
            new_data_valid = True
        except (SchemaError, SemanticError) as e:
            error("Data validation error:")
            error(epretty(e))
            new_data_valid = False

        if new_data_valid:
Pavel Spirek's avatar
Pavel Spirek committed
193
            # Set new data root
194
            ds.set_data_root(nr)
Pavel Spirek's avatar
Pavel Spirek committed
195

196 197 198 199 200 201 202 203
            # Call commit begin hook
            begin_hook_failed = False
            try:
                ds.commit_begin_callback(self.transaction_opts)
            except Exception as e:
                error("Exception occured in commit_begin handler: {}".format(epretty(e)))
                begin_hook_failed = True

204
            # Run schema node handlers
205 206 207 208 209
            conf_handler_failed = False
            if not begin_hook_failed:
                try:
                    for cl in self.clists:
                        for change in cl.journal:
210
                            ii = ds.parse_ii(change.rpc_info.path, change.rpc_info.path_format)
211
                            ds.run_conf_edit_handler(ii, change)
212
                except IndexError as e: ## Exception
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
                    error("Exception occured in edit handler: {}".format(epretty(e)))
                    conf_handler_failed = True

            # Call commit end hook
            end_hook_failed = False
            end_hook_abort_failed = False
            if not (begin_hook_failed or conf_handler_failed):
                try:
                    ds.commit_end_callback(self.transaction_opts, failed=False)
                except Exception as e:
                    error("Exception occured in commit_end handler: {}".format(epretty(e)))
                    end_hook_failed = True

            if begin_hook_failed or conf_handler_failed or end_hook_failed:
                try:
                    # Call commit_end callback again with "failed" argument set to True
                    ds.commit_end_callback(self.transaction_opts, failed=True)
                except Exception as e:
                    error("Exception occured in commit_end handler (abort): {}".format(epretty(e)))
                    end_hook_abort_failed = True
Pavel Spirek's avatar
Pavel Spirek committed
233 234 235

            # Clear user changelists
            self.clists.clear()
236

237 238 239 240
            # Return to previous version of data and raise an exception if something went wrong
            if begin_hook_failed or conf_handler_failed or end_hook_failed or end_hook_abort_failed:
                ds.data_root_rollback(history_steps=1, store_current=False)
                raise ConfHandlerFailedError("(see logged)")
241

242

Pavel Spirek's avatar
Pavel Spirek committed
243
class BaseDatastore:
244
    def __init__(self, dm: DataModel, name: str="", with_nacm: bool=False):
245 246 247
        def _blankfn(*args, **kwargs):
            pass

Pavel Spirek's avatar
Pavel Spirek committed
248
        self.name = name
Pavel Spirek's avatar
Pavel Spirek committed
249
        self.nacm = None    # type: NacmConfig
250
        self._data = None   # type: InstanceNode
251 252
        self._data_history = []     # type: List[InstanceNode]
        self._yang_lib_data = None  # type: InstanceNode
253
        self._dm = dm       # type: DataModel
Pavel Spirek's avatar
Pavel Spirek committed
254
        self._data_lock = Lock()
Pavel Spirek's avatar
Pavel Spirek committed
255
        self._lock_username = None  # type: str
256
        self._usr_journals = {}     # type: Dict[str, UsrChangeJournal]
257 258
        self.commit_begin_callback = _blankfn   # type: Callable[..., bool]
        self.commit_end_callback = _blankfn     # type: Callable[..., bool]
Pavel Spirek's avatar
Pavel Spirek committed
259

260
        if with_nacm:
261
            self.nacm = NacmConfig(self, self._dm)
Pavel Spirek's avatar
Pavel Spirek committed
262

Pavel Spirek's avatar
Pavel Spirek committed
263
    # Returns the root node of data tree
264 265 266 267 268
    def get_data_root(self, previous_version: int=0) -> InstanceNode:
        if previous_version > 0:
            return self._data_history[-previous_version]
        else:
            return self._data
Pavel Spirek's avatar
Pavel Spirek committed
269

270 271 272
    def get_yl_data_root(self) -> InstanceNode:
        return self._yang_lib_data

273 274 275 276 277 278 279 280 281
    # Returns the root node of data tree
    def get_data_root_staging(self, username: str) -> InstanceNode:
        usr_journal = self._usr_journals.get(username)
        if usr_journal is not None:
            root = usr_journal.get_root_head()
            return root
        else:
            raise NoHandlerError("No active changelist for user \"{}\"".format(username))

282
    # Set a new Instance node as data root, store old root to archive
Pavel Spirek's avatar
Pavel Spirek committed
283
    def set_data_root(self, new_root: InstanceNode):
284
        self._data_history.append(self._data)
Pavel Spirek's avatar
Pavel Spirek committed
285 286
        self._data = new_root

287 288 289 290 291 292
    def data_root_rollback(self, history_steps: int, store_current: bool):
        if store_current:
            self._data_history.append(self._data)

        self._data = self._data_history[-history_steps]

293 294 295 296 297 298 299 300
    def parse_ii(self, path: str, path_format: PathFormat) -> InstanceRoute:
        if path_format == PathFormat.URL:
            ii = self._dm.parse_resource_id(path)
        else:
            ii = self._dm.parse_instance_id(path)

        return ii

301 302
    # Get schema node with particular schema address
    def get_schema_node(self, sch_pth: str) -> SchemaNode:
303
        sn = self._dm.get_data_node(sch_pth)
304
        if sn is None:
305 306
            # raise NonexistentSchemaNode(sch_pth)
            debug_data("Cannot find schema node for " + sch_pth)
307 308 309
        return sn

    # Notify data observers about change in datastore
310
    def run_conf_edit_handler(self, ii: InstanceRoute, ch: DataChange):
311
        try:
312 313 314 315
            sch_pth_list = list(filter(lambda n: isinstance(n, MemberName), ii))

            if ch.change_type == ChangeType.CREATE:
                # Get target member name
316 317
                input_member_name_fq = tuple(ch.data.keys())[0]
                input_member_name_ns, input_member_name = input_member_name_fq.split(":", maxsplit=1)
318
                # Append it to ii
319
                sch_pth_list.append(MemberName(input_member_name, None))
320

321
            sch_pth = DataHelpers.ii2str(sch_pth_list)
322
            print(sch_pth)
323 324
            sn = self.get_schema_node(sch_pth)

325 326 327 328 329 330
            if sn is None:
                return

            h = CONF_DATA_HANDLES.get_handler(str(id(sn)))
            if h is not None:
                info("handler for actual data node triggered")
331 332 333 334 335 336 337 338 339 340 341 342 343 344
                if isinstance(h, ConfDataObjectHandler):
                    if ch.change_type == ChangeType.CREATE:
                        h.create(ii, ch)
                    elif ch.change_type == ChangeType.REPLACE:
                        h.replace(ii, ch)
                    elif ch.change_type == ChangeType.DELETE:
                        h.delete(ii, ch)
                if isinstance(h, ConfDataListHandler):
                    if ch.change_type == ChangeType.CREATE:
                        h.create_item(ii, ch)
                    elif ch.change_type == ChangeType.REPLACE:
                        h.replace_item(ii, ch)
                    elif ch.change_type == ChangeType.DELETE:
                        h.delete_item(ii, ch)
345
            else:
346
                sn = sn.parent
347 348 349 350
                while sn is not None:
                    h = CONF_DATA_HANDLES.get_handler(str(id(sn)))
                    if h is not None and isinstance(h, ConfDataObjectHandler):
                        info("handler for superior data node triggered, replace")
351 352
                        print(h.schema_path)
                        print(h.__class__.__name__)
353 354 355 356 357
                        h.replace(ii, ch)
                    if h is not None and isinstance(h, ConfDataListHandler):
                        info("handler for superior data node triggered, replace_item")
                        h.replace_item(ii, ch)
                    sn = sn.parent
358 359
        except NonexistentInstance:
            warn("Cannnot notify {}, parent container removed".format(ii))
360

Pavel Spirek's avatar
Pavel Spirek committed
361
    # Get data node, evaluate NACM if required
362
    def get_node_rpc(self, rpc: RpcInfo, yl_data=False, staging=False) -> InstanceNode:
363 364 365
        if rpc.path == "":
            ii = []
        else:
366
            ii = self.parse_ii(rpc.path, rpc.path_format)
367

368 369
        if yl_data:
            root = self._yang_lib_data
370 371
        elif staging:
            try:
372
                root = self.get_data_root_staging(rpc.username)
373
            except NoHandlerError:
374
                root = self._data
375 376
        else:
            root = self._data
377

378
        # Resolve schema node of the desired data node
379
        sch_pth_list = filter(lambda isel: isinstance(isel, MemberName), ii)
380
        sch_pth = DataHelpers.ii2str(sch_pth_list)
381
        sn = self.get_schema_node(sch_pth)
382

383 384
        state_roots = sn.state_roots()

385
        if state_roots and not yl_data:
386
            debug_data("State roots: {}".format(state_roots))
387 388 389 390 391 392 393
            for state_root_sch_pth in state_roots:
                state_root_sn = self._dm.get_data_node(state_root_sch_pth)

                # Check if desired node is child of state root
                sni = sn
                is_child = False
                while sni:
394
                    if sni is state_root_sn:
395 396 397 398 399 400 401 402 403
                        is_child = True
                        break
                    sni = sni.parent

                if is_child:
                    # Direct request for the state data
                    sdh = STATE_DATA_HANDLES.get_handler(state_root_sch_pth)
                    if sdh is not None:
                        if isinstance(sdh, ContainerNodeHandlerBase):
404
                            state_handler_val = sdh.generate_node(ii, rpc.username, staging)
405 406
                            state_root_n = sdh.schema_node.orphan_instance(state_handler_val)
                        elif isinstance(sdh, ListNodeHandlerBase):
407 408 409 410 411 412
                            if (sn is sdh.schema_node) and isinstance(ii[-1], MemberName):
                                state_handler_val = sdh.generate_list(ii, rpc.username, staging)
                                state_root_n = sdh.schema_node.orphan_instance(state_handler_val)
                            else:
                                state_handler_val = sdh.generate_item(ii, rpc.username, staging)
                                state_root_n = sdh.schema_node.orphan_entry(state_handler_val)
413 414 415 416 417 418

                        # Select desired subnode from handler-generated content
                        ii_prefix, ii_rel = sdh.schema_node.split_instance_route(ii)
                        n = state_root_n.goto(ii_rel)
                    else:
                        raise NoHandlerForStateDataError(rpc.path)
419
                else:
420
                    # Request for config data containing state data
421 422 423 424 425 426 427 428 429
                    n = root.goto(ii)

                    def _fill_state_roots(node: InstanceNode) -> InstanceNode:
                        if isinstance(node.value, ObjectValue):
                            if node.schema_node is state_root_sn.parent:
                                ii_gen = DataHelpers.node_get_ii(node)
                                sdh = STATE_DATA_HANDLES.get_handler(state_root_sch_pth)
                                if sdh is not None:
                                    if isinstance(sdh, ContainerNodeHandlerBase):
430
                                        state_handler_val = sdh.generate_node(ii_gen, rpc.username, staging)
431
                                    elif isinstance(sdh, ListNodeHandlerBase):
432 433 434
                                        print("node={}".format(node))
                                        print("iigen={}".format(ii_gen))
                                        state_handler_val = sdh.generate_item(ii_gen, rpc.username, staging)
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451

                                    nm_name = state_root_sn.qual_name[0]
                                    node = node.put_member(nm_name, state_handler_val, raw=True).up()
                            else:
                                for key in node:
                                    member = node[key]
                                    node = _fill_state_roots(member).up()
                        elif isinstance(node.value, ArrayValue):
                            i = 0
                            arr_len = len(node.value)
                            while i < arr_len:
                                node = _fill_state_roots(node[i]).up()
                                i += 1

                        return node

                    n = _fill_state_roots(n)
452 453
        else:
            n = root.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
454

455 456
        try:
            with_defs = rpc.qs["with-defaults"][0]
457
        except (IndexError, KeyError):
458 459 460 461 462
            with_defs = None

        if with_defs == "report-all":
            n = n.add_defaults()

Pavel Spirek's avatar
Pavel Spirek committed
463 464
        if self.nacm:
            nrpc = self.nacm.get_user_nacm(rpc.username)
465
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_READ) == Action.DENY:
Pavel Spirek's avatar
Pavel Spirek committed
466 467
                raise NacmForbiddenError()
            else:
468 469
                # Prune nodes that should not be accessible to user
                n = nrpc.prune_data_tree(n, root, ii, Permission.NACM_ACCESS_READ)
Pavel Spirek's avatar
Pavel Spirek committed
470

471 472
        try:
            max_depth = int(rpc.qs["depth"][0])
473
        except (IndexError, KeyError):
474 475 476 477 478 479 480 481 482 483 484
            max_depth = None
        except ValueError:
            raise ValueError("Invalid value of query param \"depth\"")

        if max_depth is not None:
            def _tree_limit_depth(node: InstanceNode, depth: int) -> InstanceNode:
                if isinstance(node.value, ObjectValue):
                    if depth > max_depth:
                        node.value = ObjectValue({})
                    else:
                        for child_key in sorted(node.value.keys()):
485
                            m = node[child_key]
486 487 488 489 490 491
                            node = _tree_limit_depth(m, depth + 1).up()
                elif isinstance(node.value, ArrayValue):
                    if depth > max_depth:
                        node.value = ArrayValue([])
                    else:
                        for i in range(len(node.value)):
492
                            e = node[i]
493 494 495 496 497
                            node = _tree_limit_depth(e, depth + 1).up()

                return node
            n = _tree_limit_depth(n, 1)

Pavel Spirek's avatar
Pavel Spirek committed
498 499
        return n

500
    # Create new data node (Restconf draft compliant version)
501
    def create_node_rpc(self, root: InstanceNode, rpc: RpcInfo, value: Any) -> InstanceNode:
502
        ii = self.parse_ii(rpc.path, rpc.path_format)
503
        n = root.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
504

505 506 507
        insert = rpc.qs.get("insert", [None])[0]
        point = rpc.qs.get("point", [None])[0]

508
        if self.nacm:
509
            nrpc = self.nacm.get_user_nacm(rpc.username)
510
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_CREATE) == Action.DENY:
511 512
                raise NacmForbiddenError()

513
        # Get target member name
514 515 516 517
        input_member_name = tuple(value.keys())
        if len(input_member_name) != 1:
            raise ValueError("Received json object must contain exactly one member")
        else:
518 519
            input_member_name_fq = input_member_name[0]
            input_member_name = input_member_name_fq.split(":", maxsplit=1)[-1]
Pavel Spirek's avatar
Pavel Spirek committed
520

521
        input_member_value = value[input_member_name_fq]
Pavel Spirek's avatar
Pavel Spirek committed
522

523
        # Check if target member already exists
524
        try:
525
            existing_member = n[input_member_name]
526
        except NonexistentInstance:
527
            existing_member = None
528

529
        # Get target schema node
530 531
        n = root.goto(ii)

532
        sn = n.schema_node  # type: InternalNode
533 534 535
        # sch_member_name = sn._iname2qname(input_member_name)
        # member_sn = sn.get_data_child(*sch_member_name)
        member_sn = sn.get_child(input_member_name)
536

537
        if isinstance(member_sn, ListNode):
Pavel Spirek's avatar
Pavel Spirek committed
538
            # Append received node to list
539

540 541
            # Create list if necessary
            if existing_member is None:
542
                existing_member = n.put_member(input_member_name, ArrayValue([]))
543

544
            # Convert input data from List/Dict to ArrayValue/ObjectValue
545 546
            new_value_list = member_sn.from_raw([input_member_value])
            new_value_item = new_value_list[0]    # type: ObjectValue
547 548

            list_node_key = member_sn.keys[0][0]
549
            if new_value_item[list_node_key] in map(lambda x: x[list_node_key], existing_member.value):
550 551 552
                raise InstanceAlreadyPresent("Duplicate key")

            if insert == "first":
553
                # Optimization
554
                if len(existing_member.value) > 0:
555
                    list_entry_first = existing_member[0]   # type: ArrayEntry
556
                    new_member = list_entry_first.insert_before(new_value_item).up()
557 558
                else:
                    new_member = existing_member.update(new_value_list)
559
            elif (insert == "last") or insert is None:
560
                # Optimization
561
                if len(existing_member.value) > 0:
562 563 564 565
                    list_entry_last = existing_member[-1]   # type: ArrayEntry
                    new_member = list_entry_last.insert_after(new_value_item).up()
                else:
                    new_member = existing_member.update(new_value_list)
566 567
            elif insert == "before":
                entry_sel = EntryKeys({list_node_key: point})
568 569
                list_entry = entry_sel.goto_step(existing_member)   # type: ArrayEntry
                new_member = list_entry.insert_before(new_value_item).up()
570 571
            elif insert == "after":
                entry_sel = EntryKeys({list_node_key: point})
572 573
                list_entry = entry_sel.goto_step(existing_member)   # type: ArrayEntry
                new_member = list_entry.insert_after(new_value_item).up()
574 575
            else:
                raise ValueError("Invalid 'insert' value")
576 577 578 579 580
        elif isinstance(member_sn, LeafListNode):
            # Append received node to leaf list

            # Create leaf list if necessary
            if existing_member is None:
581
                existing_member = n.put_member(input_member_name, ArrayValue([]))
582

583
            # Convert input data from List/Dict to ArrayValue/ObjectValue
584
            new_value_item = member_sn.from_raw([input_member_value])[0]
585 586

            if insert == "first":
587
                new_member = existing_member.update(ArrayValue([new_value_item] + existing_member.value))
588
            elif (insert == "last") or insert is None:
589
                new_member = existing_member.update(ArrayValue(existing_member.value + [new_value_item]))
590 591
            else:
                raise ValueError("Invalid 'insert' value")
Pavel Spirek's avatar
Pavel Spirek committed
592
        else:
593 594 595 596
            if existing_member is None:
                # Create new data node

                # Convert input data from List/Dict to ArrayValue/ObjectValue
597
                new_value_item = member_sn.from_raw(input_member_value)
598 599

                # Create new node (object member)
600
                new_member = n.put_member(input_member_name, new_value_item)
601 602 603
            else:
                # Data node already exists
                raise InstanceAlreadyPresent("Member \"{}\" already present in \"{}\"".format(input_member_name, ii))
Pavel Spirek's avatar
Pavel Spirek committed
604

605
        print(json.dumps(new_member.top().value, indent=4))
606
        return new_member.top()
607

608
    # PUT data node
609
    def update_node_rpc(self, root: InstanceNode, rpc: RpcInfo, value: Any) -> InstanceNode:
610
        ii = self.parse_ii(rpc.path, rpc.path_format)
611
        n = root.goto(ii)
612

613
        if self.nacm:
614
            nrpc = self.nacm.get_user_nacm(rpc.username)
615
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_UPDATE) == Action.DENY:
616
                raise NacmForbiddenError()
Pavel Spirek's avatar
Pavel Spirek committed
617

618 619 620 621 622 623 624 625 626
            sn = n.schema_node
            new_val = sn.from_raw([value])[0]
            new_node = RootNode(new_val, sn, new_val.timestamp)
            new_node_prunned = nrpc.prune_data_tree(new_node, self._data, ii, Permission.NACM_ACCESS_UPDATE).raw_value()
            print(json.dumps(new_node_prunned, indent=4))

            new_n = n.update(new_node_prunned, raw=True)
        else:
            new_n = n.update(value, raw=True)
627

628
        return new_n.top()
629

630
    # Delete data node
631
    def delete_node_rpc(self, root: InstanceNode, rpc: RpcInfo) -> InstanceNode:
632
        ii = self.parse_ii(rpc.path, rpc.path_format)
633
        n = root.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
634 635 636
        n_parent = n.up()
        last_isel = ii[-1]

637
        if self.nacm:
638
            nrpc = self.nacm.get_user_nacm(rpc.username)
639
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_DELETE) == Action.DENY:
640
                raise NacmForbiddenError()
Pavel Spirek's avatar
Pavel Spirek committed
641

642
        new_n = n_parent
643 644
        if isinstance(n_parent.value, ArrayValue):
            if isinstance(last_isel, EntryIndex):
645
                new_n = n_parent.delete_item(last_isel.key)
646
            elif isinstance(last_isel, EntryKeys):
647
                new_n = n_parent.delete_item(n.index)
648 649
        elif isinstance(n_parent.value, ObjectValue):
            if isinstance(last_isel, MemberName):
650
                new_n = n_parent.delete_item(last_isel.key)
651
        else:
652
            raise InstanceValueError(n, "Invalid target node type")
Pavel Spirek's avatar
Pavel Spirek committed
653

654
        return new_n.top()
Pavel Spirek's avatar
Pavel Spirek committed
655

656
    # Invoke an operation
657
    def invoke_op_rpc(self, rpc: RpcInfo) -> ObjectValue:
658
        if rpc.op_name == "jetconf:conf-start":
Pavel Spirek's avatar
Pavel Spirek committed
659 660 661 662 663
            try:
                cl_name = rpc.op_input_args["name"]
            except (TypeError, KeyError):
                raise ValueError("This operation expects \"name\" input parameter")

664 665 666 667 668
            transaction_opts = rpc.op_input_args.get("options")

            if self._usr_journals.get(rpc.username) is None:
                self._usr_journals[rpc.username] = UsrChangeJournal(self._data, transaction_opts)

Pavel Spirek's avatar
Pavel Spirek committed
669
            self._usr_journals[rpc.username].cl_new(cl_name)
670
            ret_data = {"status": "OK"}
671
        elif rpc.op_name == "jetconf:conf-list":
Pavel Spirek's avatar
Pavel Spirek committed
672 673 674 675 676 677
            usr_journal = self._usr_journals.get(rpc.username)
            if usr_journal is not None:
                chl_json = usr_journal.list()
            else:
                chl_json = str(None)

678 679 680 681 682
            ret_data = \
                {
                    "status": "OK",
                    "changelists": chl_json
                }
683
        elif rpc.op_name == "jetconf:conf-drop":
Pavel Spirek's avatar
Pavel Spirek committed
684 685 686 687
            usr_journal = self._usr_journals.get(rpc.username)
            if usr_journal is not None:
                if not usr_journal.cl_drop():
                    del self._usr_journals[rpc.username]
688 689

            ret_data = {"status": "OK"}
690
        elif rpc.op_name == "jetconf:conf-commit":
Pavel Spirek's avatar
Pavel Spirek committed
691 692
            usr_journal = self._usr_journals.get(rpc.username)
            if usr_journal is not None:
693 694 695 696 697 698 699 700
                try:
                    self.lock_data(rpc.username)
                    usr_journal.commit(self)
                    if CONFIG["GLOBAL"]["PERSISTENT_CHANGES"] is True:
                        self.save()
                finally:
                    self.unlock_data()

Pavel Spirek's avatar
Pavel Spirek committed
701 702
                del self._usr_journals[rpc.username]
            else:
703
                info("[{}]: Nothing to commit".format(rpc.username))
Pavel Spirek's avatar
Pavel Spirek committed
704 705 706 707 708 709

            ret_data = \
                {
                    "status": "OK",
                    "conf-changed": True
                }
710
        elif rpc.op_name == "jetconf:get-schema-digest":
711
            ret_data = self._dm.schema_digest()
712
        else:
713
            # User-defined operation
714 715 716 717 718 719 720
            if self.nacm and (not rpc.skip_nacm_check):
                nrpc = self.nacm.get_user_nacm(rpc.username)
                if nrpc.check_rpc_name(rpc.op_name) == Action.DENY:
                    raise NacmForbiddenError(
                        "Op \"{}\" invocation denied for user \"{}\"".format(rpc.op_name, rpc.username)
                    )

721 722
            op_handler = OP_HANDLERS.get_handler(rpc.op_name)
            if op_handler is None:
723
                raise NoHandlerForOpError(rpc.op_name)
724 725

            # Print operation input schema
726 727
            sn = self._dm.get_schema_node(rpc.path)
            sn_input = sn.get_child("input")
728 729 730
            # if sn_input is not None:
            #     print("RPC input schema:")
            #     print(sn_input._ascii_tree(""))
731

732 733 734 735 736 737 738
            if sn_input.children:
                # Input arguments are expected
                op_input_args = sn_input.from_raw(rpc.op_input_args)
                ret_data = op_handler(op_input_args, rpc.username)
            else:
                # Operation with no input
                ret_data = op_handler(None, rpc.username)
739 740 741

        return ret_data

Pavel Spirek's avatar
Pavel Spirek committed
742 743 744 745 746
    def add_to_journal_rpc(self, ch_type: ChangeType, rpc: RpcInfo, value: Any, new_root: InstanceNode):
        usr_journal = self._usr_journals.get(rpc.username)
        if usr_journal is not None:
            usr_chs = usr_journal.clists[-1]
            usr_chs.add(DataChange(ch_type, rpc, value), new_root)
747 748 749
        else:
            raise NoHandlerError("No active changelist for user \"{}\"".format(rpc.username))

Pavel Spirek's avatar
Pavel Spirek committed
750
    # Locks datastore data
751 752
    def lock_data(self, username: str = None, blocking: bool=True):
        ret = self._data_lock.acquire(blocking=blocking, timeout=1)
Pavel Spirek's avatar
Pavel Spirek committed
753
        if ret:
Pavel Spirek's avatar
Pavel Spirek committed
754
            self._lock_username = username or "(unknown)"
755
            debug_data("Acquired lock in datastore \"{}\" for user \"{}\"".format(self.name, username))
Pavel Spirek's avatar
Pavel Spirek committed
756
        else:
Pavel Spirek's avatar
Pavel Spirek committed
757
            raise DataLockError(
758 759 760 761 762
                "Failed to acquire lock in datastore \"{}\" for user \"{}\", already locked by \"{}\"".format(
                    self.name,
                    username,
                    self._lock_username
                )
Pavel Spirek's avatar
Pavel Spirek committed
763 764
            )

765
    # Unlock datastore data
Pavel Spirek's avatar
Pavel Spirek committed
766 767
    def unlock_data(self):
        self._data_lock.release()
768
        debug_data("Released lock in datastore \"{}\" for user \"{}\"".format(self.name, self._lock_username))
Pavel Spirek's avatar
Pavel Spirek committed
769 770
        self._lock_username = None

771
    # Load data from persistent storage
772
    def load(self):
Pavel Spirek's avatar
Pavel Spirek committed
773 774
        raise NotImplementedError("Not implemented in base class")

775
    # Save data to persistent storage
776
    def save(self):
Pavel Spirek's avatar
Pavel Spirek committed
777 778
        raise NotImplementedError("Not implemented in base class")

Pavel Spirek's avatar
Pavel Spirek committed
779 780

class JsonDatastore(BaseDatastore):
781 782
    def __init__(self, dm: DataModel, json_file: str, name: str = "", with_nacm: bool=False):
        super().__init__(dm, name, with_nacm)
783 784 785
        self.json_file = json_file

    def load(self):
Pavel Spirek's avatar
Pavel Spirek committed
786
        self._data = None
787
        with open(self.json_file, "rt") as fp:
Pavel Spirek's avatar
Pavel Spirek committed
788
            self._data = self._dm.from_raw(json.load(fp))
Pavel Spirek's avatar
Pavel Spirek committed
789

790 791 792
        if self.nacm is not None:
            self.nacm.update()

793 794 795 796 797
    def load_yl_data(self, filename: str):
        self._yang_lib_data = None
        with open(filename, "rt") as fp:
            self._yang_lib_data = self._dm.from_raw(json.load(fp))

798 799 800
    def save(self):
        with open(self.json_file, "w") as jfd:
            json.dump(self._data.raw_value(), jfd, indent=4)