Commit 1a41483a authored by Pavel Spirek's avatar Pavel Spirek

Another code refactor, version bump

parent 24596fb2
PROJECT = jetconf
VERSION = 0.3.1
VERSION = 0.3.2
.PHONY = tags deps install-deps test
tags:
......
import json
from threading import Lock
from enum import Enum
from colorlog import error, warning as warn, info
from typing import List, Any, Dict, Callable, Optional, Tuple
from datetime import datetime
from yangson.datamodel import DataModel
from yangson.enumerations import ContentType, ValidationScope
from yangson.schemanode import (
SchemaNode,
ListNode,
LeafListNode,
SchemaError,
SemanticError,
InternalNode,
ContainerNode
)
from yangson.enumerations import ValidationScope
from yangson.schemanode import SchemaNode, ListNode, LeafListNode, InternalNode
from yangson.instvalue import ArrayValue, ObjectValue
from yangson.instance import (
InstanceNode,
......@@ -27,24 +18,16 @@ from yangson.instance import (
EntryIndex,
InstanceRoute,
ArrayEntry,
RootNode,
ObjectMember
RootNode
)
from . import config
from .helpers import PathFormat, ErrorHelpers, LogHelpers, DataHelpers, JsonNodeT
from .nacm import NacmConfig, Permission, Action
from .handler_list import (
OP_HANDLERS,
STATE_DATA_HANDLES,
CONF_DATA_HANDLES,
ConfDataObjectHandler,
ConfDataListHandler,
StateDataContainerHandler,
StateDataListHandler
)
from .journal import ChangeType, UsrChangeJournal, RpcInfo, DataChange
from .handler_base import ConfDataObjectHandler, ConfDataListHandler, StateDataContainerHandler, StateDataListHandler
from .handler_list import ConfDataHandlerList, StateDataHandlerList, OpHandlerList
from .errors import (
ConfHandlerFailedError,
StagingDataException,
NoHandlerForStateDataError,
NoHandlerForOpError,
......@@ -59,170 +42,31 @@ epretty = ErrorHelpers.epretty
debug_data = LogHelpers.create_module_dbg_logger(__name__)
class ChangeType(Enum):
CREATE = 0,
REPLACE = 1,
DELETE = 2
class RpcInfo:
class BackendHandlers:
def __init__(self):
self.username = None # type: str
self.path = None # type: str
self.qs = None # type: Dict[str, List[str]]
self.path_format = PathFormat.URL # type: PathFormat
self.skip_nacm_check = False # type: bool
self.op_name = None # type: str
self.op_input_args = None # type: ObjectValue
class DataChange:
def __init__(self, change_type: ChangeType, rpc_info: RpcInfo, input_data: JsonNodeT, root_after_change: InstanceNode, nacm_modified: bool):
self.change_type = change_type
self.rpc_info = rpc_info
self.input_data = input_data
self.root_after_change = root_after_change
self.nacm_modified = nacm_modified
class UsrChangeJournal:
def __init__(self, root_origin: InstanceNode):
self._root_origin = root_origin
self._journal = [] # type: List[DataChange]
def get_root_head(self) -> InstanceNode:
if len(self._journal) > 0:
return self._journal[-1].root_after_change
else:
return self._root_origin
def get_root_origin(self) -> InstanceNode:
return self._root_origin
def add(self, change: DataChange):
self._journal.append(change)
def list(self) -> JsonNodeT:
changes_info = []
for ch in self._journal:
changes_info.append([ch.change_type.name, ch.rpc_info.path])
return changes_info
def commit(self, ds: "BaseDatastore") -> bool:
nacm_modified = False
if len(self._journal) == 0:
return False
if hash(ds.get_data_root()) == hash(self._root_origin):
info("Commiting new configuration (swapping roots)")
# Set new root
nr = self.get_root_head()
for change in self._journal:
nacm_modified = nacm_modified or change.nacm_modified
else:
info("Commiting new configuration (re-applying changes)")
nr = ds.get_data_root()
for change in self._journal:
nacm_modified = nacm_modified or change.nacm_modified
if change.change_type == ChangeType.CREATE:
nr = ds.create_node_rpc(nr, change.rpc_info, change.input_data)[0]
elif change.change_type == ChangeType.REPLACE:
nr = ds.update_node_rpc(nr, change.rpc_info, change.input_data)[0]
elif change.change_type == ChangeType.DELETE:
nr = ds.delete_node_rpc(nr, change.rpc_info)[0]
try:
# Validate syntax and semantics of new data
if config.CFG.glob["VALIDATE_TRANSACTIONS"] is True:
nr.validate(ValidationScope.all, ContentType.config)
except (SchemaError, SemanticError) as e:
error("Data validation error:")
error(epretty(e))
raise e
# Set new data root
ds.set_data_root(nr)
# Update NACM if NACM data has been affected by any edit
if nacm_modified and ds.nacm is not None:
ds.nacm.update()
# Call commit begin hook
begin_hook_failed = False
try:
ds.commit_begin_callback()
except Exception as e:
error("Exception occured in commit_begin handler: {}".format(epretty(e)))
begin_hook_failed = True
# Run schema node handlers
conf_handler_failed = False
if not begin_hook_failed:
try:
for change in self._journal:
ii = ds.parse_ii(change.rpc_info.path, change.rpc_info.path_format)
ds.run_conf_edit_handler(ii, change)
except Exception as e:
error("Exception occured in edit handler: {}".format(epretty(e)))
conf_handler_failed = True
self.conf = ConfDataHandlerList()
self.state = StateDataHandlerList()
self.op = OpHandlerList()
# Call commit end hook
end_hook_failed = False
end_hook_abort_failed = False
if not (begin_hook_failed or conf_handler_failed):
try:
ds.commit_end_callback(failed=False)
except Exception as e:
error("Exception occured in commit_end handler: {}".format(epretty(e)))
end_hook_failed = True
if begin_hook_failed or conf_handler_failed or end_hook_failed:
try:
# Call commit_end callback again with "failed" argument set to True
ds.commit_end_callback(failed=True)
except Exception as e:
error("Exception occured in commit_end handler (abort): {}".format(epretty(e)))
end_hook_abort_failed = True
# Return to previous version of data and raise an exception if something went wrong
if begin_hook_failed or conf_handler_failed or end_hook_failed or end_hook_abort_failed:
ds.data_root_rollback(history_steps=1, store_current=False)
# Update NACM again after rollback
if nacm_modified and ds.nacm is not None:
ds.nacm.update()
raise ConfHandlerFailedError("(see logged)")
def _blankfn(*args, **kwargs):
pass
return True
self.commit_begin = _blankfn # type: Callable[[], None]
self.commit_end = _blankfn # type: Callable[[bool], None]
class BaseDatastore:
def __init__(self, dm: DataModel, with_nacm: bool=False):
self.nacm = None # type: NacmConfig
self._dm = dm # type: DataModel
self._data = None # type: InstanceNode
self._yang_lib_data = self._dm.from_raw(self._dm.yang_library) # type: InstanceNode
self._data_history = [] # type: List[InstanceNode]
self._yang_lib_data = None # type: InstanceNode
self._dm = dm # type: DataModel
self._data_lock = Lock()
self._lock_username = None # type: str
self._usr_journals = {} # type: Dict[str, UsrChangeJournal]
def _blankfn(*args, **kwargs):
pass
self.commit_begin_callback = _blankfn # type: Callable[..., bool]
self.commit_end_callback = _blankfn # type: Callable[..., bool]
if with_nacm:
self.nacm = NacmConfig(self, self._dm)
self._yang_lib_data = self._dm.from_raw(self._dm.yang_library)
self.nacm = None # type: NacmConfig
self.handlers = BackendHandlers()
self.nacm = NacmConfig(self, self._dm) if with_nacm else None
# Returns DataModel object
def get_dm(self) -> DataModel:
......@@ -321,7 +165,7 @@ class BaseDatastore:
if sn is None:
return
h = CONF_DATA_HANDLES.get_handler(id(sn))
h = self.handlers.conf.get_handler(id(sn))
if h is not None:
info("handler for actual data node triggered")
if isinstance(h, ConfDataObjectHandler):
......@@ -341,7 +185,7 @@ class BaseDatastore:
else:
sn = sn.parent
while sn is not None:
h = CONF_DATA_HANDLES.get_handler(id(sn))
h = self.handlers.conf.get_handler(id(sn))
if h is not None and isinstance(h, ConfDataObjectHandler):
info("handler for superior data node triggered, replace")
# print(h.schema_path)
......@@ -409,7 +253,7 @@ class BaseDatastore:
if is_child:
# Direct request for the state data
sdh = STATE_DATA_HANDLES.get_handler(state_root_sch_pth)
sdh = self.handlers.state.get_handler(state_root_sch_pth)
if sdh is not None:
if isinstance(sdh, StateDataContainerHandler):
state_handler_val = sdh.generate_node(ii, rpc.username, staging)
......@@ -442,7 +286,7 @@ class BaseDatastore:
if isinstance(node.value, ObjectValue):
if node.schema_node is state_root_sn.parent:
ii_gen = DataHelpers.node_get_ii(node)
_sdh = STATE_DATA_HANDLES.get_handler(state_root_sch_pth)
_sdh = self.handlers.state.get_handler(state_root_sch_pth)
if _sdh is not None:
try:
if isinstance(_sdh, StateDataContainerHandler):
......@@ -779,7 +623,7 @@ class BaseDatastore:
def invoke_op_rpc(self, rpc: RpcInfo) -> JsonNodeT:
if rpc.op_name.startswith("jetconf:"):
# Jetconf internal operation
op_handler = OP_HANDLERS.get_handler(rpc.op_name)
op_handler = self.handlers.op.get_handler(rpc.op_name)
if op_handler is None:
raise NoHandlerForOpError(rpc.op_name)
......@@ -793,7 +637,7 @@ class BaseDatastore:
"Invocation of \"{}\" operation denied for user \"{}\"".format(rpc.op_name, rpc.username)
)
op_handler = OP_HANDLERS.get_handler(rpc.op_name)
op_handler = self.handlers.op.get_handler(rpc.op_name)
if op_handler is None:
raise NoHandlerForOpError(rpc.op_name)
......
from typing import Callable, Union
from yangson.schemanode import SchemaNode
from yangson.instance import InstanceRoute
from .journal import DataChange, RpcInfo
from .helpers import JsonNodeT
# ---------- Base classes for conf data handlers ----------
class ConfDataHandlerBase:
def __init__(self, ds: "BaseDatastore", sch_pth: str):
self.ds = ds
self.schema_path = sch_pth # type: str
self.schema_node = ds.get_schema_node(sch_pth) # type: SchemaNode
class ConfDataObjectHandler(ConfDataHandlerBase):
def create(self, ii: InstanceRoute, ch: DataChange):
pass
def replace(self, ii: InstanceRoute, ch: DataChange):
pass
def delete(self, ii: InstanceRoute, ch: DataChange):
pass
def __str__(self):
return self.__class__.__name__ + ": listening at " + self.schema_path
class ConfDataListHandler(ConfDataHandlerBase):
def create_item(self, ii: InstanceRoute, ch: DataChange):
pass
def replace_item(self, ii: InstanceRoute, ch: DataChange):
pass
def delete_item(self, ii: InstanceRoute, ch: DataChange):
pass
def create_list(self, ii: InstanceRoute, ch: DataChange):
pass
def replace_list(self, ii: InstanceRoute, ch: DataChange):
pass
def delete_list(self, ii: InstanceRoute, ch: DataChange):
pass
def __str__(self):
return self.__class__.__name__ + ": listening at " + self.schema_path
# ---------- Base classes for state data handlers ----------
class StateDataHandlerBase:
def __init__(self, datastore: "BaseDatastore", schema_path: str):
self.ds = datastore
self.data_model = datastore.get_dm()
self.sch_pth = schema_path
self.schema_node = self.data_model.get_data_node(self.sch_pth)
class StateDataContainerHandler(StateDataHandlerBase):
def generate_node(self, node_ii: InstanceRoute, username: str, staging: bool) -> JsonNodeT:
pass
class StateDataListHandler(StateDataHandlerBase):
def generate_list(self, node_ii: InstanceRoute, username: str, staging: bool) -> JsonNodeT:
pass
def generate_item(self, node_ii: InstanceRoute, username: str, staging: bool) -> JsonNodeT:
pass
# ---------- Types ----------
ConfDataHandler = Union[ConfDataObjectHandler, ConfDataListHandler]
StateDataHandler = Union[StateDataContainerHandler, StateDataListHandler]
OpHandler = Callable[[RpcInfo], JsonNodeT]
from typing import Dict, Callable
from typing import List, Dict, Tuple, Callable
from yangson.schemanode import SchemaNode
from yangson.schemadata import SchemaData
from yangson.instance import InstanceRoute
from yangson.typealiases import SchemaRoute
from .helpers import JsonNodeT
# ---------- Base classes for conf data handlers ----------
class ConfDataHandlerBase:
def __init__(self, ds: "BaseDatastore", sch_pth: str):
self.ds = ds
self.schema_path = sch_pth # type: str
self.schema_node = ds.get_schema_node(sch_pth) # type: SchemaNode
class ConfDataObjectHandler(ConfDataHandlerBase):
def create(self, ii: InstanceRoute, ch: "DataChange"):
pass
def replace(self, ii: InstanceRoute, ch: "DataChange"):
pass
def delete(self, ii: InstanceRoute, ch: "DataChange"):
pass
def __str__(self):
return self.__class__.__name__ + ": listening at " + self.schema_path
class ConfDataListHandler(ConfDataHandlerBase):
def create_item(self, ii: InstanceRoute, ch: "DataChange"):
pass
def replace_item(self, ii: InstanceRoute, ch: "DataChange"):
pass
def delete_item(self, ii: InstanceRoute, ch: "DataChange"):
pass
def create_list(self, ii: InstanceRoute, ch: "DataChange"):
pass
def replace_list(self, ii: InstanceRoute, ch: "DataChange"):
pass
def delete_list(self, ii: InstanceRoute, ch: "DataChange"):
pass
def __str__(self):
return self.__class__.__name__ + ": listening at " + self.schema_path
# ---------- Base classes for state data handlers ----------
class StateDataHandlerBase:
def __init__(self, datastore: "BaseDatastore", schema_path: str):
self.ds = datastore
self.data_model = datastore.get_dm()
self.sch_pth = schema_path
self.schema_node = self.data_model.get_data_node(self.sch_pth)
class StateDataContainerHandler(StateDataHandlerBase):
def generate_node(self, node_ii: InstanceRoute, username: str, staging: bool) -> JsonNodeT:
pass
class StateDataListHandler(StateDataHandlerBase):
def generate_list(self, node_ii: InstanceRoute, username: str, staging: bool) -> JsonNodeT:
pass
def generate_item(self, node_ii: InstanceRoute, username: str, staging: bool) -> JsonNodeT:
pass
from .handler_base import ConfDataHandlerBase, StateDataHandlerBase, ConfDataHandler, StateDataHandler, OpHandler
# ---------- Handler lists ----------
......@@ -85,22 +17,22 @@ class ConfDataHandlerList:
self.handlers[sch_node_id] = handler
self.handlers_pth[handler.schema_path] = handler
def get_handler(self, sch_node_id: int) -> ConfDataHandlerBase:
def get_handler(self, sch_node_id: int) -> ConfDataHandler:
return self.handlers.get(sch_node_id)
def get_handler_by_pth(self, sch_pth: str) -> ConfDataHandlerBase:
def get_handler_by_pth(self, sch_pth: str) -> ConfDataHandler:
return self.handlers_pth.get(sch_pth)
class StateDataHandlerList:
def __init__(self):
self.handlers = []
self.handlers = [] # type: List[Tuple[SchemaRoute, StateDataHandlerBase]]
def register(self, handler: "StateDataHandlerBase"):
def register(self, handler: StateDataHandlerBase):
saddr = SchemaData.path2route(handler.sch_pth)
self.handlers.append((saddr, handler))
def get_handler(self, sch_pth: str, allow_superior: bool = True) -> Callable:
def get_handler(self, sch_pth: str, allow_superior: bool = True) -> StateDataHandler:
saddr = SchemaData.path2route(sch_pth)
if allow_superior:
while saddr:
......@@ -118,20 +50,10 @@ class StateDataHandlerList:
class OpHandlerList:
def __init__(self):
self.handlers = {} # type: Dict[str, Callable]
self.default_handler = None # type: Callable
self.handlers = {} # type: Dict[str, OpHandler]
def register(self, handler: Callable, op_name: str):
self.handlers[op_name] = handler
def register_default(self, handler: Callable):
self.default_handler = handler
def get_handler(self, op_name: str) -> Callable:
return self.handlers.get(op_name, self.default_handler)
# ---------- Handler list globals ----------
OP_HANDLERS = OpHandlerList()
STATE_DATA_HANDLES = StateDataHandlerList()
CONF_DATA_HANDLES = ConfDataHandlerList()
def get_handler(self, op_name: str) -> OpHandler:
return self.handlers.get(op_name)
......@@ -7,7 +7,7 @@ from enum import Enum
from colorlog import error, warning as warn, info
from urllib.parse import parse_qs
from datetime import datetime
from typing import Dict, List, Optional
from typing import Dict, List, Tuple, Any, Optional, Callable
from yangson.exceptions import YangsonException, NonexistentSchemaNode, SchemaError, SemanticError
from yangson.schemanode import ContainerNode, ListNode, GroupNode, LeafNode
......@@ -15,12 +15,8 @@ from yangson.instance import NonexistentInstance, InstanceValueError, RootNode
from . import config
from .helpers import CertHelpers, DateTimeHelpers, ErrorHelpers, LogHelpers, SSLCertT
from .handler_list import OP_HANDLERS
from .data import (
BaseDatastore,
RpcInfo,
ChangeType,
)
from .journal import RpcInfo
from .data import BaseDatastore, ChangeType
from .errors import (
BackendError,
ConfHandlerFailedError,
......@@ -34,12 +30,22 @@ from .errors import (
)
QueryStrT = Dict[str, List[str]]
HandlerConditionT = Callable[[str, str], bool] # Function(method, path) -> bool
epretty = ErrorHelpers.epretty
debug_httph = LogHelpers.create_module_dbg_logger(__name__)
CT_PLAIN = "text/plain"
CT_YANG_JSON = "application/yang.api+json"
CTYPE_PLAIN = "text/plain"
CTYPE_YANG_JSON = "application/yang.api+json"
ERRTAG_MALFORMED = "malformed-message"
ERRTAG_REQLARGE = "request-too-large"
ERRTAG_OPNOTSUPPORTED = "operation-not-supported"
ERRTAG_OPFAILED = "operation-failed"
ERRTAG_ACCDENIED = "access-denied"
ERRTAG_LOCKDENIED = "lock-denied"
ERRTAG_INVVALUE = "invalid-value"
ERRTAG_EXISTS = "data-exists"
class HttpRequestError(Exception):
......@@ -69,16 +75,6 @@ class HttpStatus(Enum):
return self.value[1]
ERRTAG_MALFORMED = "malformed-message"
ERRTAG_REQLARGE = "request-too-large"
ERRTAG_OPNOTSUPPORTED = "operation-not-supported"
ERRTAG_OPFAILED = "operation-failed"
ERRTAG_ACCDENIED = "access-denied"
ERRTAG_LOCKDENIED = "lock-denied"
ERRTAG_INVVALUE = "invalid-value"
ERRTAG_EXISTS = "data-exists"
class RestconfErrType(Enum):
Transport = "transport"
Rpc = "rpc"
......@@ -101,7 +97,7 @@ class HttpResponse:
else:
response = ""
return cls(status, response.encode(), CT_PLAIN)
return cls(status, response.encode(), CTYPE_PLAIN)
@classmethod
def error(cls, status: HttpStatus, err_type: RestconfErrType, err_tag: str, err_apptag: str=None,
......@@ -146,403 +142,398 @@ class HttpResponse:
}
response = json.dumps(err_template, indent=4)
return cls(status, response.encode(), CT_YANG_JSON)
return cls(status, response.encode(), CTYPE_YANG_JSON)
def unknown_req_handler(headers: OrderedDict, data: Optional[str], client_cert: SSLCertT) -> HttpResponse:
return HttpResponse.error(
HttpStatus.BadRequest,
RestconfErrType.Transport,
ERRTAG_MALFORMED,
"unknown_req_handler"
)
HttpHandlerT = Callable[[Any, OrderedDict, Optional[str], SSLCertT], HttpResponse]
def _get_yl_date() -> str:
try:
yang_lib_date_ts = os.path.getmtime(os.path.join(config.CFG.glob["YANG_LIB_DIR"], "yang-library-data.json"))
yang_lib_date = datetime.fromtimestamp(yang_lib_date_ts).strftime("%Y-%m-%d")
except OSError:
yang_lib_date = None
class HttpHandlerList:
def __init__(self):
self.handlers = [] # type: List[Tuple[HandlerConditionT, HttpHandlerT]]
self.default_handler = None # type: HttpHandlerT
return yang_lib_date
def reg(self, condition: HandlerConditionT, handler: HttpHandlerT):
self.handlers.append((condition, handler))
def reg_default(self, handler: HttpHandlerT):
self.default_handler = handler
def api_root_handler(headers: OrderedDict, data: Optional[str], client_cert: SSLCertT):
# Top level api resource (appendix B.1.1)
def get(self, method: str, path: str) -> HttpHandlerT:
for h in self.handlers:
if h[0](method, path):
return h[1]
top_res = {
"ietf-restconf:restconf": {
"data": {},
"operations": {},
"yang-library-version": _get_yl_date()
}
}
return self.default_handler
response = json.dumps(top_res, indent=4)
return HttpResponse(HttpStatus.Ok, response.encode(), CT_YANG_JSON)
class HttpHandlersImpl:
def __init__(self, ds: BaseDatastore):
self.ds = ds
self.list = HttpHandlerList()
def api_ylv_handler(headers: OrderedDict, data: Optional[str], client_cert: SSLCertT):
ylv = {
"ietf-restconf:yang-library-version": _get_yl_date()
}
api_root = config.CFG.http["API_ROOT"]
api_root_data = config.CFG.api_root_data
api_root_running_data = config.CFG.api_root_running_data
api_root_ylv = config.CFG.api_root_ylv
api_root_ops = config.CFG.api_root_ops
response = json.dumps(ylv, indent=4)
return HttpResponse(HttpStatus.Ok, response.encode(), CT_YANG_JSON)
# RESTCONF API handlers
self.list.reg(lambda m, p: (m == "GET") and (p.startswith(api_root_data)), self.get_api_staging)
self.list.reg(lambda m, p: (m == "GET") and (p.startswith(api_root_running_data)), self.get_api_running)
self.list.reg(lambda m, p: (m == "GET") and (p == api_root_ylv), self.get_api_yl_version)
self.list.reg(lambda m, p: (m == "GET") and (p == api_root), self.get_api_root)
self.list.reg(lambda m, p: (m == "POST") and (p.startswith(api_root_data)), self.post_api)
self.list.reg(lambda m, p: (m == "PUT") and (p.startswith(api_root_data)), self.put_api)
self.list.reg(lambda m, p: (m == "DELETE") and (p.startswith(api_root_data)), self.delete_api)
self.list.reg(lambda m, p: (m == "GET") and (p.startswith(api_root_ops)), self.get_api_op)
self.list.reg(lambda m, p: (m == "POST") and (p.startswith(api_root_ops)), self.post_api_op_call)
self.list.reg(lambda m, p: m == "OPTIONS", self.options_api)
# Static handlers
self.list.reg(lambda m, p: (m == "GET") and not (p.startswith(api_root)), self.get_file)
self.list.reg_default(self.unknown_request)
def _get(ds: BaseDatastore, req_headers: OrderedDict, pth: str, username: str, staging: bool=False) -> HttpResponse:
url_split = pth.split("?")
url_path = url_split[0]
if len(url_split) > 1:
query_string = parse_qs(url_split[1])
else:
query_string = {}
def unknown_request(self, headers: OrderedDict, data: Optional[str], client_cert: SSLCertT) -> HttpResponse:
return HttpResponse.error(
HttpStatus.BadRequest,
RestconfErrType.Transport,
ERRTAG_MALFORMED,
"unknown_request"
)
rpc1 = RpcInfo()
rpc1.username = username
rpc1.path = url_path.rstrip("/")
rpc1.qs = query_string
@staticmethod
def _get_yl_date() -> str:
try:
yang_lib_date_ts = os.path.getmtime(os.path.join(config.CFG.glob["YANG_LIB_DIR"], "yang-library-data.json"))
yang_lib_date = datetime.fromtimestamp(yang_lib_date_ts).strftime("%Y-%m-%d")
except OSError:
yang_lib_date = None
# Skip NACM check for privileged users
if username in config.CFG.nacm["ALLOWED_USERS"]:
rpc1.skip_nacm_check = True
return yang_lib_date
try:
ds.lock_data(username)
http_resp = None
def get_api_root(self, headers: OrderedDict, data: Optional[str], client_cert: SSLCertT):
# Top level api resource (appendix B.1.1)
try:
n = ds.get_node_rpc(rpc1, staging)
except NacmForbiddenError as e:
http_resp = HttpResponse.error(
HttpStatus.Forbidden,
RestconfErrType.Protocol,
ERRTAG_ACCDENIED,
exception=e
)