[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] Re: [UNIKRAFT PATCH 2/5] lib/ukmpi: Initial port of FreeBSD's buf_ring.h
Hi Alexander, What we did with other source code imported from FreeBSD was to add a first commit with exactly the same code, copy-pasted from FreeBSD. See for example the bitmap imported from FreeBSD [1]. This should be fairly simple to do. You should also add in the commit message the commit ID from which you copied the initial code (see as example the first commit for bitmap). Next commit would contain the differences between the original code and what we have here. This second commit would actually represent the adaptation of the FreeBSD code to Unikraft. That's why you can also include the changes in 4/5 'lib/ukmpi: Include ring buffer into Unikraft build process' and even 1/5 lib/ukmpi: Introduce simple ring interface KConfig option. [1] https://github.com/unikraft/unikraft/commits/master/include/uk/bitmap.h Cheers, Costin On 7/20/20 7:40 PM, Alexander Jung wrote: > From: Alexander Jung <alexander.jung@xxxxxxxxx> > > This commit introduces a Unikraft-centric implementation of > FreeBSD's buf_ring.h message passing ring buffer header. Most of > the functionality is provided inline with this header. > > Signed-off-by: Alexander Jung <alexander.jung@xxxxxxxxx> > --- > lib/ukmpi/include/uk/ring.h | 474 > ++++++++++++++++++++++++++++++++++++++++++++ > 1 file changed, 474 insertions(+) > create mode 100644 lib/ukmpi/include/uk/ring.h > > diff --git a/lib/ukmpi/include/uk/ring.h b/lib/ukmpi/include/uk/ring.h > new file mode 100644 > index 0000000..6673244 > --- /dev/null > +++ b/lib/ukmpi/include/uk/ring.h > @@ -0,0 +1,474 @@ > +/* SPDX-License-Identifier: BSD-2-Clause-FreeBSD */ > +/* > + * Authors: Kip Macy <kmacy@xxxxxxxxxxx> > + * Alexander Jung <alexander.jung@xxxxxxxxx> > + * > + * Copyright (c) 2007-2009, Kip Macy <kmacy@xxxxxxxxxxx> > + * 2020, NEC Laboratories Europe GmbH, NEC Corporation. > + * All rights reserved. > + * > + * Redistribution and use in source and binary forms, with or without > + * modification, are permitted provided that the following conditions > + * are met: > + * > + * 1. Redistributions of source code must retain the above copyright > + * notice, this list of conditions and the following disclaimer. > + * 2. Redistributions in binary form must reproduce the above copyright > + * notice, this list of conditions and the following disclaimer in the > + * documentation and/or other materials provided with the distribution. > + * > + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND > + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE > + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE > + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE > + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL > + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS > + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) > + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT > + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY > + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF > + * SUCH DAMAGE. > + * > + * $FreeBSD$ > + */ > +/* > + * Simple ring implementation to handle object references. > + * > + * Inspired by FreeBSD and modified (commit-id: c45cce1). > + */ > +#ifndef __UK_RING_H__ > +#define __UK_RING_H__ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > + > +#include <errno.h> > +#include <uk/mutex.h> > +#include <uk/print.h> > +#include <uk/config.h> > +#include <uk/assert.h> > +#include <uk/plat/lcpu.h> > +#include <uk/arch/atomic.h> > + > + > +struct uk_ring { > + volatile uint32_t prod_head; > + volatile uint32_t prod_tail; > + int prod_size; > + int prod_mask; > + uint64_t drops; > + volatile uint32_t cons_head; > + volatile uint32_t cons_tail; > + int cons_size; > + int cons_mask; > +#ifdef CONFIG_LIBUKMPI_RING_DEBUG > + struct uk_mutex *lock; > +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ > + void *ring[0]; > +}; > + > + > +/** > + * Multi-producer safe lock-free ring buffer enqueue. > + * > + * @param br > + * Reference to the ring structure. > + * @param buf > + * Buffer size for the ring. > + */ > +static __inline int > +uk_ring_enqueue(struct uk_ring *br, void *buf) > +{ > + uint32_t prod_head, prod_next, cons_tail; > + > +#ifdef CONFIG_LIBUKMPI_RING_DEBUG > + int i; > + /* > + * Note: It is possible to encounter an mbuf that was removed via drpeek(), > + * and then re-added via drputback() and trigger a spurious panic. > + */ > + for (i = br->cons_head; i != br->prod_head; > + i = ((i + 1) & br->cons_mask)) > + if(br->ring[i] == buf) > + uk_pr_crit("buf=%p already enqueue at %d prod=%d cons=%d\n", > + buf, i, br->prod_tail, br->cons_tail); > +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ > + > + ukplat_lcpu_disable_irq(); > + > + do { > + prod_head = br->prod_head; > + prod_next = (prod_head + 1) & br->prod_mask; > + cons_tail = br->cons_tail; > + > + if (prod_next == cons_tail) { > + rmb(); > + if (prod_head == br->prod_head && > + cons_tail == br->cons_tail) { > + br->drops++; > + ukplat_lcpu_enable_irq(); > + return -ENOBUFS; > + } > + continue; > + } > + } while (!ukarch_compare_exchange_sync(&br->prod_head, prod_head, > prod_next)); > + > +#ifdef CONFIG_LIBUKMPI_RING_DEBUG > + if (br->ring[prod_head] != NULL) > + uk_pr_crit("dangling value in enqueue\n"); > +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ > + > + br->ring[prod_head] = buf; > + > + /* > + * If there are other enqueues in progress that preceded us, we need to > wait > + * for them to complete. > + */ > + /* TODO: Provide cpu_spinwait() */ > +#if 0 > + while (br->prod_tail != prod_head) > + cpu_spinwait(); > +#endif > + > + ukarch_store_n(&br->prod_tail, prod_next); > + ukplat_lcpu_enable_irq(); > + > + return 0; > +} > + > + > +/** > + * Multi-consumer safe dequeue. > + * > + * @param br > + * Reference to the ring structure. > + */ > +static __inline void * > +uk_ring_dequeue(struct uk_ring *br) > +{ > + void *buf; > + uint32_t cons_head, cons_next; > + > + ukplat_lcpu_disable_irq(); > + > + do { > + cons_head = br->cons_head; > + cons_next = (cons_head + 1) & br->cons_mask; > + > + if (cons_head == br->prod_tail) { > + ukplat_lcpu_enable_irq(); > + return (NULL); > + } > + } while (!ukarch_compare_exchange_sync(&br->cons_head, cons_head, > cons_next)); > + > + buf = br->ring[cons_head]; > + > +#ifdef CONFIG_LIBUKMPI_RING_DEBUG > + br->ring[cons_head] = NULL; > +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ > + > + /* > + * If there are other dequeues in progress that preceded us, we need to > wait > + * for them to complete. > + */ > + /* TODO: Provide cpu_spinwait() */ > +#if 0 > + while (br->cons_tail != cons_head) > + cpu_spinwait(); > +#endif > + > + ukarch_store_n(&br->cons_tail, cons_next); > + > + ukplat_lcpu_enable_irq(); > + > + return buf; > +} > + > + > +/** > + * Single-consumer dequeue use where dequeue is protected by a lock e.g. a > + * network driver's tx queue lock. > + * > + * @param br > + * Reference to the ring structure. > + */ > +static __inline void * > +uk_ring_dequeue_single(struct uk_ring *br) > +{ > + void *buf; > + uint32_t cons_head, cons_next, prod_tail; > + > +#ifdef PREFETCH_DEFINED > + uint32_t cons_next_next; > +#endif > + > + /* TODO: Provide atomic_load_acq_32() */ > +#if 0 > + /* > + * This is a workaround to allow using uk_ring on ARM and ARM64. > + * > + * ARM64TODO: Fix uk_ring in a generic way. > + * > + * REMARKS: It is suspected that cons_head does not require > + * load_acq operation, but this change was extensively tested > + * and confirmed it's working. To be reviewed once again in > + * FreeBSD-12. > + * > + * Preventing following situation: > + * > + * | uk_ring_enqueue() | > uk_ring_dequeue_single() | > + * > |----------------------------------------------+-----------------------------------------------------| > + * | | > | > + * | | cons_head = > br->cons_head; | > + * | atomic_cmpset_acq_32(&br->prod_head, ...)); | > | > + * | | buf = > br->ring[cons_head]; (see <1>) | > + * | br->ring[prod_head] = buf; | > | > + * | atomic_store_rel_32(&br->prod_tail, ...); | > | > + * | | prod_tail = > br->prod_tail; | > + * | | if (cons_head == > prod_tail) | > + * | | return NULL; > | > + * | | <condition is false > and code uses invalid(old) buf> | > + * > + * <1> Load (on core 1) from br->ring[cons_head] can be reordered > (speculative readed) by CPU. > + */ > +#if defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64) > + cons_head = atomic_load_acq_32(&br->cons_head); > +#else > + cons_head = br->cons_head; > +#endif /* defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64) */ > +#else > + cons_head = br->cons_head; > +#endif /* 0 */ > + > + // prod_tail = atomic_load_acq_32(&br->prod_tail); > + prod_tail = br->prod_tail; > + cons_next = (cons_head + 1) & br->cons_mask; > + > + if (cons_head == prod_tail) > + return NULL; > + > + br->cons_head = cons_next; > + buf = br->ring[cons_head]; > + > +#ifdef CONFIG_LIBUKMPI_RING_DEBUG > + br->ring[cons_head] = NULL; > + > + if (!uk_mutex_is_locked(br->lock)) > + uk_pr_crit("lock not held on single consumer dequeue\n"); > + > + if (br->cons_tail != cons_head) > + uk_pr_crit("inconsistent list cons_tail=%d cons_head=%d\n", > + br->cons_tail, cons_head); > + > +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ > + > + br->cons_tail = cons_next; > + > + return buf; > +} > + > + > +/** > + * Single-consumer advance after a peek use where it is protected by a lock > e.g. > + * a network driver's tx queue lock. > + * > + * @param br > + * Reference to the ring structure. > + */ > +static __inline void > +uk_ring_advance_single(struct uk_ring *br) > +{ > + uint32_t cons_head, cons_next; > + uint32_t prod_tail; > + > + debug_ring(br); > + > + cons_head = br->cons_head; > + prod_tail = br->prod_tail; > + > + cons_next = (cons_head + 1) & br->cons_mask; > + > + if (cons_head == prod_tail) > + return; > + > + br->cons_head = cons_next; > + > +#ifdef CONFIG_LIBUKMPI_RING_DEBUG > + br->ring[cons_head] = NULL; > +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ > + > + br->cons_tail = cons_next; > +} > + > + > +/** > + * Used to return a buffer (most likely already there) to the top of the > ring. > + * The caller should *not* have used any dequeue to pull it out of the ring > but > + * instead should have used the peek() function. This is normally used where > + * the transmit queue of a driver is full, and an mbuf must be returned. > Most > + * likely whats in the ring-buffer is what is being put back (since it was > not > + * removed), but sometimes the lower transmit function may have done a > pullup or > + * other function that will have changed it. As an optimization we always > put it > + * back (since jhb says the store is probably cheaper), if we have to do a > + * multi-queue version we will need the compare and an atomic. > + * > + * @param br > + * Reference to the ring structure. > + * @param new > + * > + */ > +static __inline void > +uk_ring_putback_single(struct uk_ring *br, void *new) > +{ > + /* Buffer ring has none in putback */ > + UK_ASSERT(br->cons_head != br->prod_tail); > + br->ring[br->cons_head] = new; > +} > + > + > +/** > + * Return a pointer to the first entry in the ring without modifying it, or > NULL > + * if the ring is empty race-prone if not protected by a lock. > + * > + * @param br > + * Reference to the ring structure. > + */ > +static __inline void * > +uk_ring_peek(struct uk_ring *br) > +{ > +#ifdef CONFIG_LIBUKMPI_RING_DEBUG > + if ((br->lock != NULL) && !uk_mutex_is_locked(br->lock)) > + uk_pr_crit("lock not held on single consumer dequeue\n"); > +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ > + > + /* > + * It is safe to not have a memory barrier here because we control cons and > + * tail is worst case a lagging indicator so we worst case we might return > + * NULL immediately after a buffer has been enqueued. > + */ > + if (br->cons_head == br->prod_tail) > + return NULL; > + > + return br->ring[br->cons_head]; > +} > + > + > +/** > + * Single-consumer clear the pointer to the first entry of the ring, or NULL > if > + * the ring is empty. > + * > + * @param br > + * Reference to the ring structure. > + */ > +static __inline void * > +uk_ring_peek_clear_single(struct uk_ring *br) > +{ > +#ifdef CONFIG_LIBUKMPI_RING_DEBUG > + void *ret; > + > + if (!mtx_owned(br->lock)) > + panic("lock not held on single consumer dequeue"); > +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ > + > + if (br->cons_head == br->prod_tail) > + return (NULL); > + > + /* TODO: Provide atomic_thread_fence_acq */ > +#if 0 > +#if defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64) > + /* > + * The barrier is required there on ARM and ARM64 to ensure, that > + * br->ring[br->cons_head] will not be fetched before the above condition > is > + * checked. Without the barrier, it is possible, that buffer will be > fetched > + * before the enqueue will put mbuf into br, then, in the meantime, the > + * enqueue will update the array and the prod_tail, and the conditional > check > + * will be true, so we will return previously fetched (and invalid) buffer. > + */ > + atomic_thread_fence_acq(); > +#endif /* defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64) */ > +#endif /* 0 */ > + > +#ifdef CONFIG_LIBUKMPI_RING_DEBUG > + /* > + * Single consumer, i.e. cons_head will not move while we are > + * running, so atomic_swap_ptr() is not necessary here. > + */ > + ret = br->ring[br->cons_head]; > + br->ring[br->cons_head] = NULL; > + return (ret); > +#else > + return (br->ring[br->cons_head]); > +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ > +} > + > + > +/** > + * Return whether the ring buffer is full. > + * > + * @param br > + * Reference to the ring structure. > + */ > +static __inline int > +uk_ring_full(struct uk_ring *br) > +{ > + return ((br->prod_head + 1) & br->prod_mask) == br->cons_tail; > +} > + > + > +/** > + * Return whether the ring buffer is empty. > + * > + * @param br > + * Reference to the ring structure. > + */ > +static __inline int > +uk_ring_empty(struct uk_ring *br) > +{ > + return br->cons_head == br->prod_tail; > +} > + > + > +/** > + * Return the queue size in the ring buffer. > + * > + * @param br > + * Reference to the ring structure. > + */ > +static __inline int > +uk_ring_count(struct uk_ring *br) > +{ > + return (br->prod_size + br->prod_tail - br->cons_tail) & br->prod_mask; > +} > + > + > +/** > + * Create a new ring buffer. > + * > + * @param count > + * The size of the ring buffer. > + * @param a > + * The memory allocator to use when creating the ring buffer. > + * @param flags > + * Additional flags to specify to the ring. > + * @param lock > + * The mutex to use when debugging the ring buffer. > + */ > +struct uk_ring * > +uk_ring_alloc(int count, struct uk_alloc *a, int flags, struct uk_mutex > *lock); > + > + > +/** > + * Free the ring from use. > + * > + * @param br > + * Reference to the ring structure. > + * @param a > + * The memory allocator to use when freeing the object. > + */ > +void > +uk_ring_free(struct uk_ring *br, struct uk_alloc *a); > + > + > +#endif /* __UK_RING_H__ */ >
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |