Transactions #2

parent 74daed17
Pipeline #426 skipped
......@@ -10,6 +10,7 @@ CONFIG_HTTP = {
"DOC_ROOT": "doc-root",
"DOC_DEFAULT_NAME": "index.html",
"API_ROOT": "/restconf",
"API_ROOT_STAGING": "/restconf_staging",
"SERVER_NAME": "hyper-h2",
"PORT": 8443,
......@@ -30,12 +31,14 @@ CONFIG = {
NACM_ADMINS = CONFIG["NACM"]["ALLOWED_USERS"]
API_ROOT_data = os.path.join(CONFIG_HTTP["API_ROOT"], "data")
API_ROOT_STAGING_data = os.path.join(CONFIG_HTTP["API_ROOT_STAGING"], "data")
API_ROOT_ops = os.path.join(CONFIG_HTTP["API_ROOT"], "operations")
def load_config(filename: str):
global NACM_ADMINS
global API_ROOT_data
global API_ROOT_STAGING_data
global API_ROOT_ops
try:
......@@ -53,6 +56,7 @@ def load_config(filename: str):
# Shortcuts
NACM_ADMINS = CONFIG["NACM"]["ALLOWED_USERS"]
API_ROOT_data = os.path.join(CONFIG_HTTP["API_ROOT"], "data")
API_ROOT_STAGING_data = os.path.join(CONFIG_HTTP["API_ROOT_STAGING"], "data")
API_ROOT_ops = os.path.join(CONFIG_HTTP["API_ROOT"], "operations")
......
This diff is collapsed.
......@@ -124,6 +124,7 @@ def create_get_api(ds: BaseDatastore):
def get_file(prot: "H2Protocol", stream_id: int, headers: OrderedDict):
# Ordinary file on filesystem
error("ordfile")
url_split = headers[":path"].split("?")
url_path = url_split[0]
......@@ -169,6 +170,84 @@ def get_file(prot: "H2Protocol", stream_id: int, headers: OrderedDict):
prot.conn.send_data(stream_id, bytes(), end_stream=True)
def _get_staging(prot: "H2Protocol", stream_id: int, ds: BaseDatastore, pth: str):
username = CertHelpers.get_field(prot.client_cert, "emailAddress")
url_split = pth.split("?")
url_path = url_split[0]
if len(url_split) > 1:
query_string = parse_qs(url_split[1])
else:
query_string = {}
rpc1 = RpcInfo()
rpc1.username = username
rpc1.path = url_path
rpc1.qs = query_string
try:
ds.lock_data(username)
n = ds.get_node_rpc(rpc1)
response = json.dumps(n.value, indent=4) + "\n"
response_bytes = response.encode()
response_headers = [
(":status", "200"),
("server", CONFIG_HTTP["SERVER_NAME"])
]
try:
lm_time = DateTimeHelpers.to_httpdate_str(n.value.last_modified, CONFIG_GLOBAL["TIMEZONE"])
response_headers.append(("Last-Modified", lm_time))
except AttributeError:
# Only arrays and objects have last_modified attribute
pass
response_headers.append(("ETag", hash(n.value)))
response_headers.append(("Content-Type", "application/yang.api+json"))
response_headers.append(("content-length", len(response_bytes)))
prot.conn.send_headers(stream_id, response_headers)
prot.conn.send_data(stream_id, response_bytes, end_stream=True)
except DataLockError as e:
warn(epretty(e))
prot.send_empty(stream_id, "500", "Internal Server Error")
except NacmForbiddenError as e:
warn(epretty(e))
prot.send_empty(stream_id, "403", "Forbidden")
except NonexistentSchemaNode as e:
warn(epretty(e))
prot.send_empty(stream_id, "404", "Not Found")
except NonexistentInstance as e:
warn(epretty(e))
prot.send_empty(stream_id, "404", "Not Found")
except InstanceTypeError as e:
warn(epretty(e))
prot.send_empty(stream_id, "400", "Bad Request")
finally:
ds.unlock_data()
def create_get_staging_api(ds: BaseDatastore):
def get_staging_api_closure(prot: "H2Protocol", stream_id: int, headers: OrderedDict):
# api request
info(("api_get: " + headers[":path"]))
api_pth = headers[":path"][len(API_ROOT_data):]
ns = DataHelpers.path_first_ns(api_pth)
if ns == "ietf-netconf-acm":
username = CertHelpers.get_field(prot.client_cert, "emailAddress")
if username not in NACM_ADMINS:
warn(username + " not allowed to access NACM data")
prot.send_empty(stream_id, "403", "Forbidden")
else:
_get_staging(prot, stream_id, ds.nacm.nacm_ds, api_pth)
else:
_get_staging(prot, stream_id, ds, api_pth)
return get_staging_api_closure
def _post(prot: "H2Protocol", data: bytes, stream_id: int, ds: BaseDatastore, pth: str):
data_str = data.decode("utf-8")
debug("HTTP data received: " + data_str)
......@@ -187,9 +266,9 @@ def _post(prot: "H2Protocol", data: bytes, stream_id: int, ds: BaseDatastore, pt
rpc1.path = url_path
try:
json_data = json.loads(data_str)
json_data = json.loads(data_str) if len(data_str) > 0 else {}
except ValueError as e:
error("Invalid HTTP data: " + str(e))
error("Failed to parse POST data: " + epretty(e))
prot.send_empty(stream_id, "400", "Bad Request")
return
......@@ -197,8 +276,8 @@ def _post(prot: "H2Protocol", data: bytes, stream_id: int, ds: BaseDatastore, pt
ds.lock_data(username)
ins_pos = (query_string.get("insert") or [None])[0]
point = (query_string.get("point") or [None])[0]
ds.create_node_rpc(ds.get_data_root(), rpc1, json_data, insert=ins_pos, point=point)
#ds.add_to_journal_rpc(ChangeType.CREATE, rpc1, json_data)
new_root = ds.create_node_rpc(ds.get_data_root(), rpc1, json_data, insert=ins_pos, point=point)
ds.add_to_journal_rpc(ChangeType.CREATE, rpc1, json_data, new_root)
prot.send_empty(stream_id, "201", "Created")
except DataLockError as e:
warn(epretty(e))
......@@ -259,13 +338,17 @@ def _put(prot: "H2Protocol", data: bytes, stream_id: int, ds: BaseDatastore, pth
rpc1.username = username
rpc1.path = url_path
json_data = json.loads(data_str)
try:
json_data = json.loads(data_str) if len(data_str) > 0 else {}
except ValueError as e:
error("Failed to parse PUT data: " + epretty(e))
prot.send_empty(stream_id, "400", "Bad Request")
return
try:
ds.lock_data(username)
nr = ds.update_node_rpc(ds.get_data_root(), rpc1, json_data)
ds._data = nr
#ds.add_to_journal_rpc(ChangeType.REPLACE, rpc1, json_data)
new_root = ds.update_node_rpc(ds.get_data_root(), rpc1, json_data)
ds.add_to_journal_rpc(ChangeType.REPLACE, rpc1, json_data, new_root)
prot.send_empty(stream_id, "204", "No Content", False)
except DataLockError as e:
warn(epretty(e))
......@@ -316,8 +399,8 @@ def _delete(prot: "H2Protocol", stream_id: int, ds: BaseDatastore, pth: str):
try:
ds.lock_data(username)
# ds.delete_node_rpc(rpc1)
ds.add_to_journal_rpc(ChangeType.DELETE, rpc1, None)
new_root = ds.delete_node_rpc(ds.get_data_root(), rpc1)
ds.add_to_journal_rpc(ChangeType.DELETE, rpc1, None, new_root)
prot.send_empty(stream_id, "204", "No Content", False)
except DataLockError as e:
warn(epretty(e))
......@@ -376,9 +459,9 @@ def create_api_op(ds: BaseDatastore):
username = CertHelpers.get_field(prot.client_cert, "emailAddress")
try:
json_data = json.loads(data_str)
json_data = json.loads(data_str) if len(data_str) > 0 else {}
except ValueError as e:
error("Invalid HTTP data: " + str(e))
error("Failed to parse POST data: " + epretty(e))
prot.send_empty(stream_id, "400", "Bad Request")
return
......
......@@ -140,7 +140,6 @@ class DataRuleTree:
ind_str += "+--"
# :o) :o) :o)
for vb in vbars:
isl = list(ind_str)
isl[vb * 3] = "|"
......@@ -316,12 +315,12 @@ class UserNacm:
# config.update() always creates new structures instead of modifying ones
config.internal_data_lock.release()
def check_data_node_path(self, ii: InstanceIdentifier, access: Permission, out_matching_rule: List[NacmRule]=None) -> Action:
def check_data_node_path(self, root: InstanceNode, ii: InstanceIdentifier, access: Permission, out_matching_rule: List[NacmRule]=None) -> Action:
if not self.nacm_enabled:
return Action.PERMIT
retval = None
data_node = self.data.get_data_root() # type: InstanceNode
data_node = root # type: InstanceNode
nl = self.rule_tree.root
for isel in ii:
......@@ -361,7 +360,7 @@ class UserNacm:
return retval
def _check_data_read_path(self, node: InstanceNode, ii: InstanceIdentifier) -> InstanceNode:
def _check_data_read_path(self, node: InstanceNode, root: InstanceNode, ii: InstanceIdentifier) -> InstanceNode:
# node = self.data.get_node(ii)
if isinstance(node.value, ObjectValue):
......@@ -374,12 +373,12 @@ class UserNacm:
mii.append(nsel)
debug("checking mii {}".format(mii))
if self.check_data_node_path(mii, Permission.NACM_ACCESS_READ) == Action.DENY:
if self.check_data_node_path(root, mii, Permission.NACM_ACCESS_READ) == Action.DENY:
# info("Pruning node {} {}".format(id(node.value[child_key]), node.value[child_key]))
debug("Pruning node {}".format(mii))
node = node.remove_member(child_key)
else:
node = self._check_data_read_path(m, mii).up()
node = self._check_data_read_path(m, root, mii).up()
elif isinstance(node.value, ArrayValue):
# print("array: {}".format(node.value))
i = 0
......@@ -391,22 +390,22 @@ class UserNacm:
eii.append(nsel)
debug("checking eii {}".format(eii))
if self.check_data_node_path(eii, Permission.NACM_ACCESS_READ) == Action.DENY:
if self.check_data_node_path(root, eii, Permission.NACM_ACCESS_READ) == Action.DENY:
debug("Pruning node {} {}".format(id(node.value[i]), node.value[i]))
node = node.remove_entry(i)
arr_len -= 1
else:
i += 1
node = self._check_data_read_path(e, eii).up()
node = self._check_data_read_path(e, root, eii).up()
return node
def check_data_read_path(self, ii: InstanceIdentifier) -> InstanceNode:
n = self.data.get_node(ii)
def check_data_read_path(self, root: InstanceNode, ii: InstanceIdentifier) -> InstanceNode:
n = self.data.get_node(root, ii)
if not self.nacm_enabled:
return n
else:
return self._check_data_read_path(n, ii)
return self._check_data_read_path(n, root, ii)
def check_rpc_name(self, rpc_name: str, out_matching_rule: List[NacmRule] = None) -> Action:
if not self.nacm_enabled:
......@@ -459,13 +458,14 @@ def test():
for test_path in test_paths:
info("Testing path \"{}\"".format(test_path[0]))
datanode = data.get_node_path(test_path[0], PathFormat.XPATH)
ii = data.parse_ii(test_path[0], PathFormat.XPATH)
datanode = data.get_node(data.get_data_root(), ii)
if datanode:
info("Node found")
debug("Node contents: {}".format(datanode.value))
test_ii = data.parse_ii(test_path[0], PathFormat.XPATH)
rule = []
action = nacm.get_user_nacm(test_user).check_data_node_path(test_ii, test_path[1], out_matching_rule=rule)
action = nacm.get_user_nacm(test_user).check_data_node_path(data.get_data_root(), test_ii, test_path[1], out_matching_rule=rule)
if action == test_path[2]:
info("Action = {}, OK ({})\n".format(action.name, rule[0].name if len(rule) > 0 else "default"))
else:
......@@ -476,7 +476,7 @@ def test():
test_ii2 = data.parse_ii("/dns-server:dns-server/zones/zone[domain='example.com']", PathFormat.XPATH)
info("Reading: " + str(test_ii2))
res = nacm.get_user_nacm(test_user).check_data_read_path(test_ii2)
res = nacm.get_user_nacm(test_user).check_data_read_path(data.get_data_root(), test_ii2)
res = json.dumps(res.value, indent=4, sort_keys=True)
print("Result =")
print(res)
......
......@@ -8,7 +8,7 @@ from h2.connection import H2Connection
from h2.events import DataReceived, RequestReceived, RemoteSettingsChanged
import jetconf.http_handlers as handlers
from .config import CONFIG_HTTP, API_ROOT_data, API_ROOT_ops
from .config import CONFIG_HTTP, API_ROOT_data, API_ROOT_STAGING_data, API_ROOT_ops
from .data import BaseDatastore
......@@ -74,8 +74,12 @@ class H2Protocol(asyncio.Protocol):
# Handle immediately, no need to wait for incoming data
self.handle_get_delete(headers, event.stream_id)
elif http_method in ("PUT", "POST"):
# Store headers and wait for data upload
self.reqs_waiting_upload[event.stream_id] = headers
if headers.get("content-length"):
# Store headers and wait for data upload
self.reqs_waiting_upload[event.stream_id] = headers
else:
# Handle immediately, incoming data empty
self.handle_put_post(headers, event.stream_id, bytes())
else:
warn("Unknown http method \"{}\"".format(headers[":method"]))
elif isinstance(event, DataReceived):
......@@ -89,6 +93,9 @@ class H2Protocol(asyncio.Protocol):
except KeyError:
return
self.handle_put_post(headers, stream_id, data)
def handle_put_post(self, headers: OrderedDict, stream_id: int, data: bytes):
# Handle PUT, POST
url_split = headers[":path"].split("?")
url_path = url_split[0]
......@@ -138,6 +145,7 @@ class RestServer:
# Register HTTP handlers
api_get_root = handlers.api_root_handler
api_get = handlers.create_get_api(datastore)
api_get_staging = handlers.create_get_staging_api(datastore)
api_post = handlers.create_post_api(datastore)
api_put = handlers.create_put_api(datastore)
api_delete = handlers.create_api_delete(datastore)
......@@ -145,6 +153,7 @@ class RestServer:
self.http_handlers.register_handler(lambda m, p: (m == "GET") and (p == CONFIG_HTTP["API_ROOT"]), api_get_root)
self.http_handlers.register_handler(lambda m, p: (m == "GET") and (p.startswith(API_ROOT_data)), api_get)
self.http_handlers.register_handler(lambda m, p: (m == "GET") and (p.startswith(API_ROOT_STAGING_data)), api_get_staging)
self.http_handlers.register_handler(lambda m, p: (m == "POST") and (p.startswith(API_ROOT_data)), api_post)
self.http_handlers.register_handler(lambda m, p: (m == "PUT") and (p.startswith(API_ROOT_data)), api_put)
self.http_handlers.register_handler(lambda m, p: (m == "DELETE") and (p.startswith(API_ROOT_data)), api_delete)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment