|
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [UNIKRAFT PATCH 2/5] lib/ukmpi: Initial port of FreeBSD's buf_ring.h
From: Alexander Jung <alexander.jung@xxxxxxxxx>
This commit introduces a Unikraft-centric implementation of
FreeBSD's buf_ring.h message passing ring buffer header. Most of
the functionality is provided inline with this header.
Signed-off-by: Alexander Jung <alexander.jung@xxxxxxxxx>
---
lib/ukmpi/include/uk/ring.h | 474 ++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 474 insertions(+)
create mode 100644 lib/ukmpi/include/uk/ring.h
diff --git a/lib/ukmpi/include/uk/ring.h b/lib/ukmpi/include/uk/ring.h
new file mode 100644
index 0000000..6673244
--- /dev/null
+++ b/lib/ukmpi/include/uk/ring.h
@@ -0,0 +1,474 @@
+/* SPDX-License-Identifier: BSD-2-Clause-FreeBSD */
+/*
+ * Authors: Kip Macy <kmacy@xxxxxxxxxxx>
+ * Alexander Jung <alexander.jung@xxxxxxxxx>
+ *
+ * Copyright (c) 2007-2009, Kip Macy <kmacy@xxxxxxxxxxx>
+ * 2020, NEC Laboratories Europe GmbH, NEC Corporation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * $FreeBSD$
+ */
+/*
+ * Simple ring implementation to handle object references.
+ *
+ * Inspired by FreeBSD and modified (commit-id: c45cce1).
+ */
+#ifndef __UK_RING_H__
+#define __UK_RING_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+
+#include <errno.h>
+#include <uk/mutex.h>
+#include <uk/print.h>
+#include <uk/config.h>
+#include <uk/assert.h>
+#include <uk/plat/lcpu.h>
+#include <uk/arch/atomic.h>
+
+
+struct uk_ring {
+ volatile uint32_t prod_head;
+ volatile uint32_t prod_tail;
+ int prod_size;
+ int prod_mask;
+ uint64_t drops;
+ volatile uint32_t cons_head;
+ volatile uint32_t cons_tail;
+ int cons_size;
+ int cons_mask;
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+ struct uk_mutex *lock;
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+ void *ring[0];
+};
+
+
+/**
+ * Multi-producer safe lock-free ring buffer enqueue.
+ *
+ * @param br
+ * Reference to the ring structure.
+ * @param buf
+ * Buffer size for the ring.
+ */
+static __inline int
+uk_ring_enqueue(struct uk_ring *br, void *buf)
+{
+ uint32_t prod_head, prod_next, cons_tail;
+
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+ int i;
+ /*
+ * Note: It is possible to encounter an mbuf that was removed via drpeek(),
+ * and then re-added via drputback() and trigger a spurious panic.
+ */
+ for (i = br->cons_head; i != br->prod_head;
+ i = ((i + 1) & br->cons_mask))
+ if(br->ring[i] == buf)
+ uk_pr_crit("buf=%p already enqueue at %d prod=%d cons=%d\n",
+ buf, i, br->prod_tail, br->cons_tail);
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+
+ ukplat_lcpu_disable_irq();
+
+ do {
+ prod_head = br->prod_head;
+ prod_next = (prod_head + 1) & br->prod_mask;
+ cons_tail = br->cons_tail;
+
+ if (prod_next == cons_tail) {
+ rmb();
+ if (prod_head == br->prod_head &&
+ cons_tail == br->cons_tail) {
+ br->drops++;
+ ukplat_lcpu_enable_irq();
+ return -ENOBUFS;
+ }
+ continue;
+ }
+ } while (!ukarch_compare_exchange_sync(&br->prod_head, prod_head,
prod_next));
+
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+ if (br->ring[prod_head] != NULL)
+ uk_pr_crit("dangling value in enqueue\n");
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+
+ br->ring[prod_head] = buf;
+
+ /*
+ * If there are other enqueues in progress that preceded us, we need to wait
+ * for them to complete.
+ */
+ /* TODO: Provide cpu_spinwait() */
+#if 0
+ while (br->prod_tail != prod_head)
+ cpu_spinwait();
+#endif
+
+ ukarch_store_n(&br->prod_tail, prod_next);
+ ukplat_lcpu_enable_irq();
+
+ return 0;
+}
+
+
+/**
+ * Multi-consumer safe dequeue.
+ *
+ * @param br
+ * Reference to the ring structure.
+ */
+static __inline void *
+uk_ring_dequeue(struct uk_ring *br)
+{
+ void *buf;
+ uint32_t cons_head, cons_next;
+
+ ukplat_lcpu_disable_irq();
+
+ do {
+ cons_head = br->cons_head;
+ cons_next = (cons_head + 1) & br->cons_mask;
+
+ if (cons_head == br->prod_tail) {
+ ukplat_lcpu_enable_irq();
+ return (NULL);
+ }
+ } while (!ukarch_compare_exchange_sync(&br->cons_head, cons_head,
cons_next));
+
+ buf = br->ring[cons_head];
+
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+ br->ring[cons_head] = NULL;
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+
+ /*
+ * If there are other dequeues in progress that preceded us, we need to wait
+ * for them to complete.
+ */
+ /* TODO: Provide cpu_spinwait() */
+#if 0
+ while (br->cons_tail != cons_head)
+ cpu_spinwait();
+#endif
+
+ ukarch_store_n(&br->cons_tail, cons_next);
+
+ ukplat_lcpu_enable_irq();
+
+ return buf;
+}
+
+
+/**
+ * Single-consumer dequeue use where dequeue is protected by a lock e.g. a
+ * network driver's tx queue lock.
+ *
+ * @param br
+ * Reference to the ring structure.
+ */
+static __inline void *
+uk_ring_dequeue_single(struct uk_ring *br)
+{
+ void *buf;
+ uint32_t cons_head, cons_next, prod_tail;
+
+#ifdef PREFETCH_DEFINED
+ uint32_t cons_next_next;
+#endif
+
+ /* TODO: Provide atomic_load_acq_32() */
+#if 0
+ /*
+ * This is a workaround to allow using uk_ring on ARM and ARM64.
+ *
+ * ARM64TODO: Fix uk_ring in a generic way.
+ *
+ * REMARKS: It is suspected that cons_head does not require
+ * load_acq operation, but this change was extensively tested
+ * and confirmed it's working. To be reviewed once again in
+ * FreeBSD-12.
+ *
+ * Preventing following situation:
+ *
+ * | uk_ring_enqueue() | uk_ring_dequeue_single()
|
+ *
|----------------------------------------------+-----------------------------------------------------|
+ * | |
|
+ * | | cons_head =
br->cons_head; |
+ * | atomic_cmpset_acq_32(&br->prod_head, ...)); |
|
+ * | | buf =
br->ring[cons_head]; (see <1>) |
+ * | br->ring[prod_head] = buf; |
|
+ * | atomic_store_rel_32(&br->prod_tail, ...); |
|
+ * | | prod_tail =
br->prod_tail; |
+ * | | if (cons_head ==
prod_tail) |
+ * | | return NULL;
|
+ * | | <condition is false and
code uses invalid(old) buf> |
+ *
+ * <1> Load (on core 1) from br->ring[cons_head] can be reordered
(speculative readed) by CPU.
+ */
+#if defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64)
+ cons_head = atomic_load_acq_32(&br->cons_head);
+#else
+ cons_head = br->cons_head;
+#endif /* defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64) */
+#else
+ cons_head = br->cons_head;
+#endif /* 0 */
+
+ // prod_tail = atomic_load_acq_32(&br->prod_tail);
+ prod_tail = br->prod_tail;
+ cons_next = (cons_head + 1) & br->cons_mask;
+
+ if (cons_head == prod_tail)
+ return NULL;
+
+ br->cons_head = cons_next;
+ buf = br->ring[cons_head];
+
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+ br->ring[cons_head] = NULL;
+
+ if (!uk_mutex_is_locked(br->lock))
+ uk_pr_crit("lock not held on single consumer dequeue\n");
+
+ if (br->cons_tail != cons_head)
+ uk_pr_crit("inconsistent list cons_tail=%d cons_head=%d\n",
+ br->cons_tail, cons_head);
+
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+
+ br->cons_tail = cons_next;
+
+ return buf;
+}
+
+
+/**
+ * Single-consumer advance after a peek use where it is protected by a lock
e.g.
+ * a network driver's tx queue lock.
+ *
+ * @param br
+ * Reference to the ring structure.
+ */
+static __inline void
+uk_ring_advance_single(struct uk_ring *br)
+{
+ uint32_t cons_head, cons_next;
+ uint32_t prod_tail;
+
+ debug_ring(br);
+
+ cons_head = br->cons_head;
+ prod_tail = br->prod_tail;
+
+ cons_next = (cons_head + 1) & br->cons_mask;
+
+ if (cons_head == prod_tail)
+ return;
+
+ br->cons_head = cons_next;
+
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+ br->ring[cons_head] = NULL;
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+
+ br->cons_tail = cons_next;
+}
+
+
+/**
+ * Used to return a buffer (most likely already there) to the top of the ring.
+ * The caller should *not* have used any dequeue to pull it out of the ring but
+ * instead should have used the peek() function. This is normally used where
+ * the transmit queue of a driver is full, and an mbuf must be returned. Most
+ * likely whats in the ring-buffer is what is being put back (since it was not
+ * removed), but sometimes the lower transmit function may have done a pullup
or
+ * other function that will have changed it. As an optimization we always put
it
+ * back (since jhb says the store is probably cheaper), if we have to do a
+ * multi-queue version we will need the compare and an atomic.
+ *
+ * @param br
+ * Reference to the ring structure.
+ * @param new
+ *
+ */
+static __inline void
+uk_ring_putback_single(struct uk_ring *br, void *new)
+{
+ /* Buffer ring has none in putback */
+ UK_ASSERT(br->cons_head != br->prod_tail);
+ br->ring[br->cons_head] = new;
+}
+
+
+/**
+ * Return a pointer to the first entry in the ring without modifying it, or
NULL
+ * if the ring is empty race-prone if not protected by a lock.
+ *
+ * @param br
+ * Reference to the ring structure.
+ */
+static __inline void *
+uk_ring_peek(struct uk_ring *br)
+{
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+ if ((br->lock != NULL) && !uk_mutex_is_locked(br->lock))
+ uk_pr_crit("lock not held on single consumer dequeue\n");
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+
+ /*
+ * It is safe to not have a memory barrier here because we control cons and
+ * tail is worst case a lagging indicator so we worst case we might return
+ * NULL immediately after a buffer has been enqueued.
+ */
+ if (br->cons_head == br->prod_tail)
+ return NULL;
+
+ return br->ring[br->cons_head];
+}
+
+
+/**
+ * Single-consumer clear the pointer to the first entry of the ring, or NULL if
+ * the ring is empty.
+ *
+ * @param br
+ * Reference to the ring structure.
+ */
+static __inline void *
+uk_ring_peek_clear_single(struct uk_ring *br)
+{
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+ void *ret;
+
+ if (!mtx_owned(br->lock))
+ panic("lock not held on single consumer dequeue");
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+
+ if (br->cons_head == br->prod_tail)
+ return (NULL);
+
+ /* TODO: Provide atomic_thread_fence_acq */
+#if 0
+#if defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64)
+ /*
+ * The barrier is required there on ARM and ARM64 to ensure, that
+ * br->ring[br->cons_head] will not be fetched before the above condition is
+ * checked. Without the barrier, it is possible, that buffer will be fetched
+ * before the enqueue will put mbuf into br, then, in the meantime, the
+ * enqueue will update the array and the prod_tail, and the conditional check
+ * will be true, so we will return previously fetched (and invalid) buffer.
+ */
+ atomic_thread_fence_acq();
+#endif /* defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64) */
+#endif /* 0 */
+
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+ /*
+ * Single consumer, i.e. cons_head will not move while we are
+ * running, so atomic_swap_ptr() is not necessary here.
+ */
+ ret = br->ring[br->cons_head];
+ br->ring[br->cons_head] = NULL;
+ return (ret);
+#else
+ return (br->ring[br->cons_head]);
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+}
+
+
+/**
+ * Return whether the ring buffer is full.
+ *
+ * @param br
+ * Reference to the ring structure.
+ */
+static __inline int
+uk_ring_full(struct uk_ring *br)
+{
+ return ((br->prod_head + 1) & br->prod_mask) == br->cons_tail;
+}
+
+
+/**
+ * Return whether the ring buffer is empty.
+ *
+ * @param br
+ * Reference to the ring structure.
+ */
+static __inline int
+uk_ring_empty(struct uk_ring *br)
+{
+ return br->cons_head == br->prod_tail;
+}
+
+
+/**
+ * Return the queue size in the ring buffer.
+ *
+ * @param br
+ * Reference to the ring structure.
+ */
+static __inline int
+uk_ring_count(struct uk_ring *br)
+{
+ return (br->prod_size + br->prod_tail - br->cons_tail) & br->prod_mask;
+}
+
+
+/**
+ * Create a new ring buffer.
+ *
+ * @param count
+ * The size of the ring buffer.
+ * @param a
+ * The memory allocator to use when creating the ring buffer.
+ * @param flags
+ * Additional flags to specify to the ring.
+ * @param lock
+ * The mutex to use when debugging the ring buffer.
+ */
+struct uk_ring *
+uk_ring_alloc(int count, struct uk_alloc *a, int flags, struct uk_mutex *lock);
+
+
+/**
+ * Free the ring from use.
+ *
+ * @param br
+ * Reference to the ring structure.
+ * @param a
+ * The memory allocator to use when freeing the object.
+ */
+void
+uk_ring_free(struct uk_ring *br, struct uk_alloc *a);
+
+
+#endif /* __UK_RING_H__ */
--
2.11.0
|
![]() |
Lists.xenproject.org is hosted with RackSpace, monitoring our |