data.py 25.3 KB
Newer Older
Pavel Spirek's avatar
Pavel Spirek committed
1
import json
2

3
from threading import Lock
Pavel Spirek's avatar
Pavel Spirek committed
4
from enum import Enum
5
from colorlog import error, warning as warn, info
6
from typing import List, Any, Dict, Callable, Optional
7

8
from yangson.datamodel import DataModel
9 10 11 12 13 14 15 16 17 18
from yangson.enumerations import ContentType, ValidationScope
from yangson.schema import (
    SchemaNode,
    NonexistentSchemaNode,
    ListNode,
    LeafListNode,
    SchemaError,
    SemanticError,
    InternalNode
)
19 20 21
from yangson.instance import (
    InstanceNode,
    NonexistentInstance,
22
    InstanceValueError,
23 24 25 26 27
    ArrayValue,
    ObjectValue,
    MemberName,
    EntryKeys,
    EntryIndex,
28
    InstanceRoute,
29 30
    ArrayEntry,
    RootNode)
31

32
from .helpers import PathFormat, ErrorHelpers, LogHelpers, DataHelpers, JsonNodeT
33
from .config import CONFIG
34 35
from .nacm import NacmConfig, Permission, Action
from .handler_list import OP_HANDLERS, STATE_DATA_HANDLES, CONF_DATA_HANDLES
36 37 38

epretty = ErrorHelpers.epretty
debug_data = LogHelpers.create_module_dbg_logger(__name__)
Pavel Spirek's avatar
Pavel Spirek committed
39 40


41 42 43 44 45 46
class ChangeType(Enum):
    CREATE = 0,
    REPLACE = 1,
    DELETE = 2


Pavel Spirek's avatar
Pavel Spirek committed
47
class NacmForbiddenError(Exception):
48
    def __init__(self, msg="Access to data node rejected by NACM", rule=None):
Pavel Spirek's avatar
Pavel Spirek committed
49
        self.msg = msg
50
        self.rulename = rule
Pavel Spirek's avatar
Pavel Spirek committed
51

52 53 54
    def __str__(self):
        return self.msg

Pavel Spirek's avatar
Pavel Spirek committed
55 56 57 58 59

class DataLockError(Exception):
    def __init__(self, msg=""):
        self.msg = msg

Pavel Spirek's avatar
Pavel Spirek committed
60 61
    def __str__(self):
        return self.msg
Pavel Spirek's avatar
Pavel Spirek committed
62 63


64
class InstanceAlreadyPresent(Exception):
65 66 67 68 69 70 71
    def __init__(self, msg=""):
        self.msg = msg

    def __str__(self):
        return self.msg


72
class HandlerError(Exception):
73 74 75 76 77 78 79
    def __init__(self, msg=""):
        self.msg = msg

    def __str__(self):
        return self.msg


80 81 82 83
class NoHandlerError(HandlerError):
    pass


84 85 86 87
class ConfHandlerFailedError(HandlerError):
    pass


88
class NoHandlerForOpError(NoHandlerError):
89 90 91 92 93
    def __init__(self, op_name: str):
        self.op_name = op_name

    def __str__(self):
        return "Nonexistent handler for operation \"{}\"".format(self.op_name)
94 95 96 97 98 99


class NoHandlerForStateDataError(NoHandlerError):
    pass


100
class BaseDataListener:
101
    def __init__(self, ds: "BaseDatastore", sch_pth: str):
102
        self.ds = ds
103 104
        self.schema_path = sch_pth                          # type: str
        self.schema_node = ds.get_schema_node(sch_pth)      # type: SchemaNode
105

106
    def process(self, sn: SchemaNode, ii: InstanceRoute, ch: "DataChange"):
107 108 109
        raise NotImplementedError("Not implemented in base class")

    def __str__(self):
110
        return self.__class__.__name__ + ": listening at " + self.schema_path
111 112


113
class RpcInfo:
Pavel Spirek's avatar
Pavel Spirek committed
114
    def __init__(self):
115 116
        self.username = None    # type: str
        self.path = None        # type: str
117
        self.qs = None          # type: Dict[str, List[str]]
118 119 120 121
        self.path_format = PathFormat.URL   # type: PathFormat
        self.skip_nacm_check = False        # type: bool
        self.op_name = None                 # type: str
        self.op_input_args = None           # type: ObjectValue
Pavel Spirek's avatar
Pavel Spirek committed
122 123


124 125 126 127 128 129 130 131
class DataChange:
    def __init__(self, change_type: ChangeType, rpc_info: RpcInfo, data: Any):
        self.change_type = change_type
        self.rpc_info = rpc_info
        self.data = data


class ChangeList:
Pavel Spirek's avatar
Pavel Spirek committed
132 133
    def __init__(self, root_origin_cl: InstanceNode, changelist_name: str):
        self.root_list = [root_origin_cl]
134 135 136
        self.changelist_name = changelist_name
        self.journal = []   # type: List[DataChange]

Pavel Spirek's avatar
Pavel Spirek committed
137
    def add(self, change: DataChange, root_after_change: InstanceNode):
138
        self.journal.append(change)
Pavel Spirek's avatar
Pavel Spirek committed
139 140 141 142
        self.root_list.append(root_after_change)


class UsrChangeJournal:
143
    def __init__(self, root_origin: InstanceNode, transaction_opts: Optional[JsonNodeT]):
Pavel Spirek's avatar
Pavel Spirek committed
144
        self.root_origin = root_origin
145
        self.transaction_opts = transaction_opts
Pavel Spirek's avatar
Pavel Spirek committed
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
        self.clists = []    # type: List[ChangeList]

    def cl_new(self, cl_name: str):
        self.clists.append(ChangeList(self.get_root_head(), cl_name))

    def cl_drop(self) -> bool:
        try:
            self.clists.pop()
            return True
        except IndexError:
            return False

    def get_root_head(self) -> InstanceNode:
        if len(self.clists) > 0:
            return self.clists[-1].root_list[-1]
        else:
            return self.root_origin

    def list(self) -> str:
        chl_json = {}
        for chl in self.clists:
            changes = []
            for ch in chl.journal:
                changes.append(
                    [ch.change_type.name, ch.rpc_info.path]
                )

            chl_json[chl.changelist_name] = changes

        return chl_json

    def commit(self, ds: "BaseDatastore"):
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
        if hash(ds.get_data_root()) == hash(self.root_origin):
            info("Commiting new configuration (swapping roots)")
            # Set new root
            nr = self.get_root_head()
        else:
            info("Commiting new configuration (re-applying changes)")
            nr = ds.get_data_root()
            for cl in self.clists:
                for change in cl.journal:
                    if change.change_type == ChangeType.CREATE:
                        nr = ds.create_node_rpc(nr, change.rpc_info, change.data)
                    elif change.change_type == ChangeType.REPLACE:
                        nr = ds.update_node_rpc(nr, change.rpc_info, change.data)
                    elif change.change_type == ChangeType.DELETE:
                        nr = ds.delete_node_rpc(nr, change.rpc_info)

Pavel Spirek's avatar
Pavel Spirek committed
194
        try:
195
            # Validate syntax and semantics of new data
196 197
            if CONFIG["GLOBAL"]["VALIDATE_TRANSACTIONS"] is True:
                nr.validate(ValidationScope.all, ContentType.config)
198 199 200 201 202 203 204
            new_data_valid = True
        except (SchemaError, SemanticError) as e:
            error("Data validation error:")
            error(epretty(e))
            new_data_valid = False

        if new_data_valid:
Pavel Spirek's avatar
Pavel Spirek committed
205
            # Set new data root
206
            ds.set_data_root(nr)
Pavel Spirek's avatar
Pavel Spirek committed
207

208 209 210 211 212 213 214 215
            # Call commit begin hook
            begin_hook_failed = False
            try:
                ds.commit_begin_callback(self.transaction_opts)
            except Exception as e:
                error("Exception occured in commit_begin handler: {}".format(epretty(e)))
                begin_hook_failed = True

216
            # Run schema node handlers
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
            conf_handler_failed = False
            if not begin_hook_failed:
                try:
                    for cl in self.clists:
                        for change in cl.journal:
                            ii = DataHelpers.parse_ii(change.rpc_info.path, change.rpc_info.path_format)
                            ds.run_conf_edit_handler(ii, change)
                except Exception as e:
                    error("Exception occured in edit handler: {}".format(epretty(e)))
                    conf_handler_failed = True

            # Call commit end hook
            end_hook_failed = False
            end_hook_abort_failed = False
            if not (begin_hook_failed or conf_handler_failed):
                try:
                    ds.commit_end_callback(self.transaction_opts, failed=False)
                except Exception as e:
                    error("Exception occured in commit_end handler: {}".format(epretty(e)))
                    end_hook_failed = True

            if begin_hook_failed or conf_handler_failed or end_hook_failed:
                try:
                    # Call commit_end callback again with "failed" argument set to True
                    ds.commit_end_callback(self.transaction_opts, failed=True)
                except Exception as e:
                    error("Exception occured in commit_end handler (abort): {}".format(epretty(e)))
                    end_hook_abort_failed = True
Pavel Spirek's avatar
Pavel Spirek committed
245 246 247

            # Clear user changelists
            self.clists.clear()
248

249 250 251 252
            # Return to previous version of data and raise an exception if something went wrong
            if begin_hook_failed or conf_handler_failed or end_hook_failed or end_hook_abort_failed:
                ds.data_root_rollback(history_steps=1, store_current=False)
                raise ConfHandlerFailedError("(see logged)")
253

254

Pavel Spirek's avatar
Pavel Spirek committed
255
class BaseDatastore:
256
    def __init__(self, dm: DataModel, name: str="", with_nacm: bool=False):
257 258 259
        def _blankfn(*args, **kwargs):
            pass

Pavel Spirek's avatar
Pavel Spirek committed
260
        self.name = name
Pavel Spirek's avatar
Pavel Spirek committed
261
        self.nacm = None    # type: NacmConfig
262
        self._data = None   # type: InstanceNode
263 264
        self._data_history = []     # type: List[InstanceNode]
        self._yang_lib_data = None  # type: InstanceNode
265
        self._dm = dm       # type: DataModel
Pavel Spirek's avatar
Pavel Spirek committed
266
        self._data_lock = Lock()
Pavel Spirek's avatar
Pavel Spirek committed
267
        self._lock_username = None  # type: str
268
        self._usr_journals = {}     # type: Dict[str, UsrChangeJournal]
269 270
        self.commit_begin_callback = _blankfn   # type: Callable[..., bool]
        self.commit_end_callback = _blankfn     # type: Callable[..., bool]
Pavel Spirek's avatar
Pavel Spirek committed
271

272 273
        if with_nacm:
            self.nacm = NacmConfig(self)
Pavel Spirek's avatar
Pavel Spirek committed
274

Pavel Spirek's avatar
Pavel Spirek committed
275
    # Returns the root node of data tree
276 277 278 279 280
    def get_data_root(self, previous_version: int=0) -> InstanceNode:
        if previous_version > 0:
            return self._data_history[-previous_version]
        else:
            return self._data
Pavel Spirek's avatar
Pavel Spirek committed
281

282 283 284
    def get_yl_data_root(self) -> InstanceNode:
        return self._yang_lib_data

285 286 287 288 289 290 291 292 293
    # Returns the root node of data tree
    def get_data_root_staging(self, username: str) -> InstanceNode:
        usr_journal = self._usr_journals.get(username)
        if usr_journal is not None:
            root = usr_journal.get_root_head()
            return root
        else:
            raise NoHandlerError("No active changelist for user \"{}\"".format(username))

294
    # Set a new Instance node as data root, store old root to archive
Pavel Spirek's avatar
Pavel Spirek committed
295
    def set_data_root(self, new_root: InstanceNode):
296
        self._data_history.append(self._data)
Pavel Spirek's avatar
Pavel Spirek committed
297 298
        self._data = new_root

299 300 301 302 303 304
    def data_root_rollback(self, history_steps: int, store_current: bool):
        if store_current:
            self._data_history.append(self._data)

        self._data = self._data_history[-history_steps]

305 306
    # Get schema node with particular schema address
    def get_schema_node(self, sch_pth: str) -> SchemaNode:
307
        sn = self._dm.get_schema_node(sch_pth)
308
        if sn is None:
309 310
            # raise NonexistentSchemaNode(sch_pth)
            debug_data("Cannot find schema node for " + sch_pth)
311 312 313
        return sn

    # Notify data observers about change in datastore
314
    def run_conf_edit_handler(self, ii: InstanceRoute, ch: DataChange):
315
        try:
Pavel Spirek's avatar
Pavel Spirek committed
316
            sch_pth_list = filter(lambda n: isinstance(n, MemberName), ii)
317
            sch_pth = DataHelpers.ii2str(sch_pth_list)
318 319 320
            sn = self.get_schema_node(sch_pth)

            while sn is not None:
321 322
                h = CONF_DATA_HANDLES.get_handler(str(id(sn)))
                if h is not None:
323
                    h.process(sn, ii, ch)
324 325 326
                sn = sn.parent
        except NonexistentInstance:
            warn("Cannnot notify {}, parent container removed".format(ii))
327

Pavel Spirek's avatar
Pavel Spirek committed
328
    # Get data node, evaluate NACM if required
329
    def get_node_rpc(self, rpc: RpcInfo, yl_data=False, staging=False) -> InstanceNode:
330
        ii = DataHelpers.parse_ii(rpc.path, rpc.path_format)
331 332 333
        if yl_data:
            root = self._yang_lib_data
        else:
334 335 336 337
            if staging:
                root = self.get_data_root_staging(rpc.username)
            else:
                root = self._data
338

339
        sch_pth_list = filter(lambda isel: isinstance(isel, MemberName), ii)
340
        sch_pth = DataHelpers.ii2str(sch_pth_list)
341
        sn = self.get_schema_node(sch_pth)
342 343 344 345 346 347 348 349 350 351 352 353
        state_roots = sn.state_roots()

        if not yl_data and state_roots:
            self.commit_begin_callback()
            for state_node_pth in state_roots:
                sdh = STATE_DATA_HANDLES.get_handler(state_node_pth)
                if sdh is not None:
                    root_val = sdh.update_node(ii, root, True)
                    root = self._data.update(root_val, raw=True)
                else:
                    raise NoHandlerForStateDataError()
            self.commit_end_callback()
Pavel Spirek's avatar
Pavel Spirek committed
354

355
        n = root.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
356

357 358
        try:
            with_defs = rpc.qs["with-defaults"][0]
359
        except (IndexError, KeyError):
360 361 362 363 364
            with_defs = None

        if with_defs == "report-all":
            n = n.add_defaults()

Pavel Spirek's avatar
Pavel Spirek committed
365 366
        if self.nacm:
            nrpc = self.nacm.get_user_nacm(rpc.username)
367
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_READ) == Action.DENY:
Pavel Spirek's avatar
Pavel Spirek committed
368 369
                raise NacmForbiddenError()
            else:
370 371
                # Prune nodes that should not be accessible to user
                n = nrpc.prune_data_tree(n, root, ii, Permission.NACM_ACCESS_READ)
Pavel Spirek's avatar
Pavel Spirek committed
372

373 374
        try:
            max_depth = int(rpc.qs["depth"][0])
375
        except (IndexError, KeyError):
376 377 378 379 380 381 382 383 384 385 386
            max_depth = None
        except ValueError:
            raise ValueError("Invalid value of query param \"depth\"")

        if max_depth is not None:
            def _tree_limit_depth(node: InstanceNode, depth: int) -> InstanceNode:
                if isinstance(node.value, ObjectValue):
                    if depth > max_depth:
                        node.value = ObjectValue({})
                    else:
                        for child_key in sorted(node.value.keys()):
387
                            m = node[child_key]
388 389 390 391 392 393
                            node = _tree_limit_depth(m, depth + 1).up()
                elif isinstance(node.value, ArrayValue):
                    if depth > max_depth:
                        node.value = ArrayValue([])
                    else:
                        for i in range(len(node.value)):
394
                            e = node[i]
395 396 397 398 399
                            node = _tree_limit_depth(e, depth + 1).up()

                return node
            n = _tree_limit_depth(n, 1)

Pavel Spirek's avatar
Pavel Spirek committed
400 401
        return n

402
    # Create new data node (Restconf draft compliant version)
403 404
    def create_node_rpc(self, root: InstanceNode, rpc: RpcInfo, value: Any) -> InstanceNode:
        ii = DataHelpers.parse_ii(rpc.path, rpc.path_format)
405
        n = root.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
406

407 408 409
        insert = rpc.qs.get("insert", [None])[0]
        point = rpc.qs.get("point", [None])[0]

410
        if self.nacm:
411
            nrpc = self.nacm.get_user_nacm(rpc.username)
412
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_CREATE) == Action.DENY:
413 414
                raise NacmForbiddenError()

415
        # Get target member name
416 417 418 419 420
        input_member_name = tuple(value.keys())
        if len(input_member_name) != 1:
            raise ValueError("Received json object must contain exactly one member")
        else:
            input_member_name = input_member_name[0]
Pavel Spirek's avatar
Pavel Spirek committed
421

422
        input_member_value = value[input_member_name]
Pavel Spirek's avatar
Pavel Spirek committed
423

424
        # Check if target member already exists
425
        try:
426
            existing_member = n[input_member_name]
427
        except NonexistentInstance:
428
            existing_member = None
429

430
        # Get target schema node
431 432
        n = root.goto(ii)

433
        sn = n.schema_node  # type: InternalNode
434
        sch_member_name = sn._iname2qname(input_member_name)
435
        member_sn = sn.get_data_child(*sch_member_name)
436

437
        if isinstance(member_sn, ListNode):
Pavel Spirek's avatar
Pavel Spirek committed
438
            # Append received node to list
439

440 441
            # Create list if necessary
            if existing_member is None:
442
                existing_member = n.put_member(input_member_name, ArrayValue([]))
443

444
            # Convert input data from List/Dict to ArrayValue/ObjectValue
445 446
            new_value_list = member_sn.from_raw([input_member_value])
            new_value_item = new_value_list[0]    # type: ObjectValue
447 448

            list_node_key = member_sn.keys[0][0]
449
            if new_value_item[list_node_key] in map(lambda x: x[list_node_key], existing_member.value):
450 451 452
                raise InstanceAlreadyPresent("Duplicate key")

            if insert == "first":
453
                # Optimization
454
                if len(existing_member.value) > 0:
455
                    list_entry_first = existing_member[0]   # type: ArrayEntry
456
                    new_member = list_entry_first.insert_before(new_value_item).up()
457 458
                else:
                    new_member = existing_member.update(new_value_list)
459
            elif (insert == "last") or insert is None:
460
                # Optimization
461
                if len(existing_member.value) > 0:
462 463 464 465
                    list_entry_last = existing_member[-1]   # type: ArrayEntry
                    new_member = list_entry_last.insert_after(new_value_item).up()
                else:
                    new_member = existing_member.update(new_value_list)
466 467
            elif insert == "before":
                entry_sel = EntryKeys({list_node_key: point})
468 469
                list_entry = entry_sel.goto_step(existing_member)   # type: ArrayEntry
                new_member = list_entry.insert_before(new_value_item).up()
470 471
            elif insert == "after":
                entry_sel = EntryKeys({list_node_key: point})
472 473
                list_entry = entry_sel.goto_step(existing_member)   # type: ArrayEntry
                new_member = list_entry.insert_after(new_value_item).up()
474 475
            else:
                raise ValueError("Invalid 'insert' value")
476 477 478 479 480
        elif isinstance(member_sn, LeafListNode):
            # Append received node to leaf list

            # Create leaf list if necessary
            if existing_member is None:
481
                existing_member = n.put_member(input_member_name, ArrayValue([]))
482

483
            # Convert input data from List/Dict to ArrayValue/ObjectValue
484
            new_value_item = member_sn.from_raw([input_member_value])[0]
485 486

            if insert == "first":
487
                new_member = existing_member.update(ArrayValue([new_value_item] + existing_member.value))
488
            elif (insert == "last") or insert is None:
489
                new_member = existing_member.update(ArrayValue(existing_member.value + [new_value_item]))
490 491
            else:
                raise ValueError("Invalid 'insert' value")
Pavel Spirek's avatar
Pavel Spirek committed
492
        else:
493 494 495 496
            if existing_member is None:
                # Create new data node

                # Convert input data from List/Dict to ArrayValue/ObjectValue
497
                new_value_item = member_sn.from_raw(input_member_value)
498 499

                # Create new node (object member)
500
                new_member = n.put_member(input_member_name, new_value_item)
501 502 503
            else:
                # Data node already exists
                raise InstanceAlreadyPresent("Member \"{}\" already present in \"{}\"".format(input_member_name, ii))
Pavel Spirek's avatar
Pavel Spirek committed
504

505
        return new_member.top()
506

507
    # PUT data node
508
    def update_node_rpc(self, root: InstanceNode, rpc: RpcInfo, value: Any) -> InstanceNode:
509
        ii = DataHelpers.parse_ii(rpc.path, rpc.path_format)
510
        n = root.goto(ii)
511

512
        if self.nacm:
513
            nrpc = self.nacm.get_user_nacm(rpc.username)
514
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_UPDATE) == Action.DENY:
515
                raise NacmForbiddenError()
Pavel Spirek's avatar
Pavel Spirek committed
516

517 518 519 520 521 522 523 524 525
            sn = n.schema_node
            new_val = sn.from_raw([value])[0]
            new_node = RootNode(new_val, sn, new_val.timestamp)
            new_node_prunned = nrpc.prune_data_tree(new_node, self._data, ii, Permission.NACM_ACCESS_UPDATE).raw_value()
            print(json.dumps(new_node_prunned, indent=4))

            new_n = n.update(new_node_prunned, raw=True)
        else:
            new_n = n.update(value, raw=True)
526

527
        return new_n.top()
528

529
    # Delete data node
530
    def delete_node_rpc(self, root: InstanceNode, rpc: RpcInfo) -> InstanceNode:
531
        ii = DataHelpers.parse_ii(rpc.path, rpc.path_format)
532
        n = root.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
533 534 535
        n_parent = n.up()
        last_isel = ii[-1]

536
        if self.nacm:
537
            nrpc = self.nacm.get_user_nacm(rpc.username)
538
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_DELETE) == Action.DENY:
539
                raise NacmForbiddenError()
Pavel Spirek's avatar
Pavel Spirek committed
540

541
        new_n = n_parent
542 543
        if isinstance(n_parent.value, ArrayValue):
            if isinstance(last_isel, EntryIndex):
544
                new_n = n_parent.delete_item(last_isel.key)
545
            elif isinstance(last_isel, EntryKeys):
546
                new_n = n_parent.delete_item(n.index)
547 548
        elif isinstance(n_parent.value, ObjectValue):
            if isinstance(last_isel, MemberName):
549
                new_n = n_parent.delete_item(last_isel.key)
550
        else:
551
            raise InstanceValueError(n, "Invalid target node type")
Pavel Spirek's avatar
Pavel Spirek committed
552

553
        return new_n.top()
Pavel Spirek's avatar
Pavel Spirek committed
554

555
    # Invoke an operation
556 557
    def invoke_op_rpc(self, rpc: RpcInfo) -> ObjectValue:
        if rpc.op_name == "conf-start":
Pavel Spirek's avatar
Pavel Spirek committed
558 559 560 561 562
            try:
                cl_name = rpc.op_input_args["name"]
            except (TypeError, KeyError):
                raise ValueError("This operation expects \"name\" input parameter")

563 564 565 566 567
            transaction_opts = rpc.op_input_args.get("options")

            if self._usr_journals.get(rpc.username) is None:
                self._usr_journals[rpc.username] = UsrChangeJournal(self._data, transaction_opts)

Pavel Spirek's avatar
Pavel Spirek committed
568
            self._usr_journals[rpc.username].cl_new(cl_name)
569 570
            ret_data = {"status": "OK"}
        elif rpc.op_name == "conf-list":
Pavel Spirek's avatar
Pavel Spirek committed
571 572 573 574 575 576
            usr_journal = self._usr_journals.get(rpc.username)
            if usr_journal is not None:
                chl_json = usr_journal.list()
            else:
                chl_json = str(None)

577 578 579 580 581 582
            ret_data = \
                {
                    "status": "OK",
                    "changelists": chl_json
                }
        elif rpc.op_name == "conf-drop":
Pavel Spirek's avatar
Pavel Spirek committed
583 584 585 586
            usr_journal = self._usr_journals.get(rpc.username)
            if usr_journal is not None:
                if not usr_journal.cl_drop():
                    del self._usr_journals[rpc.username]
587 588 589

            ret_data = {"status": "OK"}
        elif rpc.op_name == "conf-commit":
Pavel Spirek's avatar
Pavel Spirek committed
590 591
            usr_journal = self._usr_journals.get(rpc.username)
            if usr_journal is not None:
592 593 594 595 596 597 598 599
                try:
                    self.lock_data(rpc.username)
                    usr_journal.commit(self)
                    if CONFIG["GLOBAL"]["PERSISTENT_CHANGES"] is True:
                        self.save()
                finally:
                    self.unlock_data()

Pavel Spirek's avatar
Pavel Spirek committed
600 601
                del self._usr_journals[rpc.username]
            else:
602
                info("[{}]: Nothing to commit".format(rpc.username))
Pavel Spirek's avatar
Pavel Spirek committed
603 604 605 606 607 608

            ret_data = \
                {
                    "status": "OK",
                    "conf-changed": True
                }
609
        elif rpc.op_name == "get-schema-digest":
610
            ret_data = self._dm.schema_digest()
611
        else:
612
            # User-defined operation
613 614 615 616 617 618 619
            if self.nacm and (not rpc.skip_nacm_check):
                nrpc = self.nacm.get_user_nacm(rpc.username)
                if nrpc.check_rpc_name(rpc.op_name) == Action.DENY:
                    raise NacmForbiddenError(
                        "Op \"{}\" invocation denied for user \"{}\"".format(rpc.op_name, rpc.username)
                    )

620 621
            op_handler = OP_HANDLERS.get_handler(rpc.op_name)
            if op_handler is None:
622
                raise NoHandlerForOpError(rpc.op_name)
623 624 625 626 627 628 629

            # Print operation input schema
            # sn = self.get_schema_node(rpc.path)
            # sn_input = sn.get_child("input")
            # if sn_input is not None:
            #     print("RPC input schema:")
            #     print(sn_input._ascii_tree(""))
630

631
            ret_data = op_handler(rpc.op_input_args)
632 633 634

        return ret_data

Pavel Spirek's avatar
Pavel Spirek committed
635 636 637 638 639
    def add_to_journal_rpc(self, ch_type: ChangeType, rpc: RpcInfo, value: Any, new_root: InstanceNode):
        usr_journal = self._usr_journals.get(rpc.username)
        if usr_journal is not None:
            usr_chs = usr_journal.clists[-1]
            usr_chs.add(DataChange(ch_type, rpc, value), new_root)
640 641 642
        else:
            raise NoHandlerError("No active changelist for user \"{}\"".format(rpc.username))

Pavel Spirek's avatar
Pavel Spirek committed
643
    # Locks datastore data
644 645
    def lock_data(self, username: str = None, blocking: bool=True):
        ret = self._data_lock.acquire(blocking=blocking, timeout=1)
Pavel Spirek's avatar
Pavel Spirek committed
646
        if ret:
Pavel Spirek's avatar
Pavel Spirek committed
647
            self._lock_username = username or "(unknown)"
648
            debug_data("Acquired lock in datastore \"{}\" for user \"{}\"".format(self.name, username))
Pavel Spirek's avatar
Pavel Spirek committed
649
        else:
Pavel Spirek's avatar
Pavel Spirek committed
650
            raise DataLockError(
651 652 653 654 655
                "Failed to acquire lock in datastore \"{}\" for user \"{}\", already locked by \"{}\"".format(
                    self.name,
                    username,
                    self._lock_username
                )
Pavel Spirek's avatar
Pavel Spirek committed
656 657
            )

658
    # Unlock datastore data
Pavel Spirek's avatar
Pavel Spirek committed
659 660
    def unlock_data(self):
        self._data_lock.release()
661
        debug_data("Released lock in datastore \"{}\" for user \"{}\"".format(self.name, self._lock_username))
Pavel Spirek's avatar
Pavel Spirek committed
662 663
        self._lock_username = None

664
    # Load data from persistent storage
665
    def load(self):
Pavel Spirek's avatar
Pavel Spirek committed
666 667
        raise NotImplementedError("Not implemented in base class")

668
    # Save data to persistent storage
669
    def save(self):
Pavel Spirek's avatar
Pavel Spirek committed
670 671
        raise NotImplementedError("Not implemented in base class")

Pavel Spirek's avatar
Pavel Spirek committed
672 673

class JsonDatastore(BaseDatastore):
674 675
    def __init__(self, dm: DataModel, json_file: str, name: str = "", with_nacm: bool=False):
        super().__init__(dm, name, with_nacm)
676 677 678
        self.json_file = json_file

    def load(self):
Pavel Spirek's avatar
Pavel Spirek committed
679
        self._data = None
680
        with open(self.json_file, "rt") as fp:
Pavel Spirek's avatar
Pavel Spirek committed
681
            self._data = self._dm.from_raw(json.load(fp))
Pavel Spirek's avatar
Pavel Spirek committed
682

683 684 685
        if self.nacm is not None:
            self.nacm.update()

686 687 688 689 690
    def load_yl_data(self, filename: str):
        self._yang_lib_data = None
        with open(filename, "rt") as fp:
            self._yang_lib_data = self._dm.from_raw(json.load(fp))

691 692 693
    def save(self):
        with open(self.json_file, "w") as jfd:
            json.dump(self._data.raw_value(), jfd, indent=4)