Commit 0a16649d authored by Ales Mrazek's avatar Ales Mrazek

new converter module

parent cc0192a3
Pipeline #41081 passed with stages
in 1 minute and 3 seconds
......@@ -18,6 +18,7 @@ model = DataModel.from_file(yangdir + "/yanglib.json", [yangdir])
# load unbound.conf file
with open(unbconf_path, "r") as unb_file:
unbconf_data = unb_file.readlines()
#unbconf_data = unb_file.read()
# call of conversion function unbconf_data >> Dictionary
json_data = from_unbound(unbconf_data)
......
"""Module for converting Unbound configuration string to dictionary structure, which can be validate with data model
and save as Json-Encoded file. """
from re import compile
class Converter:
def __init__(self):
self.__reset()
def __reset(self):
self.server = {}
self.network = {'listen-interfaces': []}
self.resolver = {'stub-zones': [], 'options': {}}
self.logging = {}
self.dnssec = {'trust-anchors': {}, 'negative-trust-anchors': []}
self.cache = {}
self.dns64 = {}
self.stub = {}
self.interface_count = 0
self.ta_domain_count = 0
self.do_ipv4 = True
self.do_ipv6 = True
# ipv4 address regex
ipv4_regex = compile('^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$')
@staticmethod
def _parse_line(line: str):
(key, separator, value) = line.partition(":")
return key.strip(), value.strip(' "')
@staticmethod
def _parse_ipaddr(address: str):
if '@' in address:
(addr, port) = address.split("@")
return str(addr), int(port)
else:
return str(address), None
######################### SERVER ###########################
def _username(self, value: str) -> None:
self.server['user-name'] = str(value)
def _roothints(self, value: str) -> None:
self.resolver['hints'] = {'root-zone-file': str(value)}
def _interface(self, value: str) -> None:
name = "interface" + str(self.interface_count)
adress, port = self._parse_ipaddr(value)
self.interface_count += 1
self.network['listen-interfaces'].append({
'name': str(name),
'ip-address': str(adress),
'port': int(port)
})
def _out_interface(self, value: str) -> None:
if Converter.ipv4_regex.match(value):
self.network['source-address'] = {'ipv4': value}
else:
self.network['source-address'] = {'ipv6': value}
def _do_ipv4(self, value: str) -> None:
if value == 'no':
self.do_ipv4 = False
def _do_ipv6(self, value: str) -> None:
if value == 'no':
self.do_ipv6 = False
def _edns_buffer_size(self, value: str) -> None:
self.network['udp-payload-size'] = int(value)
def _harden_glue(self, value: str) -> None:
if value == 'yes':
self.resolver['options']['glue-checking'] = "strict"
else:
self.resolver['options']['glue-checking'] = "normal"
def _qname_minimisation(self, value: str) -> None:
if value == 'yes':
self.resolver['options']['qname-minimisation'] = True
else:
self.resolver['options']['qname-minimisation'] = False
def _rrset_roundrobin(self, value: str) -> None:
if value == 'yes':
self.resolver['options']['reorder-rrset'] = True
else:
self.resolver['options']['reorder-rrset'] = False
def _dnq_localhost(self, value: str) -> None:
if value == 'yes':
self.resolver['options']['query-loopback'] = False
else:
self.resolver['options']['query-loopback'] = True
def _verbosity(self, value: str) -> None:
self.logging['verbosity'] = int(value)
def _at_anchorfile(self, value: str) -> None:
self.dnssec['trust-anchors']['key-files'] = []
domain = "domain" + str(self.ta_domain_count)
self.dnssec['trust-anchors']['key-files'].append({
'domain': domain,
'file': str(value)})
self.ta_domain_count += 1
def _domain_insecure(self, value: str) -> None:
self.dnssec['negative-trust-anchors'].append(str(value))
def _cache_size(self, value: str) -> None:
self.cache['max-size'] = int(value)
def _cache_minttl(self, value: str) -> None:
self.cache['min-ttl'] = int(value)
def _cache_maxttl(self, value: str) -> None:
self.cache['max-ttl'] = int(value)
def _dns64_prefix(self, value: str) -> None:
self.dns64['prefix'] = str(value)
######################## STUB-ZONE ########################
def _stubname(self, value: str) -> None:
self.stub['domain'] = value
def _stubaddr(self, value: str) -> None:
self.stub['nameserver'], self.stub['port'] = self._parse_ipaddr(value)
if self.stub['domain'] and self.stub['port']:
self.resolver['stub-zones'].append({
"domain": self.stub['domain'],
"nameserver": self.stub['nameserver'],
"port": self.stub['port']
})
else:
self.resolver['stub-zones'].append({
"domain": self.stub['domain'],
"nameserver": self.stub['nameserver']
})
def _stubhost(self, value: str) -> None:
self.stub['nameserver'], self.stub['port'] = self._parse_ipaddr(value)
if self.stub['domain'] and self.stub['port']:
self.resolver['stub-zones'].append({
"domain": self.stub['domain'],
"nameserver": self.stub['nameserver'],
"port": self.stub['port']
})
else:
self.resolver['stub-zones'].append({
"domain": self.stub['domain'],
"nameserver": self.stub['nameserver']
})
##########################################################
def _noop(self, value: str) -> None:
pass
def _recursion_transport(self):
l2_protocols = 'ipv4 ipv6'
if self.do_ipv4 and not self.do_ipv6:
l2_protocols = 'ipv4'
elif self.do_ipv6 and not self.do_ipv4:
l2_protocols = 'ipv6'
self.network['recursion-transport'] = {'l2-protocols': l2_protocols}
def from_unbound(self, unbound_data: str):
"""
:param unbound_data: data loaded from Unbound configuration file
:type string:
:return: Dictionary structure of common-resolver configuration
:rtype: dictionary
"""
unbound_data = unbound_data.splitlines()
data = {}
self.__reset()
_server = False
_stub_zone = False
for line in unbound_data:
# parse key and value from line
key, value = Converter._parse_line(line)
if key == 'server':
_server = True
_stub_zone = False
elif key == 'stub-zone':
_stub_zone = True
_server = False
elif key == 'CONFIG_END':
break
else:
if _server:
mname = Converter._serverconf_switcher.get(key, "_noop")
  • I suppose we should not pass unknown keys in unbound configuration silently… This lets typos in config files slip silently.

    Throw an UnimplementedError or SyntaxError maybe?

Please register or sign in to reply
method = getattr(self, mname)
method(value)
elif _stub_zone:
mname = Converter._stubzone_switcher.get(key, "_noop")
method = getattr(self, mname)
method(value)
else:
pass
# set l2-protocols for recursion-transport
self._recursion_transport()
# completing data
if self.server:
data['server'] = self.server
if self.network:
data['network'] = self.network
if self.resolver:
data['resolver'] = self.resolver
if self.logging:
data['logging'] = self.logging
if self.dnssec:
data['dnssec'] = self.dnssec
if self.cache:
data['cache'] = self.cache
if self.dns64:
data['dns64'] = self.dns64
data = {'cznic-resolver-common:dns-resolver': data}
return data
_serverconf_switcher = {
"username": "_username",
"root-hints": "_roothints",
"interface": "_interface",
"outgoing-interface": "_out_interface",
"do-ip4": "_do_ipv4",
"do-ip6": "_do_ipv6",
"edns-buffer-size": "_edns_buffer_size",
"harden-glue": "_harden_glue",
"qname-minimisation": "_qname_minimisation",
"rrset-roundrobin": "_rrset_roundrobin",
"do-not-query-localhost": "_dnq_localhost",
"verbosity": "_verbosity",
"auto-trust-anchor-file": "_at_anchorfile",
"domain-insecure": "_domain_insecure",
"msg-cache-size": "_cache_size",
"cache-max-ttl": "_cache_maxttl",
"cache-min-ttl": "_cache_minttl",
"dns64-prefix": "_dns64_prefix",
}
_stubzone_switcher = {
"name": "_stubname",
"stub-addr": "_stubaddr",
"stub-host": "_stubhost",
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment