Commit 611f231d authored by Edvard Rejthar's avatar Edvard Rejthar

#29 HUGE server development

Signed-off-by: Edvard Rejthar's avatarEdvard Rejthar <edvard.rejthar@nic.cz>
parent ebce69dd
......@@ -78,6 +78,7 @@ user_pref("extensions.lastPlatformVersion", "51.0.1");
user_pref("extensions.pendingOperations", false);
user_pref("extensions.systemAddonSet", "{\"schema\":1,\"addons\":{}}");
user_pref("extensions.xpiState", "{\"app-system-defaults\":{\"firefox@getpocket.com\":{\"d\":\"/usr/lib/firefox/browser/features/firefox@getpocket.com.xpi\",\"e\":true,\"v\":\"1.0.5\",\"st\":1485985013000},\"e10srollout@mozilla.org\":{\"d\":\"/usr/lib/firefox/browser/features/e10srollout@mozilla.org.xpi\",\"e\":true,\"v\":\"1.7\",\"st\":1485985012000},\"webcompat@mozilla.org\":{\"d\":\"/usr/lib/firefox/browser/features/webcompat@mozilla.org.xpi\",\"e\":true,\"v\":\"1.0\",\"st\":1485985013000},\"aushelper@mozilla.org\":{\"d\":\"/usr/lib/firefox/browser/features/aushelper@mozilla.org.xpi\",\"e\":true,\"v\":\"1.0\",\"st\":1485985012000}},\"app-system-user\":{\"cinetickets@jetpack\":{\"d\":\"/home/edvard/.mozilla/extensions/{ec8030f7-c20a-464f-9b0e-13a3a9e97384}/cinetickets@jetpack\",\"e\":false,\"v\":\"0.1\",\"st\":1468510464000,\"mt\":1457095010000},\"mdmaug@jetpack\":{\"d\":\"/home/edvard/.mozilla/extensions/{ec8030f7-c20a-464f-9b0e-13a3a9e97384}/mdmaug@jetpack\",\"e\":false,\"v\":\"0.1\",\"st\":1457095010000,\"mt\":1457095010000}},\"app-global\":{\"{972ce4c6-7e08-4474-a285-3208198ce6fd}\":{\"d\":\"/usr/lib/firefox/browser/extensions/{972ce4c6-7e08-4474-a285-3208198ce6fd}.xpi\",\"e\":true,\"v\":\"51.0.1\",\"st\":1485985012000},\"langpack-cs@firefox.mozilla.org\":{\"d\":\"/usr/lib/firefox/browser/extensions/langpack-cs@firefox.mozilla.org.xpi\",\"e\":true,\"v\":\"51.0.1\",\"st\":1485988894000},\"langpack-en-GB@firefox.mozilla.org\":{\"d\":\"/usr/lib/firefox/browser/extensions/langpack-en-GB@firefox.mozilla.org.xpi\",\"e\":true,\"v\":\"51.0.1\",\"st\":1485988909000},\"langpack-en-ZA@firefox.mozilla.org\":{\"d\":\"/usr/lib/firefox/browser/extensions/langpack-en-ZA@firefox.mozilla.org.xpi\",\"e\":true,\"v\":\"51.0.1\",\"st\":1485988909000}},\"app-system-share\":{\"ubufox@ubuntu.com\":{\"d\":\"/usr/share/mozilla/extensions/{ec8030f7-c20a-464f-9b0e-13a3a9e97384}/ubufox@ubuntu.com.xpi\",\"e\":true,\"v\":\"3.2\",\"st\":1442597402000}}}");
user_pref("security.insecure_field_warning.contextual.enabled", false); // wrong https certificate
user_pref("media.gmp-gmpopenh264.abi", "x86_64-gcc3");
user_pref("media.gmp-gmpopenh264.lastUpdate", 1486578560);
user_pref("media.gmp-gmpopenh264.version", "1.6");
......
......@@ -10,7 +10,7 @@ apt install software-properties-common
add-apt-repository "deb http://archive.ubuntu.com/ubuntu $(lsb_release -sc) main universe restricted multiverse"
apt update
apt install firefox python3 mariadb-server xvfb
pip3 install xvfbwrapper pymysql peewee flask wtforms pyyaml bs4 pygments pillow requests humanize filelock
pip3 install xvfbwrapper pymysql peewee flask wtforms jsonpickle bs4 pygments pillow requests humanize filelock
# current dir
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
......
......@@ -6,7 +6,7 @@ Scans a website for a sign of a parasite hosts or commands.
1. Download ```git clone git@gitlab.labs.nic.cz:csirt/mdmaug.git /tmp/mdmaug```
2. Edit mdmaug/lib/config.py
3. You should generate a certificate to `mdmaug/cert-mdmaug.pem`, at least a self-signed one (non recommended): `openssl req -new -x509 -keyout cert-mdmaug.pem -out cert-mdmaug.pem -days 365 -nodes`
3. You should generate a certificate to `mdmaug/cert-mdmaug.pem`, at least a self-signed one (non recommended): `openssl req -x509 -newkey rsa:4096 -nodes -out cert-mdmaug.pem -keyout key-mdmaug.pem`
4. Perform installation: ```/tmp/mdmaug/INSTALL```
5. Everything should be located in `/opt/mdmaug`.
6. Launch under newly created `mdmaug` user: `su - mdmaug -c 'python3 -m mdmaug'`
......@@ -20,6 +20,10 @@ Scans a website for a sign of a parasite hosts or commands.
* You may put ```03 1,7,13,19 * * * ~/mdmaug-launch``` in ```crontab -e``` of user mdmaug.
* We are using Python3.6+
## Tips
* You may use /static/demopage.html as a testing page.
### Troubleshooting
* Analysis stopped working after restart? Maybe you need to launch `xhost +local:mdmaug` command from a common user shell after every system restart :( I'm not sure.
......
......@@ -11,7 +11,7 @@ CREATE TABLE `export` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='Dates of realized Turris export';
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='Dates of realized Encounter export';
DROP TABLE IF EXISTS `status`;
......@@ -27,21 +27,18 @@ INSERT INTO `status` (`id`, `name`) VALUES
(2, 'log'),
(3, 'block');
DROP TABLE IF EXISTS `turris`;
CREATE TABLE `turris` (
DROP TABLE IF EXISTS `encounter`;
CREATE TABLE `encounter` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'When we encountered the IP on a page.',
`status` int(11) NOT NULL DEFAULT '0' COMMENT 'n/a, allow, log, block',
`date` int(11) NOT NULL,
`host` varchar(255) COLLATE utf8_bin NOT NULL,
`ip` varchar(45) COLLATE utf8_bin DEFAULT NULL,
`port` int(11) NOT NULL,
`url` varchar(255) COLLATE utf8_bin NOT NULL,
`block` int(11) NOT NULL,
`remoteHost` varchar(255) COLLATE utf8_bin NOT NULL,
`otherDetails` varchar(255) COLLATE utf8_bin NOT NULL,
PRIMARY KEY (`id`),
KEY `status` (`status`),
CONSTRAINT `turris_ibfk_1` FOREIGN KEY (`status`) REFERENCES `status` (`id`) ON UPDATE CASCADE
CONSTRAINT `encounter_ibfk_1` FOREIGN KEY (`status`) REFERENCES `status` (`id`) ON UPDATE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
......
#!/usr/bin/env python3
import datetime
import logging
import os
......@@ -31,6 +32,14 @@ app.secret_key = b'as8!r"afERaa5'
# "geoip": False
# }
@app.template_filter('format_datetime')
def format_datetime(time, format='%y%m%d%H%M%S', target='%d.%m.%Y %H:%M'):
return datetime.datetime.strptime(time, format).strftime(target)
#app.jinja_env.filters['datetime'] = format_datetime
"""
httpd = HTTPServer((address, Config.APP_PORT), Server)
httpd.socket = ssl.wrap_socket(httpd.socket,
......
......@@ -8,8 +8,8 @@ logger = logging.getLogger("mdmaug")
class Pref:
safebrowsing = True
pdns = True
safebrowsing = False # True
pdns = False # True
geoip = False
@classmethod
......@@ -18,29 +18,31 @@ class Pref:
class Config:
profile_count = 2 # number of Firefox profiles. Its name is just a number – 0,1...
profile_count = 21 # number of Firefox profiles. Its name is just a number – 0,1...
browser = 'firefox' # iceweasel, firefox. What browser gets launched.
config_file = '/opt/mdmaug/.cache/mdmaug-scans/_tmp/queue.cache' # RAM disk was too small: '/tmp/mdm/queue.cache'
APP_PORT = 5000
APP_IP = "127.0.0.1"
APP_DOMAIN = 'https://217.31.202.41:' + str(APP_PORT) # csirt.csirt.office.nic.cz
APP_IP = "127.0.0.1" # 217.31.202.41
APP_HOST = f'http://{APP_IP}:{APP_PORT}' # Xhttps
LOG_DIR = "/opt/mdmaug/.cache/mdmaug-scans/_tmp/"
CACHE_DIR = "/opt/mdmaug/.cache/mdmaug-scans/"
ALLOWED_DESTINATION = {"mdm.nic.cz": "https://mdm.nic.cz",
# "hostname": "scheme://hostname(:port)"; allowed destinations for iframe-postMessage communication between MDMaug and MDM host
APP_IP: APP_HOST} # in every case we allow MDMaug to communicate with itself
DIR = os.path.dirname(os.path.realpath(__file__)) + "/../"
myDB: None
lock = threading.RLock() # doufam, ze kdyz je lock tady, ze je funknci. Closure...? XX nejak otestovat
db: MySQLDatabase = None
lock = threading.RLock()
THUMBNAIL_SIZE = 640, 640
MAX_WHOIS_DOMAIN_THREADS = 10 # spusti maximalne 10 threadu doraz, jednou mi to totiz preteklo (kazda domena spusti jeste tolik threadu, kolik ma IP, ale tech byva jen par)
MAX_BROWSER_RUN_TIME = 25 # maximum time for a browser to run
MAX_BROWSER_EXPIRATION = 15 # seconds that we wait before killing the browser (waiting for the files to be written)
EXPORT_NOTBLOCK_TLD = ".cz" # lowercase; this TLD is ignored in the export
CRAWL_FILE = "analysis.json"
@staticmethod
def connect():
# XX resim problem peewee.OperationalError: (2006, "MySQL server has gone away (BrokenPipeError(32, 'Broken pipe'))") po 7 hodinach timeoutu
# XX kupodivu pripojeni nemuze byt v dbp DBModel.connect. Prestoze type je pak spravne (MySQLDatabase), nic udelat nejde a pokusy o select konci NoneType.
logger.debug("Connecting to DB.")
Config.myDB = MySQLDatabase("mdmaug", host='localhost', port=3306, user="mdmaug",
passwd="fidFDSs676") # XX dal jsem pryc: , threadlocals=False
Config.connect()
# XX dal jsem pryc: , threadlocals=False
Config.db = MySQLDatabase("mdmaug", host='localhost', port=3306, user="mdmaug", passwd="fidFDSs676")
import json
import logging
import os
import subprocess
from urllib.parse import urlparse
from builtins import FileNotFoundError
from glob import escape
from os import walk
from peewee import IntegrityError
from ..domains import domain2dir
from mdmaug.lib.model.crawl import Crawl
from .scan_controller import ScanController
from ..config import Config
from ..model.dbp import Turris, Whitelist
from ..domains import domain2dir
from ..model.dbp import Encounter, Whitelist
from ..parser.traffic_log_parser import TrafficLogParser
from ...templates.crawl_view import CrawlView
logger = logging.getLogger("mdmaug")
class Api:
website = "" # http://site.cz
websiteDomain = "" # site.cz
def run(self, request):
""" Accept command
......@@ -32,21 +35,25 @@ class Api:
else:
days = int(request["analyze"])
crawl = ScanController().launch(request["page"], days)
elif "inspect" in request:
# XXX → migrate to dbp
output = []
for row in Config.db.execute_sql("SELECT url from encounter where host = %s", request["inspect"]):
output.append(row[0])
return "<br>".join(output)
elif "aggregate" in request:
crawl = self.aggregate(request)
elif "decide" in request: # XX deprecated?
return self.get_undecided()
elif "nicify" in request:
return TrafficLogParser.getStylesheet() + TrafficLogParser.nicify_file(request["page"])
elif "vote" in request: # /api/vote/block/example.org/10.0.0.1
logger.debug("vote cmd")
return Turris.vote(request["vote"], request["page"])
elif "scans" in request:
if "url" in request: # /api/scans/url/http://example.com
domain = domain2dir(request["page"])
if not domain:
return "Wrong domain"
return ScanController().get_domain_snapdirs(domain, full_dirs=False)
else:
return "Not implemented"
return Encounter.vote(request["vote"], request["page"])
elif "scan" in request:
if "date" not in request:
request["date"] = ""
crawl = ScanController().get_scan(escape(request["scan"]), date=escape(request["date"]))
elif "whitelist" in request: # XXX not implemented yet
"""url = path.split("/", 3)
if len(url) > 3:
......@@ -55,7 +62,7 @@ class Api:
logger.debug(url) # XXX
quit() # XXX
logger.debug(self.website)
logger.debug(self.websiteDomain)
logger.debug(self.origin_domain)
return self.whitelist()"""
return "Implement first if needed."
elif "reset" in request:
......@@ -65,15 +72,46 @@ class Api:
return "Unknown API method."
if crawl:
if type(crawl) is str: # probably an error
return crawl
if type(crawl) is str:
return crawl # containing an error message
elif request["api"] == "json":
return CrawlView.output_json(crawl)
elif request["api"] == "mdmaug":
return CrawlView.output_mdmaug(crawl)
#elif request["api"] == "mdmaug":
# return CrawlView.output_mdmaug(crawl)
else:
return CrawlView.output_html(crawl)
@staticmethod
def aggregate(request):
date_from = int(request["from"])
date_to = int(request["to"])
crawl = Crawl()
cwd = os.getcwd()
os.chdir(Config.CACHE_DIR)
scan_count = set()
domain_count = set()
for domain, scans, _ in walklevel(".", 1):
try:
for scan in scans:
if date_from < int(scan) < date_to:
print("importing", domain)
try:
scan_count.add(scan)
domain_count.add(domain)
crawl += Crawl.load_from_file("/".join([domain, scan, Config.CRAWL_FILE]))
except FileNotFoundError:
logger.warning("Wrong analysis stored at %s/%s", domain, scan)
pass
# print(roots, dirs)
except ValueError:
pass
crawl.title = f"Merged {len(scan_count)} scans from {len(domain_count)} domains"
if not crawl:
crawl = "No scan between these dates."
os.chdir(cwd)
return crawl
@staticmethod
def reset():
logger.debug("resetting running browsers")
......@@ -83,17 +121,29 @@ class Api:
# prida 2ld domenu mezi whitelistovane
def whitelist(self):
logger.debug("whitelistuju")
logger.debug("whitelisting")
# Db.cur = Db.connection.cursor()
# self._logger.debug(Db.cur.execute("""REPLACE INTO whitelist set domain = %s""", (self.websiteDomain, )))
# self._logger.debug(Db.cur.execute("""REPLACE INTO whitelist set domain = %s""", (self.origin_domain, )))
# Db.connection.commit()
# Db.cur.close()
return # not yet implemented
try:
Whitelist.insert(domain=self.websiteDomain).execute()
Whitelist.insert(domain=self.origin_domain).execute()
except IntegrityError:
pass # jiz je vlozeno
pass # already inserted
@staticmethod
def get_undecided():
logger.debug("XXX jeste jsem neudelal - ma vylezt tabulka vsech nerozhodlych domen od posledniho exportu")
logger.debug("XXX not implemented yet - table of undecideds domain since last export")
pass
def walklevel(some_dir, level=1):
some_dir = some_dir.rstrip(os.path.sep)
assert os.path.isdir(some_dir)
num_sep = some_dir.count(os.path.sep)
for root, dirs, files in os.walk(some_dir):
yield root, dirs, files
num_sep_this = root.count(os.path.sep)
if num_sep + level <= num_sep_this:
del dirs[:]
\ No newline at end of file
......@@ -26,18 +26,41 @@ logger = logging.getLogger("mdmaug")
class ScanController:
FF_INFO_FILE = "cache.dir"
CRAWL_FILE = "crawlSave.yaml"
profile = "-1" # booked browser profile
url = None
def __init__(self):
self.lock = None
@staticmethod
def get_scan(domain, date=""):
"""
:param domain: hostname
:param date: time of scan, if not specified, we return the last
:return: Crawl object
"""
scans = ScanController().get_domain_snapdirs(domain, full_dirs=True)
if not scans:
return False
else:
if date:
if date in scans:
scan = date
else:
return "Scan wasn't performed at this time."
else:
scan = scans[0]
return Crawl.load_from_file(scan+"/"+Config.CRAWL_FILE)
@staticmethod
def get_domain_snapdirs(domain, full_dirs=True):
d = Config.CACHE_DIR + domain + "/"
if os.path.isdir(d):
return [str(d + subdir) if full_dirs else str(subdir) for subdir in os.listdir(d) # all possible snapshot directories
if os.path.isdir(str(d + subdir)) and os.path.isfile(d + subdir + "/" + ScanController.CRAWL_FILE)]
if os.path.isdir(str(d + subdir)) and os.path.isfile(d + subdir + "/" + Config.CRAWL_FILE)]
# Xfor i in os.scandir(Config.CACHE_DIR + ):
def launch(self, url, cached=None):
"""
......@@ -51,8 +74,8 @@ class ScanController:
cache_dir = max(snapdirs, key=os.path.getmtime) + "/"
if cached is True or os.path.getmtime(cache_dir) > time.time() - (3600 * 24 * cached):
try:
logger.debug(f"Returning a previous crawl from: {cache_dir + ScanController.CRAWL_FILE}")
crawl = Crawl.load_from_file(cache_dir + ScanController.CRAWL_FILE)
logger.debug(f"Returning a previous crawl from: {cache_dir + Config.CRAWL_FILE}")
crawl = Crawl.load_from_file(cache_dir + Config.CRAWL_FILE)
return crawl
except ValueError:
pass
......@@ -77,12 +100,12 @@ class ScanController:
# XX Pokud je potiz, ze JS zabiji FF, mozno experimentovat s ulimit -Sv 500000;
return f"PROFILE EXCEPTION ({self.profile}) {e} See logs, i.e. mdmaug/nohup.out. "
crawl.save_to_file(crawl.cacheDir + ScanController.CRAWL_FILE) # save search results
crawl.save_to_file(crawl.cache_dir + Config.CRAWL_FILE) # save search results
return crawl
else:
logger.debug("(-) no free slots")
result = f"Scanning {self.url} failed – no free slots. <a href='{Config.APP_DOMAIN}/reset'>Reset</a>"
result = f"Scanning {self.url} failed – no free slots. <a href='{Config.APP_HOST}/reset'>Reset</a>"
return f"<div id='analysis-results'>{result}</div>"
def analyze(self):
......@@ -153,26 +176,26 @@ class ScanController:
Config.CACHE_DIR + domain2dir(self.url) + "/" + ScanController._get_cache_dir_stamp() + "/")
# info pro FF
with open(log_dir + ScanController.FF_INFO_FILE, "w") as f: # v logDiru mu dame odkaz do cacheDiru
with open(log_dir + ScanController.FF_INFO_FILE, "w") as f: # v logDiru mu dame odkaz do cache_diru
f.write(cache_dir) # napoveda, kde FF najde cache dir (protoze FF najde log dir podle nazvu profilu)
return log_dir, cache_dir
def _load_profile_queue(self):
@staticmethod
def _load_profile_queue():
# load queue from config file
try:
with open(Config.config_file, 'r') as f:
queue = json.load(f)
print(f"*** Loaded {self.profile} {queue}")
except (IOError, JSONDecodeError):
with open(Config.config_file, 'w'):
pass
queue = {}
return queue
def _save_profile_queue(self, queue):
@staticmethod
def _save_profile_queue(queue):
with open(Config.config_file, 'w') as f:
print(f"*** Saving {self.profile} {queue}")
json.dump(queue, f)
def dequeue(self):
......@@ -208,7 +231,7 @@ class ScanController:
for i in range(Config.profile_count):
if queue.get(str(i)) is None:
self.profile = i
queue[self.profile] = url # X"loading"
queue[self.profile] = url
self._save_profile_queue(queue)
return True # we found a free slot, let's proceed
if self.profile == -1:
......
import datetime
import json
import logging
import re
from html import escape
from flask import Blueprint, send_from_directory, render_template, request, make_response
from jinja2 import Environment, FileSystemLoader
from wtforms import Form
from wtforms.fields import BooleanField
from ..config import Config, Pref
from ..controller.api import Api
from ..model.dbp import DbModel
from ..model.dbp import Export
# env = Environment()
......@@ -30,6 +31,8 @@ def update_preferences():
# current_app.config["preferences"][k] = v
@app.route('/test')
def index():
resp = make_response("fds")
......@@ -54,7 +57,7 @@ def homepage():
return 'Hello ' + name
form = OptionsForm()
return render_template("homepage.html", form=form)
return render_template("homepage.html", form=form, APP_HOST=Config.APP_HOST)
@app.route('/favicon.ico')
......@@ -74,13 +77,14 @@ def controller(request_url):
/api/reset
/export/(days) → CSV of last X days
"""
Config.connect()
path, *_ = request_url.split("?", 1)
DbModel.assureConnection()
#DbModel.assure_connection()
# parse the request url into a friendly dictionary
keywords = {"safebrowsing", "geoip", "api", "destination", "analyze", "pdns", "export"}
request = {"page": ""}
page = False
for l in request_url.split("/"):
......@@ -116,12 +120,14 @@ def controller(request_url):
output = json.dumps(output)
if "destination" in request:
if request["destination"] in Config.ALLOWED_DESTINATION:
# send everything up, we are in an iframe
render_template("_message.html", contents=output, url=request_url, destination=f"https://{request['destination']}/")
return render_template("_message.html", contents=output, url=request_url, destination=Config.ALLOWED_DESTINATION[request['destination']])
else:
return "Wrong destination: " + escape(request["destination"])
else:
return output
elif "export" in request: # /export/{days} - csv of last 7 days
url = request_url.split("/", 1)
return Export.export_view(days=url[2])
return Export.export_view(days=request["days"] if "days" in request else 7)
else:
return "MDMaug: Page not found", 404
......@@ -2,6 +2,7 @@ import logging
import re
import socket
from urllib.parse import urlparse
import requests
from .config import Pref
......@@ -13,10 +14,8 @@ def url2domain(url):
""" http://www.seznam.cz/url -> www.seznam.cz """
try:
return re.search('(([\w\-_]+(?:(?:\.[\w\-_]+)+)))', url).group(0)
except TypeError:
logger.debug("Domains/url2domain type error")
logger.debug(url)
raise # return ""
except (TypeError, AttributeError):
return url
def url2path(url):
......@@ -93,6 +92,9 @@ def get_pdns_link(ip):
return 'http://pdns.cert.at/p/dns?qry=' + ip
suspicious_list = {}
def is_suspicious(domain, output='bool'):
"""
Scrape Safebrowsing service webpages and try to read out if the site is considered dangerous to visit.
......@@ -102,17 +104,27 @@ def is_suspicious(domain, output='bool'):
"""
if not Pref.safebrowsing:
return None
if domain in suspicious_list:
result = suspicious_list[domain]
else:
# contents = urllib.request.urlopen('http://www.google.com/safebrowsing/diagnostic?site=' + domain).read().decode("utf-8")
# with open("debugsf.tmp","a") as f:
# f.write(contents + "\n\n")
# if "Site is listed as suspicious" in contents:
# elif "This site is not currently listed as suspicious." in contents:
import requests
logger.debug("Safebrowsing %s", domain)
result = None
r = requests.get("http://www.google.com/safebrowsing/diagnostic?output=jsonp&site=" + domain, timeout=5)
if '"listed"' in r.text:
return True if output == 'bool' else "1"
result = True
if '"unlisted"' in r.text: # vratilo to alespon neco rozumneho
result = False
suspicious_list[domain] = result
if result is True:
return True if output == 'bool' else "1"
elif result is False:
return False if output == 'bool' else "0"
else:
return None if output == 'bool' else ""
......
from yaml import load, dump
import jsonpickle
#from yaml import load, dump
from ..config import Config
from ..parser.spy_parser import SpyParser
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
#try:
# from yaml import CLoader as Loader, CDumper as Dumper
#except ImportError:
# from yaml import Loader, Dumper
from collections import defaultdict
class Crawl(defaultdict):
""" Objekt Crawl udružuje výsledky vyhledávání """
""" Analysis results data model"""
title: None
def __add__(self, other):
c = Crawl("merged")
for domain_name, domain in self.items():
c[domain_name] = domain
for domain_name, domain in other.items():
if domain_name not in c:
c[domain_name] = domain
else:
c[domain_name] = domain + c[domain_name]
return c
def __str__(self):
r = "Výsledky vyhledávání - navštíveno {} domén".format(len(self))
for key in self.keys():
r += "\n* " + key + " " + str(self[key])
return r
def save_to_file(self, filename):
with open(filename, "w") as f:
f.write(dump(self.__getstate__(), Dumper=Dumper))
f.write(jsonpickle.encode(self))
# YAML is slower and 1/2 smaller and needs more coding (much uglier model with all that getstates
#f.write(dump(self, Dumper=Dumper))
@staticmethod
def load_from_file(filename):
with open(filename, 'r') as f:
return Crawl(state=load(f.read(), Loader=Loader))
#return Crawl(state=load(f.read(), Loader=Loader))
return jsonpickle.decode(f.read(), keys=True)
def __init__(self, host=None, state=None, log_dir=None, cache_dir=None, profile=None):
""" State muze obsahovat vystup __getstate__() (serializace YAMLem) """
def __init__(self, host=None, log_dir=None, cache_dir=None, profile=None):
self.default_factory = _Domain
self.screenfile = None # HTML output XXX
# if host:
self.host = host
# if log_dir:
self.logDir = log_dir
# if cache_dir:
self.cacheDir = cache_dir
self.log_dir = log_dir
self.cache_dir = cache_dir
self.profile = profile
if state:
self.__setstate__(state)
def __getstate__(self):
state = self.__dict__.copy()
state["keys"] = [[x for x in (key, self[key].__getstate__())] for key in self.keys()]
return state
def __setstate__(self, state):
for tup in state["keys"]:
key, val = tup
self[key].__setstate__(val)
del state["keys"]
self.__dict__ = state
self.screenfile = None
self.title = None # if not set, we take host for the title
class _Domain(defaultdict):
""" Navstivena domena behem crawlu """
def __str__(self):
r = "{} adres a {} url".format(len(self.addresses), len(self.urls))
for key in self.urls.keys():
r += "\n " + key + " " + str(self.urls[key])
for key in self.addresses.keys():
r += "\n " + key + " " + str(self.addresses[key])
if self.pdns:
r += "Informace z PDNS:\n"
for key in self.pdns:
r += key + " "
else:
r += "\n Žádné informace z PDNS."
return r
""" Domain visited while crawling """
def __add__(self, other):
o = _Domain()
o.pdns = set.union(self.pdns, other.pdns)
o.urls = {**self.urls, **other.urls}
o.addresses = {**self.addresses, **other.addresses}
return o
def __init__(self):
# self.vote = None
# self.urls = set()
self.pdns = set()
self.urls = defaultdict(_Url)
self.addresses = defaultdict(_Address)
self.pdns = set()
def __getstate__(self):
state = self.__dict__.copy()
state["addresses"] = [[x for x in (key, self.addresses[key].__dict__)] for key in self.addresses]
state["urls"] = [[x for x in (key, self.urls[key].__dict__)] for key in self.urls]
return state
def __setstate__(self, state):
for tup in state["addresses"]:
key, val = tup
self.addresses[key].__dict__ = val
del state["addresses"]
for tup in state["urls"]:
key, val = tup
self.urls[key].__dict__ = val
del state["urls"]
for key in state:
self.__dict__[key] = state[key]
return
class _Url(set):
""" Unikatni navstivena url """
def __str__(self):
return str(self.__dict__)
""" A unique URL visited """
def __init__(self):
super().__init__()
# paths to files with a suspicious code, run by the inspected page
self.spyfiles = []
# paths to files with source codes.
# Both 'http://example.com/index.htm' and 'https://example.com/index.htm' are under example.com domain.
self.sourcefiles = []