Messages - Distinguish fail and error states

parent 350bdd03
This diff is collapsed.
......@@ -2,13 +2,29 @@ class CertAPIError(Exception):
pass
class InvalidParamError(CertAPIError):
class CertAPIClientError(CertAPIError):
pass
class InvalidSessionError(CertAPIError):
class ClientDataError(CertAPIClientError):
pass
class InvalidAuthStateError(CertAPIError):
class ClientAuthError(CertAPIClientError):
pass
class InvalidParamError(ClientDataError):
pass
class CertAPISystemError(CertAPIError):
pass
class InvalidSessionError(CertAPISystemError):
pass
class InvalidAuthStateError(CertAPISystemError):
pass
from .crypto import AVAIL_HASHES, get_common_names, csr_from_str
from .exceptions import InvalidParamError, InvalidAuthStateError, InvalidSessionError
from .exceptions import InvalidParamError, InvalidAuthStateError, InvalidSessionError, ClientDataError
AVAIL_REQUEST_TYPES = {"get_cert", "auth"}
AVAIL_FLAGS = {"renew"}
AVAIL_STATES = {"ok", "fail", "error"}
SESSION_PARAMS = {
"auth_type",
......@@ -13,6 +14,20 @@ SESSION_PARAMS = {
}
AUTH_STATE_PARAMS = {
"status",
"message",
}
GENERAL_REQ_PARAMS = {
"type",
"auth_type",
"sid",
"sn",
}
GET_CERT_REQ_PARAMS = {
"csr",
"flags",
}
AUTH_REQ_PARAMS = {
"digest",
}
......@@ -99,14 +114,49 @@ def validate_auth_type(auth_type):
def check_session(session):
if type(session) is not dict:
raise InvalidSessionError("Must be a dict!")
for param in SESSION_PARAMS:
if param not in session:
raise InvalidSessionError(param)
raise InvalidSessionError("{} missing in session".format(param))
def check_auth_state(auth_state):
if len(auth_state) != len(AUTH_STATE_PARAMS):
raise InvalidAuthStateError()
for param in auth_state:
if param not in AUTH_STATE_PARAMS:
raise InvalidAuthStateError()
for param in AUTH_STATE_PARAMS:
if param not in auth_state:
raise InvalidAuthStateError("{} missing in auth_state".format(param))
if auth_state["status"] not in AVAIL_STATES:
raise InvalidAuthStateError("Invalid status ({}) in auth_state".format(auth_state["status"]))
def check_params_exist(req, params):
for param in GENERAL_REQ_PARAMS:
if param not in req:
raise ClientDataError("'{}' is missing in the request".format(param))
def check_request(req):
if type(req) is not dict:
raise ClientDataError("Request not a valid JSON with correct content type")
check_params_exist(req, GENERAL_REQ_PARAMS)
validate_req_type(req["type"])
validate_auth_type(req["auth_type"])
validate_sn = sn_validators[req["auth_type"]]
validate_sn(req["sn"])
validate_sid(req["sid"])
if req["type"] == "get_cert":
if len(req) > (len(GENERAL_REQ_PARAMS) + len(GET_CERT_REQ_PARAMS)):
raise ClientDataError("Too much parameters in request")
check_params_exist(req, AUTH_REQ_PARAMS)
validate_csr(req["csr"], req["sn"])
validate_flags(req["flags"])
if "renew" in req["flags"] and req["sid"]:
raise InvalidParamError("Renew allowed only in the first request")
elif req["type"] == "auth":
if len(req) > (len(GENERAL_REQ_PARAMS) + len(AUTH_REQ_PARAMS)):
raise ClientDataError("Too much parameters in request")
check_params_exist(req, GET_CERT_REQ_PARAMS)
validate_digest(req["digest"])
......@@ -84,8 +84,8 @@ def test_good_sid_set_auth_in_progress(client, good_data, redis_mock):
def test_good_sid_set_auth_failed(client, good_data, redis_mock):
redis_mock().exists.return_value = True # Session exists
redis_mock().get.return_value = b'{"status": "failed"}' # Auth failed
# Now the client gets response "failed"
redis_mock().get.return_value = b'{"status": "fail", "message": "fail"}' # Auth failed
# Now the client gets response "fail"
rv = client.post("/v1", json=good_data[0])
assert redis_mock().exists.call_count == 1 # Session exists?
......@@ -94,13 +94,13 @@ def test_good_sid_set_auth_failed(client, good_data, redis_mock):
assert rv.status_code == 200
resp_data = rv.get_json()
assert resp_data["status"] == "auth_failed"
assert resp_data["status"] == "fail"
def test_good_sid_set_auth_ok_cert_missing(client, good_data, redis_mock):
def redis_get(key):
if "auth_state:{}:".format(good_data[0]["sn"]) in key:
return b'{"status": "ok"}'
return b'{"status": "ok", "message": null}'
if key == "certificate:{}".format(good_data[0]["sn"]):
return None
assert False
......@@ -124,7 +124,7 @@ def test_good_sid_set_auth_ok_cert_missing(client, good_data, redis_mock):
def test_good_sid_set_auth_ok_cert_ok(client, good_data, redis_mock):
def redis_get(key):
if "auth_state:{}:".format(good_data[0]["sn"]) in key:
return b'{"status": "ok"}'
return b'{"status": "ok", "message": null}'
if key == "certificate:{}".format(good_data[0]["sn"]):
return good_data[1].encode("utf-8")
assert False
......
......@@ -135,11 +135,11 @@ def bad_sid(request):
@pytest.fixture(params=[
{
"auth_type",
"nonce",
"digest",
"csr_str",
"flags",
"auth_type": "",
"nonce": "",
"digest": "",
"csr_str": "",
"flags": "",
}
])
def good_sessions(request):
......@@ -174,12 +174,10 @@ def bad_sessions(request):
@pytest.fixture(params=[
(
"status",
),
(
"status",
)
{
"status": "ok",
"message": None,
},
])
def good_auth_state(request):
return request.param
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment