Commit ac5d8012 authored by Martin Mareš's avatar Martin Mareš

Asynchronous feeding of protocols.

parent 0850ce22
......@@ -5,10 +5,8 @@ Core
- client: data losses on large dumps?
- table: ocassional core dumps in rt_prune()
- table: do feeding by parts
- bgp: timing of updates?
- bgp: dump and get_route_info
- bgp: get_route_info
Documentation
~~~~~~~~~~~~~
......@@ -33,6 +31,7 @@ Globals
Various ideas
~~~~~~~~~~~~~
- static: allow specifying a per-route filter program for setting route attributes?
- bgp: timing of updates?
- netlink: realms
- netlink: import Linux route attributes to our rta's, so that they can be filtered?
- config: executable config files
......
......@@ -409,6 +409,25 @@ proto_fell_down(struct proto *p)
proto_rethink_goal(p);
}
static void
proto_feed_more(void *P)
{
struct proto *p = P;
DBG("Feeding protocol %s continued\n", p->name);
if (rt_feed_baby(p))
{
p->core_state = FS_HAPPY;
proto_relink(p);
DBG("Protocol %s up and running\n", p->name);
}
else
{
p->attn->hook = proto_feed_more;
ev_schedule(p->attn); /* Will continue later... */
}
}
static void
proto_feed(void *P)
{
......@@ -417,10 +436,7 @@ proto_feed(void *P)
DBG("Feeding protocol %s\n", p->name);
proto_add_announce_hook(p, p->table);
if_feed_baby(p);
rt_feed_baby(p);
p->core_state = FS_HAPPY;
proto_relink(p);
DBG("Protocol %s up and running\n", p->name);
proto_feed_more(P);
}
void
......@@ -444,8 +460,12 @@ proto_notify_state(struct proto *p, unsigned ps)
}
else if (cs == FS_FLUSHING) /* Still flushing... */
;
else /* Need to start flushing */
goto schedule_flush;
else
{
if (cs == FS_FEEDING) /* Need to abort feeding */
rt_feed_baby_abort(p);
goto schedule_flush; /* Need to start flushing */
}
break;
case PS_START:
ASSERT(ops == PS_DOWN);
......
......@@ -151,6 +151,9 @@ struct proto {
struct filter *out_filter; /* Output filter */
struct announce_hook *ahooks; /* Announcement hooks for this protocol */
struct fib_iterator *feed_iterator; /* Routing table iterator used during protocol feeding */
struct announce_hook *feed_ahook; /* Announce hook we currently feed */
/* Hic sunt protocol-specific data */
};
......
......@@ -197,7 +197,8 @@ rte *rte_do_cow(rte *);
static inline rte * rte_cow(rte *r) { return (r->flags & REF_COW) ? rte_do_cow(r) : r; }
void rt_dump(rtable *);
void rt_dump_all(void);
void rt_feed_baby(struct proto *p);
int rt_feed_baby(struct proto *p);
void rt_feed_baby_abort(struct proto *p);
void rt_prune(rtable *tab);
void rt_prune_all(void);
struct rtable_config *rt_new_table(struct symbol *s);
......
......@@ -200,35 +200,6 @@ rte_announce(rtable *tab, net *net, rte *new, rte *old, ea_list *tmpa)
}
}
void
rt_feed_baby(struct proto *p)
{
struct announce_hook *h;
if (!p->ahooks)
return;
DBG("Announcing routes to new protocol %s\n", p->name);
for(h=p->ahooks; h; h=h->next)
{
rtable *t = h->table;
FIB_WALK(&t->fib, fn)
{
net *n = (net *) fn;
rte *e;
for(e=n->routes; e; e=e->next)
{
struct proto *q = e->attrs->proto;
ea_list *tmpa = q->make_tmp_attrs ? q->make_tmp_attrs(e, rte_update_pool) : NULL;
do_rte_announce(h, n, e, NULL, tmpa, ipa_classify(n->n.prefix));
lp_flush(rte_update_pool);
if (p->core_state != FS_FEEDING)
return; /* In the meantime, the protocol fell down. */
}
}
FIB_WALK_END;
}
}
static inline int
rte_validate(rte *e)
{
......@@ -683,6 +654,74 @@ rt_commit(struct config *new, struct config *old)
DBG("\tdone\n");
}
int
rt_feed_baby(struct proto *p)
{
struct announce_hook *h;
struct fib_iterator *fit;
int max_feed = 2; /* FIXME */
if (!p->feed_ahook) /* Need to initialize first */
{
if (!p->ahooks)
return 1;
DBG("Announcing routes to new protocol %s\n", p->name);
p->feed_ahook = p->ahooks;
fit = p->feed_iterator = mb_alloc(p->pool, sizeof(struct fib_iterator));
goto next_hook;
}
fit = p->feed_iterator;
again:
h = p->feed_ahook;
FIB_ITERATE_START(&h->table->fib, fit, fn)
{
net *n = (net *) fn;
rte *e;
for(e=n->routes; e; e=e->next)
{
struct proto *q = e->attrs->proto;
ea_list *tmpa;
if (p->core_state != FS_FEEDING)
return 1; /* In the meantime, the protocol fell down. */
rte_update_lock();
tmpa = q->make_tmp_attrs ? q->make_tmp_attrs(e, rte_update_pool) : NULL;
do_rte_announce(h, n, e, NULL, tmpa, ipa_classify(n->n.prefix));
rte_update_unlock();
if (!--max_feed)
{
FIB_ITERATE_PUT(fit, fn);
return 0;
}
}
}
FIB_ITERATE_END(fn);
p->feed_ahook = h->next;
if (!p->feed_ahook)
{
mb_free(p->feed_iterator);
p->feed_iterator = NULL;
return 1;
}
next_hook:
h = p->feed_ahook;
FIB_ITERATE_INIT(fit, &h->table->fib);
goto again;
}
void
rt_feed_baby_abort(struct proto *p)
{
if (p->feed_ahook)
{
/* Unlink the iterator and exit */
fit_get(&p->feed_ahook->table->fib, p->feed_iterator);
p->feed_ahook = NULL;
}
}
/*
* CLI commands
*/
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment