Commit 85b4355c authored by Marek Vavrusa's avatar Marek Vavrusa

modules: http, graphite, policy, daf support map()

all relevant modules now support running in
forked mode and polling workers for information.
for example graphite module can poll stats from
all workers and then aggregate before sending,
or HTTP module can run on the process group leader
only and then poll workers for information.
parent 93303da5
......@@ -271,12 +271,12 @@ $(function() {
}
/* Rule builder submit */
$('#daf-add').click(function () {
const form = $('#daf-builder-form');
const form = $('#daf-builder-form').parent();
if (dafBuilder.items.length == 0 || form.hasClass('has-error')) {
return;
}
/* Clear previous errors and resubmit. */
form.find('.alert').remove();
form.parent().find('.alert').remove();
$.post('daf', dafBuilder.items.join(' '))
.done(function (data) {
dafBuilder.clear();
......@@ -284,7 +284,7 @@ $(function() {
})
.fail(function (data) {
const reason = data.responseText.length > 0 ? data.responseText : 'internal error';
form.append(
form.after(
'<div class="alert alert-danger" role="alert">'+
'Couldn\'t add rule (code: '+data.status+', reason: '+reason+').'+
'</div>'
......
local cqueues = require('cqueues')
-- Load dependent modules
if not view then modules.load('view') end
if not policy then modules.load('policy') end
......@@ -60,7 +58,8 @@ local filters = {
end,
}
local function parse_filter(tok, g)
local function parse_filter(tok, g, prev)
if not tok then error(string.format('expected filter after "%s"', prev)) end
local filter = filters[tok:lower()]
if not filter then error(string.format('invalid filter "%s"', tok)) end
return filter(g)
......@@ -77,11 +76,11 @@ local function parse_rule(g)
-- or terminate filter chain and return
tok = g()
while tok do
if tok == 'AND' then
local fa, fb = f, parse_filter(g(), g)
if tok:lower() == 'and' then
local fa, fb = f, parse_filter(g(), g, tok)
f = function (req, qry) return fa(req, qry) and fb(req, qry) end
elseif tok == 'OR' then
local fa, fb = f, parse_filter(g(), g)
elseif tok:lower() == 'or' then
local fa, fb = f, parse_filter(g(), g, tok)
f = function (req, qry) return fa(req, qry) or fb(req, qry) end
else
break
......@@ -131,7 +130,7 @@ local M = {
-- @function Cleanup module
function M.deinit()
if http then
if http and http.endpoints then
http.endpoints['/daf'] = nil
http.endpoints['/daf.js'] = nil
http.snippets['/daf'] = nil
......@@ -140,6 +139,10 @@ end
-- @function Add rule
function M.add(rule)
-- Ignore duplicates
for _, r in ipairs(M.rules) do
if r.info == rule then return r end
end
local id, action, filter = compile(rule)
if not id then error(action) end
-- Combine filter and action into policy
......@@ -202,6 +205,15 @@ function M.enable(id, val)
return M.toggle(id, true)
end
local function consensus(op, ...)
local ret = true
local results = map(string.format(op, ...))
for _, r in ipairs(results) do
ret = ret and r
end
return ret
end
-- @function Public-facing API
local function api(h, stream)
local m = h:get(':method')
......@@ -227,7 +239,7 @@ local function api(h, stream)
local path = h:get(':path')
local id = tonumber(path:match '/([^/]*)$')
if id then
if M.del(id) then
if consensus('daf.del "%s"', id) then
return tojson(true)
end
return 404, '"No such rule"' -- Not found
......@@ -237,8 +249,10 @@ local function api(h, stream)
elseif m == 'POST' then
local query = stream:get_body_as_string()
if query then
local ok, r, err = pcall(M.add, query)
if not ok then return 500, string.format('"%s"', r) end
local ok, r = pcall(M.add, query)
if not ok then return 500, string.format('"%s"', r:match('/([^/]+)$')) end
-- Dispatch to all other workers
consensus('daf.add "%s"', query)
return rule_info(r)
end
return 400
......@@ -252,7 +266,7 @@ local function api(h, stream)
end
-- We do not support more actions
if action == 'active' then
if M.toggle(id, val == 'true') then
if consensus('daf.toggle(%d, %s)', id, val == 'true' or 'false') then
return tojson(true)
else
return 404, '"No such rule"'
......@@ -263,23 +277,39 @@ local function api(h, stream)
end
end
local function getmatches()
local update = {}
for _, rules in ipairs(map 'daf.rules') do
for _, r in ipairs(rules) do
local id = tostring(r.rule.id)
-- Must have string keys for JSON object and not an array
update[id] = (update[id] or 0) + r.rule.count
end
end
return update
end
-- @function Publish DAF statistics
local function publish(h, ws)
local ok, counters = true, {}
local cqueues = require('cqueues')
local ok, last = true, nil
while ok do
-- Check if we have new rule matches
local update = {}
for _, r in ipairs(M.rules) do
local id = r.rule.id
if counters[id] ~= r.rule.count then
-- Must have string keys for JSON object and not an array
update[tostring(id)] = r.rule.count
counters[id] = r.rule.count
local diff = {}
local has_update, update = pcall(getmatches)
if has_update then
if last then
for id, count in pairs(update) do
if not last[id] or last[id] < count then
diff[id] = count
end
end
end
last = update
end
-- Update counters when there is a new data
if next(update) ~= nil then
ok = ws:send(tojson(update))
if next(diff) ~= nil then
ok = ws:send(tojson(diff))
else
ok = ws:send_ping()
end
......@@ -289,7 +319,7 @@ end
-- @function Configure module
function M.config(conf)
if not http then error('"http" module is not loaded, cannot load DAF') end
if not http or not http.endpoints then return end
-- Export API and data publisher
http.endpoints['/daf.js'] = http.page('daf.js', 'daf')
http.endpoints['/daf'] = {'application/json', api, publish}
......@@ -301,13 +331,17 @@ function M.config(conf)
<div class="col-md-11">
<input type="text" id="daf-builder" class="form-control" aria-label="..." />
</div>
<button type="button" id="daf-add" class="btn btn-default btn-sm">Add</button>
<div class="col-md-1">
<button type="button" id="daf-add" class="btn btn-default btn-sm">Add</button>
</div>
</form>
</div>
<div class="row">
<table id="daf-rules" class="table table-striped table-responsive">
<th><td>No rules here yet.</td></th>
</table>
<div class="col-md-12">
<table id="daf-rules" class="table table-striped table-responsive">
<th><td>No rules here yet.</td></th>
</table>
</div>
</div>
]]}
end
......
--- @module graphite
local graphite = {}
-- Load dependent modules
if not stats then modules.load('stats') end
-- This is leader-only module
if worker.id > 0 then return {} end
local M = {}
local socket = require('socket')
-- Create connected UDP socket
......@@ -38,6 +42,16 @@ local function make_tcp(host, port)
return s
end
local function merge(results)
local t = {}
for _, result in ipairs(results) do
for k, v in pairs(result) do
t[k] = (t[k] or 0) + v
end
end
return t
end
-- Send the metrics in a table to multiple Graphite consumers
local function publish_table(metrics, prefix, now)
for key,val in pairs(metrics) do
......@@ -45,16 +59,16 @@ local function publish_table(metrics, prefix, now)
if prefix then
msg = prefix..'.'..msg
end
for i in ipairs(graphite.cli) do
local ok, err = graphite.cli[i]:send(msg)
for i in ipairs(M.cli) do
local ok, err = M.cli[i]:send(msg)
if not ok then
-- Best-effort reconnect once per two tries
local tcp = graphite.cli[i]['connect'] ~= nil
local host = graphite.info[i]
if tcp and host.seen + 2 * graphite.interval / 1000 <= now then
local tcp = M.cli[i]['connect'] ~= nil
local host = M.info[i]
if tcp and host.seen + 2 * M.interval / 1000 <= now then
print(string.format('[graphite] reconnecting: %s#%d reason: %s',
host.addr, host.port, err))
graphite.cli[i] = make_tcp(host.addr, host.port)
M.cli[i] = make_tcp(host.addr, host.port)
host.seen = now
end
end
......@@ -62,56 +76,49 @@ local function publish_table(metrics, prefix, now)
end
end
function graphite.init(module)
graphite.ev = nil
graphite.cli = {}
graphite.info = {}
graphite.interval = 5 * sec
graphite.prefix = 'kresd.' .. hostname()
function M.init(module)
M.ev = nil
M.cli = {}
M.info = {}
M.interval = 5 * sec
M.prefix = 'kresd.' .. hostname()
return 0
end
function graphite.deinit(module)
if graphite.ev then event.cancel(graphite.ev) end
function M.deinit(module)
if M.ev then event.cancel(M.ev) end
return 0
end
-- @function Publish results to the Graphite server(s)
function graphite.publish()
function M.publish()
local now = os.time()
-- Publish built-in statistics
if not graphite.cli then error("no graphite server configured") end
publish_table(cache.stats(), graphite.prefix..'.cache', now)
publish_table(worker.stats(), graphite.prefix..'.worker', now)
if not M.cli then error("no graphite server configured") end
publish_table(merge(map 'cache.stats()'), M.prefix..'.cache', now)
publish_table(merge(map 'worker.stats()'), M.prefix..'.worker', now)
-- Publish extended statistics if available
if not stats then
return 0
end
local now_metrics = stats.list()
if type(now_metrics) ~= 'table' then
return 0 -- No metrics to watch
end
publish_table(now_metrics, graphite.prefix, now)
publish_table(merge(map 'stats.list()'), M.prefix, now)
return 0
end
-- @function Make connection to Graphite server.
function graphite.add_server(graphite, host, port, tcp)
function M.add_server(graphite, host, port, tcp)
local s, err = tcp and make_tcp(host, port) or make_udp(host, port)
if not s then
error(err)
end
table.insert(graphite.cli, s)
table.insert(graphite.info, {addr = host, port = port, seen = 0})
table.insert(M.cli, s)
table.insert(M.info, {addr = host, port = port, seen = 0})
return 0
end
function graphite.config(conf)
function M.config(conf)
-- config defaults
if not conf then return 0 end
if not conf.port then conf.port = 2003 end
if conf.interval then graphite.interval = conf.interval end
if conf.prefix then graphite.prefix = conf.prefix end
if conf.interval then M.interval = conf.interval end
if conf.prefix then M.prefix = conf.prefix end
-- connect to host(s)
if type(conf.host) == 'table' then
for key, val in pairs(conf.host) do
......@@ -121,9 +128,9 @@ function graphite.config(conf)
graphite:add_server(conf.host, conf.port, conf.tcp)
end
-- start publishing stats
if graphite.ev then event.cancel(graphite.ev) end
graphite.ev = event.recurrent(graphite.interval, graphite.publish)
if M.ev then event.cancel(M.ev) end
M.ev = event.recurrent(M.interval, M.publish)
return 0
end
return graphite
return M
-- Load dependent modules
if not stats then modules.load('stats') end
-- This is leader-only module
if worker.id > 0 then return {} end
-- This is a module that does the heavy lifting to provide an HTTP/2 enabled
-- server that supports TLS by default and provides endpoint for other modules
-- in order to enable them to export restful APIs and websocket streams.
......@@ -318,40 +324,40 @@ end
-- @function Configure module
function M.config(conf)
conf = conf or {}
assert(type(conf) == 'table', 'config { host = "...", port = 443, cert = "...", key = "..." }')
-- Configure web interface for resolver
if not conf.port then conf.port = 8053 end
if not conf.host then conf.host = 'localhost' end
if conf.geoip and has_mmdb then M.geoip = mmdb.open(conf.geoip) end
M.interface(conf.host, conf.port, M.endpoints, conf.cert, conf.key)
-- TODO: configure DNS/HTTP(s) interface
if M.ev then return end
-- Schedule both I/O activity notification and timeouts
local poll_step
poll_step = function ()
local ok, err, _, co = cq:step(0)
if not ok then warn('[http] error: %s %s', err, debug.traceback(co)) end
-- Reschedule timeout or create new one
local timeout = cq:timeout()
if timeout then
-- Throttle web requests
if timeout == 0 then timeout = 0.001 end
-- Convert from seconds to duration
timeout = timeout * sec
if not M.timeout then
M.timeout = event.after(timeout, poll_step)
else
event.reschedule(M.timeout, timeout)
end
else -- Cancel running timeout when there is no next deadline
if M.timeout then
event.cancel(M.timeout)
M.timeout = nil
end
if conf == true then conf = {} end
assert(type(conf) == 'table', 'config { host = "...", port = 443, cert = "...", key = "..." }')
-- Configure web interface for resolver
if not conf.port then conf.port = 8053 end
if not conf.host then conf.host = 'localhost' end
if conf.geoip and has_mmdb then M.geoip = mmdb.open(conf.geoip) end
M.interface(conf.host, conf.port, M.endpoints, conf.cert, conf.key)
-- TODO: configure DNS/HTTP(s) interface
if M.ev then return end
-- Schedule both I/O activity notification and timeouts
local poll_step
poll_step = function ()
local ok, err, _, co = cq:step(0)
if not ok then warn('[http] error: %s %s', err, debug.traceback(co)) end
-- Reschedule timeout or create new one
local timeout = cq:timeout()
if timeout then
-- Throttle web requests
if timeout == 0 then timeout = 0.001 end
-- Convert from seconds to duration
timeout = timeout * sec
if not M.timeout then
M.timeout = event.after(timeout, poll_step)
else
event.reschedule(M.timeout, timeout)
end
else -- Cancel running timeout when there is no next deadline
if M.timeout then
event.cancel(M.timeout)
M.timeout = nil
end
end
M.ev = event.socket(cq:pollfd(), poll_step)
end
M.ev = event.socket(cq:pollfd(), poll_step)
end
return M
......@@ -4,15 +4,25 @@ local snapshots, snapshots_count = {}, 120
-- Gauge metrics
local gauges = {
['worker.concurrent'] = true,
['worker.rss'] = true,
}
-- Load dependent modules
if not stats then modules.load('stats') end
local function merge(t, results, prefix)
for x, result in pairs(results) do
if type(result) == 'table' then
for k, v in pairs(result) do
local val = t[prefix..k]
t[prefix..k] = (val or 0) + v
end
end
end
end
local function getstats()
local t = stats.list()
for k,v in pairs(cache.stats()) do t['cache.'..k] = v end
for k,v in pairs(worker.stats()) do t['worker.'..k] = v end
local t = {}
merge(t, map 'stats.list()', '')
merge(t, map 'cache.stats()', 'cache.')
merge(t, map 'worker.stats()', 'worker.')
return t
end
......@@ -52,9 +62,16 @@ local function snapshot_start(h, ws)
end
end
end
-- Aggregate per-worker metrics
local wdata = {}
for i, info in pairs(map 'worker.info()') do
if type(info) == 'table' then
wdata[tostring(info.pid)] = {rss=info.rss, usertime=info.usertime, systime=info.systime, pagefaults=info.pagefaults, queries=info.queries}
end
end
-- Publish stats updates periodically
if not is_empty then
local update = {time=os.time(), stats=stats_dt, upstreams=upstreams or {}}
local update = {time=os.time(), stats=stats_dt, upstreams=upstreams, workers=wdata}
table.insert(snapshots, update)
if #snapshots > snapshots_count then
table.remove(snapshots, 1)
......
......@@ -29,3 +29,11 @@ body {
border-color: #4cae4c !important;
color: #fff !important;
}
.spark {
display: inline-block;
}
.spark-legend {
display: inline-block;
}
\ No newline at end of file
......@@ -44,7 +44,7 @@ $(function() {
/* Render other interesting metrics as lines (hidden by default) */
var data = [];
var last_metric = 15;
var last_metric = 17;
var metrics = {
'answer.noerror': [0, 'NOERROR', null, 'By RCODE'],
'answer.nodata': [1, 'NODATA', null, 'By RCODE'],
......@@ -62,6 +62,8 @@ $(function() {
'worker.concurrent': [13, 'Queries outstanding'],
'worker.queries': [14, 'Queries received/s'],
'worker.dropped': [15, 'Queries dropped'],
'worker.usertime': [16, 'CPU (user)', null, 'Workers'],
'worker.systime': [17, 'CPU (sys)', null, 'Workers'],
};
/* Render latency metrics as sort of a heatmap */
......@@ -182,7 +184,10 @@ $(function() {
var bubblemap = {};
function pushUpstreams(resp) {
if (resp == null) {
$('#map-container').hide();
return;
} else {
$('#map-container').show();
}
/* Get current maximum number of queries for bubble diameter adjustment */
var maxQueries = 1;
......@@ -235,6 +240,90 @@ $(function() {
age = age + 1;
}
/* Per-worker information */
function updateRate(x, y, dt) {
return (100.0 * ((x - y) / dt)).toFixed(1);
}
function updateWorker(row, next, data, timestamp, buffer) {
const dt = timestamp - data.timestamp;
const cell = row.find('td');
/* Update spark lines and CPU times first */
if (dt > 0.0) {
const utimeRate = updateRate(next.usertime, data.last.usertime, dt);
const stimeRate = updateRate(next.systime, data.last.systime, dt);
cell.eq(1).find('span').text(utimeRate + '% / ' + stimeRate + '%');
/* Update sparkline graph */
data.data.push([new Date(timestamp * 1000), utimeRate, stimeRate]);
if (data.data.length > 60) {
data.data.shift();
}
if (!buffer) {
data.graph.updateOptions( { 'file': data.data } );
}
}
/* Update other fields */
if (!buffer) {
cell.eq(2).text(formatNumber(next.rss) + 'B');
cell.eq(3).text(next.pagefaults);
cell.eq(4).text('Healthy').addClass('text-success');
}
}
var workerData = {};
function pushWorkers(resp, timestamp, buffer) {
if (resp == null) {
return;
}
const workerTable = $('#workers');
for (var pid in resp) {
var row = workerTable.find('tr[data-pid='+pid+']');
if (row.length == 0) {
row = workerTable.append(
'<tr data-pid='+pid+'><td>'+pid+'</td>'+
'<td><div class="spark" id="spark-'+pid+'" /><span /></td><td></td><td></td><td></td>'+
'</tr>');
/* Create sparkline visualisation */
const spark = row.find('#spark-'+pid);
spark.css({'margin-right': '1em', width: '80px', height: '1.4em'});
workerData[pid] = {timestamp: timestamp, data: [[new Date(timestamp * 1000),0,0]], last: resp[pid]};
const workerGraph = new Dygraph(spark[0],
workerData[pid].data, {
valueRange: [0, 100],
legend: 'never',
axes : {
x : {
drawGrid: false,
drawAxis : false,
},
y : {
drawGrid: false,
drawAxis : false,
}
},
labels: ['x', '%user', '%sys'],
labelsDiv: '',
}
);
workerData[pid].graph = workerGraph;
}
updateWorker(row, resp[pid], workerData[pid], timestamp, buffer);
/* Track last datapoint */
workerData[pid].last = resp[pid];
workerData[pid].timestamp = timestamp;
}
/* Prune unhealthy PIDs */
if (!buffer) {
workerTable.find('tr').each(function () {
const e = $(this);
if (!(e.data('pid') in resp)) {
const healthCell = e.find('td').last();
healthCell.removeClass('text-success')
healthCell.text('Dead').addClass('text-danger');
}
});
}
}
/* WebSocket endpoints */
var wsStats = (secure ? 'wss://' : 'ws://') + location.host + '/stats';
var ws = new Socket(wsStats);
......@@ -244,14 +333,16 @@ $(function() {
if (data.length > 0) {
pushUpstreams(data[data.length - 1].upstreams);
}
/* Buffer datapoints and redraw last */
for (var i in data) {
pushMetrics(data[i].stats, data[i].time, true);
const is_last = (i == data.length - 1);
pushWorkers(data[i].workers, data[i].time, !is_last);
pushMetrics(data[i].stats, data[i].time, !is_last);
}
graph.updateOptions( { 'file': data } );
} else {
pushMetrics(data.stats, data.time);
pushUpstreams(data.upstreams);
pushWorkers(data.workers, data.time);
pushMetrics(data.stats, data.time);
}
};
});
\ No newline at end of file
......@@ -62,11 +62,24 @@
</div>
</div>
</div>
<div class="row">
<h3>Running workers</h3>
<div class="col-md-12">
<table id="workers" class="table table-responsive">
<tr>
<th>PID</th><th>CPU per-worker (user/sys)</th>
<th>RSS</th><th>Page faults</th><th>Status</th>
</tr>
</table>
</div>
</div>
</div>
<a name="worldmap"></a>
<h2 class="sub-header">Where do the queries go?</h2>
<div class="col-md-12">
<div id="map" style="position: relative;"></div>
<div class="row" id="map-container">
<a name="worldmap"></a>
<h2 class="sub-header">Where do the queries go?</h2>
<div class="col-md-12">
<div id="map" style="position: relative;"></div>
</div>
</div>
<div class="col-md-12">
{{ snippets }}
......
......@@ -175,12 +175,8 @@ local function rpz_zonefile(action, path)
end
-- RPZ policy set
function policy.rpz(action, path, format)
if format == 'lmdb' then
error('lmdb zone format is NYI')
else
return rpz_zonefile(action, path)
end
function policy.rpz(action, path)
return rpz_zonefile(action, path)
end
-- Evaluate packet in given rules to determine policy action
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment