[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Xen-devel] [PATCH v5 09/15] argo: implement the sendv op; evtchn: expose send_guest_global_virq



On Mon, Jan 21, 2019 at 01:59:49AM -0800, Christopher Clark wrote:
> sendv operation is invoked to perform a synchronous send of buffers
> contained in iovs to a remote domain's registered ring.
> 
> It takes:
>  * A destination address (domid, port) for the ring to send to.
>    It performs a most-specific match lookup, to allow for wildcard.
>  * A source address, used to inform the destination of where to reply.
>  * The address of an array of iovs containing the data to send
>  * .. and the length of that array of iovs
>  * and a 32-bit message type, available to communicate message context
>    data (eg. kernel-to-kernel, separate from the application data).
> 
> If insufficient space exists in the destination ring, it will return
> -EAGAIN and Xen will notify the caller when sufficient space becomes
> available.
> 
> Accesses to the ring indices are appropriately atomic. The rings are
> mapped into Xen's private address space to write as needed and the
> mappings are retained for later use.
> 
> Notifications are sent to guests via VIRQ and send_guest_global_virq is
> exposed in the change to enable argo to call it. VIRQ_ARGO_MESSAGE is
                                                   ^ VIRQ_ARGO
> claimed from the VIRQ previously reserved for this purpose (#11).
> 
> The VIRQ notification method is used rather than sending events using
> evtchn functions directly because:
> 
> * no current event channel type is an exact fit for the intended
>   behaviour. ECS_IPI is closest, but it disallows migration to
>   other VCPUs which is not necessarily a requirement for Argo.
> 
> * at the point of argo_init, allocation of an event channel is
>   complicated by none of the guest VCPUs being initialized yet
>   and the event channel logic expects that a valid event channel
>   has a present VCPU.

IMO iff you wanted to use event channels those _must_ be setup by the
guest, ie: the guest argo driver would load, allocate an event channel
and then tell the hypervisor about the event channel that should be
used for argo notifications.

> +static int
> +memcpy_to_guest_ring(const struct domain *d, struct argo_ring_info 
> *ring_info,
> +                     unsigned int offset,
> +                     const void *src, XEN_GUEST_HANDLE(uint8_t) src_hnd,
> +                     unsigned int len)
> +{
> +    unsigned int mfns_index = offset >> PAGE_SHIFT;
> +    void *dst;
> +    int ret;
> +    unsigned int src_offset = 0;
> +
> +    ASSERT(LOCKING_L3(d, ring_info));
> +
> +    offset &= ~PAGE_MASK;
> +
> +    if ( len + offset > XEN_ARGO_MAX_RING_SIZE )
> +        return -EFAULT;
> +
> +    while ( len )
> +    {
> +        unsigned int head_len = (offset + len) > PAGE_SIZE ? PAGE_SIZE - 
> offset
> +                                                           : len;

IMO that would be clearer as:

head_len = min(PAGE_SIZE - offset, len);

But anyway, this should go away when you move to using vmap.

[...]
> +static int
> +ringbuf_insert(const struct domain *d, struct argo_ring_info *ring_info,
> +               const struct argo_ring_id *src_id,
> +               XEN_GUEST_HANDLE_PARAM(xen_argo_iov_t) iovs_hnd,
> +               unsigned long niov, uint32_t message_type,
> +               unsigned long *out_len)
> +{
> +    xen_argo_ring_t ring;
> +    struct xen_argo_ring_message_header mh = { };
> +    int sp, ret;
> +    unsigned int len = 0;
> +    xen_argo_iov_t iovs[XEN_ARGO_MAXIOV];
> +    xen_argo_iov_t *piov;
> +    XEN_GUEST_HANDLE(uint8_t) NULL_hnd =
> +       guest_handle_from_param(guest_handle_from_ptr(NULL, uint8_t), 
> uint8_t);
> +
> +    ASSERT(LOCKING_L3(d, ring_info));
> +
> +    ret = __copy_from_guest(iovs, iovs_hnd, niov) ? -EFAULT : 0;
> +    if ( ret )
> +        return ret;
> +
> +    /*
> +     * Obtain the total size of data to transmit -- sets the 'len' variable
> +     * -- and sanity check that the iovs conform to size and number limits.
> +     * Enforced below: no more than 'len' bytes of guest data
> +     * (plus the message header) will be sent in this operation.
> +     */
> +    ret = iov_count(iovs, niov, &len);
> +    if ( ret )
> +        return ret;
> +
> +    /*
> +     * Size bounds check against ring size and static maximum message limit.
> +     * The message must not fill the ring; there must be at least one slot
> +     * remaining so we can distinguish a full ring from an empty one.
> +     */
> +    if ( ((ROUNDUP_MESSAGE(len) +
> +            sizeof(struct xen_argo_ring_message_header)) >= ring_info->len) 
> ||
> +         (len > MAX_ARGO_MESSAGE_SIZE) )

len is already checked to be <= MAX_ARGO_MESSAGE_SIZE in iov_count
where it gets set, this is redundant.

> +        return -EMSGSIZE;
> +
> +    ret = get_sanitized_ring(d, &ring, ring_info);
> +    if ( ret )
> +        return ret;
> +
> +    argo_dprintk("ring.tx_ptr=%u ring.rx_ptr=%u ring len=%u"
> +                 " ring_info->tx_ptr=%u\n",
> +                 ring.tx_ptr, ring.rx_ptr, ring_info->len, 
> ring_info->tx_ptr);
> +
> +    if ( ring.rx_ptr == ring.tx_ptr )
> +        sp = ring_info->len;
> +    else
> +    {
> +        sp = ring.rx_ptr - ring.tx_ptr;
> +        if ( sp < 0 )
> +            sp += ring_info->len;
> +    }
> +
> +    /*
> +     * Size bounds check against currently available space in the ring.
> +     * Again: the message must not fill the ring leaving no space remaining.
> +     */
> +    if ( (ROUNDUP_MESSAGE(len) +
> +            sizeof(struct xen_argo_ring_message_header)) >= sp )
> +    {
> +        argo_dprintk("EAGAIN\n");
> +        return -EAGAIN;
> +    }
> +
> +    mh.len = len + sizeof(struct xen_argo_ring_message_header);
> +    mh.source.aport = src_id->aport;
> +    mh.source.domain_id = src_id->domain_id;
> +    mh.message_type = message_type;
> +
> +    /*
> +     * For this copy to the guest ring, tx_ptr is always 16-byte aligned
> +     * and the message header is 16 bytes long.
> +     */
> +    BUILD_BUG_ON(
> +        sizeof(struct xen_argo_ring_message_header) != ROUNDUP_MESSAGE(1));
> +
> +    /*
> +     * First data write into the destination ring: fixed size, message 
> header.
> +     * This cannot overrun because the available free space (value in 'sp')
> +     * is checked above and must be at least this size.
> +     */
> +    ret = memcpy_to_guest_ring(d, ring_info,
> +                               ring.tx_ptr + sizeof(xen_argo_ring_t),
> +                               &mh, NULL_hnd, sizeof(mh));
> +    if ( ret )
> +    {
> +        gprintk(XENLOG_ERR,
> +                "argo: failed to write message header to ring (vm%u:%x 
> vm%u)\n",
> +                ring_info->id.domain_id, ring_info->id.aport,
> +                ring_info->id.partner_id);
> +
> +        return ret;
> +    }
> +
> +    ring.tx_ptr += sizeof(mh);
> +    if ( ring.tx_ptr == ring_info->len )
> +        ring.tx_ptr = 0;
> +
> +    for ( piov = iovs; niov--; piov++ )
> +    {
> +        XEN_GUEST_HANDLE_64(uint8_t) buf_hnd = piov->iov_hnd;
> +        unsigned int iov_len = piov->iov_len;
> +
> +        /* If no data is provided in this iov, moan and skip on to the next 
> */
> +        if ( !iov_len )
> +        {
> +            gprintk(XENLOG_ERR,

This should likely be WARN or INFO, since it's not an error?

> +                    "argo: no data iov_len=0 iov_hnd=%p ring (vm%u:%x 
> vm%u)\n",
> +                    buf_hnd.p, ring_info->id.domain_id, ring_info->id.aport,
> +                    ring_info->id.partner_id);
> +
> +            continue;
> +        }
> +
> +        if ( unlikely(!guest_handle_okay(buf_hnd, iov_len)) )
> +        {
> +            gprintk(XENLOG_ERR,
> +                    "argo: bad iov handle [%p, %u] (vm%u:%x vm%u)\n",
> +                    buf_hnd.p, iov_len,
> +                    ring_info->id.domain_id, ring_info->id.aport,
> +                    ring_info->id.partner_id);
> +
> +            return -EFAULT;
> +        }
> +
> +        sp = ring_info->len - ring.tx_ptr;
> +
> +        /* Check: iov data size versus free space at the tail of the ring */
> +        if ( iov_len > sp )
> +        {
> +            /*
> +             * Second possible data write: ring-tail-wrap-write.
> +             * Populate the ring tail and update the internal tx_ptr to 
> handle
> +             * wrapping at the end of ring.
> +             * Size of data written here: sp
> +             * which is the exact full amount of free space available at the
> +             * tail of the ring, so this cannot overrun.
> +             */
> +            ret = memcpy_to_guest_ring(d, ring_info,
> +                                       ring.tx_ptr + sizeof(xen_argo_ring_t),
> +                                       NULL, buf_hnd, sp);
> +            if ( ret )
> +            {
> +                gprintk(XENLOG_ERR,
> +                        "argo: failed to copy {%p, %d} (vm%u:%x vm%u)\n",
> +                        buf_hnd.p, sp,
> +                        ring_info->id.domain_id, ring_info->id.aport,
> +                        ring_info->id.partner_id);
> +
> +                return ret;
> +            }
> +
> +            ring.tx_ptr = 0;
> +            iov_len -= sp;
> +            guest_handle_add_offset(buf_hnd, sp);
> +
> +            ASSERT(iov_len <= ring_info->len);
> +        }
> +
> +        /*
> +         * Third possible data write: all data remaining for this iov.
> +         * Size of data written here: iov_len
> +         *
> +         * Case 1: if the ring-tail-wrap-write above was performed, then
> +         *         iov_len has been decreased by 'sp' and ring.tx_ptr is 
> zero.
> +         *
> +         *    We know from checking the result of iov_count:
> +         *      len + sizeof(message_header) <= ring_info->len
> +         *    We also know that len is the total of summing all iov_lens, so:
> +         *       iov_len <= len
> +         *    so by transitivity:
> +         *       iov_len <= len <= (ring_info->len - sizeof(msgheader))
> +         *    and therefore:
> +         *       (iov_len + sizeof(msgheader) <= ring_info->len) &&
> +         *       (ring.tx_ptr == 0)
> +         *    so this write cannot overrun here.
> +         *
> +         * Case 2: ring-tail-wrap-write above was not performed
> +         *    -> so iov_len is the guest-supplied value and: (iov_len <= sp)
> +         *    ie. less than available space at the tail of the ring:
> +         *        so this write cannot overrun.
> +         */
> +        ret = memcpy_to_guest_ring(d, ring_info,
> +                                   ring.tx_ptr + sizeof(xen_argo_ring_t),
> +                                   NULL, buf_hnd, iov_len);
> +        if ( ret )
> +        {
> +            gprintk(XENLOG_ERR,
> +                    "argo: failed to copy [%p, %u] (vm%u:%x vm%u)\n",
> +                    buf_hnd.p, iov_len, ring_info->id.domain_id,
> +                    ring_info->id.aport, ring_info->id.partner_id);
> +
> +            return ret;
> +        }
> +
> +        ring.tx_ptr += iov_len;
> +
> +        if ( ring.tx_ptr == ring_info->len )
> +            ring.tx_ptr = 0;
> +    }
> +
> +    ring.tx_ptr = ROUNDUP_MESSAGE(ring.tx_ptr);
> +
> +    if ( ring.tx_ptr >= ring_info->len )
> +        ring.tx_ptr -= ring_info->len;

You seem to handle the wrapping after each possible write, so I think
the above is not needed? Maybe it should be an assert instead?

> +
> +    update_tx_ptr(d, ring_info, ring.tx_ptr);
> +
> +    /*
> +     * At this point (and also on an error exit paths from this function) it 
> is
> +     * possible to unmap the ring_info, ie:
> +     *   ring_unmap(d, ring_info);
> +     * but performance should be improved by not doing so, and retaining
> +     * the mapping.
> +     * An XSM policy control over level of confidentiality required
> +     * versus performance cost could be added to decide that here.
> +     */
> +
> +    *out_len = len;
> +
> +    return ret;
> +}
> +
>  static void
>  wildcard_pending_list_remove(domid_t domain_id, struct pending_ent *ent)
>  {
> @@ -497,6 +918,25 @@ wildcard_pending_list_remove(domid_t domain_id, struct 
> pending_ent *ent)
>  }
>  
>  static void
> +wildcard_pending_list_insert(domid_t domain_id, struct pending_ent *ent)
> +{
> +    struct domain *d = get_domain_by_id(domain_id);
> +
> +    if ( !d )
> +        return;
> +
> +    ASSERT(LOCKING_Read_L1);
> +
> +    if ( d->argo )
> +    {
> +        spin_lock(&d->argo->wildcard_L2_lock);
> +        list_add(&ent->wildcard_node, &d->argo->wildcard_pend_list);
> +        spin_unlock(&d->argo->wildcard_L2_lock);
> +    }
> +    put_domain(d);
> +}
> +
> +static void
>  pending_remove_all(const struct domain *d, struct argo_ring_info *ring_info)
>  {
>      struct list_head *ring_pending = &ring_info->pending;
> @@ -518,6 +958,70 @@ pending_remove_all(const struct domain *d, struct 
> argo_ring_info *ring_info)
>      ring_info->npending = 0;
>  }
>  
> +static int
> +pending_queue(const struct domain *d, struct argo_ring_info *ring_info,
> +              domid_t src_id, unsigned int len)
> +{
> +    struct pending_ent *ent;
> +
> +    ASSERT(LOCKING_L3(d, ring_info));
> +
> +    if ( ring_info->npending >= MAX_PENDING_PER_RING )
> +        return -ENOSPC;
> +
> +    ent = xmalloc(struct pending_ent);
> +    if ( !ent )
> +        return -ENOMEM;
> +
> +    ent->len = len;
> +    ent->domain_id = src_id;
> +    ent->ring_info = ring_info;
> +
> +    if ( ring_info->id.partner_id == XEN_ARGO_DOMID_ANY )
> +        wildcard_pending_list_insert(src_id, ent);
> +    list_add(&ent->node, &ring_info->pending);
> +    ring_info->npending++;
> +
> +    return 0;
> +}
> +
> +static int
> +pending_requeue(const struct domain *d, struct argo_ring_info *ring_info,
> +                domid_t src_id, unsigned int len)
> +{
> +    struct list_head *cursor, *head;
> +
> +    ASSERT(LOCKING_L3(d, ring_info));
> +
> +    /* List structure is not modified here. Update len in a match if found. 
> */
> +    head = &ring_info->pending;
> +
> +    for ( cursor = head->next; cursor != head; cursor = cursor->next )

list_for_each_entry

>  long
>  do_argo_op(unsigned int cmd, XEN_GUEST_HANDLE_PARAM(void) arg1,
>             XEN_GUEST_HANDLE_PARAM(void) arg2, unsigned long arg3,
> @@ -1145,6 +1734,53 @@ do_argo_op(unsigned int cmd, 
> XEN_GUEST_HANDLE_PARAM(void) arg1,
>          break;
>      }
>  
> +    case XEN_ARGO_OP_sendv:
> +    {
> +        xen_argo_send_addr_t send_addr;
> +
> +        XEN_GUEST_HANDLE_PARAM(xen_argo_send_addr_t) send_addr_hnd =
> +            guest_handle_cast(arg1, xen_argo_send_addr_t);
> +        XEN_GUEST_HANDLE_PARAM(xen_argo_iov_t) iovs_hnd =
> +            guest_handle_cast(arg2, xen_argo_iov_t);
> +        /* arg3 is niov */
> +        /* arg4 is message_type. Must be a 32-bit value. */
> +
> +        rc = copy_from_guest(&send_addr, send_addr_hnd, 1) ? -EFAULT : 0;
> +        if ( rc )
> +            break;
> +
> +        /*
> +         * Check padding is zeroed. Reject niov above limit or message_types
> +         * that are outside 32 bit range.
> +         */
> +        if ( unlikely(send_addr.src.pad || send_addr.dst.pad ||
> +                      (arg3 > XEN_ARGO_MAXIOV) || (arg4 & ~0xffffffffUL)) )

arg4 & (GB(4) - 1)

Is clearer IMO, or:

arg4 > UINT32_MAX

> +        {
> +            rc = -EINVAL;
> +            break;
> +        }
> +
> +        if ( send_addr.src.domain_id == XEN_ARGO_DOMID_ANY )
> +            send_addr.src.domain_id = currd->domain_id;
> +
> +        /* No domain is currently authorized to send on behalf of another */
> +        if ( unlikely(send_addr.src.domain_id != currd->domain_id) )
> +        {
> +            rc = -EPERM;
> +            break;
> +        }
> +
> +        /*
> +         * Check access to the whole array here so we can use the faster 
> __copy
> +         * operations to read each element later.
> +         */
> +        if ( unlikely(!guest_handle_okay(iovs_hnd, arg3)) )

You need to set rc to EFAULT here, because the call to copy_from_guest
has set it to 0.

Alternatively you can change the call above to be:

if ( copy_from_guest(&send_addr, send_addr_hnd, 1) )
    return -EFAULT;

So rc doesn't get set to 0 on success.

With those taken care of:

Reviewed-by: Roger Pau Monné <roger.pau@xxxxxxxxxx>

_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxxx
https://lists.xenproject.org/mailman/listinfo/xen-devel

 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.