Commit c416877a authored by Marek Vavrusa's avatar Marek Vavrusa

daemon: support event.socket(fd, cb) for I/O events

this allows embedding other event loops or just
asynchronous events triggered by socket activity.
this is required for things like cooperative
HTTP server, monitoring endpoint or remote
configuration daemon/controller
parent a41532e5
......@@ -299,8 +299,42 @@ as a parameter, but it's not very useful as you don't have any *non-global* way
-- make recurrent event that will cancel after 5 times
event.recurrent(5 * minute, pruner())
Another type of actionable event is activity on a file descriptor. This allows you to embed other
event loops or monitor open files and then fire a callback when an activity is detected.
This allows you to build persistent services like HTTP servers or monitoring probes that cooperate
well with the daemon internal operations.
For example a simple web server that doesn't block:
.. code-block:: lua
local server, headers = require 'http.server', require 'http.headers'
local cqueues = require 'cqueues'
-- Start socket server
local s = server.listen { host = 'localhost', port = 8080 }
assert(s:listen())
-- Compose per-request coroutine
local cq = cqueues.new()
cq:wrap(function()
s:run(function(stream)
-- Create response headers
local headers = headers.new()
headers:append(':status', '200')
headers:append('connection', 'close')
-- Send response and close connection
assert(stream:write_headers(headers, false))
assert(stream:write_chunk('OK', true))
stream:shutdown()
stream.connection:shutdown()
end)
s:close()
end)
-- Hook to socket watcher
event.socket(cq:pollfd(), function (ev, status, events)
cq:step(0)
end)
* File watchers
* Data I/O
.. note:: Work in progress, come back later!
......@@ -826,6 +860,28 @@ For example, ``5 * hour`` represents five hours, or 5*60*60*100 milliseconds.
e = event.after(1 * minute, function() print('Hi!') end)
event.cancel(e)
Watch for file descriptor activity. This allows embedding other event loops or simply
firing events when a pipe endpoint becomes active. In another words, asynchronous
notifications for daemon.
.. function:: event.socket(fd, cb)
:param number fd: file descriptor to watch
:param cb: closure or callback to execute when fd becomes active
:return: event id
Execute function when there is activity on the file descriptor and calls a closure
with event id as the first parameter, status as second and number of events as third.
Example:
.. code-block:: lua
e = event.socket(0, function(e, status, nevents)
print('activity detected')
end)
e.cancel(e)
Scripting worker
^^^^^^^^^^^^^^^^
......
......@@ -692,7 +692,6 @@ static int execute_callback(lua_State *L, int argc)
}
/* Clear the stack, there may be event a/o enything returned */
lua_settop(L, 0);
lua_gc(L, LUA_GCCOLLECT, 0);
return ret;
}
......@@ -714,6 +713,26 @@ static void event_callback(uv_timer_t *timer)
}
}
static void event_fdcallback(uv_poll_t* handle, int status, int events)
{
struct worker_ctx *worker = handle->loop->data;
lua_State *L = worker->engine->L;
/* Retrieve callback and execute */
lua_rawgeti(L, LUA_REGISTRYINDEX, (intptr_t) handle->data);
lua_rawgeti(L, -1, 1);
lua_pushinteger(L, (intptr_t) handle->data);
lua_pushinteger(L, status);
lua_pushinteger(L, events);
int ret = execute_callback(L, 3);
/* Free callback if not recurrent or an error */
if (ret != 0) {
if (!uv_is_closing((uv_handle_t *)handle)) {
uv_close((uv_handle_t *)handle, (uv_close_cb) event_free);
}
}
}
static int event_sched(lua_State *L, unsigned timeout, unsigned repeat)
{
uv_timer_t *timer = malloc(sizeof(*timer));
......@@ -794,12 +813,69 @@ static int event_cancel(lua_State *L)
return 1;
}
static int event_fdwatch(lua_State *L)
{
/* Check parameters */
int n = lua_gettop(L);
if (n < 2 || !lua_isnumber(L, 1) || !lua_isfunction(L, 2)) {
format_error(L, "expected 'socket(number fd, function)'");
lua_error(L);
}
uv_poll_t *handle = malloc(sizeof(*handle));
if (!handle) {
format_error(L, "out of memory");
lua_error(L);
}
/* Start timer with the reference */
int sock = lua_tonumber(L, 1);
uv_loop_t *loop = uv_default_loop();
#if defined(__APPLE__) || defined(__FreeBSD__)
/* libuv is buggy and fails to create poller for
* kqueue sockets as it can't be fcntl'd to non-blocking mode,
* so we pass it a copy of standard input and then
* switch it with real socket before starting the poller
*/
int decoy_fd = dup(STDIN_FILENO);
int ret = uv_poll_init(loop, handle, decoy_fd);
if (ret == 0) {
handle->io_watcher.fd = sock;
}
close(decoy_fd);
#else
int ret = uv_poll_init(loop, handle, sock);
#endif
if (ret == 0) {
ret = uv_poll_start(handle, UV_READABLE, event_fdcallback);
}
if (ret != 0) {
free(handle);
format_error(L, "couldn't start event poller");
lua_error(L);
}
/* Save callback and timer in registry */
lua_newtable(L);
lua_pushvalue(L, 2);
lua_rawseti(L, -2, 1);
lua_pushlightuserdata(L, handle);
lua_rawseti(L, -2, 2);
int ref = luaL_ref(L, LUA_REGISTRYINDEX);
/* Save reference to the timer */
handle->data = (void *) (intptr_t)ref;
lua_pushinteger(L, ref);
return 1;
}
int lib_event(lua_State *L)
{
static const luaL_Reg lib[] = {
{ "after", event_after },
{ "recurrent", event_recurrent },
{ "cancel", event_cancel },
{ "socket", event_fdwatch },
{ NULL, NULL }
};
......
......@@ -68,6 +68,7 @@ static int l_help(lua_State *L)
"mode(strict|normal|permissive)\n set resolver strictness level\n"
"resolve(name, type[, class, flags, callback])\n resolve query, callback when it's finished\n"
"todname(name)\n convert name to wire format\n"
"tojson(val)\n convert value to JSON\n"
"net\n network configuration\n"
"cache\n network configuration\n"
"modules\n modules configuration\n"
......@@ -319,6 +320,16 @@ static char *l_pack_json(lua_State *L, int top)
return result;
}
static int l_tojson(lua_State *L)
{
auto_free char *json_str = l_pack_json(L, 1);
if (!json_str) {
return 0;
}
lua_pushstring(L, json_str);
return 1;
}
/** Trampoline function for module properties. */
static int l_trampoline(lua_State *L)
{
......@@ -431,6 +442,8 @@ static int init_state(struct engine *engine)
lua_setglobal(engine->L, "trustanchor");
lua_pushcfunction(engine->L, l_libpath);
lua_setglobal(engine->L, "libpath");
lua_pushcfunction(engine->L, l_tojson);
lua_setglobal(engine->L, "tojson");
lua_pushliteral(engine->L, MODULEDIR);
lua_setglobal(engine->L, "moduledir");
lua_pushliteral(engine->L, ETCDIR);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment