Commit d5f7b69a authored by Štěpán Balážik's avatar Štěpán Balážik Committed by Petr Špaček

scenario: remove defaults while handling MATCH and ADJUST fields, change...

scenario: remove defaults while handling MATCH and ADJUST fields, change `MATCH all` and `MATCH question`

see docs for details
parent 81fe7c84
......@@ -223,14 +223,14 @@ rcode extended response code (``RCODE`` value
- *expected* message ``RCODE`` is defined by ``REPLY`` keyword
question whole QUESTION section [sectmatch]_
question equivalent to ``qtype qname``
answer whole ANSWER section [sectmatch]_
authority whole AUTHORITY section [sectmatch]_
additional whole ADDITIONAL section [sectmatch]_
edns EDNS `version <https://tools.ietf.org/html/rfc6891#section-6.1.3>`_ and
EDNS `payload <https://tools.ietf.org/html/rfc6891#section-6.1.2>`_ size
nsid `NSID <https://tools.ietf.org/html/rfc5001>`_ presence and value
all equivalent to ``flags`` + ``rcode`` + all sections explicitly defined in the ``ENTRY``
all equivalent to ``opcode qtype qname flags rcode answer authority additional``
- sections present in the *received* message but not explicitly defined in the *expected* entry are ignored
============ =========================================================================================
......@@ -313,9 +313,7 @@ Default values for DNS messages
========== ===========================================================================================
feature default value
========== ===========================================================================================
ADJUST copy_id
EDNS version 0 with buffer size 4096 B
MATCH opcode, qtype, qname
REPLY QUERY, NOERROR
========== ===========================================================================================
......
......@@ -32,7 +32,7 @@ let ip_re = /[0-9a-f.:]+/
let hex_re = /[0-9a-fA-F]+/
let match_option = "opcode" | "qtype" | "qcase" | "qname" | "subdomain" | "flags" | "rcode" | "question" | "answer" | "authority" | "additional" | "all" | "TCP" | "ttl"
let match_option = "opcode" | "qtype" | "qcase" | "qname" | "subdomain" | "flags" | "rcode" | "question" | "answer" | "authority" | "additional" | "all" | "edns"
let adjust_option = "copy_id" | "copy_query" | "raw_id"
let reply_option = "QR" | "TC" | "AA" | "AD" | "RD" | "RA" | "CD" | "DO" | "NOERROR" | "FORMERR" | "SERVFAIL" | "NXDOMAIN" | "NOTIMP" | "REFUSED" | "YXDOMAIN" | "YXRRSET" | "NXRRSET" | "NOTAUTH" | "NOTZONE" | "BADVERS" | "BADSIG" | "BADKEY" | "BADTIME" | "BADMODE" | "BADNAME" | "BADALG" | "BADTRUNC" | "BADCOOKIE"
let step_option = "REPLY" | "QUERY" | "CHECK_ANSWER" | "CHECK_OUT_QUERY" | /TIME_PASSES[ \t]+ELAPSE/
......@@ -40,7 +40,7 @@ let step_option = "REPLY" | "QUERY" | "CHECK_ANSWER" | "CHECK_OUT_QUERY" | /TIME
let mandatory = [del_str "MANDATORY" . label "mandatory" . value "true" . comment_or_eol]
let tsig = [del_str "TSIG" . label "tsig" . space . [label "keyname" . store word] . space . [label "secret" . store word] . comment_or_eol]
let match = (mandatory | tsig)* . del_str "MATCH" . [space . label "match" . store match_option ]+ . comment_or_eol
let match = (mandatory | tsig)* . [ label "match_present" . value "true" . del_str "MATCH" ] . [space . label "match" . store match_option ]+ . comment_or_eol
let adjust = (mandatory | tsig)* . del_str "ADJUST" . [space . label "adjust" . store adjust_option ]+ . comment_or_eol
let reply = (mandatory | tsig)* . del ("REPLY" | "FLAGS") "REPLY" . [space . label "reply" . store reply_option ]+ . comment_or_eol
......
......@@ -141,21 +141,11 @@ def match_rcode(exp, got):
dns.rcode.to_text(got.rcode()))
def match_question(exp, got):
return compare_rrs(exp.question,
got.question)
def match_answer(exp, got):
return compare_rrs(exp.answer,
got.answer)
def match_ttl(exp, got):
return compare_rrs(exp.answer,
got.answer)
def match_answertypes(exp, got):
return compare_rrs_types(exp.answer,
got.answer, skip_rrsigs=True)
......@@ -207,9 +197,9 @@ def match_nsid(exp, got):
MATCH = {"opcode": match_opcode, "qtype": match_qtype, "qname": match_qname, "qcase": match_qcase,
"subdomain": match_subdomain, "flags": match_flags, "rcode": match_rcode,
"question": match_question, "answer": match_answer, "ttl": match_ttl,
"answertypes": match_answertypes, "answerrrsigs": match_answerrrsigs,
"authority": match_authority, "additional": match_additional, "edns": match_edns,
"answer": match_answer, "answertypes": match_answertypes,
"answerrrsigs": match_answerrrsigs, "authority": match_authority,
"additional": match_additional, "edns": match_edns,
"nsid": match_nsid}
......
......@@ -155,26 +155,17 @@ class Entry:
self.fired = 0
# RAW
try:
self.raw_data = binascii.unhexlify(node["/raw"].value)
self.is_raw_data_entry = True
except KeyError:
self.raw_data = None
self.is_raw_data_entry = False
self.raw_data = None
self.is_raw_data_entry = self.process_raw()
# MATCH
self.match_fields = [m.value for m in node.match("/match")]
if not self.match_fields:
self.match_fields = ['opcode', 'qtype', 'qname']
self.match_fields = self.process_match()
# FLAGS
self.process_reply_line(node)
self.process_reply_line()
# ADJUST
self.adjust_fields = [m.value for m in node.match("/adjust")]
if not self.adjust_fields:
self.adjust_fields = ['copy_id']
self.adjust_fields = {m.value for m in node.match("/adjust")}
# MANDATORY
try:
......@@ -183,8 +174,53 @@ class Entry:
self.mandatory = None
# TSIG
self.process_tsig()
# SECTIONS & RECORDS
self.sections = self.process_sections()
def process_raw(self):
try:
tsig = list(node.match("/tsig"))[0]
self.raw_data = binascii.unhexlify(self.node["/raw"].value)
return True
except KeyError:
return False
def process_match(self):
try:
self.node["/match_present"]
except KeyError:
return None
fields = set(m.value for m in self.node.match("/match"))
if 'all' in fields:
fields.remove("all")
fields |= set(["opcode", "qtype", "qname", "flags",
"rcode", "answer", "authority", "additional"])
if 'question' in fields:
fields.remove("question")
fields |= set(["qtype", "qname"])
return fields
def process_reply_line(self):
"""Extracts flags, rcode and opcode from given node and adjust dns message accordingly"""
self.fields = [f.value for f in self.node.match("/reply")]
if 'DO' in self.fields:
self.message.want_dnssec(True)
opcode = self.get_opcode(fields=self.fields)
rcode = self.get_rcode(fields=self.fields)
self.message.flags = self.get_flags(fields=self.fields)
if rcode is not None:
self.message.set_rcode(rcode)
if opcode is not None:
self.message.set_opcode(opcode)
def process_tsig(self):
try:
tsig = list(self.node.match("/tsig"))[0]
tsig_keyname = tsig["/keyname"].value
tsig_secret = tsig["/secret"].value
keyring = dns.tsigkeyring.from_text({tsig_keyname: tsig_secret})
......@@ -192,11 +228,11 @@ class Entry:
except (KeyError, IndexError):
pass
# SECTIONS & RECORDS
self.sections = []
for section in node.match("/section/*"):
def process_sections(self):
sections = set()
for section in self.node.match("/section/*"):
section_name = posixpath.basename(section.path)
self.sections.append(section_name)
sections.add(section_name)
for record in section.match("/record"):
owner = record['/domain'].value
if not owner.endswith("."):
......@@ -229,6 +265,7 @@ class Entry:
self.message.authority.append(rr)
elif section_name == 'additional':
self.message.additional.append(rr)
return sections
def __str__(self):
txt = 'ENTRY_BEGIN\n'
......@@ -258,19 +295,6 @@ class Entry:
txt += 'ENTRY_END\n'
return txt
def process_reply_line(self, node):
"""Extracts flags, rcode and opcode from given node and adjust dns message accordingly"""
self.fields = [f.value for f in node.match("/reply")]
if 'DO' in self.fields:
self.message.want_dnssec(True)
opcode = self.get_opcode(fields=self.fields)
rcode = self.get_rcode(fields=self.fields)
self.message.flags = self.get_flags(fields=self.fields)
if rcode is not None:
self.message.set_rcode(rcode)
if opcode is not None:
self.message.set_opcode(opcode)
@classmethod
def get_flags(cls, fields):
"""From `fields` extracts and returns flags"""
......@@ -321,11 +345,7 @@ class Entry:
def match(self, msg):
""" Compare scripted reply to given message based on match criteria. """
match_fields = self.match_fields
if 'all' in match_fields:
match_fields.remove('all')
match_fields += ['flags'] + ['rcode'] + self.sections
for code in match_fields:
for code in self.match_fields:
try:
pydnstest.matchpart.match_part(self.message, msg, code)
except pydnstest.matchpart.DataMismatch as ex:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment