|
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] Re: [PATCH v4] xenbus: support large messages
Hello,
Sorry, it seems I missed that mail :/
Added my tag below.
BTW, I didn't see the mb() fix between rsp_cons+= and reading rsp_prod
on the Linux side?
Samuel
Juergen Gross, le mer. 24 nov. 2021 08:00:55 +0100, a ecrit:
> Ping?
>
> On 04.10.21 11:40, Juergen Gross wrote:
> > Today the implementation of the xenbus protocol in Mini-OS will only
> > allow to transfer the complete message to or from the ring page buffer.
> > This is limiting the maximum message size to lower values as the xenbus
> > protocol normally would allow.
> >
> > Change that by allowing to transfer the xenbus message in chunks as
> > soon as they are available.
> >
> > Avoid crashing Mini-OS in case of illegal data read from the ring
> > buffer.
> >
> > Signed-off-by: Juergen Gross <jgross@xxxxxxxx>
Reviewed-by: Samuel Thibault <samuel.thibault@xxxxxxxxxxxx>
> > ---
> > V2:
> > - drop redundant if (Samuel Thibault)
> > - move rmb() (Samuel Thibault)
> > V3:
> > - correct notification test (Samuel Thibault)
> > V4:
> > - more memory barriers (Samuel Thibault)
> > ---
> > xenbus/xenbus.c | 210 ++++++++++++++++++++++++++++--------------------
> > 1 file changed, 122 insertions(+), 88 deletions(-)
> >
> > diff --git a/xenbus/xenbus.c b/xenbus/xenbus.c
> > index 23de61e..b687678 100644
> > --- a/xenbus/xenbus.c
> > +++ b/xenbus/xenbus.c
> > @@ -29,6 +29,7 @@
> > #include <xen/hvm/params.h>
> > #include <mini-os/spinlock.h>
> > #include <mini-os/xmalloc.h>
> > +#include <mini-os/semaphore.h>
> > #define min(x,y) ({ \
> > typeof(x) tmpx = (x); \
> > @@ -46,6 +47,7 @@
> > static struct xenstore_domain_interface *xenstore_buf;
> > static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
> > DECLARE_WAIT_QUEUE_HEAD(xenbus_watch_queue);
> > +static __DECLARE_SEMAPHORE_GENERIC(xb_write_sem, 1);
> > xenbus_event_queue xenbus_events;
> > static struct watch {
> > @@ -231,75 +233,103 @@ char *xenbus_wait_for_state_change(const char* path,
> > XenbusState *state, xenbus_
> > }
> > +static void xenbus_read_data(char *buf, unsigned int len)
> > +{
> > + unsigned int off = 0;
> > + unsigned int prod, cons;
> > + unsigned int size;
> > +
> > + while (off != len)
> > + {
> > + wait_event(xb_waitq, xenstore_buf->rsp_prod !=
> > xenstore_buf->rsp_cons);
> > +
> > + prod = xenstore_buf->rsp_prod;
> > + cons = xenstore_buf->rsp_cons;
> > + DEBUG("Rsp_cons %d, rsp_prod %d.\n", cons, prod);
> > + size = min(len - off, prod - cons);
> > +
> > + rmb(); /* Make sure data read from ring is ordered with
> > rsp_prod. */
> > + memcpy_from_ring(xenstore_buf->rsp, buf + off,
> > + MASK_XENSTORE_IDX(cons), size);
> > + off += size;
> > + mb(); /* memcpy() and rsp_cons update must not be reordered. */
> > + xenstore_buf->rsp_cons += size;
> > + mb(); /* rsp_cons must be visible before we look at rsp_prod. */
> > + if (xenstore_buf->rsp_prod - cons >= XENSTORE_RING_SIZE)
> > + notify_remote_via_evtchn(xenbus_evtchn);
> > + }
> > +}
> > +
> > static void xenbus_thread_func(void *ign)
> > {
> > struct xsd_sockmsg msg;
> > - unsigned prod = xenstore_buf->rsp_prod;
> > + char *data;
> > for (;;) {
> > - wait_event(xb_waitq, prod != xenstore_buf->rsp_prod);
> > - while (1) {
> > - prod = xenstore_buf->rsp_prod;
> > - DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons,
> > - xenstore_buf->rsp_prod);
> > - if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons <
> > sizeof(msg))
> > - break;
> > - rmb();
> > - memcpy_from_ring(xenstore_buf->rsp, &msg,
> > - MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
> > - sizeof(msg));
> > - DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
> > - xenstore_buf->rsp_prod - xenstore_buf->rsp_cons,
> > msg.req_id);
> > -
> > - if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons <
> > - sizeof(msg) + msg.len)
> > - break;
> > -
> > - DEBUG("Message is good.\n");
> > -
> > - if (msg.type == XS_WATCH_EVENT) {
> > - struct xenbus_event *event = malloc(sizeof(*event) +
> > msg.len);
> > - xenbus_event_queue *events = NULL;
> > - char *data = (char*)event + sizeof(*event);
> > - struct watch *watch;
> > -
> > - memcpy_from_ring(xenstore_buf->rsp, data,
> > - MASK_XENSTORE_IDX(xenstore_buf->rsp_cons +
> > sizeof(msg)),
> > - msg.len);
> > -
> > - event->path = data;
> > - event->token = event->path + strlen(event->path) + 1;
> > -
> > - mb();
> > - xenstore_buf->rsp_cons += msg.len + sizeof(msg);
> > -
> > - for (watch = watches; watch; watch = watch->next)
> > - if (!strcmp(watch->token, event->token)) {
> > - events = watch->events;
> > - break;
> > - }
> > -
> > - if (events) {
> > - event->next = *events;
> > - *events = event;
> > - wake_up(&xenbus_watch_queue);
> > - } else {
> > - printk("unexpected watch token %s\n", event->token);
> > - free(event);
> > + xenbus_read_data((char *)&msg, sizeof(msg));
> > + DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
> > + xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
> > +
> > + if (msg.len > XENSTORE_PAYLOAD_MAX) {
> > + printk("Xenstore violates protocol, message longer than
> > allowed.\n");
> > + return;
> > + }
> > +
> > + if (msg.type == XS_WATCH_EVENT) {
> > + struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
> > + xenbus_event_queue *events = NULL;
> > + struct watch *watch;
> > + char *c;
> > + int zeroes = 0;
> > +
> > + data = (char*)event + sizeof(*event);
> > + xenbus_read_data(data, msg.len);
> > +
> > + for (c = data; c < data + msg.len; c++)
> > + if (!*c)
> > + zeroes++;
> > + if (zeroes != 2) {
> > + printk("Xenstore: illegal watch event data\n");
> > + free(event);
> > + continue;
> > + }
> > +
> > + event->path = data;
> > + event->token = event->path + strlen(event->path) + 1;
> > +
> > + for (watch = watches; watch; watch = watch->next)
> > + if (!strcmp(watch->token, event->token)) {
> > + events = watch->events;
> > + break;
> > }
> > +
> > + if (events) {
> > + event->next = *events;
> > + *events = event;
> > + wake_up(&xenbus_watch_queue);
> > } else {
> > - req_info[msg.req_id].reply = malloc(sizeof(msg) + msg.len);
> > - memcpy_from_ring(xenstore_buf->rsp,
> > req_info[msg.req_id].reply,
> > - MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
> > - msg.len + sizeof(msg));
> > - mb();
> > - xenstore_buf->rsp_cons += msg.len + sizeof(msg);
> > - wake_up(&req_info[msg.req_id].waitq);
> > + printk("Xenstore: unexpected watch token %s\n",
> > event->token);
> > + free(event);
> > }
> > - wmb();
> > - notify_remote_via_evtchn(xenbus_evtchn);
> > + continue;
> > }
> > +
> > + data = malloc(sizeof(msg) + msg.len);
> > + memcpy(data, &msg, sizeof(msg));
> > + xenbus_read_data(data + sizeof(msg), msg.len);
> > +
> > + if (msg.req_id >= NR_REQS || !req_info[msg.req_id].in_use) {
> > + printk("Xenstore: illegal request id %d\n", msg.req_id);
> > + free(data);
> > + continue;
> > + }
> > +
> > + DEBUG("Message is good.\n");
> > +
> > + req_info[msg.req_id].reply = data;
> > +
> > + wake_up(&req_info[msg.req_id].waitq);
> > }
> > }
> > @@ -451,36 +481,40 @@ static void xb_write(int type, int req_id,
> > xenbus_transaction_t trans_id,
> > cur_req = &header_req;
> > - BUG_ON(len > XENSTORE_RING_SIZE);
> > - /* Wait for the ring to drain to the point where we can send the
> > - message. */
> > - prod = xenstore_buf->req_prod;
> > - if (prod + len - xenstore_buf->req_cons > XENSTORE_RING_SIZE)
> > - {
> > - /* Wait for there to be space on the ring */
> > - DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n",
> > - prod, len, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
> > - wait_event(xb_waitq,
> > - xenstore_buf->req_prod + len - xenstore_buf->req_cons <=
> > - XENSTORE_RING_SIZE);
> > - DEBUG("Back from wait.\n");
> > - prod = xenstore_buf->req_prod;
> > - }
> > + BUG_ON(len > XENSTORE_PAYLOAD_MAX);
> > +
> > + /* Make sure we are the only thread trying to write. */
> > + down(&xb_write_sem);
> > - /* We're now guaranteed to be able to send the message without
> > - overflowing the ring. Do so. */
> > + /* Send the message in chunks using free ring space when available. */
> > total_off = 0;
> > req_off = 0;
> > - while (total_off < len)
> > + while (total_off < len)
> > {
> > + prod = xenstore_buf->req_prod;
> > + if (prod - xenstore_buf->req_cons >= XENSTORE_RING_SIZE)
> > + {
> > + /* Send evtchn to notify remote */
> > + notify_remote_via_evtchn(xenbus_evtchn);
> > +
> > + /* Wait for there to be space on the ring */
> > + DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n", prod,
> > + len - total_off, xenstore_buf->req_cons,
> > XENSTORE_RING_SIZE);
> > + wait_event(xb_waitq,
> > + prod - xenstore_buf->req_cons < XENSTORE_RING_SIZE);
> > + DEBUG("Back from wait.\n");
> > + }
> > +
> > this_chunk = min(cur_req->len - req_off,
> > - XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
> > + XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
> > + this_chunk = min(this_chunk,
> > + xenstore_buf->req_cons + XENSTORE_RING_SIZE -
> > prod);
> > memcpy((char *)xenstore_buf->req + MASK_XENSTORE_IDX(prod),
> > - (char *)cur_req->data + req_off, this_chunk);
> > + (char *)cur_req->data + req_off, this_chunk);
> > prod += this_chunk;
> > req_off += this_chunk;
> > total_off += this_chunk;
> > - if (req_off == cur_req->len)
> > + if (req_off == cur_req->len)
> > {
> > req_off = 0;
> > if (cur_req == &header_req)
> > @@ -488,20 +522,20 @@ static void xb_write(int type, int req_id,
> > xenbus_transaction_t trans_id,
> > else
> > cur_req++;
> > }
> > +
> > + /* Remote must see entire message before updating indexes */
> > + wmb();
> > + xenstore_buf->req_prod = prod;
> > }
> > + /* Send evtchn to notify remote */
> > + notify_remote_via_evtchn(xenbus_evtchn);
> > +
> > DEBUG("Complete main loop of xb_write.\n");
> > BUG_ON(req_off != 0);
> > BUG_ON(total_off != len);
> > - BUG_ON(prod > xenstore_buf->req_cons + XENSTORE_RING_SIZE);
> > - /* Remote must see entire message before updating indexes */
> > - wmb();
> > -
> > - xenstore_buf->req_prod += len;
> > -
> > - /* Send evtchn to notify remote */
> > - notify_remote_via_evtchn(xenbus_evtchn);
> > + up(&xb_write_sem);
> > }
> > /* Send a mesasge to xenbus, in the same fashion as xb_write, and
> >
>
|
![]() |
Lists.xenproject.org is hosted with RackSpace, monitoring our |