[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [PATCH 07/11] mini-os/xenbus: Unify watch and reply queues



We are going to want to provide an interface to xenbus which does not
reorder messages for a particular user.  In particular, the reply to a
watch or unwatch should not be reordered with respect to watch events.

To this end we arrange that both replies and watches use the same kind
of queue inside the xenbus driver.  Currently this queue type is only
exposed outside the xenbus driver for use with watches, as before.

Important functional changes in this patch include:

* There is a separate scheduler wait queue for each reply queue,
  rather than one for all watches and one for each outstanding reply.
  This wait queue lives in the reply queue struct.

* There are abstracted-away internal functions for removing (and,
  indeed, awaiting) events.  xenbus_wait_for_watch_return becomes a
  trivial wrapper around await_event.

* Handling of the replies to requests is formalised, using the new
  queues.  Now a single reply queue might be used for multiple
  requests (although there are no callers that do this).

Other changes are:

* The latent bug in xenbus_msg_reply, which assumed no spurious
  wakeups, is gone.

* The "in_use" flag in the request array can be done away with, since
  we can use the reply_queue pointer value instead.

* The callers of allocate_xenbus_id (currently, only
  xenbus_msg_reply), have to initialise a xenbus_event_queue and
  provide it to allocate_xenbus_id.

* Abolished the xenbus_watch_queue waitq in favour of the waitq inside
  the xenbus_default_watch_events event queue.

* Abolished a duplicate assignment to in_use in release_xenbus_id.

Signed-off-by: Ian Jackson <Ian.Jackson@xxxxxxxxxxxxx>
---
 include/mini-os/xenbus.h |    3 ++
 xen/xenbus/xenbus.c      |   82 +++++++++++++++++++++++++++++-----------------
 2 files changed, 55 insertions(+), 30 deletions(-)

diff --git a/include/mini-os/xenbus.h b/include/mini-os/xenbus.h
index abf8765..7e70de0 100644
--- a/include/mini-os/xenbus.h
+++ b/include/mini-os/xenbus.h
@@ -2,6 +2,8 @@
 #define MINIOS_XENBUS_H__
 
 #include <xen/io/xenbus.h>
+#include <mini-os/sched.h>
+#include <mini-os/waittypes.h>
 #include <mini-os/queue.h>
 
 typedef unsigned long xenbus_transaction_t;
@@ -30,6 +32,7 @@ struct xenbus_event {
 };
 struct xenbus_event_queue {
     MINIOS_STAILQ_HEAD(, xenbus_event) events;
+    struct wait_queue_head waitq;
 };
 
 void xenbus_event_queue_init(struct xenbus_event_queue *queue);
diff --git a/xen/xenbus/xenbus.c b/xen/xenbus/xenbus.c
index 947b5c8..d2e59b3 100644
--- a/xen/xenbus/xenbus.c
+++ b/xen/xenbus/xenbus.c
@@ -46,7 +46,6 @@
 static struct xenstore_domain_interface *xenstore_buf;
 static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
 static spinlock_t xb_lock = SPIN_LOCK_UNLOCKED; /* protects xenbus req ring */
-DECLARE_WAIT_QUEUE_HEAD(xenbus_watch_queue);
 
 struct xenbus_event_queue xenbus_default_watch_queue;
 struct watch {
@@ -57,8 +56,8 @@ struct watch {
 static MINIOS_LIST_HEAD(, watch) watches;
 struct xenbus_req_info 
 {
-    int in_use:1;
-    struct wait_queue_head waitq;
+    struct xenbus_event_queue *reply_queue; /* non-0 iff in use */
+    struct xenbus_event *for_queue;
     void *reply;
 };
 
@@ -66,6 +65,39 @@ struct xenbus_req_info
 void xenbus_event_queue_init(struct xenbus_event_queue *queue)
 {
     MINIOS_STAILQ_INIT(&queue->events);
+    init_waitqueue_head(&queue->waitq);
+}
+
+static struct xenbus_event *remove_event(struct xenbus_event_queue *queue)
+{
+    struct xenbus_event *event;
+
+    event = MINIOS_STAILQ_FIRST(&queue->events);
+    if (!event)
+        goto out;
+    MINIOS_STAILQ_REMOVE_HEAD(&queue->events, entry);
+
+ out:
+    return event;
+}
+
+static void queue_event(struct xenbus_event_queue *queue,
+                        struct xenbus_event *event)
+{
+    MINIOS_STAILQ_INSERT_TAIL(&queue->events, event, entry);
+    wake_up(&queue->waitq);
+}
+
+static struct xenbus_event *await_event(struct xenbus_event_queue *queue)
+{
+    struct xenbus_event *event;
+    DEFINE_WAIT(w);
+    while (!(event = remove_event(queue))) {
+        add_waiter(w, queue->waitq);
+        schedule();
+    }
+    remove_waiter(w, queue->waitq);
+    return event;
 }
 
 
@@ -89,15 +121,9 @@ static void memcpy_from_ring(const void *Ring,
 char **xenbus_wait_for_watch_return(struct xenbus_event_queue *queue)
 {
     struct xenbus_event *event;
-    DEFINE_WAIT(w);
     if (!queue)
         queue = &xenbus_default_watch_queue;
-    while (!(event = MINIOS_STAILQ_FIRST(&queue->events))) {
-        add_waiter(w, xenbus_watch_queue);
-        schedule();
-    }
-    remove_waiter(w, xenbus_watch_queue);
-    MINIOS_STAILQ_REMOVE_HEAD(&queue->events, entry);
+    event = await_event(queue);
     return &event->path;
 }
 
@@ -256,8 +282,7 @@ static void xenbus_thread_func(void *ign)
                     }
 
                 if (events) {
-                    MINIOS_STAILQ_INSERT_TAIL(&events->events, event, entry);
-                    wake_up(&xenbus_watch_queue);
+                    queue_event(events, event);
                 } else {
                     printk("unexpected watch token %s\n", event->token);
                     free(event);
@@ -272,7 +297,8 @@ static void xenbus_thread_func(void *ign)
                     MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
                     msg.len + sizeof(msg));
                 xenstore_buf->rsp_cons += msg.len + sizeof(msg);
-                wake_up(&req_info[msg.req_id].waitq);
+                queue_event(req_info[msg.req_id].reply_queue,
+                            req_info[msg.req_id].for_queue);
             }
         }
     }
@@ -291,11 +317,10 @@ static DECLARE_WAIT_QUEUE_HEAD(req_wq);
 /* Release a xenbus identifier */
 static void release_xenbus_id(int id)
 {
-    BUG_ON(!req_info[id].in_use);
+    BUG_ON(!req_info[id].reply_queue);
     spin_lock(&req_lock);
-    req_info[id].in_use = 0;
+    req_info[id].reply_queue = 0;
     nr_live_reqs--;
-    req_info[id].in_use = 0;
     if (nr_live_reqs == NR_REQS - 1)
         wake_up(&req_wq);
     spin_unlock(&req_lock);
@@ -303,7 +328,8 @@ static void release_xenbus_id(int id)
 
 /* Allocate an identifier for a xenbus request.  Blocks if none are
    available. */
-static int allocate_xenbus_id(void)
+static int allocate_xenbus_id(struct xenbus_event_queue *reply_queue,
+                              struct xenbus_event *for_queue)
 {
     static int probe;
     int o_probe;
@@ -320,16 +346,16 @@ static int allocate_xenbus_id(void)
     o_probe = probe;
     for (;;) 
     {
-        if (!req_info[o_probe].in_use)
+        if (!req_info[o_probe].reply_queue)
             break;
         o_probe = (o_probe + 1) % NR_REQS;
         BUG_ON(o_probe == probe);
     }
     nr_live_reqs++;
-    req_info[o_probe].in_use = 1;
+    req_info[o_probe].reply_queue = reply_queue;
+    req_info[o_probe].for_queue = for_queue;
     probe = (o_probe + 1) % NR_REQS;
     spin_unlock(&req_lock);
-    init_waitqueue_head(&req_info[o_probe].waitq);
 
     return o_probe;
 }
@@ -448,22 +474,18 @@ xenbus_msg_reply(int type,
                 int nr_reqs)
 {
     int id;
-    DEFINE_WAIT(w);
     struct xsd_sockmsg *rep;
+    struct xenbus_event_queue queue;
+    struct xenbus_event event_buf;
 
-    /*
-     * XXX: should use a predicate loop instead of blindly trusting
-     * that $someone didn't wake us up
-     */
+    xenbus_event_queue_init(&queue);
 
-    id = allocate_xenbus_id();
-    add_waiter(w, req_info[id].waitq);
+    id = allocate_xenbus_id(&queue,&event_buf);
 
     xb_write(type, id, trans, io, nr_reqs);
 
-    schedule();
-    remove_waiter(w, req_info[id].waitq);
-    wake(current);
+    struct xenbus_event *event = await_event(&queue);
+    BUG_ON(event != &event_buf);
 
     rep = req_info[id].reply;
     BUG_ON(rep->req_id != id);
-- 
1.7.10.4


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.