data.py 25.3 KB
Newer Older
Pavel Spirek's avatar
Pavel Spirek committed
1
import json
2

3
from threading import Lock
Pavel Spirek's avatar
Pavel Spirek committed
4
from enum import Enum
5
from colorlog import error, warning as warn, info
6
from typing import List, Any, Dict, Callable, Optional
7

8
from yangson.datamodel import DataModel
9 10 11 12 13 14 15 16 17 18
from yangson.enumerations import ContentType, ValidationScope
from yangson.schema import (
    SchemaNode,
    NonexistentSchemaNode,
    ListNode,
    LeafListNode,
    SchemaError,
    SemanticError,
    InternalNode
)
19 20 21
from yangson.instance import (
    InstanceNode,
    NonexistentInstance,
22
    InstanceValueError,
23 24 25 26 27
    ArrayValue,
    ObjectValue,
    MemberName,
    EntryKeys,
    EntryIndex,
28
    InstanceRoute,
29
    ArrayEntry,
30 31
    RootNode
)
32

33
from .helpers import PathFormat, ErrorHelpers, LogHelpers, DataHelpers, JsonNodeT
34
from .config import CONFIG
35 36
from .nacm import NacmConfig, Permission, Action
from .handler_list import OP_HANDLERS, STATE_DATA_HANDLES, CONF_DATA_HANDLES
37 38 39

epretty = ErrorHelpers.epretty
debug_data = LogHelpers.create_module_dbg_logger(__name__)
Pavel Spirek's avatar
Pavel Spirek committed
40 41


42 43 44 45 46 47
class ChangeType(Enum):
    CREATE = 0,
    REPLACE = 1,
    DELETE = 2


Pavel Spirek's avatar
Pavel Spirek committed
48
class NacmForbiddenError(Exception):
49
    def __init__(self, msg="Access to data node rejected by NACM", rule=None):
Pavel Spirek's avatar
Pavel Spirek committed
50
        self.msg = msg
51
        self.rulename = rule
Pavel Spirek's avatar
Pavel Spirek committed
52

53 54 55
    def __str__(self):
        return self.msg

Pavel Spirek's avatar
Pavel Spirek committed
56 57 58 59 60

class DataLockError(Exception):
    def __init__(self, msg=""):
        self.msg = msg

Pavel Spirek's avatar
Pavel Spirek committed
61 62
    def __str__(self):
        return self.msg
Pavel Spirek's avatar
Pavel Spirek committed
63 64


65
class InstanceAlreadyPresent(Exception):
66 67 68 69 70 71 72
    def __init__(self, msg=""):
        self.msg = msg

    def __str__(self):
        return self.msg


73
class HandlerError(Exception):
74 75 76 77 78 79 80
    def __init__(self, msg=""):
        self.msg = msg

    def __str__(self):
        return self.msg


81 82 83 84
class NoHandlerError(HandlerError):
    pass


85 86 87 88
class ConfHandlerFailedError(HandlerError):
    pass


89
class NoHandlerForOpError(NoHandlerError):
90 91 92 93 94
    def __init__(self, op_name: str):
        self.op_name = op_name

    def __str__(self):
        return "Nonexistent handler for operation \"{}\"".format(self.op_name)
95 96 97 98 99 100


class NoHandlerForStateDataError(NoHandlerError):
    pass


101
class BaseDataListener:
102
    def __init__(self, ds: "BaseDatastore", sch_pth: str):
103
        self.ds = ds
104 105
        self.schema_path = sch_pth                          # type: str
        self.schema_node = ds.get_schema_node(sch_pth)      # type: SchemaNode
106

107
    def process(self, sn: SchemaNode, ii: InstanceRoute, ch: "DataChange"):
108 109 110
        raise NotImplementedError("Not implemented in base class")

    def __str__(self):
111
        return self.__class__.__name__ + ": listening at " + self.schema_path
112 113


114
class RpcInfo:
Pavel Spirek's avatar
Pavel Spirek committed
115
    def __init__(self):
116 117
        self.username = None    # type: str
        self.path = None        # type: str
118
        self.qs = None          # type: Dict[str, List[str]]
119 120 121 122
        self.path_format = PathFormat.URL   # type: PathFormat
        self.skip_nacm_check = False        # type: bool
        self.op_name = None                 # type: str
        self.op_input_args = None           # type: ObjectValue
Pavel Spirek's avatar
Pavel Spirek committed
123 124


125 126 127 128 129 130 131 132
class DataChange:
    def __init__(self, change_type: ChangeType, rpc_info: RpcInfo, data: Any):
        self.change_type = change_type
        self.rpc_info = rpc_info
        self.data = data


class ChangeList:
Pavel Spirek's avatar
Pavel Spirek committed
133 134
    def __init__(self, root_origin_cl: InstanceNode, changelist_name: str):
        self.root_list = [root_origin_cl]
135 136 137
        self.changelist_name = changelist_name
        self.journal = []   # type: List[DataChange]

Pavel Spirek's avatar
Pavel Spirek committed
138
    def add(self, change: DataChange, root_after_change: InstanceNode):
139
        self.journal.append(change)
Pavel Spirek's avatar
Pavel Spirek committed
140 141 142 143
        self.root_list.append(root_after_change)


class UsrChangeJournal:
144
    def __init__(self, root_origin: InstanceNode, transaction_opts: Optional[JsonNodeT]):
Pavel Spirek's avatar
Pavel Spirek committed
145
        self.root_origin = root_origin
146
        self.transaction_opts = transaction_opts
Pavel Spirek's avatar
Pavel Spirek committed
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
        self.clists = []    # type: List[ChangeList]

    def cl_new(self, cl_name: str):
        self.clists.append(ChangeList(self.get_root_head(), cl_name))

    def cl_drop(self) -> bool:
        try:
            self.clists.pop()
            return True
        except IndexError:
            return False

    def get_root_head(self) -> InstanceNode:
        if len(self.clists) > 0:
            return self.clists[-1].root_list[-1]
        else:
            return self.root_origin

    def list(self) -> str:
        chl_json = {}
        for chl in self.clists:
            changes = []
            for ch in chl.journal:
                changes.append(
                    [ch.change_type.name, ch.rpc_info.path]
                )

            chl_json[chl.changelist_name] = changes

        return chl_json

    def commit(self, ds: "BaseDatastore"):
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
        if hash(ds.get_data_root()) == hash(self.root_origin):
            info("Commiting new configuration (swapping roots)")
            # Set new root
            nr = self.get_root_head()
        else:
            info("Commiting new configuration (re-applying changes)")
            nr = ds.get_data_root()
            for cl in self.clists:
                for change in cl.journal:
                    if change.change_type == ChangeType.CREATE:
                        nr = ds.create_node_rpc(nr, change.rpc_info, change.data)
                    elif change.change_type == ChangeType.REPLACE:
                        nr = ds.update_node_rpc(nr, change.rpc_info, change.data)
                    elif change.change_type == ChangeType.DELETE:
                        nr = ds.delete_node_rpc(nr, change.rpc_info)

Pavel Spirek's avatar
Pavel Spirek committed
195
        try:
196
            # Validate syntax and semantics of new data
197 198
            if CONFIG["GLOBAL"]["VALIDATE_TRANSACTIONS"] is True:
                nr.validate(ValidationScope.all, ContentType.config)
199 200 201 202 203 204 205
            new_data_valid = True
        except (SchemaError, SemanticError) as e:
            error("Data validation error:")
            error(epretty(e))
            new_data_valid = False

        if new_data_valid:
Pavel Spirek's avatar
Pavel Spirek committed
206
            # Set new data root
207
            ds.set_data_root(nr)
Pavel Spirek's avatar
Pavel Spirek committed
208

209 210 211 212 213 214 215 216
            # Call commit begin hook
            begin_hook_failed = False
            try:
                ds.commit_begin_callback(self.transaction_opts)
            except Exception as e:
                error("Exception occured in commit_begin handler: {}".format(epretty(e)))
                begin_hook_failed = True

217
            # Run schema node handlers
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
            conf_handler_failed = False
            if not begin_hook_failed:
                try:
                    for cl in self.clists:
                        for change in cl.journal:
                            ii = DataHelpers.parse_ii(change.rpc_info.path, change.rpc_info.path_format)
                            ds.run_conf_edit_handler(ii, change)
                except Exception as e:
                    error("Exception occured in edit handler: {}".format(epretty(e)))
                    conf_handler_failed = True

            # Call commit end hook
            end_hook_failed = False
            end_hook_abort_failed = False
            if not (begin_hook_failed or conf_handler_failed):
                try:
                    ds.commit_end_callback(self.transaction_opts, failed=False)
                except Exception as e:
                    error("Exception occured in commit_end handler: {}".format(epretty(e)))
                    end_hook_failed = True

            if begin_hook_failed or conf_handler_failed or end_hook_failed:
                try:
                    # Call commit_end callback again with "failed" argument set to True
                    ds.commit_end_callback(self.transaction_opts, failed=True)
                except Exception as e:
                    error("Exception occured in commit_end handler (abort): {}".format(epretty(e)))
                    end_hook_abort_failed = True
Pavel Spirek's avatar
Pavel Spirek committed
246 247 248

            # Clear user changelists
            self.clists.clear()
249

250 251 252 253
            # Return to previous version of data and raise an exception if something went wrong
            if begin_hook_failed or conf_handler_failed or end_hook_failed or end_hook_abort_failed:
                ds.data_root_rollback(history_steps=1, store_current=False)
                raise ConfHandlerFailedError("(see logged)")
254

255

Pavel Spirek's avatar
Pavel Spirek committed
256
class BaseDatastore:
257
    def __init__(self, dm: DataModel, name: str="", with_nacm: bool=False):
258 259 260
        def _blankfn(*args, **kwargs):
            pass

Pavel Spirek's avatar
Pavel Spirek committed
261
        self.name = name
Pavel Spirek's avatar
Pavel Spirek committed
262
        self.nacm = None    # type: NacmConfig
263
        self._data = None   # type: InstanceNode
264 265
        self._data_history = []     # type: List[InstanceNode]
        self._yang_lib_data = None  # type: InstanceNode
266
        self._dm = dm       # type: DataModel
Pavel Spirek's avatar
Pavel Spirek committed
267
        self._data_lock = Lock()
Pavel Spirek's avatar
Pavel Spirek committed
268
        self._lock_username = None  # type: str
269
        self._usr_journals = {}     # type: Dict[str, UsrChangeJournal]
270 271
        self.commit_begin_callback = _blankfn   # type: Callable[..., bool]
        self.commit_end_callback = _blankfn     # type: Callable[..., bool]
Pavel Spirek's avatar
Pavel Spirek committed
272

273 274
        if with_nacm:
            self.nacm = NacmConfig(self)
Pavel Spirek's avatar
Pavel Spirek committed
275

Pavel Spirek's avatar
Pavel Spirek committed
276
    # Returns the root node of data tree
277 278 279 280 281
    def get_data_root(self, previous_version: int=0) -> InstanceNode:
        if previous_version > 0:
            return self._data_history[-previous_version]
        else:
            return self._data
Pavel Spirek's avatar
Pavel Spirek committed
282

283 284 285
    def get_yl_data_root(self) -> InstanceNode:
        return self._yang_lib_data

286 287 288 289 290 291 292 293 294
    # Returns the root node of data tree
    def get_data_root_staging(self, username: str) -> InstanceNode:
        usr_journal = self._usr_journals.get(username)
        if usr_journal is not None:
            root = usr_journal.get_root_head()
            return root
        else:
            raise NoHandlerError("No active changelist for user \"{}\"".format(username))

295
    # Set a new Instance node as data root, store old root to archive
Pavel Spirek's avatar
Pavel Spirek committed
296
    def set_data_root(self, new_root: InstanceNode):
297
        self._data_history.append(self._data)
Pavel Spirek's avatar
Pavel Spirek committed
298 299
        self._data = new_root

300 301 302 303 304 305
    def data_root_rollback(self, history_steps: int, store_current: bool):
        if store_current:
            self._data_history.append(self._data)

        self._data = self._data_history[-history_steps]

306 307
    # Get schema node with particular schema address
    def get_schema_node(self, sch_pth: str) -> SchemaNode:
308
        sn = self._dm.get_schema_node(sch_pth)
309
        if sn is None:
310 311
            # raise NonexistentSchemaNode(sch_pth)
            debug_data("Cannot find schema node for " + sch_pth)
312 313 314
        return sn

    # Notify data observers about change in datastore
315
    def run_conf_edit_handler(self, ii: InstanceRoute, ch: DataChange):
316
        try:
Pavel Spirek's avatar
Pavel Spirek committed
317
            sch_pth_list = filter(lambda n: isinstance(n, MemberName), ii)
318
            sch_pth = DataHelpers.ii2str(sch_pth_list)
319 320 321
            sn = self.get_schema_node(sch_pth)

            while sn is not None:
322 323
                h = CONF_DATA_HANDLES.get_handler(str(id(sn)))
                if h is not None:
324
                    h.process(sn, ii, ch)
325 326 327
                sn = sn.parent
        except NonexistentInstance:
            warn("Cannnot notify {}, parent container removed".format(ii))
328

Pavel Spirek's avatar
Pavel Spirek committed
329
    # Get data node, evaluate NACM if required
330
    def get_node_rpc(self, rpc: RpcInfo, yl_data=False, staging=False) -> InstanceNode:
331
        ii = DataHelpers.parse_ii(rpc.path, rpc.path_format)
332 333 334
        if yl_data:
            root = self._yang_lib_data
        else:
335 336 337 338
            if staging:
                root = self.get_data_root_staging(rpc.username)
            else:
                root = self._data
339

340
        sch_pth_list = filter(lambda isel: isinstance(isel, MemberName), ii)
341
        sch_pth = DataHelpers.ii2str(sch_pth_list)
342
        sn = self.get_schema_node(sch_pth)
343 344 345 346 347 348 349 350 351 352 353 354
        state_roots = sn.state_roots()

        if not yl_data and state_roots:
            self.commit_begin_callback()
            for state_node_pth in state_roots:
                sdh = STATE_DATA_HANDLES.get_handler(state_node_pth)
                if sdh is not None:
                    root_val = sdh.update_node(ii, root, True)
                    root = self._data.update(root_val, raw=True)
                else:
                    raise NoHandlerForStateDataError()
            self.commit_end_callback()
Pavel Spirek's avatar
Pavel Spirek committed
355

356
        n = root.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
357

358 359
        try:
            with_defs = rpc.qs["with-defaults"][0]
360
        except (IndexError, KeyError):
361 362 363 364 365
            with_defs = None

        if with_defs == "report-all":
            n = n.add_defaults()

Pavel Spirek's avatar
Pavel Spirek committed
366 367
        if self.nacm:
            nrpc = self.nacm.get_user_nacm(rpc.username)
368
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_READ) == Action.DENY:
Pavel Spirek's avatar
Pavel Spirek committed
369 370
                raise NacmForbiddenError()
            else:
371 372
                # Prune nodes that should not be accessible to user
                n = nrpc.prune_data_tree(n, root, ii, Permission.NACM_ACCESS_READ)
Pavel Spirek's avatar
Pavel Spirek committed
373

374 375
        try:
            max_depth = int(rpc.qs["depth"][0])
376
        except (IndexError, KeyError):
377 378 379 380 381 382 383 384 385 386 387
            max_depth = None
        except ValueError:
            raise ValueError("Invalid value of query param \"depth\"")

        if max_depth is not None:
            def _tree_limit_depth(node: InstanceNode, depth: int) -> InstanceNode:
                if isinstance(node.value, ObjectValue):
                    if depth > max_depth:
                        node.value = ObjectValue({})
                    else:
                        for child_key in sorted(node.value.keys()):
388
                            m = node[child_key]
389 390 391 392 393 394
                            node = _tree_limit_depth(m, depth + 1).up()
                elif isinstance(node.value, ArrayValue):
                    if depth > max_depth:
                        node.value = ArrayValue([])
                    else:
                        for i in range(len(node.value)):
395
                            e = node[i]
396 397 398 399 400
                            node = _tree_limit_depth(e, depth + 1).up()

                return node
            n = _tree_limit_depth(n, 1)

Pavel Spirek's avatar
Pavel Spirek committed
401 402
        return n

403
    # Create new data node (Restconf draft compliant version)
404 405
    def create_node_rpc(self, root: InstanceNode, rpc: RpcInfo, value: Any) -> InstanceNode:
        ii = DataHelpers.parse_ii(rpc.path, rpc.path_format)
406
        n = root.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
407

408 409 410
        insert = rpc.qs.get("insert", [None])[0]
        point = rpc.qs.get("point", [None])[0]

411
        if self.nacm:
412
            nrpc = self.nacm.get_user_nacm(rpc.username)
413
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_CREATE) == Action.DENY:
414 415
                raise NacmForbiddenError()

416
        # Get target member name
417 418 419 420 421
        input_member_name = tuple(value.keys())
        if len(input_member_name) != 1:
            raise ValueError("Received json object must contain exactly one member")
        else:
            input_member_name = input_member_name[0]
Pavel Spirek's avatar
Pavel Spirek committed
422

423
        input_member_value = value[input_member_name]
Pavel Spirek's avatar
Pavel Spirek committed
424

425
        # Check if target member already exists
426
        try:
427
            existing_member = n[input_member_name]
428
        except NonexistentInstance:
429
            existing_member = None
430

431
        # Get target schema node
432 433
        n = root.goto(ii)

434
        sn = n.schema_node  # type: InternalNode
435
        sch_member_name = sn._iname2qname(input_member_name)
436
        member_sn = sn.get_data_child(*sch_member_name)
437

438
        if isinstance(member_sn, ListNode):
Pavel Spirek's avatar
Pavel Spirek committed
439
            # Append received node to list
440

441 442
            # Create list if necessary
            if existing_member is None:
443
                existing_member = n.put_member(input_member_name, ArrayValue([]))
444

445
            # Convert input data from List/Dict to ArrayValue/ObjectValue
446 447
            new_value_list = member_sn.from_raw([input_member_value])
            new_value_item = new_value_list[0]    # type: ObjectValue
448 449

            list_node_key = member_sn.keys[0][0]
450
            if new_value_item[list_node_key] in map(lambda x: x[list_node_key], existing_member.value):
451 452 453
                raise InstanceAlreadyPresent("Duplicate key")

            if insert == "first":
454
                # Optimization
455
                if len(existing_member.value) > 0:
456
                    list_entry_first = existing_member[0]   # type: ArrayEntry
457
                    new_member = list_entry_first.insert_before(new_value_item).up()
458 459
                else:
                    new_member = existing_member.update(new_value_list)
460
            elif (insert == "last") or insert is None:
461
                # Optimization
462
                if len(existing_member.value) > 0:
463 464 465 466
                    list_entry_last = existing_member[-1]   # type: ArrayEntry
                    new_member = list_entry_last.insert_after(new_value_item).up()
                else:
                    new_member = existing_member.update(new_value_list)
467 468
            elif insert == "before":
                entry_sel = EntryKeys({list_node_key: point})
469 470
                list_entry = entry_sel.goto_step(existing_member)   # type: ArrayEntry
                new_member = list_entry.insert_before(new_value_item).up()
471 472
            elif insert == "after":
                entry_sel = EntryKeys({list_node_key: point})
473 474
                list_entry = entry_sel.goto_step(existing_member)   # type: ArrayEntry
                new_member = list_entry.insert_after(new_value_item).up()
475 476
            else:
                raise ValueError("Invalid 'insert' value")
477 478 479 480 481
        elif isinstance(member_sn, LeafListNode):
            # Append received node to leaf list

            # Create leaf list if necessary
            if existing_member is None:
482
                existing_member = n.put_member(input_member_name, ArrayValue([]))
483

484
            # Convert input data from List/Dict to ArrayValue/ObjectValue
485
            new_value_item = member_sn.from_raw([input_member_value])[0]
486 487

            if insert == "first":
488
                new_member = existing_member.update(ArrayValue([new_value_item] + existing_member.value))
489
            elif (insert == "last") or insert is None:
490
                new_member = existing_member.update(ArrayValue(existing_member.value + [new_value_item]))
491 492
            else:
                raise ValueError("Invalid 'insert' value")
Pavel Spirek's avatar
Pavel Spirek committed
493
        else:
494 495 496 497
            if existing_member is None:
                # Create new data node

                # Convert input data from List/Dict to ArrayValue/ObjectValue
498
                new_value_item = member_sn.from_raw(input_member_value)
499 500

                # Create new node (object member)
501
                new_member = n.put_member(input_member_name, new_value_item)
502 503 504
            else:
                # Data node already exists
                raise InstanceAlreadyPresent("Member \"{}\" already present in \"{}\"".format(input_member_name, ii))
Pavel Spirek's avatar
Pavel Spirek committed
505

506
        return new_member.top()
507

508
    # PUT data node
509
    def update_node_rpc(self, root: InstanceNode, rpc: RpcInfo, value: Any) -> InstanceNode:
510
        ii = DataHelpers.parse_ii(rpc.path, rpc.path_format)
511
        n = root.goto(ii)
512

513
        if self.nacm:
514
            nrpc = self.nacm.get_user_nacm(rpc.username)
515
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_UPDATE) == Action.DENY:
516
                raise NacmForbiddenError()
Pavel Spirek's avatar
Pavel Spirek committed
517

518 519 520 521 522 523 524 525 526
            sn = n.schema_node
            new_val = sn.from_raw([value])[0]
            new_node = RootNode(new_val, sn, new_val.timestamp)
            new_node_prunned = nrpc.prune_data_tree(new_node, self._data, ii, Permission.NACM_ACCESS_UPDATE).raw_value()
            print(json.dumps(new_node_prunned, indent=4))

            new_n = n.update(new_node_prunned, raw=True)
        else:
            new_n = n.update(value, raw=True)
527

528
        return new_n.top()
529

530
    # Delete data node
531
    def delete_node_rpc(self, root: InstanceNode, rpc: RpcInfo) -> InstanceNode:
532
        ii = DataHelpers.parse_ii(rpc.path, rpc.path_format)
533
        n = root.goto(ii)
Pavel Spirek's avatar
Pavel Spirek committed
534 535 536
        n_parent = n.up()
        last_isel = ii[-1]

537
        if self.nacm:
538
            nrpc = self.nacm.get_user_nacm(rpc.username)
539
            if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_DELETE) == Action.DENY:
540
                raise NacmForbiddenError()
Pavel Spirek's avatar
Pavel Spirek committed
541

542
        new_n = n_parent
543 544
        if isinstance(n_parent.value, ArrayValue):
            if isinstance(last_isel, EntryIndex):
545
                new_n = n_parent.delete_item(last_isel.key)
546
            elif isinstance(last_isel, EntryKeys):
547
                new_n = n_parent.delete_item(n.index)
548 549
        elif isinstance(n_parent.value, ObjectValue):
            if isinstance(last_isel, MemberName):
550
                new_n = n_parent.delete_item(last_isel.key)
551
        else:
552
            raise InstanceValueError(n, "Invalid target node type")
Pavel Spirek's avatar
Pavel Spirek committed
553

554
        return new_n.top()
Pavel Spirek's avatar
Pavel Spirek committed
555

556
    # Invoke an operation
557 558
    def invoke_op_rpc(self, rpc: RpcInfo) -> ObjectValue:
        if rpc.op_name == "conf-start":
Pavel Spirek's avatar
Pavel Spirek committed
559 560 561 562 563
            try:
                cl_name = rpc.op_input_args["name"]
            except (TypeError, KeyError):
                raise ValueError("This operation expects \"name\" input parameter")

564 565 566 567 568
            transaction_opts = rpc.op_input_args.get("options")

            if self._usr_journals.get(rpc.username) is None:
                self._usr_journals[rpc.username] = UsrChangeJournal(self._data, transaction_opts)

Pavel Spirek's avatar
Pavel Spirek committed
569
            self._usr_journals[rpc.username].cl_new(cl_name)
570 571
            ret_data = {"status": "OK"}
        elif rpc.op_name == "conf-list":
Pavel Spirek's avatar
Pavel Spirek committed
572 573 574 575 576 577
            usr_journal = self._usr_journals.get(rpc.username)
            if usr_journal is not None:
                chl_json = usr_journal.list()
            else:
                chl_json = str(None)

578 579 580 581 582 583
            ret_data = \
                {
                    "status": "OK",
                    "changelists": chl_json
                }
        elif rpc.op_name == "conf-drop":
Pavel Spirek's avatar
Pavel Spirek committed
584 585 586 587
            usr_journal = self._usr_journals.get(rpc.username)
            if usr_journal is not None:
                if not usr_journal.cl_drop():
                    del self._usr_journals[rpc.username]
588 589 590

            ret_data = {"status": "OK"}
        elif rpc.op_name == "conf-commit":
Pavel Spirek's avatar
Pavel Spirek committed
591 592
            usr_journal = self._usr_journals.get(rpc.username)
            if usr_journal is not None:
593 594 595 596 597 598 599 600
                try:
                    self.lock_data(rpc.username)
                    usr_journal.commit(self)
                    if CONFIG["GLOBAL"]["PERSISTENT_CHANGES"] is True:
                        self.save()
                finally:
                    self.unlock_data()

Pavel Spirek's avatar
Pavel Spirek committed
601 602
                del self._usr_journals[rpc.username]
            else:
603
                info("[{}]: Nothing to commit".format(rpc.username))
Pavel Spirek's avatar
Pavel Spirek committed
604 605 606 607 608 609

            ret_data = \
                {
                    "status": "OK",
                    "conf-changed": True
                }
610
        elif rpc.op_name == "get-schema-digest":
611
            ret_data = self._dm.schema_digest()
612
        else:
613
            # User-defined operation
614 615 616 617 618 619 620
            if self.nacm and (not rpc.skip_nacm_check):
                nrpc = self.nacm.get_user_nacm(rpc.username)
                if nrpc.check_rpc_name(rpc.op_name) == Action.DENY:
                    raise NacmForbiddenError(
                        "Op \"{}\" invocation denied for user \"{}\"".format(rpc.op_name, rpc.username)
                    )

621 622
            op_handler = OP_HANDLERS.get_handler(rpc.op_name)
            if op_handler is None:
623
                raise NoHandlerForOpError(rpc.op_name)
624 625 626 627 628 629 630

            # Print operation input schema
            # sn = self.get_schema_node(rpc.path)
            # sn_input = sn.get_child("input")
            # if sn_input is not None:
            #     print("RPC input schema:")
            #     print(sn_input._ascii_tree(""))
631

632
            ret_data = op_handler(rpc.op_input_args)
633 634 635

        return ret_data

Pavel Spirek's avatar
Pavel Spirek committed
636 637 638 639 640
    def add_to_journal_rpc(self, ch_type: ChangeType, rpc: RpcInfo, value: Any, new_root: InstanceNode):
        usr_journal = self._usr_journals.get(rpc.username)
        if usr_journal is not None:
            usr_chs = usr_journal.clists[-1]
            usr_chs.add(DataChange(ch_type, rpc, value), new_root)
641 642 643
        else:
            raise NoHandlerError("No active changelist for user \"{}\"".format(rpc.username))

Pavel Spirek's avatar
Pavel Spirek committed
644
    # Locks datastore data
645 646
    def lock_data(self, username: str = None, blocking: bool=True):
        ret = self._data_lock.acquire(blocking=blocking, timeout=1)
Pavel Spirek's avatar
Pavel Spirek committed
647
        if ret:
Pavel Spirek's avatar
Pavel Spirek committed
648
            self._lock_username = username or "(unknown)"
649
            debug_data("Acquired lock in datastore \"{}\" for user \"{}\"".format(self.name, username))
Pavel Spirek's avatar
Pavel Spirek committed
650
        else:
Pavel Spirek's avatar
Pavel Spirek committed
651
            raise DataLockError(
652 653 654 655 656
                "Failed to acquire lock in datastore \"{}\" for user \"{}\", already locked by \"{}\"".format(
                    self.name,
                    username,
                    self._lock_username
                )
Pavel Spirek's avatar
Pavel Spirek committed
657 658
            )

659
    # Unlock datastore data
Pavel Spirek's avatar
Pavel Spirek committed
660 661
    def unlock_data(self):
        self._data_lock.release()
662
        debug_data("Released lock in datastore \"{}\" for user \"{}\"".format(self.name, self._lock_username))
Pavel Spirek's avatar
Pavel Spirek committed
663 664
        self._lock_username = None

665
    # Load data from persistent storage
666
    def load(self):
Pavel Spirek's avatar
Pavel Spirek committed
667 668
        raise NotImplementedError("Not implemented in base class")

669
    # Save data to persistent storage
670
    def save(self):
Pavel Spirek's avatar
Pavel Spirek committed
671 672
        raise NotImplementedError("Not implemented in base class")

Pavel Spirek's avatar
Pavel Spirek committed
673 674

class JsonDatastore(BaseDatastore):
675 676
    def __init__(self, dm: DataModel, json_file: str, name: str = "", with_nacm: bool=False):
        super().__init__(dm, name, with_nacm)
677 678 679
        self.json_file = json_file

    def load(self):
Pavel Spirek's avatar
Pavel Spirek committed
680
        self._data = None
681
        with open(self.json_file, "rt") as fp:
Pavel Spirek's avatar
Pavel Spirek committed
682
            self._data = self._dm.from_raw(json.load(fp))
Pavel Spirek's avatar
Pavel Spirek committed
683

684 685 686
        if self.nacm is not None:
            self.nacm.update()

687 688 689 690 691
    def load_yl_data(self, filename: str):
        self._yang_lib_data = None
        with open(filename, "rt") as fp:
            self._yang_lib_data = self._dm.from_raw(json.load(fp))

692 693 694
    def save(self):
        with open(self.json_file, "w") as jfd:
            json.dump(self._data.raw_value(), jfd, indent=4)