Commit 158650b2 authored by Ales Mrazek's avatar Ales Mrazek

new converter, added comments parser, stub-zone ipv6 correction

parent 0a16649d
......@@ -2,7 +2,7 @@ import sys
from json import dump
from yangson.datamodel import DataModel
from resolvers_yang.converter import from_unbound
from resolvers_yang.converter import Converter
unbconf_path = sys.argv[1]
......@@ -17,19 +17,24 @@ model = DataModel.from_file(yangdir + "/yanglib.json", [yangdir])
# load unbound.conf file
with open(unbconf_path, "r") as unb_file:
unbconf_data = unb_file.readlines()
#unbconf_data = unb_file.read()
unbconf_data = unb_file.read()
# call of conversion function unbconf_data >> Dictionary
json_data = from_unbound(unbconf_data)
converter = Converter()
json_data = converter.from_unbound(unbconf_data)
# load data to DataModel
model_data = model.from_raw(json_data)
if json_data:
# load data to DataModel
model_data = model.from_raw(json_data)
# validate data against DataModel
model_data.validate()
# validate data against DataModel
model_data.validate()
# save model_data to json
with open(data_json_path, 'w') as json_file:
dump(json_data, json_file, indent=2, sort_keys=False)
# save model_data to json
with open(data_json_path, 'w') as json_file:
dump(json_data, json_file, indent=2, sort_keys=False)
print("Data was successfully writted to json file named " + data_json_path)
else:
print("Data cannot be written, because of Errors")
This diff is collapsed.
"""Module for converting Unbound configuration string to dictionary structure, which can be validate with data model
and save as Json-Encoded file. """
from re import compile
class Converter:
def __init__(self):
self.__reset()
def __reset(self):
self.server = {}
self.network = {'listen-interfaces': []}
self.resolver = {'stub-zones': [], 'options': {}}
self.logging = {}
self.dnssec = {'trust-anchors': {}, 'negative-trust-anchors': []}
self.cache = {}
self.dns64 = {}
self.stub = {}
self.interface_count = 0
self.ta_domain_count = 0
self.do_ipv4 = True
self.do_ipv6 = True
# ipv4 address regex
ipv4_regex = compile('^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$')
@staticmethod
def _parse_line(line: str):
(key, separator, value) = line.partition(":")
return key.strip(), value.strip(' "')
@staticmethod
def _parse_ipaddr(address: str):
if '@' in address:
(addr, port) = address.split("@")
return str(addr), int(port)
else:
return str(address), None
######################### SERVER ###########################
def _username(self, value: str) -> None:
self.server['user-name'] = str(value)
def _roothints(self, value: str) -> None:
self.resolver['hints'] = {'root-zone-file': str(value)}
def _interface(self, value: str) -> None:
name = "interface" + str(self.interface_count)
adress, port = self._parse_ipaddr(value)
self.interface_count += 1
self.network['listen-interfaces'].append({
'name': str(name),
'ip-address': str(adress),
'port': int(port)
})
def _out_interface(self, value: str) -> None:
if Converter.ipv4_regex.match(value):
self.network['source-address'] = {'ipv4': value}
else:
self.network['source-address'] = {'ipv6': value}
def _do_ipv4(self, value: str) -> None:
if value == 'no':
self.do_ipv4 = False
def _do_ipv6(self, value: str) -> None:
if value == 'no':
self.do_ipv6 = False
def _edns_buffer_size(self, value: str) -> None:
self.network['udp-payload-size'] = int(value)
def _harden_glue(self, value: str) -> None:
if value == 'yes':
self.resolver['options']['glue-checking'] = "strict"
else:
self.resolver['options']['glue-checking'] = "normal"
def _qname_minimisation(self, value: str) -> None:
if value == 'yes':
self.resolver['options']['qname-minimisation'] = True
else:
self.resolver['options']['qname-minimisation'] = False
def _rrset_roundrobin(self, value: str) -> None:
if value == 'yes':
self.resolver['options']['reorder-rrset'] = True
else:
self.resolver['options']['reorder-rrset'] = False
def _dnq_localhost(self, value: str) -> None:
if value == 'yes':
self.resolver['options']['query-loopback'] = False
else:
self.resolver['options']['query-loopback'] = True
def _verbosity(self, value: str) -> None:
self.logging['verbosity'] = int(value)
def _at_anchorfile(self, value: str) -> None:
self.dnssec['trust-anchors']['key-files'] = []
domain = "domain" + str(self.ta_domain_count)
self.dnssec['trust-anchors']['key-files'].append({
'domain': domain,
'file': str(value)})
self.ta_domain_count += 1
def _domain_insecure(self, value: str) -> None:
self.dnssec['negative-trust-anchors'].append(str(value))
def _cache_size(self, value: str) -> None:
self.cache['max-size'] = int(value)
def _cache_minttl(self, value: str) -> None:
self.cache['min-ttl'] = int(value)
def _cache_maxttl(self, value: str) -> None:
self.cache['max-ttl'] = int(value)
def _dns64_prefix(self, value: str) -> None:
self.dns64['prefix'] = str(value)
######################## STUB-ZONE ########################
def _stubname(self, value: str) -> None:
self.stub['domain'] = value
def _stubaddr(self, value: str) -> None:
self.stub['nameserver'], self.stub['port'] = self._parse_ipaddr(value)
if self.stub['domain'] and self.stub['port']:
self.resolver['stub-zones'].append({
"domain": self.stub['domain'],
"nameserver": self.stub['nameserver'],
"port": self.stub['port']
})
else:
self.resolver['stub-zones'].append({
"domain": self.stub['domain'],
"nameserver": self.stub['nameserver']
})
def _stubhost(self, value: str) -> None:
self.stub['nameserver'], self.stub['port'] = self._parse_ipaddr(value)
if self.stub['domain'] and self.stub['port']:
self.resolver['stub-zones'].append({
"domain": self.stub['domain'],
"nameserver": self.stub['nameserver'],
"port": self.stub['port']
})
else:
self.resolver['stub-zones'].append({
"domain": self.stub['domain'],
"nameserver": self.stub['nameserver']
})
##########################################################
def _noop(self, value: str) -> None:
pass
def _recursion_transport(self):
l2_protocols = 'ipv4 ipv6'
if self.do_ipv4 and not self.do_ipv6:
l2_protocols = 'ipv4'
elif self.do_ipv6 and not self.do_ipv4:
l2_protocols = 'ipv6'
self.network['recursion-transport'] = {'l2-protocols': l2_protocols}
def from_unbound(self, unbound_data: str):
"""
:param unbound_data: data loaded from Unbound configuration file
:type string:
:return: Dictionary structure of common-resolver configuration
:rtype: dictionary
"""
unbound_data = unbound_data.splitlines()
data = {}
self.__reset()
_server = False
_stub_zone = False
for line in unbound_data:
# parse key and value from line
key, value = Converter._parse_line(line)
if key == 'server':
_server = True
_stub_zone = False
elif key == 'stub-zone':
_stub_zone = True
_server = False
elif key == 'CONFIG_END':
break
else:
if _server:
mname = Converter._serverconf_switcher.get(key, "_noop")
method = getattr(self, mname)
method(value)
elif _stub_zone:
mname = Converter._stubzone_switcher.get(key, "_noop")
method = getattr(self, mname)
method(value)
else:
pass
# set l2-protocols for recursion-transport
self._recursion_transport()
# completing data
if self.server:
data['server'] = self.server
if self.network:
data['network'] = self.network
if self.resolver:
data['resolver'] = self.resolver
if self.logging:
data['logging'] = self.logging
if self.dnssec:
data['dnssec'] = self.dnssec
if self.cache:
data['cache'] = self.cache
if self.dns64:
data['dns64'] = self.dns64
data = {'cznic-resolver-common:dns-resolver': data}
return data
_serverconf_switcher = {
"username": "_username",
"root-hints": "_roothints",
"interface": "_interface",
"outgoing-interface": "_out_interface",
"do-ip4": "_do_ipv4",
"do-ip6": "_do_ipv6",
"edns-buffer-size": "_edns_buffer_size",
"harden-glue": "_harden_glue",
"qname-minimisation": "_qname_minimisation",
"rrset-roundrobin": "_rrset_roundrobin",
"do-not-query-localhost": "_dnq_localhost",
"verbosity": "_verbosity",
"auto-trust-anchor-file": "_at_anchorfile",
"domain-insecure": "_domain_insecure",
"msg-cache-size": "_cache_size",
"cache-max-ttl": "_cache_maxttl",
"cache-min-ttl": "_cache_minttl",
"dns64-prefix": "_dns64_prefix",
}
_stubzone_switcher = {
"name": "_stubname",
"stub-addr": "_stubaddr",
"stub-host": "_stubhost",
}
......@@ -9,7 +9,7 @@ from json import load, dump
from yangson.datamodel import DataModel
from resolvers_yang.generator import gen_unbound, gen_kresd
from resolvers_yang.converter import from_unbound
from resolvers_yang.converter import Converter
examample_path = "../examples/example-data.json"
test_path = "test-data.json"
......@@ -53,10 +53,11 @@ with open(unb_path, "w") as unb_file:
# load unbound.conf file
with open(unb_path, "r") as unb_file:
unbconf_data = unb_file.readlines()
unbconf_data = unb_file.read()
# call of conversion function unbconf_data >> Dictionary
json_data = from_unbound(unbconf_data)
converter = Converter()
json_data = converter.from_unbound(unbconf_data)
# load data to DataModel
model_data = model.from_raw(json_data)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment