Split process_req_get function

parent 824bae6a
......@@ -28,6 +28,9 @@ SESSION_PARAMS = {
"csr_str",
"flags",
}
AUTH_STATE_PARAMS = {
"status",
}
class InvalidParamError(Exception):
......@@ -38,6 +41,10 @@ class InvalidSessionError(Exception):
pass
class InvalidAuthStateError(Exception):
pass
def validate_sn_atsha(sn):
if len(sn) != 16:
raise InvalidParamError("SN has invalid length.")
......@@ -160,6 +167,14 @@ def check_session(session):
raise InvalidSessionError(param)
def check_auth_state(auth_state):
if len(auth_state) != len(AUTH_STATE_PARAMS):
raise InvalidAuthStateError()
for param in auth_state:
if param not in AUTH_STATE_PARAMS:
raise InvalidAuthStateError()
def generate_nonce(sn, sid, csr_str, flags, auth_type, r):
""" Certificate with matching private key not found in redis
"""
......@@ -192,57 +207,79 @@ def key_match(cert_bytes, csr_bytes):
return cert.public_key().public_numbers() == csr.public_key().public_numbers()
def get_auth_state(sn, sid, r):
""" Get state of client authentication from Redis. If the state is failed
or missing, return fail info.
"""
auth_state = r.get(get_auth_state_key(sn, sid))
if not auth_state:
app.logger.debug("Certificate creation in progress, sn=%s, sid=%s", sn, sid)
status = {
"status": "wait",
"delay": DELAY_GET_SESSION_EXISTS
}
return (False, status)
try:
auth_state = json.loads(auth_state.decode("utf-8"))
except UnicodeDecodeError:
app.logger.error("UnicodeDecodeError for auth_state sn=%s, sid=%s", sn, sid)
return (False, {"status": "error"})
except json.decoder.JSONDecodeError:
app.logger.error("JSONDecodeError for auth_state sn=%s, sid=%s", sn, sid)
return (False, {"status": "error"})
try:
check_auth_state(auth_state)
except InvalidAuthStateError:
app.logger.error("Auth state ivalid for sn=%s, sid=%s", sn, sid)
return (False, {"status": "error"})
if auth_state["status"] == "ok":
return (True, None)
elif auth_state["status"] == "failed":
return (False, {"status": "auth_failed"})
else:
app.logger.error("auth_state invalid value in session sn=%s, sid=%s", sn, sid)
return (False, {"status": "error"})
def process_req_get(sn, sid, csr_str, flags, auth_type, r):
app.logger.debug("Processing GET request, sn=%s, sid=%s", sn, sid)
if "renew" in flags: # when renew is flagged we ignore cert in redis
return generate_nonce(sn, sid, csr_str, flags, auth_type, r)
authenticated = False
# We care about authentication only when session exists
if r.exists(get_session_key(sn, sid)):
auth_state = r.get(get_auth_state_key(sn, sid))
if auth_state is not None:
auth_state = json.loads(auth_state.decode("utf-8"))
try:
if auth_state["status"] == "ok":
authenticated = True
elif auth_state["status"] == "failed":
return {"status": "auth_failed"}
else:
app.logger.error("auth_state invalid value in session sn=%s, sid=%s", sn, sid)
return {"status": "error"}
except KeyError as e:
app.logger.error("Value missing in Redis session (%s) sn=%s, sid=%s", e, sn, sid)
return {"status": "error"}
else:
app.logger.debug("Certificate creation in progress, sn=%s, sid=%s", sn, sid)
return {
"status": "wait",
"delay": DELAY_GET_SESSION_EXISTS
}
(authenticated, status) = get_auth_state(sn, sid, r)
if not authenticated:
return status
cert_bytes = r.get(get_cert_key(sn))
if cert_bytes is not None:
app.logger.debug("Certificate found in redis, sn=%s", sn)
# cert and csr public key match
if key_match(cert_bytes, csr_str.encode("utf-8")):
app.logger.debug("Certificate restored from redis, sn=%s", sn)
return {
"status": "ok",
"cert": cert_bytes.decode("utf-8")
}
else:
if authenticated:
app.logger.warning("Auth OK but certificate key does not match, sn=%s", sn)
else:
app.logger.debug("Certificate key does not match, sn=%s", sn)
return generate_nonce(sn, sid, csr_str, flags, auth_type, r)
else:
if not cert_bytes:
if authenticated:
app.logger.warning("Auth OK but certificate not in redis, sn=%s", sn)
else:
app.logger.debug("Certificate not in redis, sn=%s", sn)
return generate_nonce(sn, sid, csr_str, flags, auth_type, r)
app.logger.debug("Certificate found in redis, sn=%s", sn)
# cert and csr public key match
if not key_match(cert_bytes, csr_str.encode("utf-8")):
if authenticated:
app.logger.warning("Auth OK but certificate key does not match, sn=%s", sn)
else:
app.logger.debug("Certificate key does not match, sn=%s", sn)
return generate_nonce(sn, sid, csr_str, flags, auth_type, r)
app.logger.debug("Certificate restored from redis, sn=%s", sn)
return {
"status": "ok",
"cert": cert_bytes.decode("utf-8")
}
def get_auth_session(sn, sid, r):
""" Get state of client session from Redis. If the session is broken
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment