Commit 909889e5 authored by Ales Mrazek's avatar Ales Mrazek

trust-anchor converter implements #4, strip strings fixes #8

parent 17914d82
Pipeline #43503 passed with stages
in 50 seconds
......@@ -41,3 +41,7 @@ with open(kresd_path, "w") as knot_file:
# write Unbound configuration
with open(unb_path, "w") as unb_file:
unb_file.write(unb_conf)
print("{0} {1} configuration files were successfully created.".format(kresd_path, unb_path))
......@@ -23,18 +23,15 @@ with open(unbconf_path, "r") as unb_file:
converter = Converter()
json_data = converter.from_unbound(unbconf_data)
if json_data:
# load data to DataModel
model_data = model.from_raw(json_data)
# load data to DataModel
model_data = model.from_raw(json_data)
# validate data against DataModel
# model_data.validate()
# validate data against DataModel
model_data.validate()
# save model_data to json
with open(data_json_path, 'w') as json_file:
dump(json_data, json_file, indent=2, sort_keys=False)
# save model_data to json
with open(data_json_path, 'w') as json_file:
dump(json_data, json_file, indent=2, sort_keys=False)
print("Data was successfully writted to json file named " + data_json_path)
print("Data was successfully writted to json file named " + data_json_path)
else:
print("Data cannot be written, because of Errors")
......@@ -2,12 +2,9 @@
and save as Json-Encoded file. """
from re import compile
class UnboundConfValueError(ValueError):
def __init__(self, arg):
self.strerror = arg
self.args = {arg}
from .parser import TrustAnchorRR as ta_parser
from .errors import UnboundConfValueError
from .regex import domain_name, ipv4_address, ipv6_address, ipv6_prefix
class Converter:
......@@ -34,34 +31,6 @@ class Converter:
self.do_ipv6 = True
self.write_data = True
# regex domains (".", "stub.example.", "stub.example.net)
domain = compile('^\.$|((([a-z0-9]+(-[a-z0-9]+)*\.)+([a-z]{2,}|)))$')
ipv4_addr = compile('^(([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])\.){3}'
+ '([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])$')
ipv6_addr = compile('^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}'
+ '|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}'
+ ':[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4})'
+ '{1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}'
+ ':){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}'
+ '|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:'
+ '(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|'
+ '(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|'
+ '([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}'
+ '(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))$')
ipv6_prefix = compile('^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}'
+ '|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}'
+ ':[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4})'
+ '{1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}'
+ ':){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}'
+ '|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:'
+ '(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|'
+ '(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|'
+ '([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}'
+ '(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))(/(([0-9])|([0-9]{2})|(1[0-1][0-9])|(12[0-8])))$')
@staticmethod
def _ignore_comments(line: str):
......@@ -77,10 +46,11 @@ class Converter:
else:
nocomment_line = line
return nocomment_line
return nocomment_line.strip()
@staticmethod
def _parse_line(line: str):
line = line.strip()
# remove comments ofter # or ;
line = Converter._ignore_comments(line)
......@@ -92,7 +62,7 @@ class Converter:
@staticmethod
def _parse_ipaddr(address: str):
address = address.strip()
if '@' in address:
addr, port = address.split("@")
......@@ -112,7 +82,10 @@ class Converter:
@staticmethod
def _check_ipaddress(ip_addr: str):
if Converter.ipv6_addr.match(ip_addr) or Converter.ipv4_addr.match(ip_addr):
ip_addr = ip_addr.strip()
if ipv6_address.match(ip_addr) or ipv4_address.match(ip_addr):
return True
else:
return False
......@@ -147,9 +120,9 @@ class Converter:
def _out_interface(self, key: str, value: str) -> None:
if Converter.ipv4_addr.match(value):
if ipv4_address.match(value):
self.network['source-address'] = {'ipv4': value}
elif Converter.ipv6_addr.match(value):
elif ipv6_address.match(value):
self.network['source-address'] = {'ipv6': value}
else:
raise UnboundConfValueError("Value: '{0}' of '{1}' option is not ipv4 or ipv6 address".format(value, key))
......@@ -222,19 +195,36 @@ class Converter:
raise UnboundConfValueError("Value: '{0}' of '{1}' option is not Valid: "
"Must be '0-5' number".format(value, key))
def _at_anchorfile(self, key: str, value: str) -> None:
def _trust_anchor(self, key: str, value: str) -> None:
if 'DS' in value:
ta = ta_parser.parse_ds(value)
rr_type = 'ds'
elif 'DNSKEY' in value:
ta = ta_parser.parse_dnskey(value)
rr_type = 'dnskey'
else:
return
domain = ta['owner']
if not any(item['domain'] == domain for item in self.dnssec['trust-anchors']):
self.dnssec['trust-anchors'].append({"domain": domain, "auto-update": True, "trust-anchor": []})
ta_index = 0
for tas in self.dnssec['trust-anchors']:
if tas['domain'] == domain:
break
else:
ta_index += 1
ta['id'] = len(self.dnssec['trust-anchors'][ta_index]['trust-anchor'])
if ' ' in value:
raise UnboundConfValueError("Value: '{0}' of '{1}' option is not Path string".format(value, key))
del ta['owner']
domain = "domain" + str(self.ta_domain_count)
self.dnssec['trust-anchors'].append({
'domain': domain,
'key-file': str(value)})
self.ta_domain_count += 1
self.dnssec['trust-anchors'][ta_index]['trust-anchor'].append(ta)
def _domain_insecure(self, key: str, value: str) -> None:
if Converter.domain.match(value):
if domain_name.match(value):
self.dnssec['negative-trust-anchors'].append(str(value))
else:
raise UnboundConfValueError("Value: '{0}' of '{1}' option is not Valid: "
......@@ -258,7 +248,7 @@ class Converter:
self.debugging['cznic-resolver-unbound:val-override-date'] = utc_datetime
def _dns64_prefix(self, key: str, value: str) -> None:
if Converter.ipv6_prefix.match(value):
if ipv6_prefix.match(value):
self.dns64['prefix'] = str(value)
else:
raise UnboundConfValueError("Value: '{0}' of '{1}' option is not Valid DNS64 prefix".format(value, key))
......@@ -267,7 +257,7 @@ class Converter:
def _stubname(self, key: str, value: str) -> None:
self.stub = {}
if Converter.domain.match(value):
if domain_name.match(value):
self.stub['domain'] = value
else:
raise UnboundConfValueError("Value: '{0}' of '{1}' option is not Valid domain name".format(value, key))
......@@ -301,7 +291,7 @@ class Converter:
self.stub['nameserver'], self.stub['port'] = self._parse_ipaddr(value)
if not Converter.domain.match(self.stub['nameserver']):
if not domain_name.match(self.stub['nameserver']):
raise UnboundConfValueError("Value: '{0}' of '{1}' option is not Valid domain name".format(value, key))
if self.stub['domain'] and self.stub['port']:
......@@ -425,7 +415,7 @@ class Converter:
"rrset-roundrobin": "_rrset_roundrobin",
"do-not-query-localhost": "_dnq_localhost",
"verbosity": "_verbosity",
"auto-trust-anchor-file": "_at_anchorfile",
"trust-anchor": "_trust_anchor",
"domain-insecure": "_domain_insecure",
"msg-cache-size": "_cache_size",
"cache-max-ttl": "_cache_maxttl",
......
class UnboundConfValueError(ValueError):
def __init__(self, arg):
self.strerror = arg
self.args = {arg}
......@@ -2,11 +2,10 @@
Module for generating Unbound and Knot Resolver configuration files from valid loaded Json.
"""
from re import compile, sub
from re import sub
from socket import gethostbyname
from .parser import TrustAnchorRR as ta_parser
ip_address = compile('^((\d{1,3}\.){3}\d{1,3})|(([\dA-Fa-f]{1,4})|(:)|(:[\dA-Fa-f]{1,4})){2,7}[\dA-Fa-f]$')
from .regex import ipv4_address, ipv6_address
class Generator:
......@@ -119,7 +118,8 @@ class Generator:
if 'port' in sz:
temp = "@" + str(sz["port"])
if ip_address.match(str(sz["nameserver"])):
# if ip_address.match(str(sz["nameserver"])):
if ipv4_address.match(str(sz["nameserver"])) or ipv6_address.match(str(sz["nameserver"])):
temp = str(sz["nameserver"]) + temp
stub_addr.append(temp)
......@@ -415,7 +415,8 @@ class Generator:
if 'port' in sz:
temp += "@" + str(sz["port"])
if ip_address.match(str(sz["nameserver"])):
#if ip_address.match(str(sz["nameserver"])):
if ipv4_address.match(str(sz["nameserver"])) or ipv6_address.match(str(sz["nameserver"])):
self.stub_unbound_conf += str("\tstub-addr: \"{}\"\n".format(temp))
else:
self.stub_unbound_conf += str("\tstub-host: \"{}\"\n".format(temp))
......
class TrustAnchorRR:
class TrustAnchorRR:
alg_enum = {
1: "RSAMD5",
2: "DH",
......@@ -27,12 +27,14 @@ class TrustAnchorRR:
digest_type_enum_inv = {v: k for k, v in digest_type_enum.items()}
@staticmethod
def parse_ds(ds_ta: str)-> dict:
def parse_ds(ds_ta: str) -> dict:
ds_ta.strip()
owner = ds_ta.split()[0]
i = 1
if ds_ta.split()[i] != "IN" and ds_ta.split()[i] != "DS":
i = i + 1
if ds_ta.split()[i] == "IN":
i = i + 1
if ds_ta.split()[i] == "DS":
......@@ -40,14 +42,14 @@ class TrustAnchorRR:
ta_id = 0
key_tag = ds_ta.split()[i]
algorithm = TrustAnchorRR.alg_enum.get(ds_ta.split()[i+1])
digest_type = TrustAnchorRR.digest_type_enum.get(int(ds_ta.split()[i+2]))
digest = ds_ta.split()[i+3]
algorithm = TrustAnchorRR.alg_enum.get(int(ds_ta.split()[i + 1]))
digest_type = TrustAnchorRR.digest_type_enum.get(int(ds_ta.split()[i + 2]))
digest = ds_ta.split()[i + 3]
ds_dict = {
"owner": owner,
"id": int(ta_id),
"ds": {
"id": ta_id,
"algorithm": algorithm,
"digest": digest,
"digest-type": digest_type,
......@@ -57,44 +59,53 @@ class TrustAnchorRR:
return ds_dict
@staticmethod
def parse_dnskey(dnskey_ta: str)-> dict:
def parse_dnskey(dnskey_ta: str) -> dict:
dnskey_ta.strip()
owner = dnskey_ta.split()[0]
i = 1
if dnskey_ta.split()[i] != "IN" and dnskey_ta.split()[i] != "DNSKEY":
i = i+1
i = i + 1
if dnskey_ta.split()[i] == "IN":
i = i + 1
if dnskey_ta.split()[i] == "DNSKEY":
i = i + 1
ta_id = 0
#flags = dnskey_ta.split()[i]
flags = "ZONE SEP"
protocol = dnskey_ta.split()[i+1]
algorithm = dnskey_ta.split()[i+2]
public_key = dnskey_ta.split()[i+3]
flags_num = int(dnskey_ta.split()[i])
protocol = dnskey_ta.split()[i + 1]
algorithm = TrustAnchorRR.alg_enum.get(int(dnskey_ta.split()[i + 2]))
public_key = dnskey_ta.split()[i + 3]
public_key = public_key.lstrip('(')
public_key = public_key.rstrip(')')
if flags_num == 257:
flags = "ZONE SEP"
elif flags_num == 256:
flags = "ZONE"
else:
flags = False
dnskey_dict = {
"owner": owner,
"id": int(ta_id),
"dnskey": {
"id": ta_id,
"algorithm": algorithm,
"flags": flags,
"protocol": protocol,
"protocol": int(protocol),
"public-key": public_key
}
}
if not flags:
del dnskey_dict['dnskey']['flags']
return dnskey_dict
@staticmethod
def create_ds_string(domain: str, ds_ta: dict)-> str:
def create_ds_string(domain: str, ds_ta: dict) -> str:
algorithm = TrustAnchorRR.alg_enum_inv.get(str(ds_ta['algorithm']))
digest_type = TrustAnchorRR.digest_type_enum_inv.get(str(ds_ta['digest-type']))
......@@ -108,19 +119,17 @@ class TrustAnchorRR:
return ds_str
@staticmethod
def create_dnskey_string(domain: str, dnskey_ta: dict)-> str:
def create_dnskey_string(domain: str, dnskey_ta: dict) -> str:
algorithm = TrustAnchorRR.alg_enum_inv.get(str(dnskey_ta['algorithm']))
flags = None
# 257 is KSK, 256 is ZSK
if 'flags' in dnskey_ta:
# TODO: dont know mapping between SEP, ZONE, REVOKE <-> 257, 256
if 'SEP' in str(dnskey_ta['flags']):
flags = 256
else:
if 'SEP' in str(dnskey_ta['flags']) and 'ZONE' in str(dnskey_ta['flags']):
flags = 257
elif 'ZONE' in str(dnskey_ta['flags']) and 'SEP' not in str(dnskey_ta['flags']):
flags = 256
if flags is not None:
dnskey_str = "{0} IN DNSKEY {1} {2} {3} {4}".format(domain,
......
from re import compile
# Domain format checker (".", "stub.example.", "stub.example.net)
domain_name = compile('^\.$|((([a-z0-9]+(-[a-z0-9]+)*\.)+([a-z]{2,}|)))$')
# IPv4 address format checker
ipv4_address = compile('^(([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])\.){3}'
+ '([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])$')
# IPv6 address format checker
ipv6_address = compile('^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}'
+ '|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}'
+ ':[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4})'
+ '{1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}'
+ ':){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}'
+ '|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:'
+ '(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|'
+ '(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|'
+ '([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}'
+ '(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))$')
# IPv6 prefix format checker
ipv6_prefix = compile('^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}'
+ '|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}'
+ ':[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4})'
+ '{1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}'
+ ':){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}'
+ '|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:'
+ '(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|'
+ '(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|'
+ '([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}'
+ '(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))(/(([0-9])|([0-9]{2})|(1[0-1][0-9])|(12[0-8])))$')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment