Commit cf2a18b0 authored by Marek Vavrusa's avatar Marek Vavrusa

modules/http: graphs, prometheus metrics, websocks

* http embeds modified lua-http server code that
  reuses single cqueue for all h2 client sockets,
  this is also because the API in upstream is unstable
* http embeds rickshaw for real-time graphs over
  websockets, it displays latency heatmap by default
  and can show several other metrics
* http shows a world map with pinned recently contacted
  authoritatives, where diameter represents number
  of queries sent and colour its average RTT, so
  you can see where the queries are going
* http now exports several endpoints and websockets:
  /stats for statistics in JSON, and /metrics for
  metrics in Prometheus text format
parent 7ed94014
......@@ -10,6 +10,17 @@ min = minute
hour = 60 * minute
day = 24 * hour
-- Logging
function panic(fmt, ...)
error(string.format('error: '..fmt, ...))
end
function warn(fmt, ...)
io.stderr:write(string.format(fmt..'\n', ...))
end
function log(fmt, ...)
print(string.format(fmt, ...))
end
-- Resolver bindings
kres = require('kres')
trust_anchors = require('trust_anchors')
......
......@@ -28,6 +28,7 @@ for starters?
http = {
host = 'localhost',
port = 8053,
geoip = 'GeoLite2-City.mmdb' -- Optional
}
}
......@@ -71,6 +72,40 @@ the outputs of following:
openssl req -new -key mykey.key -out csr.pem
openssl req -x509 -days 90 -key mykey.key -in csr.pem -out mycert.crt
Built-in services
^^^^^^^^^^^^^^^^^
The HTTP module has several built-in services to use.
.. csv-table::
:header: "Endpoint", "Service", "Description"
"``/stats``", "Statistics/metrics", "Exported metrics in JSON."
"``/metrics``", "Prometheus metrics", "Exported metrics for Prometheus_"
"``/feed``", "Most frequent queries", "List of most frequent queries in JSON."
Enabling Prometheus metrics endpoint
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The module exposes ``/metrics`` endpoint that serves internal metrics in Prometheus_ text format.
You can use it out of the box:
.. code-block:: bash
$ curl -k https://localhost:8053/metrics | tail
# TYPE latency histogram
latency_bucket{le=10} 2.000000
latency_bucket{le=50} 2.000000
latency_bucket{le=100} 2.000000
latency_bucket{le=250} 2.000000
latency_bucket{le=500} 2.000000
latency_bucket{le=1000} 2.000000
latency_bucket{le=1500} 2.000000
latency_bucket{le=+Inf} 2.000000
latency_count 2.000000
latency_sum 11.000000
How to expose services over HTTP
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
......@@ -142,3 +177,9 @@ Dependencies
* `lua-http <https://github.com/daurnimator/lua-http>`_ available in LuaRocks
``$ luarocks install --server=http://luarocks.org/dev http``
* `mmdblua <https://github.com/daurnimator/mmdblua>`_ available in LuaRocks
``$ luarocks install --server=http://luarocks.org/dev mmdblua``
.. _Prometheus: https://prometheus.io
\ No newline at end of file
......@@ -8,6 +8,7 @@ local server = require('http.server')
local headers = require('http.headers')
local websocket = require('http.websocket')
local x509, pkey = require('openssl.x509'), require('openssl.pkey')
local has_mmdb, mmdb = pcall(require, 'mmdb')
-- Module declaration
local cq = cqueues.new()
......@@ -31,89 +32,54 @@ local function pgload(relpath)
fp:close()
-- Guess content type
local ext = relpath:match('[^\\.]+$')
return {'/'..relpath, mime_types[ext] or 'text', data}
return {'/'..relpath, mime_types[ext] or 'text', data, 86400}
end
-- Preloaded static assets
local pages = {
pgload('favicon.ico'),
pgload('rickshaw.min.css'),
pgload('kresd.js'),
pgload('datamaps.world.min.js'),
pgload('topojson.js'),
pgload('jquery.js'),
pgload('epoch.css'),
pgload('epoch.js'),
pgload('favicon.ico'),
pgload('rickshaw.min.js'),
pgload('d3.js'),
}
-- Serve preloaded root page
local function serve_root()
local _, mime_root, mime_data = unpack(pgload('main.tpl'))
mime_data = mime_data
local data = pgload('main.tpl')[3]
data = data
:gsub('{{ title }}', 'kresd @ '..hostname())
:gsub('{{ host }}', hostname())
return function (h, stream)
-- Return index page
-- Render snippets
local rsnippets = {}
for _,v in pairs(M.snippets) do
table.insert(rsnippets, string.format('<h2>%s</h2>\n%s', v[1], v[2]))
end
local data = mime_data
-- Return index page
return data
:gsub('{{ secure }}', stream:checktls() and 'true' or 'false')
:gsub('{{ snippets }}', table.concat(rsnippets, '\n'))
local hsend = headers.new()
hsend:append(':status', '200')
hsend:append('content/type', mime_root)
assert(stream:write_headers(hsend, false))
assert(stream:write_chunk(data, true))
-- Push assets
-- local path, mime, data = unpack(pages[1])
-- local hpush = headers.new()
-- hpush:append(':scheme', h:get(':scheme'))
-- hpush:append(':method', 'GET')
-- hpush:append(':authority', h:get(':authority'))
-- hpush:append(':path', path)
-- local nstream = stream:push_promise(hpush)
-- hpush = headers.new()
-- hpush:append(':status', '200')
-- hpush:append('content/type', mime)
-- print('pushing', path)
-- assert(nstream:write_headers(hpush, false))
-- assert(nstream:write_chunk(data, true))
-- Do not send anything else
return false
end
end
-- Load dependent modules
if not stats then modules.load('stats') end
-- Function to sort frequency list
local function stream_stats(h, ws)
local ok, prev = true, stats.list()
while ok do
-- Get current snapshot
local cur, stats_dt = stats.list(), {}
for k,v in pairs(cur) do
stats_dt[k] = v - (prev[k] or 0)
end
prev = cur
-- Publish stats updates periodically
local push = tojson({stats=stats_dt})
ok = ws:send(push)
cqueues.sleep(0.5)
end
ws:close()
end
-- Export HTTP service endpoints
M.endpoints = {
['/'] = {'text/html', serve_root()},
['/stats'] = {'application/json', stats.list, stream_stats},
['/feed'] = {'application/json', stats.frequent},
}
-- Export static pages
for _, pg in ipairs(pages) do
local path, mime, data = unpack(pg)
M.endpoints[path] = {mime, data}
local path, mime, data, ttl = unpack(pg)
M.endpoints[path] = {mime, data, nil, ttl}
end
-- Export built-in prometheus interface
for k, v in pairs(require('prometheus')) do
M.endpoints[k] = v
end
-- Export HTTP service page snippets
......@@ -140,11 +106,15 @@ local function serve_get(h, stream)
if type(data) == 'table' then data = tojson(data) end
if not mime or type(data) ~= 'string' then
hsend:append(':status', '404')
assert(stream:write_headers(hsend, false))
assert(stream:write_headers(hsend, true))
else
-- Serve content type appropriately
hsend:append(':status', '200')
hsend:append('content/type', mime)
hsend:append('content-type', mime)
local ttl = entry and entry[4]
if ttl then
hsend:append('cache-control', string.format('max-age=%d', ttl))
end
assert(stream:write_headers(hsend, false))
assert(stream:write_chunk(data, true))
end
......@@ -174,6 +144,7 @@ local function route(endpoints)
if cb then
cb(h, ws)
end
ws:close()
return
-- Handle HTTP method appropriately
elseif m == 'GET' then
......@@ -182,13 +153,9 @@ local function route(endpoints)
-- Method is not supported
local hsend = headers.new()
hsend:append(':status', '500')
assert(stream:write_headers(hsend, false))
assert(stream:write_headers(hsend, true))
end
stream:shutdown()
-- Close multiplexed HTTP/2 connection only when empty
if connection.version < 2 or connection.new_streams:length() == 0 then
connection:shutdown()
end
end
end
......@@ -282,18 +249,18 @@ function M.interface(host, port, endpoints, crtfile, keyfile)
end
-- Check loaded certificate
if not crt or not key then
error(string.format('failed to load certificate "%s" - %s', crtfile, err or 'error'))
panic('failed to load certificate "%s" - %s', crtfile, err or 'error')
end
end
-- Create TLS context and start listening
local s, err = server.listen {
host = host,
port = port,
tls = crt ~= nil,
client_timeout = 5,
ctx = crt and tlscontext(crt, key),
}
if not s then
error(string.format('failed to listen on %s#%d: %s', host, port, err))
panic('failed to listen on %s#%d: %s', host, port, err)
end
-- Compose server handler
local routes = route(endpoints)
......@@ -307,7 +274,7 @@ function M.interface(host, port, endpoints, crtfile, keyfile)
local _, expiry = crt:getLifetime()
expiry = math.max(0, expiry - (os.time() - 3 * 24 * 3600))
event.after(expiry, function (ev)
print('[http] refreshed ephemeral certificate')
log('[http] refreshed ephemeral certificate')
crt, key = updatecert(crtfile, keyfile)
s.ctx = tlscontext(crt, key)
end)
......@@ -321,21 +288,21 @@ function M.deinit()
end
-- @function Configure module
local ffi = require('ffi')
function M.config(conf)
conf = conf or {}
assert(type(conf) == 'table', 'config { host = "...", port = 443, cert = "...", key = "..." }')
-- Configure web interface for resolver
if not conf.port then conf.port = 8053 end
if not conf.host then conf.host = 'localhost' end
if conf.geoip and has_mmdb then M.geoip = mmdb.open(conf.geoip) end
M.interface(conf.host, conf.port, M.endpoints, conf.cert, conf.key)
-- TODO: configure DNS/HTTP(s) interface
if M.ev then return end
-- Schedule both I/O activity notification and timeouts
local poll_step
poll_step = function (ev, status, events)
poll_step = function ()
local ok, err, _, co = cq:step(0)
if not ok then print('[http]', err, debug.traceback(co)) end
if not ok then warn('[http] error: %s %s', err, debug.traceback(co)) end
-- Reschedule timeout or create new one
local timeout = cq:timeout()
if timeout then
......
http_SOURCES := http.lua
http_INSTALL := $(wildcard modules/http/static/*)
http_SOURCES := http.lua prometheus.lua
http_INSTALL := $(wildcard modules/http/static/*) \
modules/http/http/h2_connection.lua \
modules/http/http/server.lua
$(call make_lua_module,http)
The MIT License (MIT)
Copyright (c) 2015-2016 Daurnimator
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
\ No newline at end of file
Embedded unstable APIs from https://github.com/daurnimator/lua-http under MIT license, see LICENSE.
ChangeLog:
* Marek Vavrusa <marek@vavrusa.com>:
- Modified h2_connection to reuse current cqueue context
\ No newline at end of file
local cqueues = require "cqueues"
local monotime = cqueues.monotime
local cc = require "cqueues.condition"
local ce = require "cqueues.errno"
local rand = require "openssl.rand"
local new_fifo = require "fifo"
local band = require "http.bit".band
local h2_error = require "http.h2_error"
local h2_stream = require "http.h2_stream"
local hpack = require "http.hpack"
local h2_banned_ciphers = require "http.tls".banned_ciphers
local spack = string.pack or require "compat53.string".pack
local sunpack = string.unpack or require "compat53.string".unpack
local assert = assert
if _VERSION:match("%d+%.?%d*") < "5.3" then
assert = require "compat53.module".assert
end
local function xor(a, b)
return (a and b) or not (a or b)
end
local preface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
local default_settings = {
[0x1] = 4096; -- HEADER_TABLE_SIZE
[0x2] = true; -- ENABLE_PUSH
[0x3] = math.huge; -- MAX_CONCURRENT_STREAMS
[0x4] = 65535; -- INITIAL_WINDOW_SIZE
[0x5] = 16384; -- MAX_FRAME_SIZE
[0x6] = math.huge; -- MAX_HEADER_LIST_SIZE
}
local function merge_settings(new, old)
return {
[0x1] = new[0x1] or old[0x1];
[0x2] = new[0x2] or old[0x2];
[0x3] = new[0x3] or old[0x3];
[0x4] = new[0x4] or old[0x4];
[0x5] = new[0x5] or old[0x5];
[0x6] = new[0x6] or old[0x6];
}
end
local connection_methods = {}
local connection_mt = {
__name = "http.h2_connection";
__index = connection_methods;
}
function connection_mt:__tostring()
return string.format("http.h2_connection{type=%q}",
self.type)
end
local connection_main_loop
-- An 'onerror' that doesn't throw
local function onerror(socket, op, why, lvl) -- luacheck: ignore 212
if why == ce.EPIPE or why == ce.ETIMEDOUT then
return why
end
return string.format("%s: %s", op, ce.strerror(why)), why
end
-- Read bytes from the given socket looking for the http2 connection preface
-- optionally ungets the bytes in case of failure
local function socket_has_preface(socket, unget, timeout)
local deadline = timeout and (monotime()+timeout)
local bytes = ""
local is_h2 = true
while #bytes < #preface do
-- read *up to* number of bytes left in preface
local ok, err, errno = socket:xread(#bytes-#preface, deadline and (deadline-monotime()))
if ok == nil then
return nil, err or ce.EPIPE, errno
end
bytes = bytes .. ok
if bytes ~= preface:sub(1, #bytes) then
is_h2 = false
break
end
end
if unget then
local ok, errno = socket:unget(bytes)
if not ok then
return nil, onerror(socket, "unget", errno, 2)
end
end
return is_h2
end
local function new_connection(socket, conn_type, settings, timeout, cq)
local deadline = timeout and (monotime()+timeout)
cq = assert(cq or cqueues.running())
socket:setmode("b", "bf")
socket:setvbuf("full", math.huge) -- 'infinite' buffering; no write locks needed
socket:onerror(onerror)
local ssl = socket:checktls()
if ssl then
local cipher = ssl:getCipherInfo()
if h2_banned_ciphers[cipher.name] then
h2_error.errors.INADEQUATE_SECURITY("bad cipher: " .. cipher.name)
end
end
if conn_type == "client" then
local ok, err = socket:xwrite(preface, "f", timeout)
if ok == nil then return nil, err end
elseif conn_type == "server" then
local ok, err = socket_has_preface(socket, false, timeout)
if ok == nil then
return nil, err
end
if not ok then
h2_error.errors.PROTOCOL_ERROR("invalid connection preface. not an http2 client?")
end
else
error('invalid connection type. must be "client" or "server"')
end
settings = settings or {}
local self = setmetatable({
socket = socket;
type = conn_type;
version = 2; -- for compat with h1_connection
streams = setmetatable({}, {__mode="kv"});
stream0 = nil; -- store separately with a strong reference
need_continuation = nil; -- stream
highest_odd_stream = -1;
highest_even_stream = -2;
recv_goaway_lowest = nil;
recv_goaway = cc.new();
new_streams = new_fifo();
new_streams_cond = cc.new();
peer_settings = default_settings;
peer_settings_cond = cc.new(); -- signaled when the peer has changed their settings
acked_settings = default_settings;
send_settings = {n = 0};
send_settings_ack_cond = cc.new(); -- for when server ACKs our settings
send_settings_acked = 0;
peer_flow_credits = 65535; -- 5.2.1
peer_flow_credits_increase = cc.new();
encoding_context = nil;
decoding_context = nil;
pongs = {}; -- pending pings we've sent. keyed by opaque 8 byte payload
}, connection_mt)
self:new_stream(0)
self.encoding_context = hpack.new(default_settings[0x1])
self.decoding_context = hpack.new(default_settings[0x1])
self.cq = cq:wrap(connection_main_loop, self)
do -- send settings frame + wait for reply to complete connection
local ok, err = self:settings(settings, deadline and (deadline-monotime()))
if not ok then
return nil, err
end
end
return self
end
function connection_methods:pollfd()
return self.socket:pollfd()
end
function connection_methods:events()
return self.socket:events()
end
function connection_methods:timeout()
if not self:empty() then
return 0
end
end
function connection_main_loop(self)
while not self.socket:eof("r") do
local typ, flag, streamid, payload = self:read_http2_frame()
if typ == nil then
if flag == nil then -- EOF
self.socket:close()
break
else
error(flag)
end
end
local handler = h2_stream.frame_handlers[typ]
-- http2 spec section 4.1:
-- Implementations MUST ignore and discard any frame that has a type that is unknown.
if handler then
local stream = self.streams[streamid]
if stream == nil then
if xor(streamid % 2 == 1, self.type == "client") then
h2_error.errors.PROTOCOL_ERROR("Streams initiated by a client MUST use odd-numbered stream identifiers; those initiated by the server MUST use even-numbered stream identifiers")
end
-- TODO: check MAX_CONCURRENT_STREAMS
stream = self:new_stream(streamid)
self.new_streams:push(stream)
self.new_streams_cond:signal(1)
end
local ok, err = handler(stream, flag, payload)
if not ok then
if h2_error.is(err) and err.stream_error then
if not stream:write_rst_stream(err.code) then
error(err)
end
else -- connection error or unknown error
error(err)
end
end
end
end
return true
end
local function handle_step_return(self, step_ok, last_err, errno)
if step_ok then
return true
else
if not self.socket:eof("w") then
local code, message
if step_ok then
code = h2_error.errors.NO_ERROR.code
elseif h2_error.is(last_err) then
code = last_err.code
message = last_err.message
else
code = h2_error.errors.INTERNAL_ERROR.code
end
-- ignore write failure here; there's nothing that can be done
self:write_goaway_frame(nil, code, message)
end
self:shutdown()
return nil, last_err, errno
end
end
function connection_methods:checktls()
return self.socket:checktls()
end
function connection_methods:localname()
return self.socket:localname()
end
function connection_methods:peername()
return self.socket:peername()
end
function connection_methods:shutdown()
local ok, err = self:write_goaway_frame(nil, h2_error.errors.NO_ERROR.code, "connection closed")
if not ok and err == ce.EPIPE then
-- other end already closed
ok, err = true, nil
end
for _, stream in pairs(self.streams) do
stream:shutdown()
end
self.socket:shutdown("r")
return ok, err
end
function connection_methods:close()
local ok, err = self:shutdown()
cqueues.poll()
cqueues.poll()
self.socket:close()
return ok, err
end
function connection_methods:new_stream(id)
if id then
assert(id % 1 == 0)
else
if self.recv_goaway_lowest then
h2_error.errors.PROTOCOL_ERROR("Receivers of a GOAWAY frame MUST NOT open additional streams on the connection")
end
if self.type == "client" then
-- Pick next free odd number
id = self.highest_odd_stream + 2
else
-- Pick next free odd number
id = self.highest_even_stream + 2
end
-- TODO: check MAX_CONCURRENT_STREAMS
end
assert(self.streams[id] == nil, "stream id already in use")
assert(id < 2^32, "stream id too large")
if id % 2 == 0 then
assert(id > self.highest_even_stream, "stream id too small")
self.highest_even_stream = id
else
assert(id > self.highest_odd_stream, "stream id too small")
self.highest_odd_stream = id
end
local stream = h2_stream.new(self, id)
if id == 0 then
self.stream0 = stream
else
-- Add dependency on stream 0. http2 spec, 5.3.1
self.stream0:reprioritise(stream)
end
self.streams[id] = stream
return stream
end
-- this function *should never throw*
function connection_methods:get_next_incoming_stream(timeout)
local deadline = timeout and (monotime()+timeout)
while self.new_streams:length() == 0 do
if self.socket:eof('r') or self.recv_goaway_lowest then
-- TODO? clarification required: can the sender of a GOAWAY subsequently start streams?
-- (with a lower stream id than they sent in the GOAWAY)
-- For now, assume not.
return nil, ce.EPIPE
end
local which = cqueues.poll(self.socket, self.new_streams_cond, self.recv_goaway, timeout)
if which == timeout then
return nil, ce.ETIMEDOUT
end
timeout = deadline and (deadline-monotime())
end
local stream = self.new_streams:pop()
return stream
end
-- On success, returns type, flags, stream id and payload
-- On timeout, returns nil, ETIMEDOUT -- safe to retry
-- If the socket has been shutdown for reading, and there is no data left unread, returns EPIPE
-- Will raise an error on other errors, or if the frame is invalid
function connection_methods:read_http2_frame(timeout)
local deadline = timeout and (monotime()+timeout)
local frame_header, err, errno = self.socket:xread(9, timeout)
if frame_header == nil then
if err == ce.ETIMEDOUT then
return nil, err
elseif err == nil --[[EPIPE]] and self.socket:eof("r") then
return nil
else
return nil, err, errno
end
end
local size, typ, flags, streamid = sunpack(">I3 B B I4", frame_header)
if size > self.acked_settings[0x5] then
return nil, h2_error.errors.FRAME_SIZE_ERROR:new_traceback("frame too large")
end
-- reserved bit MUST be ignored by receivers
streamid = band(streamid, 0x7fffffff)
local payload, err2, errno2 = self.socket:xread(size, deadline and (deadline-monotime()))
if payload == nil then
if err2 == ce.ETIMEDOUT then
-- put frame header back into socket so a retry will work
local ok, errno3 = self.socket:unget(frame_header)
if not ok then
return nil, onerror(self.socket, "unget", errno3, 2)
end
end
return nil, err2, errno2
end
return typ, flags, streamid, payload
end
-- If this times out, it was the flushing; not the write itself
-- hence it's not always total failure.
-- It's up to the caller to take some action (e.g. closing) rather than doing it here
-- TODO: distinguish between nothing sent and partially sent?
function connection_methods:write_http2_frame(typ, flags, streamid, payload, timeout)
local deadline = timeout and (monotime()+timeout)
if #payload > self.peer_settings[0x5] then
return nil, h2_error.errors.FRAME_SIZE_ERROR:new_traceback("frame too large")
end
local header = spack(">I3 B B I4", #payload, typ, flags, streamid)
local ok, err, errno = self.socket:xwrite(header, "f", timeout)
if not ok then
return nil, err, errno
end
return self.socket:xwrite(payload, "n", deadline and (deadline-monotime()))
end
function connection_methods:ping(timeout)
local deadline = timeout and (monotime()+timeout)
local payload
-- generate a random, unique payload
repeat -- keep generating until we don't have a collision
payload = rand.bytes(8)
until self.pongs[payload] == nil
local cond = cc.new()
self.pongs[payload] = cond
assert(self.stream0:write_ping_frame(false, payload, timeout))
while self.pongs[payload] do
timeout = deadline and (deadline-monotime())
local which = cqueues.poll(self, cond, timeout)
if which == self then
local ok, err, errno = self:step(