module.py 3.28 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
#!/usr/bin/env python3

import re
from subprocess import Popen, PIPE, check_call, CalledProcessError
from dnstest.utils import *
import dnstest.config
import dnstest.params as params
import dnstest.server

class KnotModule(object):
    '''Query module configuration'''

    # Instance counter.
    count = 1
    # Module callback name in the source.
    src_name = None
    # Module name in the configuration.
    conf_name = None

    def __init__(self):
        self.conf_id = "id%s" % type(self).count
        type(self).count += 1

    @classmethod
    def check(self):
        '''Checks the server binary for the module code'''

        try:
            proc = Popen(["objdump", "-t", params.knot_bin],
                         stdout=PIPE, stderr=PIPE, universal_newlines=True)
            (out, err) = proc.communicate()

            if re.search(self.src_name, out):
                return

            raise Skip()
        except:
            raise Skip("Module '%s' not detected" % self.conf_name)

    def get_conf_ref(self):
        return "%s/%s" % (self.conf_name, self.conf_id)

    def get_conf(self): pass

class ModSynthRecord(KnotModule):
    '''Automatic forward/reverse records module'''

    src_name = "synth_record_load"
    conf_name = "mod-synth-record"

51
    def __init__(self, mtype, prefix, ttl, network, origin=None):
52 53 54 55
        super().__init__()
        self.mtype = mtype
        self.prefix = prefix
        self.ttl = ttl
56 57
        self.network = network
        self.origin = origin
58 59 60 61 62 63 64 65

    def get_conf(self, conf=None):
        if not conf:
            conf = dnstest.config.KnotConf()

        conf.begin(self.conf_name)
        conf.id_item("id", self.conf_id)
        conf.item_str("type", self.mtype)
66 67 68 69
        if (self.prefix):
            conf.item_str("prefix", self.prefix)
        if (self.ttl):
            conf.item_str("ttl", self.ttl)
70 71 72
        conf.item_str("network", self.network)
        if (self.origin):
            conf.item_str("origin", self.origin)
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
        conf.end()

        return conf

class ModDnstap(KnotModule):
    '''Dnstap module'''

    src_name = "dnstap_load"
    conf_name = "mod-dnstap"

    def __init__(self, sink):
        super().__init__()
        self.sink = sink

    def get_conf(self, conf=None):
        if not conf:
            conf = dnstest.config.KnotConf()

        conf.begin(self.conf_name)
        conf.id_item("id", self.conf_id)
        conf.item_str("sink", self.sink)
        conf.end()

        return conf
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126

class ModDnsproxy(KnotModule):
    '''Dnsproxy module'''

    src_name = "dnsproxy_load"
    conf_name = "mod-dnsproxy"

    def __init__(self, addr, port=53, catch_nxdomain=False):
        super().__init__()
        self.addr = addr
        self.port = port
        self.catch_nxdomain = catch_nxdomain

    def get_conf(self, conf=None):
        if not conf:
            conf = dnstest.config.KnotConf()

        conf.begin("remote")
        conf.id_item("id", "%s_%s" % (self.conf_name, self.conf_id))
        conf.item_str("address", "%s@%s" % (self.addr, self.port))
        conf.end()

        conf.begin(self.conf_name)
        conf.id_item("id", self.conf_id)
        conf.item_str("remote", "%s_%s" % (self.conf_name, self.conf_id))
        if (self.catch_nxdomain):
            conf.item_str("catch-nxdomain", "on")
        conf.end()

        return conf