[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [UNIKRAFT PATCH 2/5] lib/ukmpi: Initial port of FreeBSD's buf_ring.h
From: Alexander Jung <alexander.jung@xxxxxxxxx> This commit introduces a Unikraft-centric implementation of FreeBSD's buf_ring.h message passing ring buffer header. Most of the functionality is provided inline with this header. Signed-off-by: Alexander Jung <alexander.jung@xxxxxxxxx> --- lib/ukmpi/include/uk/ring.h | 474 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 474 insertions(+) create mode 100644 lib/ukmpi/include/uk/ring.h diff --git a/lib/ukmpi/include/uk/ring.h b/lib/ukmpi/include/uk/ring.h new file mode 100644 index 0000000..6673244 --- /dev/null +++ b/lib/ukmpi/include/uk/ring.h @@ -0,0 +1,474 @@ +/* SPDX-License-Identifier: BSD-2-Clause-FreeBSD */ +/* + * Authors: Kip Macy <kmacy@xxxxxxxxxxx> + * Alexander Jung <alexander.jung@xxxxxxxxx> + * + * Copyright (c) 2007-2009, Kip Macy <kmacy@xxxxxxxxxxx> + * 2020, NEC Laboratories Europe GmbH, NEC Corporation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * $FreeBSD$ + */ +/* + * Simple ring implementation to handle object references. + * + * Inspired by FreeBSD and modified (commit-id: c45cce1). + */ +#ifndef __UK_RING_H__ +#define __UK_RING_H__ + +#ifdef __cplusplus +extern "C" { +#endif + + +#include <errno.h> +#include <uk/mutex.h> +#include <uk/print.h> +#include <uk/config.h> +#include <uk/assert.h> +#include <uk/plat/lcpu.h> +#include <uk/arch/atomic.h> + + +struct uk_ring { + volatile uint32_t prod_head; + volatile uint32_t prod_tail; + int prod_size; + int prod_mask; + uint64_t drops; + volatile uint32_t cons_head; + volatile uint32_t cons_tail; + int cons_size; + int cons_mask; +#ifdef CONFIG_LIBUKMPI_RING_DEBUG + struct uk_mutex *lock; +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ + void *ring[0]; +}; + + +/** + * Multi-producer safe lock-free ring buffer enqueue. + * + * @param br + * Reference to the ring structure. + * @param buf + * Buffer size for the ring. + */ +static __inline int +uk_ring_enqueue(struct uk_ring *br, void *buf) +{ + uint32_t prod_head, prod_next, cons_tail; + +#ifdef CONFIG_LIBUKMPI_RING_DEBUG + int i; + /* + * Note: It is possible to encounter an mbuf that was removed via drpeek(), + * and then re-added via drputback() and trigger a spurious panic. + */ + for (i = br->cons_head; i != br->prod_head; + i = ((i + 1) & br->cons_mask)) + if(br->ring[i] == buf) + uk_pr_crit("buf=%p already enqueue at %d prod=%d cons=%d\n", + buf, i, br->prod_tail, br->cons_tail); +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ + + ukplat_lcpu_disable_irq(); + + do { + prod_head = br->prod_head; + prod_next = (prod_head + 1) & br->prod_mask; + cons_tail = br->cons_tail; + + if (prod_next == cons_tail) { + rmb(); + if (prod_head == br->prod_head && + cons_tail == br->cons_tail) { + br->drops++; + ukplat_lcpu_enable_irq(); + return -ENOBUFS; + } + continue; + } + } while (!ukarch_compare_exchange_sync(&br->prod_head, prod_head, prod_next)); + +#ifdef CONFIG_LIBUKMPI_RING_DEBUG + if (br->ring[prod_head] != NULL) + uk_pr_crit("dangling value in enqueue\n"); +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ + + br->ring[prod_head] = buf; + + /* + * If there are other enqueues in progress that preceded us, we need to wait + * for them to complete. + */ + /* TODO: Provide cpu_spinwait() */ +#if 0 + while (br->prod_tail != prod_head) + cpu_spinwait(); +#endif + + ukarch_store_n(&br->prod_tail, prod_next); + ukplat_lcpu_enable_irq(); + + return 0; +} + + +/** + * Multi-consumer safe dequeue. + * + * @param br + * Reference to the ring structure. + */ +static __inline void * +uk_ring_dequeue(struct uk_ring *br) +{ + void *buf; + uint32_t cons_head, cons_next; + + ukplat_lcpu_disable_irq(); + + do { + cons_head = br->cons_head; + cons_next = (cons_head + 1) & br->cons_mask; + + if (cons_head == br->prod_tail) { + ukplat_lcpu_enable_irq(); + return (NULL); + } + } while (!ukarch_compare_exchange_sync(&br->cons_head, cons_head, cons_next)); + + buf = br->ring[cons_head]; + +#ifdef CONFIG_LIBUKMPI_RING_DEBUG + br->ring[cons_head] = NULL; +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ + + /* + * If there are other dequeues in progress that preceded us, we need to wait + * for them to complete. + */ + /* TODO: Provide cpu_spinwait() */ +#if 0 + while (br->cons_tail != cons_head) + cpu_spinwait(); +#endif + + ukarch_store_n(&br->cons_tail, cons_next); + + ukplat_lcpu_enable_irq(); + + return buf; +} + + +/** + * Single-consumer dequeue use where dequeue is protected by a lock e.g. a + * network driver's tx queue lock. + * + * @param br + * Reference to the ring structure. + */ +static __inline void * +uk_ring_dequeue_single(struct uk_ring *br) +{ + void *buf; + uint32_t cons_head, cons_next, prod_tail; + +#ifdef PREFETCH_DEFINED + uint32_t cons_next_next; +#endif + + /* TODO: Provide atomic_load_acq_32() */ +#if 0 + /* + * This is a workaround to allow using uk_ring on ARM and ARM64. + * + * ARM64TODO: Fix uk_ring in a generic way. + * + * REMARKS: It is suspected that cons_head does not require + * load_acq operation, but this change was extensively tested + * and confirmed it's working. To be reviewed once again in + * FreeBSD-12. + * + * Preventing following situation: + * + * | uk_ring_enqueue() | uk_ring_dequeue_single() | + * |----------------------------------------------+-----------------------------------------------------| + * | | | + * | | cons_head = br->cons_head; | + * | atomic_cmpset_acq_32(&br->prod_head, ...)); | | + * | | buf = br->ring[cons_head]; (see <1>) | + * | br->ring[prod_head] = buf; | | + * | atomic_store_rel_32(&br->prod_tail, ...); | | + * | | prod_tail = br->prod_tail; | + * | | if (cons_head == prod_tail) | + * | | return NULL; | + * | | <condition is false and code uses invalid(old) buf> | + * + * <1> Load (on core 1) from br->ring[cons_head] can be reordered (speculative readed) by CPU. + */ +#if defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64) + cons_head = atomic_load_acq_32(&br->cons_head); +#else + cons_head = br->cons_head; +#endif /* defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64) */ +#else + cons_head = br->cons_head; +#endif /* 0 */ + + // prod_tail = atomic_load_acq_32(&br->prod_tail); + prod_tail = br->prod_tail; + cons_next = (cons_head + 1) & br->cons_mask; + + if (cons_head == prod_tail) + return NULL; + + br->cons_head = cons_next; + buf = br->ring[cons_head]; + +#ifdef CONFIG_LIBUKMPI_RING_DEBUG + br->ring[cons_head] = NULL; + + if (!uk_mutex_is_locked(br->lock)) + uk_pr_crit("lock not held on single consumer dequeue\n"); + + if (br->cons_tail != cons_head) + uk_pr_crit("inconsistent list cons_tail=%d cons_head=%d\n", + br->cons_tail, cons_head); + +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ + + br->cons_tail = cons_next; + + return buf; +} + + +/** + * Single-consumer advance after a peek use where it is protected by a lock e.g. + * a network driver's tx queue lock. + * + * @param br + * Reference to the ring structure. + */ +static __inline void +uk_ring_advance_single(struct uk_ring *br) +{ + uint32_t cons_head, cons_next; + uint32_t prod_tail; + + debug_ring(br); + + cons_head = br->cons_head; + prod_tail = br->prod_tail; + + cons_next = (cons_head + 1) & br->cons_mask; + + if (cons_head == prod_tail) + return; + + br->cons_head = cons_next; + +#ifdef CONFIG_LIBUKMPI_RING_DEBUG + br->ring[cons_head] = NULL; +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ + + br->cons_tail = cons_next; +} + + +/** + * Used to return a buffer (most likely already there) to the top of the ring. + * The caller should *not* have used any dequeue to pull it out of the ring but + * instead should have used the peek() function. This is normally used where + * the transmit queue of a driver is full, and an mbuf must be returned. Most + * likely whats in the ring-buffer is what is being put back (since it was not + * removed), but sometimes the lower transmit function may have done a pullup or + * other function that will have changed it. As an optimization we always put it + * back (since jhb says the store is probably cheaper), if we have to do a + * multi-queue version we will need the compare and an atomic. + * + * @param br + * Reference to the ring structure. + * @param new + * + */ +static __inline void +uk_ring_putback_single(struct uk_ring *br, void *new) +{ + /* Buffer ring has none in putback */ + UK_ASSERT(br->cons_head != br->prod_tail); + br->ring[br->cons_head] = new; +} + + +/** + * Return a pointer to the first entry in the ring without modifying it, or NULL + * if the ring is empty race-prone if not protected by a lock. + * + * @param br + * Reference to the ring structure. + */ +static __inline void * +uk_ring_peek(struct uk_ring *br) +{ +#ifdef CONFIG_LIBUKMPI_RING_DEBUG + if ((br->lock != NULL) && !uk_mutex_is_locked(br->lock)) + uk_pr_crit("lock not held on single consumer dequeue\n"); +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ + + /* + * It is safe to not have a memory barrier here because we control cons and + * tail is worst case a lagging indicator so we worst case we might return + * NULL immediately after a buffer has been enqueued. + */ + if (br->cons_head == br->prod_tail) + return NULL; + + return br->ring[br->cons_head]; +} + + +/** + * Single-consumer clear the pointer to the first entry of the ring, or NULL if + * the ring is empty. + * + * @param br + * Reference to the ring structure. + */ +static __inline void * +uk_ring_peek_clear_single(struct uk_ring *br) +{ +#ifdef CONFIG_LIBUKMPI_RING_DEBUG + void *ret; + + if (!mtx_owned(br->lock)) + panic("lock not held on single consumer dequeue"); +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ + + if (br->cons_head == br->prod_tail) + return (NULL); + + /* TODO: Provide atomic_thread_fence_acq */ +#if 0 +#if defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64) + /* + * The barrier is required there on ARM and ARM64 to ensure, that + * br->ring[br->cons_head] will not be fetched before the above condition is + * checked. Without the barrier, it is possible, that buffer will be fetched + * before the enqueue will put mbuf into br, then, in the meantime, the + * enqueue will update the array and the prod_tail, and the conditional check + * will be true, so we will return previously fetched (and invalid) buffer. + */ + atomic_thread_fence_acq(); +#endif /* defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64) */ +#endif /* 0 */ + +#ifdef CONFIG_LIBUKMPI_RING_DEBUG + /* + * Single consumer, i.e. cons_head will not move while we are + * running, so atomic_swap_ptr() is not necessary here. + */ + ret = br->ring[br->cons_head]; + br->ring[br->cons_head] = NULL; + return (ret); +#else + return (br->ring[br->cons_head]); +#endif /* CONFIG_LIBUKMPI_RING_DEBUG */ +} + + +/** + * Return whether the ring buffer is full. + * + * @param br + * Reference to the ring structure. + */ +static __inline int +uk_ring_full(struct uk_ring *br) +{ + return ((br->prod_head + 1) & br->prod_mask) == br->cons_tail; +} + + +/** + * Return whether the ring buffer is empty. + * + * @param br + * Reference to the ring structure. + */ +static __inline int +uk_ring_empty(struct uk_ring *br) +{ + return br->cons_head == br->prod_tail; +} + + +/** + * Return the queue size in the ring buffer. + * + * @param br + * Reference to the ring structure. + */ +static __inline int +uk_ring_count(struct uk_ring *br) +{ + return (br->prod_size + br->prod_tail - br->cons_tail) & br->prod_mask; +} + + +/** + * Create a new ring buffer. + * + * @param count + * The size of the ring buffer. + * @param a + * The memory allocator to use when creating the ring buffer. + * @param flags + * Additional flags to specify to the ring. + * @param lock + * The mutex to use when debugging the ring buffer. + */ +struct uk_ring * +uk_ring_alloc(int count, struct uk_alloc *a, int flags, struct uk_mutex *lock); + + +/** + * Free the ring from use. + * + * @param br + * Reference to the ring structure. + * @param a + * The memory allocator to use when freeing the object. + */ +void +uk_ring_free(struct uk_ring *br, struct uk_alloc *a); + + +#endif /* __UK_RING_H__ */ -- 2.11.0
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |