Commit 2dfed7f0 authored by Petr Špaček's avatar Petr Špaček

domain2ipset: memory optimization

parent 7497fc2e
......@@ -13,7 +13,8 @@ import logging
import multiprocessing
import pickle
import random
from typing import Counter, Deque, Dict, Iterable, Set, Tuple
from typing import Counter, Deque, Dict, FrozenSet, Iterable, Set, Tuple
import weakref
import dns.message
import dns.query
......@@ -120,14 +121,18 @@ def retry_candidates(retry_queue: Deque[Tuple[int, dns.name.Name, AnIPAddress]],
logging.debug('retrying %s @%s (attempt no. %s)', domain, ip, attempt + 1)
yield (attempt + 1, domain, ip)
def process_reply(attempt, domain, ip, state, netstats, retry_queue, domain2ipset):
def process_reply(attempt, domain, ip, state, netstats, retry_queue,
domain2ipset: Dict[dns.name.Name, FrozenSet[AnIPAddress]],
ipsetcache: Dict[int, FrozenSet[AnIPAddress]]):
netstats.record_ip(ip, state)
if state == IP_state.notauth:
return # not authoritative, look for another domain on this NS
elif state == IP_state.timeout:
retry_queue.append((attempt, domain, ip))
elif state == IP_state.ok:
domain2ipset.setdefault(domain, set()).add(ip)
# add IP address to set, re-use existing IP address sets to prevent memory bloat
newset = frozenset((ip, )).union(domain2ipset.get(domain, ())) # type: FrozenSet[AnIPAddress]
domain2ipset[domain] = ipsetcache.setdefault(hash(newset), newset)
#if len(ip_done) % 100 == 0:
# logging.info('generated output for %s IP addresses', len(ip_done))
......@@ -151,7 +156,7 @@ def load():
logging.info('loading domain-IP mapping from previous run')
domain2ipset = pickle.load(domain2ipset_pickle)
except FileNotFoundError:
domain2ipset = {}
domain2ipset = {} # type: Dict[dns.name.Name, FrozenSet[AnIPAddress]]
return domain2nsset, nsname2ipset, netstats, domain2ipset
......@@ -190,13 +195,14 @@ def update_mapping(domain2nsset, nsname2ipset, netstats, domain2ipset):
#
# It could also help as a workaround to agressive response rate limiting.
ipsetcache = weakref.WeakValueDictionary() # type: Dict[int, FrozenSet[AnIpAddress]]
candidates = netstats.skip_dead_ips(
randomize_iter(
gen_candidates(domain2nsset, nsname2ipset, netstats, retry_queue, domain2ipset),
100000))
with multiprocessing.Pool(processes = 256) as pool:
for attempt, domain, ip, state in pool.imap_unordered(check_availability, candidates):
process_reply(attempt, domain, ip, state, netstats, retry_queue, domain2ipset)
process_reply(attempt, domain, ip, state, netstats, retry_queue, domain2ipset, ipsetcache)
if len(domain2ipset) % 1000 == 0 and len(domain2ipset.get(domain, [])) == 1:
logging.info('%s domains out of %s have at least one working NS (%0.2f %%)',
len(domain2ipset), len(domain2nsset), len(domain2ipset)/len(domain2nsset)*100)
......@@ -205,7 +211,7 @@ def update_mapping(domain2nsset, nsname2ipset, netstats, domain2ipset):
for attempt, domain, ip, state in pool.imap_unordered(
check_availability,
netstats.skip_dead_ips(retry_candidates(retry_queue, netstats))):
process_reply(attempt, domain, ip, state, netstats, retry_queue, domain2ipset)
process_reply(attempt, domain, ip, state, netstats, retry_queue, domain2ipset, ipsetcache)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment