NACM is never evaluated for privileged users

parent 90d918d2
...@@ -502,7 +502,7 @@ class BaseDatastore: ...@@ -502,7 +502,7 @@ class BaseDatastore:
n = n.add_defaults() n = n.add_defaults()
# Evaluate NACM if required # Evaluate NACM if required
if self.nacm: if self.nacm and not rpc.skip_nacm_check:
nrpc = self.nacm.get_user_rules(rpc.username) nrpc = self.nacm.get_user_rules(rpc.username)
if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_READ) == Action.DENY: if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_READ) == Action.DENY:
raise NacmForbiddenError() raise NacmForbiddenError()
...@@ -578,7 +578,7 @@ class BaseDatastore: ...@@ -578,7 +578,7 @@ class BaseDatastore:
raise NacmForbiddenError(rpc.username + " not allowed to modify NACM data") raise NacmForbiddenError(rpc.username + " not allowed to modify NACM data")
# Evaluate NACM # Evaluate NACM
if self.nacm: if self.nacm and not rpc.skip_nacm_check:
nrpc = self.nacm.get_user_rules(rpc.username) nrpc = self.nacm.get_user_rules(rpc.username)
if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_CREATE) == Action.DENY: if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_CREATE) == Action.DENY:
raise NacmForbiddenError() raise NacmForbiddenError()
...@@ -713,7 +713,7 @@ class BaseDatastore: ...@@ -713,7 +713,7 @@ class BaseDatastore:
raise NacmForbiddenError(rpc.username + " not allowed to modify NACM data") raise NacmForbiddenError(rpc.username + " not allowed to modify NACM data")
# Evaluate NACM # Evaluate NACM
if self.nacm: if self.nacm and not rpc.skip_nacm_check:
nrpc = self.nacm.get_user_rules(rpc.username) nrpc = self.nacm.get_user_rules(rpc.username)
if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_UPDATE) == Action.DENY: if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_UPDATE) == Action.DENY:
raise NacmForbiddenError() raise NacmForbiddenError()
...@@ -746,7 +746,7 @@ class BaseDatastore: ...@@ -746,7 +746,7 @@ class BaseDatastore:
raise NacmForbiddenError(rpc.username + " not allowed to modify NACM data") raise NacmForbiddenError(rpc.username + " not allowed to modify NACM data")
# Evaluate NACM # Evaluate NACM
if self.nacm: if self.nacm and not rpc.skip_nacm_check:
nrpc = self.nacm.get_user_rules(rpc.username) nrpc = self.nacm.get_user_rules(rpc.username)
if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_DELETE) == Action.DENY: if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_DELETE) == Action.DENY:
raise NacmForbiddenError() raise NacmForbiddenError()
...@@ -854,7 +854,7 @@ class BaseDatastore: ...@@ -854,7 +854,7 @@ class BaseDatastore:
raise ValueError("Passed URI does not point to List") raise ValueError("Passed URI does not point to List")
else: else:
# External operation defined in data model # External operation defined in data model
if self.nacm and (not rpc.skip_nacm_check): if self.nacm and not rpc.skip_nacm_check:
nrpc = self.nacm.get_user_rules(rpc.username) nrpc = self.nacm.get_user_rules(rpc.username)
if nrpc.check_rpc_name(rpc.op_name) == Action.DENY: if nrpc.check_rpc_name(rpc.op_name) == Action.DENY:
raise NacmForbiddenError( raise NacmForbiddenError(
......
...@@ -200,6 +200,10 @@ def _get(ds: BaseDatastore, req_headers: OrderedDict, pth: str, username: str, s ...@@ -200,6 +200,10 @@ def _get(ds: BaseDatastore, req_headers: OrderedDict, pth: str, username: str, s
rpc1.path = url_path.rstrip("/") rpc1.path = url_path.rstrip("/")
rpc1.qs = query_string rpc1.qs = query_string
# Skip NACM check for privileged users
if username in CONFIG_NACM["ALLOWED_USERS"]:
rpc1.skip_nacm_check = True
try: try:
ds.lock_data(username) ds.lock_data(username)
http_resp = None http_resp = None
...@@ -364,6 +368,10 @@ def _post(ds: BaseDatastore, pth: str, username: str, data: str) -> HttpResponse ...@@ -364,6 +368,10 @@ def _post(ds: BaseDatastore, pth: str, username: str, data: str) -> HttpResponse
rpc1.path = url_path.rstrip("/") rpc1.path = url_path.rstrip("/")
rpc1.qs = query_string rpc1.qs = query_string
# Skip NACM check for privileged users
if username in CONFIG_NACM["ALLOWED_USERS"]:
rpc1.skip_nacm_check = True
try: try:
json_data = json.loads(data) if len(data) > 0 else {} json_data = json.loads(data) if len(data) > 0 else {}
except ValueError as e: except ValueError as e:
...@@ -452,6 +460,10 @@ def _put(ds: BaseDatastore, pth: str, username: str, data: str) -> HttpResponse: ...@@ -452,6 +460,10 @@ def _put(ds: BaseDatastore, pth: str, username: str, data: str) -> HttpResponse:
rpc1.username = username rpc1.username = username
rpc1.path = url_path.rstrip("/") rpc1.path = url_path.rstrip("/")
# Skip NACM check for privileged users
if username in CONFIG_NACM["ALLOWED_USERS"]:
rpc1.skip_nacm_check = True
try: try:
json_data = json.loads(data) if len(data) > 0 else {} json_data = json.loads(data) if len(data) > 0 else {}
except ValueError as e: except ValueError as e:
...@@ -524,6 +536,10 @@ def _delete(ds: BaseDatastore, pth: str, username: str) -> HttpResponse: ...@@ -524,6 +536,10 @@ def _delete(ds: BaseDatastore, pth: str, username: str) -> HttpResponse:
rpc1.username = username rpc1.username = username
rpc1.path = url_path.rstrip("/") rpc1.path = url_path.rstrip("/")
# Skip NACM check for privileged users
if username in CONFIG_NACM["ALLOWED_USERS"]:
rpc1.skip_nacm_check = True
try: try:
ds.lock_data(username) ds.lock_data(username)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment