pydnstest/scenario: enable RAW answers in RANGE

parent cd98e35b
......@@ -158,7 +158,6 @@ class Entry:
try:
self.raw_data = binascii.unhexlify(node["/raw"].value)
self.is_raw_data_entry = True
return
except KeyError:
self.raw_data = None
self.is_raw_data_entry = False
......@@ -345,7 +344,7 @@ class Entry:
if expected != got:
raise ValueError("raw message comparsion failed: expected %s got %s" % (expected, got))
def adjust_reply(self, query):
def _adjust_reply(self, query):
""" Copy scripted reply and adjust to received query. """
answer = dns.message.from_wire(self.message.to_wire(),
xfr=self.message.xfr,
......@@ -367,6 +366,11 @@ class Entry:
assert len(answer.additional) == len(self.message.additional)
return answer
def reply(self, query):
if self.is_raw_data_entry:
return self.raw_data, True
return self._adjust_reply(query), False
def set_edns(self, fields):
""" Set EDNS version and bufsize. """
version = 0
......@@ -447,24 +451,26 @@ class Range:
"""
Get answer for given query (adjusted if needed).
Returns:
(DNS message object) or None if there is no candidate in this range
Returns: (answer, is_raw_data)
answer: DNS message object or wire-format (bytes) or None if there is no
candidate in this range
is_raw_data: True if response is wire-format bytes, instead of DNS message object
"""
self.received += 1
for candidate in self.stored:
try:
candidate.match(query)
resp = candidate.adjust_reply(query)
resp = candidate.reply(query)
# Probabilistic loss
if 'LOSS' in self.args:
if random.random() < float(self.args['LOSS']):
return None
return None, None
self.sent += 1
candidate.fired += 1
return resp
except ValueError:
pass
return None
return None, None
class StepLogger(logging.LoggerAdapter): # pylint: disable=too-few-public-methods
......@@ -741,21 +747,16 @@ class Scenario:
for rng in self.ranges:
if rng.eligible(current_step_id, address):
self.current_range = rng
return rng.reply(query), False
return rng.reply(query)
# Find any prescripted one-shot replies
for step in self.steps:
if step.id < current_step_id or step.type != 'REPLY':
continue
try:
candidate = step.data[0]
if candidate.is_raw_data_entry is False:
candidate.match(query)
step.data.remove(candidate)
answer = candidate.adjust_reply(query)
return answer, False
else:
answer = candidate.raw_data
return answer, True
candidate.match(query)
step.data.remove(candidate)
return candidate.reply(query)
except (IndexError, ValueError):
pass
return None, True
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment