Experimental transactions implementation

parent 27e7a4e3
......@@ -27,6 +27,12 @@ class PathFormat(Enum):
XPATH = 1
class ChangeType(Enum):
CREATE = 0,
REPLACE = 1,
DELETE = 2
class NacmForbiddenError(Exception):
def __init__(self, msg="Access to data node rejected by NACM", rule=None):
self.msg = msg
......@@ -85,7 +91,7 @@ class BaseDataListener:
return self.__class__.__name__ + ": listening at " + str(self.schema_paths)
class Rpc:
class RpcInfo:
def __init__(self):
self.username = None # type: str
self.path = None # type: str
......@@ -96,6 +102,22 @@ class Rpc:
self.op_input_args = None # type: ObjectValue
class DataChange:
def __init__(self, change_type: ChangeType, rpc_info: RpcInfo, data: Any):
self.change_type = change_type
self.rpc_info = rpc_info
self.data = data
class ChangeList:
def __init__(self, changelist_name: str):
self.changelist_name = changelist_name
self.journal = [] # type: List[DataChange]
def add(self, change: DataChange):
self.journal.append(change)
class BaseDatastore:
def __init__(self, dm: DataModel, name: str=""):
self.name = name
......@@ -104,6 +126,7 @@ class BaseDatastore:
self._dm = dm # type: DataModel
self._data_lock = Lock()
self._lock_username = None # type: str
self._usr_changelist = {} # type: Dict[str, List[ChangeList]]
# Register NACM module to datastore
def register_nacm(self, nacm_config: "NacmConfig"):
......@@ -153,7 +176,7 @@ class BaseDatastore:
return n
# Get data node, evaluate NACM if required
def get_node_rpc(self, rpc: Rpc) -> Instance:
def get_node_rpc(self, rpc: RpcInfo) -> Instance:
ii = self.parse_ii(rpc.path, rpc.path_format)
root = self._data
......@@ -182,7 +205,7 @@ class BaseDatastore:
return n
# Create new data node
def create_node_rpc(self, rpc: Rpc, value: Any, insert=None, point=None):
def create_node_rpc(self, root: Instance, rpc: RpcInfo, value: Any, insert=None, point=None) -> Instance:
# Rest-like version
# ii = self.parse_ii(rpc.path, rpc.path_format)
# n = self._data.goto(ii)
......@@ -238,7 +261,7 @@ class BaseDatastore:
# Restconf draft compliant version
ii = self.parse_ii(rpc.path, rpc.path_format)
n = self._data.goto(ii)
n = root.goto(ii)
new_n = n
if self.nacm:
......@@ -311,13 +334,13 @@ class BaseDatastore:
else:
raise InstanceAlreadyPresent()
self._data = new_n.top()
self.notify_edit(ii)
return new_n.top()
# Update already existing data node
def update_node_rpc(self, rpc: Rpc, value: Any):
def update_node_rpc(self, root: Instance, rpc: RpcInfo, value: Any) -> Instance:
ii = self.parse_ii(rpc.path, rpc.path_format)
n = self._data.goto(ii)
n = root.goto(ii)
if self.nacm:
nrpc = self.nacm.get_user_nacm(rpc.username)
......@@ -329,14 +352,14 @@ class BaseDatastore:
new_value = data_doc_inst.goto(ii).value
new_n = n.update(new_value)
self._data = new_n.top()
self.notify_edit(ii)
return new_n.top()
# Delete data node
def delete_node_rpc(self, rpc: Rpc, insert=None, point=None):
def delete_node_rpc(self, root: Instance, rpc: RpcInfo) -> Instance:
ii = self.parse_ii(rpc.path, rpc.path_format)
n = self._data.goto(ii)
n = root.goto(ii)
n_parent = n.up()
new_n = n_parent
last_isel = ii[-1]
......@@ -357,29 +380,74 @@ class BaseDatastore:
else:
raise InstanceTypeError(n, "Invalid target node type")
self._data = new_n.top()
return new_n.top()
# Invoke an operation
def invoke_op_rpc(self, rpc: Rpc) -> ObjectValue:
def invoke_op_rpc(self, rpc: RpcInfo) -> ObjectValue:
if self.nacm and (not rpc.skip_nacm_check):
nrpc = self.nacm.get_user_nacm(rpc.username)
if nrpc.check_rpc_name(rpc.op_name) == Action.DENY:
raise NacmForbiddenError("Op \"{}\" invocation denied for user \"{}\"".format(rpc.op_name, rpc.username))
op_handler = OP_HANDLERS.get_handler(rpc.op_name)
if op_handler is None:
raise NoHandlerForOpError()
ret_data = {}
if rpc.op_name == "conf-start":
chl = ChangeList(rpc.op_input_args["name"])
if self._usr_changelist.get(rpc.username) is None:
self._usr_changelist[rpc.username] = []
self._usr_changelist[rpc.username].append(chl)
ret_data = {"status": "OK"}
elif rpc.op_name == "conf-list":
chls = self._usr_changelist.get(rpc.username)
chl_json = {}
for chl in chls:
changes = []
for ch in chl.journal:
changes.append(
[ch.change_type.name, ch.rpc_info.path]
)
chl_json[chl.changelist_name] = changes
ret_data = \
{
"status": "OK",
"changelists": chl_json
}
elif rpc.op_name == "conf-drop":
chls = self._usr_changelist.get(rpc.username)
if chls is not None:
chls.pop()
if len(chls) == 0:
del self._usr_changelist[rpc.username]
ret_data = {"status": "OK"}
elif rpc.op_name == "conf-commit":
pass
else:
op_handler = OP_HANDLERS.get_handler(rpc.op_name)
if op_handler is None:
raise NoHandlerForOpError()
# Print operation input schema
# sn = self.get_schema_node(rpc.path)
# sn_input = sn.get_child("input")
# if sn_input is not None:
# print("RPC input schema:")
# print(sn_input._ascii_tree(""))
# Print operation input schema
# sn = self.get_schema_node(rpc.path)
# sn_input = sn.get_child("input")
# if sn_input is not None:
# print("RPC input schema:")
# print(sn_input._ascii_tree(""))
ret_data = op_handler(rpc.op_input_args)
ret_data = op_handler(rpc.op_input_args)
return ret_data
def add_to_journal_rpc(self, type: ChangeType, rpc: RpcInfo, value: Any):
usr_chss = self._usr_changelist.get(rpc.username)
if usr_chss is not None:
usr_chs = usr_chss[-1]
usr_chs.add(DataChange(type, rpc, value))
else:
raise NoHandlerError("No active changelist for user \"{}\"".format(rpc.username))
# Locks datastore data
def lock_data(self, username: str = None, blocking: bool=True):
ret = self._data_lock.acquire(blocking=blocking, timeout=1)
......@@ -428,7 +496,7 @@ def test():
data = JsonDatastore(datamodel)
data.load("jetconf/example-data.json")
rpc = Rpc()
rpc = RpcInfo()
rpc.username = "dominik"
rpc.path = "/dns-server:dns-server/zones/zone[domain='example.com']/query-module"
rpc.path_format = PathFormat.XPATH
......
......@@ -11,7 +11,7 @@ from yangson.instance import NonexistentInstance, InstanceTypeError, DuplicateMe
from .config import CONFIG_GLOBAL, CONFIG_HTTP, NACM_ADMINS, API_ROOT_data, API_ROOT_ops
from .helpers import CertHelpers, DataHelpers, DateTimeHelpers, ErrorHelpers
from .data import BaseDatastore, Rpc, DataLockError, NacmForbiddenError, NoHandlerForOpError, InstanceAlreadyPresent
from .data import BaseDatastore, RpcInfo, DataLockError, NacmForbiddenError, NoHandlerForOpError, InstanceAlreadyPresent, ChangeType
QueryStrT = Dict[str, List[str]]
epretty = ErrorHelpers.epretty
......@@ -53,7 +53,7 @@ def _get(prot: "H2Protocol", stream_id: int, ds: BaseDatastore, pth: str):
else:
query_string = {}
rpc1 = Rpc()
rpc1 = RpcInfo()
rpc1.username = username
rpc1.path = url_path
rpc1.qs = query_string
......@@ -182,7 +182,7 @@ def _post(prot: "H2Protocol", data: bytes, stream_id: int, ds: BaseDatastore, pt
username = CertHelpers.get_field(prot.client_cert, "emailAddress")
rpc1 = Rpc()
rpc1 = RpcInfo()
rpc1.username = username
rpc1.path = url_path
......@@ -197,7 +197,8 @@ def _post(prot: "H2Protocol", data: bytes, stream_id: int, ds: BaseDatastore, pt
ds.lock_data(username)
ins_pos = (query_string.get("insert") or [None])[0]
point = (query_string.get("point") or [None])[0]
ds.create_node_rpc(rpc1, json_data, insert=ins_pos, point=point)
# ds.create_node_rpc(rpc1, json_data, insert=ins_pos, point=point)
ds.add_to_journal_rpc(ChangeType.CREATE, rpc1, json_data)
prot.send_empty(stream_id, "201", "Created")
except DataLockError as e:
warn(epretty(e))
......@@ -254,7 +255,7 @@ def _put(prot: "H2Protocol", data: bytes, stream_id: int, ds: BaseDatastore, pth
username = CertHelpers.get_field(prot.client_cert, "emailAddress")
rpc1 = Rpc()
rpc1 = RpcInfo()
rpc1.username = username
rpc1.path = url_path
......@@ -262,7 +263,8 @@ def _put(prot: "H2Protocol", data: bytes, stream_id: int, ds: BaseDatastore, pth
try:
ds.lock_data(username)
ds.update_node_rpc(rpc1, json_data)
# ds.update_node_rpc(rpc1, json_data)
ds.add_to_journal_rpc(ChangeType.REPLACE, rpc1, json_data)
prot.send_empty(stream_id, "204", "No Content", False)
except DataLockError as e:
warn(epretty(e))
......@@ -307,13 +309,14 @@ def _delete(prot: "H2Protocol", stream_id: int, ds: BaseDatastore, pth: str):
username = CertHelpers.get_field(prot.client_cert, "emailAddress")
rpc1 = Rpc()
rpc1 = RpcInfo()
rpc1.username = username
rpc1.path = url_path
try:
ds.lock_data(username)
ds.delete_node_rpc(rpc1)
# ds.delete_node_rpc(rpc1)
ds.add_to_journal_rpc(ChangeType.DELETE, rpc1, None)
prot.send_empty(stream_id, "204", "No Content", False)
except DataLockError as e:
warn(epretty(e))
......@@ -380,7 +383,7 @@ def create_api_op(ds: BaseDatastore):
input_args = json_data.get(ns + ":input")
rpc1 = Rpc()
rpc1 = RpcInfo()
rpc1.username = username
rpc1.path = api_pth
rpc1.op_name = op_name
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment