validators: Add "action" distinction

parent b123e7bf
from .crypto import AVAIL_HASHES, get_common_names, csr_from_str
from .exceptions import RequestConsistencyError, InvalidRedisDataError
AVAIL_FLAGS = {"renew"}
# Available flags for get (cert) request from clients
AVAIL_CERTS_FLAGS = {"renew"}
AVAIL_STATES = {"ok", "fail", "error"}
# Params of auth session saved in Redis, general
SESSION_PARAMS = {
"auth_type",
"nonce",
"digest",
"csr_str",
"action",
"flags",
}
# Params of auth session saved in Redis, action-specific
ACTION_SPEC_SESSION_PARAMS = {
"certs": {"csr_str"},
}
# Params of auth state saved in redis
AUTH_STATE_PARAMS = {
"status",
"message",
}
# Params of request send by clients, general
GENERAL_REQ_PARAMS = {
"type",
"auth_type",
"sid",
"sn",
}
# Params of request send by clients, get
GET_REQ_PARAMS = {
"flags",
}
# Params of request send by clients, get - cert
GET_CERT_REQ_PARAMS = {
"csr",
"flags",
}
# Params of request send by clients, auth
AUTH_REQ_PARAMS = {
"digest",
}
......@@ -76,9 +92,9 @@ def validate_csr(csr, sn):
validate_csr_signature(csr)
def validate_flags(flags):
def validate_certs_flags(flags):
for flag in flags:
if flag not in AVAIL_FLAGS:
if flag not in AVAIL_CERTS_FLAGS:
raise RequestConsistencyError("Flag not available: {}".format(flag))
......@@ -110,9 +126,11 @@ def validate_auth_type(auth_type):
def check_session(session):
if type(session) is not dict:
raise InvalidRedisDataError("Must be a dict!")
for param in SESSION_PARAMS:
for param in SESSION_PARAMS | ACTION_SPEC_SESSION_PARAMS.get(session.get("action"), set()):
if param not in session:
raise InvalidRedisDataError("Parameter {} missing".format(param))
raise InvalidRedisDataError("Parameter {} missing in session".format(param))
if session["action"] not in ACTION_SPEC_SESSION_PARAMS:
raise InvalidRedisDataError("Invalid action '{}' in session".format(param))
def validate_auth_state(auth_state):
......@@ -131,7 +149,7 @@ def check_params_exist(req, params):
raise RequestConsistencyError("'{}' is missing in the request".format(param))
def check_request(req):
def check_request(req, action):
if type(req) is not dict:
raise RequestConsistencyError("Request not a valid JSON with correct content type")
check_params_exist(req, GENERAL_REQ_PARAMS)
......@@ -140,13 +158,15 @@ def check_request(req):
validate_sn(req["sn"])
validate_sid(req["sid"])
if req["type"] == "get_cert" or req["type"] == "get":
check_params_exist(req, GET_CERT_REQ_PARAMS)
validate_csr(req["csr"], req["sn"])
validate_flags(req["flags"])
if req["type"] == "get" or req["type"] == "get_cert":
check_params_exist(req, GET_REQ_PARAMS)
if action == "certs":
check_params_exist(req, GET_CERT_REQ_PARAMS)
validate_csr(req["csr"], req["sn"])
validate_certs_flags(req["flags"])
if "renew" in req["flags"] and req["sid"]:
raise RequestConsistencyError("Renew allowed only in the first request")
if "renew" in req["flags"] and req["sid"]:
raise RequestConsistencyError("Renew allowed only in the first request")
elif req["type"] == "auth":
check_params_exist(req, AUTH_REQ_PARAMS)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment