[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] Re: [Xen-devel] [PATCH 15/25] argo: implement the sendv op
> -----Original Message----- > From: Christopher Clark [mailto:christopher.w.clark@xxxxxxxxx] > Sent: 01 December 2018 01:33 > To: xen-devel@xxxxxxxxxxxxxxxxxxxx > Cc: Andrew Cooper <Andrew.Cooper3@xxxxxxxxxx>; George Dunlap > <George.Dunlap@xxxxxxxxxx>; Ian Jackson <Ian.Jackson@xxxxxxxxxx>; Jan > Beulich <jbeulich@xxxxxxxx>; Julien Grall <julien.grall@xxxxxxx>; Konrad > Rzeszutek Wilk <konrad.wilk@xxxxxxxxxx>; Stefano Stabellini > <sstabellini@xxxxxxxxxx>; Tim (Xen.org) <tim@xxxxxxx>; Wei Liu > <wei.liu2@xxxxxxxxxx>; Paul Durrant <Paul.Durrant@xxxxxxxxxx>; Rich > Persaud <persaur@xxxxxxxxx>; Ross Philipson <ross.philipson@xxxxxxxxx>; > Eric Chanudet <eric.chanudet@xxxxxxxxx>; James McKenzie > <voreekf@xxxxxxxxxxxxx>; Jason Andryuk <jandryuk@xxxxxxxxx>; Daniel Smith > <dpsmith@xxxxxxxxxxxxxxxxxxxx> > Subject: [PATCH 15/25] argo: implement the sendv op > > sendv operation is invoked to perform a synchronous send of buffers > contained in iovs to a remote domain's registered ring. > > It takes: > * A destination address (domid, port) for the ring to send to. > It performs a most-specific match lookup, to allow for wildcard. > * A source address, used to inform the destination of where to reply. > * The address of an array of iovs containing the data to send > * .. and the length of that array of iovs > * and a 32-bit message type, available to communicate message context > data (eg. kernel-to-kernel, separate from the application data). > > If insufficient space exists in the destination ring, it will return - > EAGAIN > and Xen will notify the caller when sufficient space becomes available. > > Accesses to the ring indices are appropriately atomic. The rings are > mapped into Xen's private address space to write as needed and the > mappings are retained for later use. > > When locating the destination ring, a check is performed via a cookie > installed at ring registration time, to ensure that the source domain > is the same as it was when the ring was registered. > > Fixed-size types are used in some areas within this code where caution > around avoiding integer overflow is important. > > Signed-off-by: Christopher Clark <christopher.clark6@xxxxxxxxxxxxxx> > --- > xen/common/argo.c | 528 > ++++++++++++++++++++++++++++++++++++++++++++++ > xen/include/public/argo.h | 59 ++++++ > 2 files changed, 587 insertions(+) > > diff --git a/xen/common/argo.c b/xen/common/argo.c > index 387e650..0c3972c 100644 > --- a/xen/common/argo.c > +++ b/xen/common/argo.c > @@ -24,10 +24,13 @@ > #include <xen/domain_page.h> > #include <xen/guest_access.h> > #include <xen/time.h> > +#include <xsm/xsm.h> > > DEFINE_XEN_GUEST_HANDLE(argo_pfn_t); > DEFINE_XEN_GUEST_HANDLE(argo_addr_t); > +DEFINE_XEN_GUEST_HANDLE(argo_send_addr_t); > DEFINE_XEN_GUEST_HANDLE(argo_ring_t); > +DEFINE_XEN_GUEST_HANDLE(uint8_t); > > /* Xen command line option to enable argo */ > static bool __read_mostly opt_argo_enabled = 0; > @@ -166,6 +169,21 @@ static DEFINE_RWLOCK(argo_lock); /* L1 */ > #endif > > /* > + * Event channel > + */ > + > +static void > +argo_signal_domain(struct domain *d) > +{ > + argo_dprintk("signalling domid:%d\n", d->domain_id); > + > + if ( !d->argo ) /* This can happen if the domain is being destroyed > */ > + return; > + > + evtchn_send(d, d->argo->evtchn_port); > +} > + > +/* > * ring buffer > */ > > @@ -259,6 +277,333 @@ argo_update_tx_ptr(struct argo_ring_info *ring_info, > uint32_t tx_ptr) > return 0; > } > > +static int > +argo_memcpy_to_guest_ring(struct argo_ring_info *ring_info, > + uint32_t offset, > + const void *src, > + XEN_GUEST_HANDLE(uint8_t) src_hnd, > + uint32_t len) > +{ > + int page = offset >> PAGE_SHIFT; unsigned? > + uint8_t *dst; > + int ret; > + unsigned int src_offset = 0; > + > + ASSERT(spin_is_locked(&ring_info->lock)); > + > + offset &= ~PAGE_MASK; > + > + if ( (len > ARGO_MAX_RING_SIZE) || (offset > ARGO_MAX_RING_SIZE) ) > + return -EFAULT; > + > + while ( (offset + len) > PAGE_SIZE ) > + { > + ret = argo_ring_map_page(ring_info, page, &dst); > + if ( ret ) > + return ret; > + > + if ( src ) > + { > + memcpy(dst + offset, src + src_offset, PAGE_SIZE - offset); > + src_offset += (PAGE_SIZE - offset); > + } > + else > + { > + ret = copy_from_guest_errno(dst + offset, src_hnd, > + PAGE_SIZE - offset); > + if ( ret ) > + return ret; > + > + guest_handle_add_offset(src_hnd, PAGE_SIZE - offset); > + } > + > + page++; > + len -= PAGE_SIZE - offset; A lot of "PAGE_SIZE - offset" here... maybe worth a local variable? > + offset = 0; > + } > + > + ret = argo_ring_map_page(ring_info, page, &dst); > + if ( ret ) > + { > + argo_dprintk("argo: ring (vm%u:%x vm%d) %p attempted to map page" > + " %d of %d\n", ring_info->id.addr.domain_id, > + ring_info->id.addr.port, ring_info->id.partner, ring_info, > + page, ring_info->nmfns); > + return ret; > + } > + > + if ( src ) > + memcpy(dst + offset, src + src_offset, len); > + else > + ret = copy_from_guest_errno(dst + offset, src_hnd, len); > + > + return ret; > +} > + > +static int > +argo_ringbuf_get_rx_ptr(struct argo_ring_info *ring_info, uint32_t > *rx_ptr) > +{ > + uint8_t *src; > + argo_ring_t *ringp; > + int ret; > + > + ASSERT(spin_is_locked(&ring_info->lock)); > + > + if ( !ring_info->nmfns || ring_info->nmfns < ring_info->npage ) > + return -EINVAL; > + > + ret = argo_ring_map_page(ring_info, 0, &src); > + if ( ret ) > + return ret; > + > + ringp = (argo_ring_t *)src; > + > + *rx_ptr = read_atomic(&ringp->rx_ptr); > + > + return 0; > +} > + > +/* > + * argo_sanitize_ring creates a modified copy of the ring pointers > + * where the rx_ptr is rounded up to ensure it is aligned, and then > + * ring wrap is handled. Simplifies safe use of the rx_ptr for > + * available space calculation. > + */ > +static void > +argo_sanitize_ring(argo_ring_t *ring, const struct argo_ring_info > *ring_info) > +{ > + uint32_t rx_ptr = ring->rx_ptr; > + > + ring->tx_ptr = ring_info->tx_ptr; > + ring->len = ring_info->len; > + > + rx_ptr = ARGO_ROUNDUP(rx_ptr); > + if ( rx_ptr >= ring_info->len ) > + rx_ptr = 0; > + > + ring->rx_ptr = rx_ptr; > +} > + > +/* > + * argo_iov_count returns its count on success via an out variable > + * to avoid potential for a negative return value to be used incorrectly > + * (eg. coerced into an unsigned variable resulting in a large incorrect > value) > + */ > +static int > +argo_iov_count(XEN_GUEST_HANDLE_PARAM(argo_iov_t) iovs, uint8_t niov, > + uint32_t *count) > +{ > + argo_iov_t iov; > + uint32_t sum_iov_lens = 0; > + int ret; > + > + if ( niov > ARGO_MAXIOV ) > + return -EINVAL; > + > + while ( niov-- ) > + { > + ret = copy_from_guest_errno(&iov, iovs, 1); > + if ( ret ) > + return ret; > + > + /* check each to protect sum against integer overflow */ > + if ( iov.iov_len > ARGO_MAX_RING_SIZE ) > + return -EINVAL; > + > + sum_iov_lens += iov.iov_len; > + > + /* > + * Again protect sum from integer overflow > + * and ensure total msg size will be within bounds. > + */ > + if ( sum_iov_lens > ARGO_MAX_MSG_SIZE ) > + return -EINVAL; > + > + guest_handle_add_offset(iovs, 1); > + } > + > + *count = sum_iov_lens; > + return 0; > +} > + > +static int > +argo_ringbuf_insert(struct domain *d, > + struct argo_ring_info *ring_info, > + const struct argo_ring_id *src_id, > + XEN_GUEST_HANDLE_PARAM(argo_iov_t) iovs, uint8_t > niov, > + uint32_t message_type, unsigned long *out_len) > +{ > + argo_ring_t ring; > + struct argo_ring_message_header mh = { 0 }; > + int32_t sp; > + int32_t ret = 0; > + uint32_t len; > + uint32_t iov_len; > + uint32_t sum_iov_len = 0; > + > + ASSERT(spin_is_locked(&ring_info->lock)); > + > + if ( (ret = argo_iov_count(iovs, niov, &len)) ) > + return ret; > + > + if ( ((ARGO_ROUNDUP(len) + sizeof (struct argo_ring_message_header) ) > >= > + ring_info->len) > + || (len > ARGO_MAX_MSG_SIZE) ) > + return -EMSGSIZE; > + > + do { > + ret = argo_ringbuf_get_rx_ptr(ring_info, &ring.rx_ptr); > + if ( ret ) > + break; > + > + argo_sanitize_ring(&ring, ring_info); > + > + argo_dprintk("ring.tx_ptr=%d ring.rx_ptr=%d ring.len=%d" > + " ring_info->tx_ptr=%d\n", > + ring.tx_ptr, ring.rx_ptr, ring.len, ring_info- > >tx_ptr); > + > + if ( ring.rx_ptr == ring.tx_ptr ) > + sp = ring_info->len; > + else > + { > + sp = ring.rx_ptr - ring.tx_ptr; > + if ( sp < 0 ) > + sp += ring.len; > + } > + > + if ( (ARGO_ROUNDUP(len) + sizeof(struct > argo_ring_message_header)) >= sp ) > + { > + argo_dprintk("EAGAIN\n"); > + ret = -EAGAIN; > + break; > + } > + > + mh.len = len + sizeof(struct argo_ring_message_header); > + mh.source.port = src_id->addr.port; > + mh.source.domain_id = src_id->addr.domain_id; > + mh.message_type = message_type; > + > + /* > + * For this copy to the guest ring, tx_ptr is always 16-byte > aligned > + * and the message header is 16 bytes long. > + */ > + BUILD_BUG_ON(sizeof(struct argo_ring_message_header) != > ARGO_ROUNDUP(1)); > + > + if ( (ret = argo_memcpy_to_guest_ring(ring_info, > + ring.tx_ptr + > sizeof(argo_ring_t), > + &mh, > + > XEN_GUEST_HANDLE_NULL(uint8_t), > + sizeof(mh))) ) > + break; > + > + ring.tx_ptr += sizeof(mh); > + if ( ring.tx_ptr == ring_info->len ) > + ring.tx_ptr = 0; > + > + while ( niov-- ) > + { > + XEN_GUEST_HANDLE_PARAM(uint8_t) bufp_hnd; > + XEN_GUEST_HANDLE(uint8_t) buf_hnd; > + argo_iov_t iov; > + > + ret = copy_from_guest_errno(&iov, iovs, 1); > + if ( ret ) > + break; > + > + bufp_hnd = guest_handle_from_ptr((uintptr_t)iov.iov_base, > uint8_t); > + buf_hnd = guest_handle_from_param(bufp_hnd, uint8_t); > + iov_len = iov.iov_len; > + > + if ( !iov_len ) > + { > + printk(XENLOG_ERR "argo: iov.iov_len=0 iov.iov_base=%" > + PRIx64" ring (vm%u:%x vm%d)\n", > + iov.iov_base, ring_info->id.addr.domain_id, > + ring_info->id.addr.port, ring_info->id.partner); > + > + guest_handle_add_offset(iovs, 1); > + continue; > + } > + > + if ( iov_len > ARGO_MAX_MSG_SIZE ) > + { > + ret = -EINVAL; > + break; > + } > + > + sum_iov_len += iov_len; > + if ( sum_iov_len > len ) > + { > + ret = -EINVAL; > + break; > + } > + > + if ( unlikely(!guest_handle_okay(buf_hnd, iov_len)) ) > + { > + ret = -EFAULT; > + break; > + } > + > + sp = ring.len - ring.tx_ptr; > + > + if ( iov_len > sp ) > + { > + ret = argo_memcpy_to_guest_ring(ring_info, > + ring.tx_ptr + sizeof(argo_ring_t), > + NULL, buf_hnd, sp); > + if ( ret ) > + break; > + > + ring.tx_ptr = 0; > + iov_len -= sp; > + guest_handle_add_offset(buf_hnd, sp); > + } > + > + ret = argo_memcpy_to_guest_ring(ring_info, > + ring.tx_ptr + sizeof(argo_ring_t), > + NULL, buf_hnd, iov_len); > + if ( ret ) > + break; > + > + ring.tx_ptr += iov_len; > + > + if ( ring.tx_ptr == ring_info->len ) > + ring.tx_ptr = 0; > + > + guest_handle_add_offset(iovs, 1); > + } > + > + if ( ret ) > + break; > + > + ring.tx_ptr = ARGO_ROUNDUP(ring.tx_ptr); > + > + if ( ring.tx_ptr >= ring_info->len ) > + ring.tx_ptr -= ring_info->len; > + > + mb(); > + ring_info->tx_ptr = ring.tx_ptr; > + if ( (ret = argo_update_tx_ptr(ring_info, ring.tx_ptr)) ) > + break; > + > + } while ( 0 ); > + Again. Odd do-while-zero style... I won't mention this again. Paul > + /* > + * At this point it is possible to unmap the ring_info, ie: > + * argo_ring_unmap(ring_info); > + * but performance should be improved by not doing so, and retaining > + * the mapping. > + * An XSM policy control over level of confidentiality required > + * versus performance cost could be added to decide that here. > + * See the similar comment in argo_ring_map_page re: write-only > mappings. > + */ > + > + if ( !ret ) > + *out_len = len; > + > + return ret; > +} > + > /* > * pending > */ > @@ -282,6 +627,47 @@ argo_pending_remove_all(struct argo_ring_info > *ring_info) > } > } > > +static int > +argo_pending_queue(struct argo_ring_info *ring_info, domid_t src_id, int > len) > +{ > + struct argo_pending_ent *ent; > + > + ASSERT(spin_is_locked(&ring_info->lock)); > + > + ent = xmalloc(struct argo_pending_ent); > + > + if ( !ent ) > + return -ENOMEM; > + > + ent->len = len; > + ent->id = src_id; > + > + hlist_add_head(&ent->node, &ring_info->pending); > + > + return 0; > +} > + > +static int > +argo_pending_requeue(struct argo_ring_info *ring_info, domid_t src_id, > int len) > +{ > + struct hlist_node *node; > + struct argo_pending_ent *ent; > + > + ASSERT(spin_is_locked(&ring_info->lock)); > + > + hlist_for_each_entry(ent, node, &ring_info->pending, node) > + { > + if ( ent->id == src_id ) > + { > + if ( ent->len < len ) > + ent->len = len; > + return 0; > + } > + } > + > + return argo_pending_queue(ring_info, src_id, len); > +} > + > static void argo_ring_remove_mfns(const struct domain *d, > struct argo_ring_info *ring_info) > { > @@ -509,6 +895,28 @@ argo_ring_find_info(const struct domain *d, const > struct argo_ring_id *id) > return NULL; > } > > +static struct argo_ring_info * > +argo_ring_find_info_by_match(const struct domain *d, uint32_t port, > + domid_t partner_id, uint64_t partner_cookie) > +{ > + argo_ring_id_t id; > + struct argo_ring_info *ring_info; > + > + ASSERT(rw_is_locked(&d->argo->lock)); > + > + id.addr.port = port; > + id.addr.domain_id = d->domain_id; > + id.partner = partner_id; > + > + ring_info = argo_ring_find_info(d, &id); > + if ( ring_info && (partner_cookie == ring_info->partner_cookie) ) > + return ring_info; > + > + id.partner = ARGO_DOMID_ANY; > + > + return argo_ring_find_info(d, &id); > +} > + > static long > argo_unregister_ring(struct domain *d, > XEN_GUEST_HANDLE_PARAM(argo_ring_t) ring_hnd) > @@ -754,6 +1162,103 @@ argo_register_ring(struct domain *d, > return ret; > } > > +/* > + * io > + */ > + > +static long > +argo_sendv(struct domain *src_d, const argo_addr_t *src_addr, > + const argo_addr_t *dst_addr, > + XEN_GUEST_HANDLE_PARAM(argo_iov_t) iovs, uint32_t niov, > + uint32_t message_type) > +{ > + struct domain *dst_d = NULL; > + struct argo_ring_id src_id; > + struct argo_ring_info *ring_info; > + int ret = 0; > + unsigned long len = 0; > + > + ASSERT(src_d->domain_id == src_addr->domain_id); > + > + read_lock(&argo_lock); > + > + do { > + if ( !src_d->argo ) > + { > + ret = -ENODEV; > + break; > + } > + > + src_id.addr.pad = 0; > + src_id.addr.port = src_addr->port; > + src_id.addr.domain_id = src_d->domain_id; > + src_id.partner = dst_addr->domain_id; > + > + dst_d = get_domain_by_id(dst_addr->domain_id); > + if ( !dst_d || !dst_d->argo ) > + { > + argo_dprintk("!dst_d, ECONNREFUSED\n"); > + ret = -ECONNREFUSED; > + break; > + } > + > + ret = xsm_argo_send(src_d, dst_d); > + if ( ret ) > + { > + printk(XENLOG_ERR "argo: XSM REJECTED %i -> %i\n", > + src_addr->domain_id, dst_addr->domain_id); > + break; > + } > + > + read_lock(&dst_d->argo->lock); > + > + do { > + ring_info = argo_ring_find_info_by_match(dst_d, dst_addr- > >port, > + src_addr->domain_id, > + src_d->argo- > >domain_cookie); > + if ( !ring_info ) > + { > + printk(XENLOG_ERR "argo: vm%u connection refused, " > + "src (vm%u:%x) dst (vm%u:%x)\n", > + current->domain->domain_id, > + src_id.addr.domain_id, src_id.addr.port, > + dst_addr->domain_id, dst_addr->port); > + > + ret = -ECONNREFUSED; > + break; > + } > + > + spin_lock(&ring_info->lock); > + > + ret = argo_ringbuf_insert(dst_d, ring_info, &src_id, > + iovs, niov, message_type, &len); > + if ( ret == -EAGAIN ) > + { > + argo_dprintk("argo_ringbuf_sendv failed, EAGAIN\n"); > + /* requeue to issue a notification when space is there */ > + if ( argo_pending_requeue(ring_info, src_addr->domain_id, > len) ) > + ret = -ENOMEM; > + } > + > + spin_unlock(&ring_info->lock); > + > + if ( ret >= 0 ) > + argo_signal_domain(dst_d); > + > + } while ( 0 ); > + > + read_unlock(&dst_d->argo->lock); > + > + } while ( 0 ); > + > + if ( dst_d ) > + put_domain(dst_d); > + > + read_unlock(&argo_lock); > + > + return ( ret < 0 ) ? ret : len; > +} > + > long > do_argo_message_op(int cmd, XEN_GUEST_HANDLE_PARAM(void) arg1, > XEN_GUEST_HANDLE_PARAM(void) arg2, > @@ -813,6 +1318,29 @@ do_argo_message_op(int cmd, > XEN_GUEST_HANDLE_PARAM(void) arg1, > rc = argo_unregister_ring(d, ring_hnd); > break; > } > + case ARGO_MESSAGE_OP_sendv: > + { > + argo_send_addr_t send_addr; > + uint32_t niov = arg3; > + uint32_t message_type = arg4; > + > + XEN_GUEST_HANDLE_PARAM(argo_send_addr_t) send_addr_hnd = > + guest_handle_cast(arg1, argo_send_addr_t); > + XEN_GUEST_HANDLE_PARAM(argo_iov_t) iovs = > + guest_handle_cast(arg2, argo_iov_t); > + > + if ( unlikely(!guest_handle_okay(send_addr_hnd, 1)) ) > + break; > + rc = copy_from_guest_errno(&send_addr, send_addr_hnd, 1); > + if ( rc ) > + break; > + > + send_addr.src.domain_id = d->domain_id; > + > + rc = argo_sendv(d, &send_addr.src, &send_addr.dst, > + iovs, niov, message_type); > + break; > + } > default: > rc = -ENOSYS; > break; > diff --git a/xen/include/public/argo.h b/xen/include/public/argo.h > index 6cf10a8..123efc5 100644 > --- a/xen/include/public/argo.h > +++ b/xen/include/public/argo.h > @@ -32,6 +32,28 @@ > */ > #define ARGO_MAX_RING_SIZE (16777216ULL) > > +/* > + * ARGO_MAXIOV : maximum number of iovs accepted in a single sendv. > + * Rationale for the value: > + * The Linux argo driver never passes more than two iovs. > + * Linux defines UIO_MAXIOV as 1024. > + * POSIX mandates at least 16 -- not that this is a POSIX API of course. > + * > + * Limit the total amount of data posted in a single argo operation to > + * no more than 2^31 bytes to reduce risk of integer overflow defects. > + * Each argo iov can hold ~ 2^24 bytes, so set ARGO_MAXIOV to 2^(31-24), > + * minus one to enable simple efficient bounds checking via masking: 127. > +*/ > +#define ARGO_MAXIOV 127U > + > +typedef struct argo_iov > +{ > + uint64_t iov_base; > + uint32_t iov_len; > + uint32_t pad; > +} argo_iov_t; > +DEFINE_XEN_GUEST_HANDLE(argo_iov_t); > + > /* pfn type: 64-bit on all architectures to aid avoiding a compat ABI */ > typedef uint64_t argo_pfn_t; > > @@ -42,6 +64,12 @@ typedef struct argo_addr > uint16_t pad; > } argo_addr_t; > > +typedef struct argo_send_addr > +{ > + argo_addr_t src; > + argo_addr_t dst; > +} argo_send_addr_t; > + > typedef struct argo_ring_id > { > struct argo_addr addr; > @@ -125,4 +153,35 @@ struct argo_ring_message_header > */ > #define ARGO_MESSAGE_OP_unregister_ring 2 > > +/* > + * ARGO_MESSAGE_OP_sendv > + * > + * Send a list of buffers contained in iovs. > + * > + * The send address struct specifies the source and destination addresses > + * for the message being sent, which are used to find the destination > ring: > + * Xen first looks for a most-specific match with a registered ring with > + * (id.addr == dst) and (id.partner == sending_domain) ; > + * if that fails, it then looks for a wildcard match (aka multicast > receiver) > + * where (id.addr == dst) and (id.partner == DOMID_ANY). > + * > + * For each iov entry, send iov_len bytes from iov_base to the > destination ring. > + * If insufficient space exists in the destination ring, it will return - > EAGAIN > + * and Xen will notify the caller when sufficient space becomes > available. > + * > + * The message type is a 32-bit data field available to communicate > message > + * context data (eg. kernel-to-kernel, rather than application layer). > + * > + * arg1: XEN_GUEST_HANDLE(argo_send_addr_t) source and dest addresses > + * arg2: XEN_GUEST_HANDLE(argo_iov_t) iovs > + * arg3: uint32_t niov > + * arg4: uint32_t message type > + */ > +#define ARGO_MESSAGE_OP_sendv 5 > + > +/* The maximum size of a guest message that may be sent on an Argo ring. > */ > +#define ARGO_MAX_MSG_SIZE ((ARGO_MAX_RING_SIZE) - \ > + (sizeof(struct argo_ring_message_header)) - \ > + ARGO_ROUNDUP(1)) > + > #endif > -- > 2.1.4 _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxxxxxxxxx https://lists.xenproject.org/mailman/listinfo/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |