Commit 8fa95978 authored by Marek Vavruša's avatar Marek Vavruša Committed by Marek Vavrusa

Implement worker coroutines for asynchronous background processing

This implements worker coroutines in Lua to perform non-blocking I/O and do many things concurrently.
For example a file watcher can be now implemented as:

```
  local watcher = notify.opendir('/etc')
  watcher:add('hosts')

  -- Watch changes to /etc/hosts
  worker.coroutine(function ()
    for flags, name in watcher:changes() do
      for flag in notify.flags(flags) do
        print(name, notify[flag])
      end
    end
  end)
```

In order to make this work, the runtime uses the cqueues library which
can run coroutines concurrently, and return a file descriptor to poll on
if it's blocked. The worker takes that file descriptor and calls
`event.socket(pollfd, resume_callback)` so that libuv can wake up
the worker when its ready again.

The cqueues library is still optional, but if it's not present following stuff
won't work:

* worker.coroutine()
* worker.sleep()
parent a3d38b58
......@@ -312,41 +312,26 @@ as a parameter, but it's not very useful as you don't have any *non-global* way
Another type of actionable event is activity on a file descriptor. This allows you to embed other
event loops or monitor open files and then fire a callback when an activity is detected.
This allows you to build persistent services like HTTP servers or monitoring probes that cooperate
well with the daemon internal operations.
well with the daemon internal operations. See :func:`event.socket()`
For example a simple web server that doesn't block:
* File watchers
.. code-block:: lua
This is possible with :func:`worker.coroutine()` and cqueues_, see the cqueues documentation for more information.
local server, headers = require 'http.server', require 'http.headers'
local cqueues = require 'cqueues'
-- Start socket server
local s = server.listen { host = 'localhost', port = 8080 }
assert(s:listen())
-- Compose per-request coroutine
local cq = cqueues.new()
cq:wrap(function()
s:run(function(stream)
-- Create response headers
local headers = headers.new()
headers:append(':status', '200')
headers:append('connection', 'close')
-- Send response and close connection
assert(stream:write_headers(headers, false))
assert(stream:write_chunk('OK', true))
stream:shutdown()
stream.connection:shutdown()
end)
s:close()
end)
-- Hook to socket watcher
event.socket(cq:pollfd(), function (ev, status, events)
cq:step(0)
end)
.. code-block:: lua
* File watchers
local notify = require('cqueues.notify')
local watcher = notify.opendir('/etc')
watcher:add('hosts')
.. note:: Work in progress, come back later!
-- Watch changes to /etc/hosts
worker.coroutine(function ()
for flags, name in watcher:changes() do
for flag in notify.flags(flags) do
print(name, notify[flag])
end
end
end)
.. _closures: https://www.lua.org/pil/6.1.html
......@@ -1044,12 +1029,39 @@ notifications for daemon.
end)
e.cancel(e)
Map over multiple forks
^^^^^^^^^^^^^^^^^^^^^^^
Asynchronous function execution
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The `event` package provides a very basic mean for non-blocking execution - it allows running code when activity on a file descriptor is detected, and when a certain amount of time passes. It doesn't however provide an easy to use abstraction for non-blocking I/O. This is instead exposed through the `worker` package (if `cqueues` Lua package is installed in the system).
.. function:: worker.coroutine(function)
Start a new coroutine with given function (closure). The function can do I/O or run timers without blocking the main thread. See cqueues_ for documentation of possible operations and synchronisation primitives. The main limitation is that you can't wait for a finish of a coroutine from processing layers, because it's not currently possible to suspend and resume execution of processing layers.
Example:
.. code-block:: lua
worker.coroutine(function ()
for i = 0, 10 do
print('executing', i)
worker.sleep(1)
end
end)
.. function:: worker.sleep(seconds)
Pause execution of current function (asynchronously if running inside a worker coroutine).
When daemon is running in forked mode, each process acts independently. This is good because it reduces software complexity and allows for runtime scaling, but not ideal because of additional operational burden.
For example, when you want to add a new policy, you'd need to add it to either put it in the configuration, or execute command on each process independently. The daemon simplifies this by promoting process group leader which is able to execute commands synchronously over forks.
Example:
.. code-block:: lua
worker.sleep(1)
.. function:: map(expr)
Run expression synchronously over all forks, results are returned as a table ordered as forks. Expression can be any valid expression in Lua.
......@@ -1168,5 +1180,6 @@ Example:
.. _LuaJIT: http://luajit.org/luajit.html
.. _luasec: https://luarocks.org/modules/brunoos/luasec
.. _luasocket: https://luarocks.org/modules/luarocks/luasocket
.. _cqueues: https://25thandclement.com/~william/projects/cqueues.html
.. _`real process managers`: http://blog.crocodoc.com/post/48703468992/process-managers-the-good-the-bad-and-the-ugly
.. _`systemd socket activation`: http://0pointer.de/blog/projects/socket-activation.html
......@@ -279,3 +279,77 @@ function table_print (tt, indent, done)
end
return result
end
--
-- This extends the worker module to allow asynchronous execution of functions and nonblocking I/O.
-- The current implementation combines cqueues for Lua interface, and event.socket() in order to not
-- block resolver engine while waiting for I/O or timers.
--
local has_cqueues, cqueues = pcall(require, 'cqueues')
if has_cqueues then
-- Export the asynchronous sleep function
worker.sleep = cqueues.sleep
-- Create metatable for workers to define the API
-- It can schedule multiple cqueues and yield execution when there's a wait for blocking I/O or timer
local asynchronous_worker_mt = {
work = function (self)
local ok, err, _, co = self.cq:step(0)
if not ok then
warn('[%s] error: %s %s', self.name or 'worker', err, debug.traceback(co))
end
-- Reschedule timeout or create new one
local timeout = self.cq:timeout()
if timeout then
-- Throttle timeouts to avoid too frequent wakeups
if timeout == 0 then timeout = 0.00001 end
-- Convert from seconds to duration
timeout = timeout * sec
if not self.next_timeout then
self.next_timeout = event.after(timeout, self.on_step)
else
event.reschedule(self.next_timeout, timeout)
end
else -- Cancel running timeout when there is no next deadline
if self.next_timeout then
event.cancel(self.next_timeout)
self.next_timeout = nil
end
end
end,
wrap = function (self, f)
self.cq:wrap(f)
end,
loop = function (self)
self.on_step = function () self:work() end
self.event_fd = event.socket(self.cq:pollfd(), self.on_step)
end,
close = function (self)
if self.event_fd then
event.cancel(self.event_fd)
self.event_fd = nil
end
end,
}
-- Implement the coroutine worker with cqueues
local function worker_new (name)
return setmetatable({name = name, cq = cqueues.new()}, { __index = asynchronous_worker_mt })
end
-- Create a default background worker
worker.bg_worker = worker_new('worker.background')
worker.bg_worker:loop()
-- Wrap a function for asynchronous execution
function worker.coroutine (f)
worker.bg_worker:wrap(f)
end
else
-- Disable asynchronous execution
local function disabled () error('cqueues are required for asynchronous execution') end
worker.sleep = disabled
worker.map = disabled
worker.coroutine = disabled
end
\ No newline at end of file
......@@ -296,7 +296,6 @@ end
-- @function Publish DAF statistics
local function publish(_, ws)
local cqueues = require('cqueues')
local ok, last = true, nil
while ok do
-- Check if we have new rule matches
......@@ -318,7 +317,7 @@ local function publish(_, ws)
else
ok = ws:send_ping()
end
cqueues.sleep(1)
worker.sleep(1)
end
end
......
......@@ -9,7 +9,6 @@ if worker.id > 0 then return {} end
-- in order to enable them to export restful APIs and websocket streams.
-- One example is statistics module that can stream live metrics on the website,
-- or publish metrics on request for Prometheus scraper.
local cqueues = require('cqueues')
local http_server = require('http.server')
local http_headers = require('http.headers')
local http_websocket = require('http.websocket')
......@@ -18,7 +17,6 @@ local x509, pkey = require('openssl.x509'), require('openssl.pkey')
local has_mmdb, mmdb = pcall(require, 'mmdb')
-- Module declaration
local cq = cqueues.new()
local M = {
servers = {},
}
......@@ -184,7 +182,7 @@ local function route(endpoints)
else
local ok, err, reason = http_util.yieldable_pcall(serve, h, stream)
if not ok or err then
if err ~= '404' then
if err ~= '404' and verbose() then
log('[http] %s %s: %s (%s)', m, path, err or '500', reason)
end
-- Method is not supported
......@@ -304,7 +302,7 @@ function M.interface(host, port, endpoints, crtfile, keyfile)
local routes = route(endpoints)
-- Create TLS context and start listening
local s, err = http_server.listen {
cq = cq,
cq = worker.bg_worker.cq,
host = host,
port = port,
client_timeout = 5,
......@@ -333,12 +331,11 @@ end
-- @function Init module
function M.init()
cq:wrap(prometheus.init)
worker.coroutine(prometheus.init)
end
-- @function Cleanup module
function M.deinit()
if M.ev then event.cancel(M.ev) end
for i, server in ipairs(M.servers) do
server:close()
M.servers[i] = nil
......@@ -346,17 +343,6 @@ function M.deinit()
prometheus.deinit()
end
-- @function Module runnable
function M.step(timeout)
local ok, err = cq:step(timeout)
return ok, err, cq:timeout()
end
-- @function Module pollable fd
function M.pollfd()
return cq:pollfd()
end
-- @function Configure module
function M.config(conf)
if conf == true then conf = {} end
......@@ -372,33 +358,6 @@ function M.config(conf)
end
end
M.interface(conf.host, conf.port, M.endpoints, conf.cert, conf.key)
-- TODO: configure DNS/HTTP(s) interface
if M.ev then return end
-- Schedule both I/O activity notification and timeouts
local poll_step
poll_step = function ()
local ok, err, _, co = cq:step(0)
if not ok then warn('[http] error: %s %s', err, debug.traceback(co)) end
-- Reschedule timeout or create new one
local timeout = cq:timeout()
if timeout then
-- Throttle web requests (at most 100000 req/s)
if timeout == 0 then timeout = 0.00001 end
-- Convert from seconds to duration
timeout = timeout * sec
if not M.timeout then
M.timeout = event.after(timeout, poll_step)
else
event.reschedule(M.timeout, timeout)
end
else -- Cancel running timeout when there is no next deadline
if M.timeout then
event.cancel(M.timeout)
M.timeout = nil
end
end
end
M.ev = event.socket(cq:pollfd(), poll_step)
end
return M
......@@ -3,13 +3,8 @@ local supports_http = pcall(require, 'http') and pcall(require, 'http.request')
if not supports_http then
pass('skipping http module test because its not installed')
done()
end
-- load dependencies
if supports_http then
local test_utils = require('test_utils')
else
local request = require('http.request')
local cqueues = require('cqueues')
-- setup resolver
modules = {
......@@ -24,19 +19,6 @@ if supports_http then
local _, host, port = server:localname()
ok(host and port, 'binds to an interface')
-- constructor for asynchronously executed functions
local function asynchronous(cb)
local cq = cqueues.new()
cq:wrap(cb)
event.socket(cq:pollfd(), function (ev)
cq:step(0)
if cq:empty() then
event.cancel(ev)
end
end)
return cq
end
-- helper for returning useful values to test on
local function http_get(uri)
local headers, stream = assert(request.new_from_uri(uri .. '/'):go())
......@@ -84,7 +66,8 @@ if supports_http then
}
-- run tests asynchronously
asynchronous(function ()
local test_utils = require('test_utils')
worker.coroutine(function ()
for _, t in ipairs(tests) do
test_utils.test(t)
end
......
local cqueues = require('cqueues')
local snapshots, snapshots_count = {}, 120
-- Gauge metrics
......@@ -83,7 +82,7 @@ local function snapshot_start()
table.remove(snapshots, 1)
end
end
cqueues.sleep(1)
worker.sleep(1)
end
end
......@@ -108,7 +107,7 @@ local function stream_stats(_, ws)
last = snapshots[id].time
ok = ws:send(push)
end
cqueues.sleep(1)
worker.sleep(1)
end
end
......
-- check prerequisites
if not worker.bg_worker then
pass('skipping worker test because it doesnt support background worker')
done()
else
-- import primitives for synchronisation
local monotime = require('cqueues').monotime
-- test whether sleeping works
local function test_worker_sleep()
local now = monotime()
ok(pcall(worker.sleep, 0.1), 'sleep works')
local elapsed = monotime() - now
ok(elapsed > 0, 'sleep takes non-zero time')
end
-- helper to track number of executions
local cv = require('cqueues.condition').new()
local tasks = 0
local function work ()
worker.sleep(0.1)
tasks = tasks - 1
if tasks == 0 then
cv:signal()
elseif tasks < 0 then
error('too many executions')
end
end
-- test whether coroutines work
local function test_worker_coroutine()
tasks = 2
worker.coroutine(work)
worker.coroutine(work)
-- Check if coroutines finish
local status = cv:wait(1)
same(tasks, 0, 'all coroutines finish')
ok(status, 'coroutines finish successfully')
-- Check if nesting coroutines works
local now = monotime()
tasks = 100
worker.coroutine(function ()
for _ = 1, 100 do
worker.coroutine(work)
end
end)
status = cv:wait(1)
local elapsed = monotime() - now
same(tasks, 0, 'all nested coroutines finish')
ok(status, 'nested coroutines finish successfully')
-- Test if 100 coroutines didnt execute synchronously
-- (the wait time would be 100 * 0.1 = 10s sleep time)
-- Concurrent sleep time should still be ~0.1s (added some safe margin)
ok(elapsed < 0.5, 'coroutines didnt block while sleeping')
end
-- plan tests
local tests = {
test_worker_sleep,
test_worker_coroutine
}
-- run tests asynchronously
local test_utils = require('test_utils')
worker.coroutine(function ()
for _, t in ipairs(tests) do
test_utils.test(t)
end
done()
end)
end
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment