utils.py 1.74 KB
Newer Older
Tomas Krizek's avatar
Tomas Krizek committed
1 2 3 4
import struct
import random

import dns
5
import dns.message
Tomas Krizek's avatar
Tomas Krizek committed
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36


def receive_answer(sock):
    answer_total_len = 0
    data = sock.recv(2)
    if not data:
        return None
    answer_total_len = struct.unpack_from("!H", data)[0]

    answer_received_len = 0
    data_answer = b''
    while answer_received_len < answer_total_len:
        data_chunk = sock.recv(2048)
        if not data_chunk:
            return None
        data_answer = data_answer + data_chunk
        answer_received_len = answer_received_len + len(data_answer)

    return data_answer


def receive_parse_answer(sock):
    data_answer = receive_answer(sock)

    if data_answer is None:
        raise RuntimeError("Kresd closed connection")

    msg_answer = dns.message.from_wire(data_answer, one_rr_per_rrset=True)
    return msg_answer


37 38 39
def prepare_wire(
        qname='localhost.',
        qtype=dns.rdatatype.A,
40 41
        qclass=dns.rdataclass.IN,
        msgid=None):
42 43 44 45
    """Utility function to generate DNS wire format message"""
    msg = dns.message.make_query(qname, qtype, qclass)
    if msgid is not None:
        msg.id = msgid
46
    return msg.to_wire(), msg.id
47 48 49 50 51 52 53 54 55 56


def prepare_buffer(wire, datalen=None):
    """Utility function to prepare TCP buffer from DNS message in wire format"""
    assert isinstance(wire, bytes)
    if datalen is None:
        datalen = len(wire)
    return struct.pack("!H", datalen) + wire


57 58 59 60
def get_msgbuff(qname='localhost.', qtype=dns.rdatatype.A, msgid=None):
    wire, msgid = prepare_wire(qname, qtype, msgid=msgid)
    buff = prepare_buffer(wire)
    return buff, msgid
Tomas Krizek's avatar
Tomas Krizek committed
61 62 63


def get_garbage(length):
64
    return bytes(random.getrandbits(8) for _ in range(length))
Tomas Krizek's avatar
Tomas Krizek committed
65 66 67 68


def get_prefixed_garbage(length):
    data = get_garbage(length)
69
    return prepare_buffer(data)