Commit c52dfba4 authored by Tomas Krizek's avatar Tomas Krizek

tests: add lmdb 2018-05-21 format compatibility test

parent df4679c5
......@@ -17,3 +17,6 @@ coverage.xml
# mypy
.mypy_cache/
# test data
lock.mdb
#!/usr/bin/env python3
"""
Create testing LMDB data
"""
import os
import struct
import lmdb
VERSION = "2018-05-21"
CREATE_ENVS = {
"2018-05-21": [
'answers_single_server',
'answers_multiple_servers',
],
}
BIN_INT_3000000000 = b'\x00^\xd0\xb2'
class LMDBExistsError(Exception):
pass
def open_env(version, name):
path = os.path.join(version, name)
if os.path.exists(os.path.join(path, 'data.mdb')):
raise LMDBExistsError
if not os.path.exists(path):
os.makedirs(path)
return lmdb.Environment(
path=path,
max_dbs=5,
create=True)
def create_answers_single_server():
env = open_env(VERSION, 'answers_single_server')
mdb = env.open_db(key=b'meta', create=True)
with env.begin(mdb, write=True) as txn:
txn.put(b'version', VERSION.encode('ascii'))
txn.put(b'start_time', BIN_INT_3000000000)
txn.put(b'end_time', BIN_INT_3000000000)
txn.put(b'servers', struct.pack('<I', 1))
txn.put(b'name0', 'kresd'.encode('ascii'))
adb = env.open_db(key=b'answers', create=True)
with env.begin(adb, write=True) as txn:
answer = BIN_INT_3000000000
answer += struct.pack('<H', 1)
answer += b'a'
txn.put(BIN_INT_3000000000, answer)
def create_answers_multiple_servers():
env = open_env(VERSION, 'answers_multiple_servers')
mdb = env.open_db(key=b'meta', create=True)
with env.begin(mdb, write=True) as txn:
txn.put(b'version', VERSION.encode('ascii'))
txn.put(b'servers', struct.pack('<I', 3))
txn.put(b'name0', 'kresd'.encode('ascii'))
txn.put(b'name1', 'bind'.encode('ascii'))
txn.put(b'name2', 'unbound'.encode('ascii'))
adb = env.open_db(key=b'answers', create=True)
with env.begin(adb, write=True) as txn:
# kresd
answer = BIN_INT_3000000000
answer += struct.pack('<H', 0)
# bind
answer += BIN_INT_3000000000
answer += struct.pack('<H', 2)
answer += b'ab'
# unbound
answer += BIN_INT_3000000000
answer += struct.pack('<H', 1)
answer += b'a'
txn.put(BIN_INT_3000000000, answer)
def main():
for env_name in CREATE_ENVS[VERSION]:
try:
globals()['create_{}'.format(env_name)]()
except LMDBExistsError:
print('{} exists, skipping'.format(env_name))
continue
if __name__ == '__main__':
main()
import os
import pytest
from respdiff.dbhelper import DNSReply, DNSRepliesFactory
from respdiff.dbhelper import (
DNSReply, DNSRepliesFactory, LMDB, MetaDatabase, VERSION, qid2key)
LMDB_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'lmdb', VERSION)
def create_reply(wire, time):
......@@ -98,3 +103,53 @@ def test_dns_replies_factory_init():
with pytest.raises(ValueError):
rf2.parse(DR_A_0_BIN + b'a')
INT_3M = 3000000000
TIME_3M = 3000.0
def test_lmdb_answers_single_server():
envdir = os.path.join(LMDB_DIR, 'answers_single_server')
with LMDB(envdir) as lmdb:
adb = lmdb.open_db(LMDB.ANSWERS)
meta = MetaDatabase(lmdb)
assert meta.read_start_time() == INT_3M
assert meta.read_end_time() == INT_3M
servers = meta.read_servers()
assert len(servers) == 1
assert servers[0] == 'kresd'
with lmdb.env.begin(adb) as txn:
data = txn.get(qid2key(INT_3M))
df = DNSRepliesFactory(servers)
replies = df.parse(data)
assert len(replies) == 1
assert replies[servers[0]] == DNSReply(b'a', TIME_3M)
def test_lmdb_answers_multiple_servers():
envdir = os.path.join(LMDB_DIR, 'answers_multiple_servers')
with LMDB(envdir) as lmdb:
adb = lmdb.open_db(LMDB.ANSWERS)
meta = MetaDatabase(lmdb)
assert meta.read_start_time() is None
assert meta.read_end_time() is None
servers = meta.read_servers()
assert len(servers) == 3
assert servers[0] == 'kresd'
assert servers[1] == 'bind'
assert servers[2] == 'unbound'
df = DNSRepliesFactory(servers)
with lmdb.env.begin(adb) as txn:
data = txn.get(qid2key(INT_3M))
replies = df.parse(data)
assert len(replies) == 3
assert replies[servers[0]] == DNSReply(b'', TIME_3M)
assert replies[servers[1]] == DNSReply(b'ab', TIME_3M)
assert replies[servers[2]] == DNSReply(b'a', TIME_3M)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment