[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [PATCH 08/17] Add a very basic netchannel2 implementation.
This is functional, in the sense that packets can be sent and received, but lacks any advanced features. Signed-off-by: Steven Smith <steven.smith@xxxxxxxxxx> --- drivers/net/Kconfig | 25 + drivers/net/Makefile | 1 + drivers/net/xen-netchannel2/Makefile | 12 + drivers/net/xen-netchannel2/chan.c | 663 ++++++++++++++++++++ drivers/net/xen-netchannel2/netback2.c | 354 +++++++++++ drivers/net/xen-netchannel2/netchan2.c | 32 + drivers/net/xen-netchannel2/netchannel2_core.h | 352 +++++++++++ drivers/net/xen-netchannel2/netchannel2_endpoint.h | 63 ++ drivers/net/xen-netchannel2/netfront2.c | 489 +++++++++++++++ drivers/net/xen-netchannel2/recv_packet.c | 216 +++++++ drivers/net/xen-netchannel2/rscb.c | 386 ++++++++++++ drivers/net/xen-netchannel2/util.c | 227 +++++++ drivers/net/xen-netchannel2/xmit_packet.c | 318 ++++++++++ drivers/xen/events.c | 17 +- drivers/xen/xenbus/xenbus_client.c | 13 +- include/xen/events.h | 1 + include/xen/interface/io/netchannel2.h | 106 ++++ include/xen/interface/io/uring.h | 426 +++++++++++++ 18 files changed, 3689 insertions(+), 12 deletions(-) create mode 100644 drivers/net/xen-netchannel2/Makefile create mode 100644 drivers/net/xen-netchannel2/chan.c create mode 100644 drivers/net/xen-netchannel2/netback2.c create mode 100644 drivers/net/xen-netchannel2/netchan2.c create mode 100644 drivers/net/xen-netchannel2/netchannel2_core.h create mode 100644 drivers/net/xen-netchannel2/netchannel2_endpoint.h create mode 100644 drivers/net/xen-netchannel2/netfront2.c create mode 100644 drivers/net/xen-netchannel2/recv_packet.c create mode 100644 drivers/net/xen-netchannel2/rscb.c create mode 100644 drivers/net/xen-netchannel2/util.c create mode 100644 drivers/net/xen-netchannel2/xmit_packet.c create mode 100644 include/xen/interface/io/netchannel2.h create mode 100644 include/xen/interface/io/uring.h diff --git a/drivers/net/Kconfig b/drivers/net/Kconfig index 5a0a4ee..3f599eb 100644 --- a/drivers/net/Kconfig +++ b/drivers/net/Kconfig @@ -2775,6 +2775,31 @@ config XEN_NETDEV_FRONTEND if you are compiling a kernel for a Xen guest, you almost certainly want to enable this. +config XEN_NETCHANNEL2 + tristate "Net channel 2 support" + depends on XEN && NET + default y + help + Xen netchannel2 driver support. This allows a domain to act as + either the backend or frontend part of a netchannel2 connection. + Unless you are building a dedicated device-driver domain, you + almost certainly want to say Y here. + + If you say Y or M here, you should also say Y to one or both of + ``Net channel2 backend support'' and ``Net channel2 frontend + support'', below. + +config XEN_NETDEV2_BACKEND + bool "Net channel 2 backend support" + depends on XEN_BACKEND && XEN_NETCHANNEL2 + default XEN_BACKEND + +config XEN_NETDEV2_FRONTEND + bool "Net channel 2 frontend support" + depends on XEN_NETCHANNEL2 + default y + + config ISERIES_VETH tristate "iSeries Virtual Ethernet driver support" depends on PPC_ISERIES diff --git a/drivers/net/Makefile b/drivers/net/Makefile index ead8cab..5c92566 100644 --- a/drivers/net/Makefile +++ b/drivers/net/Makefile @@ -158,6 +158,7 @@ obj-$(CONFIG_SLIP) += slip.o obj-$(CONFIG_SLHC) += slhc.o obj-$(CONFIG_XEN_NETDEV_FRONTEND) += xen-netfront.o +obj-$(CONFIG_XEN_NETCHANNEL2) += xen-netchannel2/ obj-$(CONFIG_DUMMY) += dummy.o obj-$(CONFIG_IFB) += ifb.o diff --git a/drivers/net/xen-netchannel2/Makefile b/drivers/net/xen-netchannel2/Makefile new file mode 100644 index 0000000..bdad6da --- /dev/null +++ b/drivers/net/xen-netchannel2/Makefile @@ -0,0 +1,12 @@ +obj-$(CONFIG_XEN_NETCHANNEL2) += netchannel2.o + +netchannel2-objs := chan.o netchan2.o rscb.o util.o \ + xmit_packet.o recv_packet.o + +ifeq ($(CONFIG_XEN_NETDEV2_BACKEND),y) +netchannel2-objs += netback2.o +endif + +ifeq ($(CONFIG_XEN_NETDEV2_FRONTEND),y) +netchannel2-objs += netfront2.o +endif diff --git a/drivers/net/xen-netchannel2/chan.c b/drivers/net/xen-netchannel2/chan.c new file mode 100644 index 0000000..328b2f1 --- /dev/null +++ b/drivers/net/xen-netchannel2/chan.c @@ -0,0 +1,663 @@ +#include <linux/kernel.h> +#include <linux/kthread.h> +#include <linux/gfp.h> +#include <linux/etherdevice.h> +#include <linux/interrupt.h> +#include <linux/netdevice.h> +#include <linux/slab.h> +#include <linux/spinlock.h> +#include <linux/delay.h> +#include <linux/version.h> +#include <xen/evtchn.h> +#include <xen/xenbus.h> +#include <xen/events.h> + +#include "netchannel2_endpoint.h" +#include "netchannel2_core.h" + +static int process_ring(struct napi_struct *napi, + int work_avail); + +static irqreturn_t nc2_int(int irq, void *dev_id) +{ + struct netchannel2_ring_pair *ncr = dev_id; + + if (ncr->irq == -1) + return IRQ_HANDLED; + if (ncr->cons_ring.sring->prod != ncr->cons_ring.cons_pvt || + ncr->interface->is_stopped) + nc2_kick(ncr); + return IRQ_HANDLED; +} + +/* Process all incoming messages. The function is given an + IRQ-disabled reference for the interface, and must dispose of it + (either by enabling the IRQ or re-introducing it to the pending + list). Alternatively, the function can stop the ring being + processed again by leaking the reference (e.g. when the remote + endpoint is misbehaving). */ +/* Returns -1 if we used all the available work without finishing, or + the amount of work used otherwise. */ +static int process_messages(struct netchannel2_ring_pair *ncrp, + int work_avail, + struct sk_buff_head *pending_rx_queue) +{ + struct netchannel2_msg_hdr hdr; + RING_IDX prod; + struct netchannel2 *nc = ncrp->interface; + int work_done; + + work_done = 1; + +retry: + prod = ncrp->cons_ring.sring->prod; + rmb(); + while (work_done < work_avail && + prod != ncrp->cons_ring.cons_pvt) { + nc2_copy_from_ring(&ncrp->cons_ring, &hdr, sizeof(hdr)); + if (hdr.size < sizeof(hdr)) { + printk(KERN_WARNING "Other end sent too-small message (%d)\n", + hdr.size); + goto done; + } + if (hdr.size > ncrp->cons_ring.payload_bytes) { + /* This one message is bigger than the whole + ring -> other end is clearly misbehaving. + We won't take any more messages from this + ring. */ + printk(KERN_WARNING "Other end sent enormous message (%d > %zd)\n", + hdr.size, + ncrp->cons_ring.payload_bytes); + goto done; + } + + switch (hdr.type) { + case NETCHANNEL2_MSG_SET_MAX_PACKETS: + nc2_handle_set_max_packets_msg(ncrp, &hdr); + break; + case NETCHANNEL2_MSG_PACKET: + nc2_handle_packet_msg(nc, ncrp, &hdr, + pending_rx_queue); + break; + case NETCHANNEL2_MSG_FINISH_PACKET: + nc2_handle_finish_packet_msg(nc, ncrp, &hdr); + break; + case NETCHANNEL2_MSG_PAD: + break; + default: + /* Drop bad messages. We should arguably stop + processing the ring at this point, because + the ring is probably corrupt. However, if + it is corrupt then one of the other checks + will hit soon enough, and doing it this way + should make it a bit easier to add new + message types in future. */ + pr_debug("Bad message type %d from peer!\n", + hdr.type); + break; + } + hdr.size = (hdr.size + 7) & ~7; + ncrp->cons_ring.cons_pvt += hdr.size; + + work_done++; + if (work_done == work_avail) + return -1; + } + + if (unlikely(prod != ncrp->cons_ring.sring->prod)) + goto retry; + + /* Dispose of our IRQ-disable reference. */ +done: + napi_complete(&ncrp->napi); + enable_irq(ncrp->irq); + + if (nc2_final_check_for_messages(&ncrp->cons_ring, + prod)) { + /* More work to do still. */ + nc2_kick(ncrp); + } + + return work_done; +} + +/* Flush out all pending metadata messages on ring @ncrp, and then + update the ring pointers to indicate that we've done so. Fire the + event channel if necessary. */ +static void flush_rings(struct netchannel2_ring_pair *ncrp) +{ + int need_kick; + + flush_hypercall_batcher(&ncrp->pending_rx_hypercalls, + nc2_rscb_on_gntcopy_fail); + send_finish_packet_messages(ncrp); + if (ncrp->need_advertise_max_packets) + advertise_max_packets(ncrp); + + need_kick = 0; + if (nc2_finish_messages(&ncrp->cons_ring)) { + need_kick = 1; + /* If we need an event on the consumer ring, we always + need to notify the other end, even if we don't have + any messages which would normally be considered + urgent. */ + ncrp->pending_time_sensitive_messages = 1; + } + if (nc2_flush_ring(&ncrp->prod_ring)) + need_kick = 1; + if (need_kick || + (ncrp->delayed_kick && ncrp->pending_time_sensitive_messages)) { + if (ncrp->pending_time_sensitive_messages) { + notify_remote_via_irq(ncrp->irq); + ncrp->delayed_kick = 0; + } else { + ncrp->delayed_kick = 1; + } + ncrp->pending_time_sensitive_messages = 0; + } +} + +/* Process incoming messages, and then flush outgoing metadata + * messages. We also try to unjam the xmit queue if any of the + * incoming messages would give us permission to send more stuff. */ +/* This is given an IRQ-disable reference, and must dispose of it. */ +static int nc2_poll(struct netchannel2_ring_pair *ncrp, int work_avail, + struct sk_buff_head *rx_queue) +{ + int work_done; + + if (!ncrp->is_attached) { + napi_complete(&ncrp->napi); + enable_irq(ncrp->irq); + return 0; + } + + work_done = process_messages(ncrp, work_avail, rx_queue); + + flush_rings(ncrp); + + if (work_done < 0) + return work_avail; + else + return work_done; +} + +/* Like skb_queue_purge(), but use release_tx_packet() rather than + kfree_skb() */ +void nc2_queue_purge(struct netchannel2_ring_pair *ncrp, + struct sk_buff_head *queue) +{ + struct sk_buff *skb; + + while (!skb_queue_empty(queue)) { + skb = skb_dequeue(queue); + release_tx_packet(ncrp, skb); + } +} + +/* struct net_device stop() method. */ +static int nc2_stop(struct net_device *nd) +{ + struct netchannel2 *nc = netdev_priv(nd); + + spin_lock_bh(&nc->rings.lock); + nc->stats.tx_dropped += skb_queue_len(&nc->pending_skbs); + nc2_queue_purge(&nc->rings, &nc->pending_skbs); + spin_unlock_bh(&nc->rings.lock); + + return 0; +} + +/* Kick a netchannel2 interface so that the poll() method runs + * soon. */ +/* This has semi release-like semantics, so you can set flags + lock-free and be guaranteed that the poll() method will eventually + run and see the flag set, without doing any explicit locking. */ +void nc2_kick(struct netchannel2_ring_pair *ncrp) +{ + if (napi_schedule_prep(&ncrp->napi)) { + disable_irq_nosync(ncrp->irq); + __napi_schedule(&ncrp->napi); + } +} + +static int nc2_open(struct net_device *nd) +{ + struct netchannel2 *nc = netdev_priv(nd); + + nc2_kick(&nc->rings); + return 0; +} + +/* Rad a mac address from an address in xenstore at @prefix/@node. + * Call not holding locks. Returns 0 on success or <0 on error. */ +static int read_mac_address(const char *prefix, const char *node, + unsigned char *addr) +{ + int err; + unsigned mac[6]; + int i; + + err = xenbus_scanf(XBT_NIL, prefix, node, + "%x:%x:%x:%x:%x:%x", + &mac[0], + &mac[1], + &mac[2], + &mac[3], + &mac[4], + &mac[5]); + if (err < 0) + return err; + if (err != 6) + return -EINVAL; + for (i = 0; i < 6; i++) { + if (mac[i] >= 0x100) + return -EINVAL; + addr[i] = mac[i]; + } + return 0; +} + +/* Release resources associated with a ring pair. It is assumed that + the ring pair has already been detached (which stops the IRQ and + un-pends the ring). */ +void cleanup_ring_pair(struct netchannel2_ring_pair *ncrp) +{ + BUG_ON(ncrp->prod_ring.sring); + BUG_ON(ncrp->cons_ring.sring); + + drop_pending_tx_packets(ncrp); + nc2_queue_purge(ncrp, &ncrp->release_on_flush_batcher); + if (ncrp->gref_pool != 0) + gnttab_free_grant_references(ncrp->gref_pool); + netif_napi_del(&ncrp->napi); +} + +int init_ring_pair(struct netchannel2_ring_pair *ncrp, + struct netchannel2 *nc) +{ + unsigned x; + + ncrp->interface = nc; + spin_lock_init(&ncrp->lock); + ncrp->irq = -1; + + for (x = 0; x < NR_TX_PACKETS - 1; x++) + txp_set_next_free(ncrp->tx_packets + x, x + 1); + txp_set_next_free(ncrp->tx_packets + x, INVALID_TXP_INDEX); + ncrp->head_free_tx_packet = 0; + + skb_queue_head_init(&ncrp->pending_tx_queue); + skb_queue_head_init(&ncrp->release_on_flush_batcher); + + if (gnttab_alloc_grant_references(NR_TX_PACKETS, + &ncrp->gref_pool) < 0) + return -1; + + netif_napi_add(ncrp->interface->net_device, &ncrp->napi, + process_ring, 64); + napi_enable(&ncrp->napi); + + return 0; +} + +static struct net_device_stats *nc2_get_stats(struct net_device *nd) +{ + struct netchannel2 *nc = netdev_priv(nd); + + return &nc->stats; +} + +static const struct net_device_ops nc2_net_device_ops = { + .ndo_open = nc2_open, + .ndo_stop = nc2_stop, + .ndo_start_xmit = nc2_start_xmit, + .ndo_get_stats = nc2_get_stats +}; + +/* Create a new netchannel2 structure. Call with no locks held. + Returns NULL on error. The xenbus device must remain valid for as + long as the netchannel2 structure does. The core does not take out + any kind of reference count on it, but will refer to it throughout + the returned netchannel2's life. */ +struct netchannel2 *nc2_new(struct xenbus_device *xd) +{ + struct net_device *netdev; + struct netchannel2 *nc; + int err; + int local_trusted; + int remote_trusted; + int filter_mac; + + if (!gnttab_subpage_grants_available()) { + printk(KERN_ERR "netchannel2 needs version 2 grant tables\n"); + return NULL; + } + + if (xenbus_scanf(XBT_NIL, xd->nodename, "local-trusted", + "%d", &local_trusted) != 1) { + printk(KERN_WARNING "Can't tell whether local endpoint is trusted; assuming it is.\n"); + local_trusted = 1; + } + + if (xenbus_scanf(XBT_NIL, xd->nodename, "remote-trusted", + "%d", &remote_trusted) != 1) { + printk(KERN_WARNING "Can't tell whether local endpoint is trusted; assuming it isn't.\n"); + remote_trusted = 0; + } + + if (xenbus_scanf(XBT_NIL, xd->nodename, "filter-mac", + "%d", &filter_mac) != 1) { + if (remote_trusted) { + printk(KERN_WARNING "Can't tell whether to filter MAC addresses from remote domain; filtering off.\n"); + filter_mac = 0; + } else { + printk(KERN_WARNING "Can't tell whether to filter MAC addresses from remote domain; filtering on.\n"); + filter_mac = 1; + } + } + + netdev = alloc_etherdev(sizeof(*nc)); + if (netdev == NULL) + return NULL; + + nc = netdev_priv(netdev); + memset(nc, 0, sizeof(*nc)); + nc->magic = NETCHANNEL2_MAGIC; + nc->net_device = netdev; + nc->xenbus_device = xd; + + nc->remote_trusted = remote_trusted; + nc->local_trusted = local_trusted; + nc->rings.filter_mac = filter_mac; + + skb_queue_head_init(&nc->pending_skbs); + if (init_ring_pair(&nc->rings, nc) < 0) { + nc2_release(nc); + return NULL; + } + + netdev->netdev_ops = &nc2_net_device_ops; + + /* We need to hold the ring lock in order to send messages + anyway, so there's no point in Linux doing additional + synchronisation. */ + netdev->features = NETIF_F_LLTX; + + SET_NETDEV_DEV(netdev, &xd->dev); + + err = read_mac_address(xd->nodename, "remote-mac", + nc->rings.remote_mac); + if (err == 0) + err = read_mac_address(xd->nodename, "mac", netdev->dev_addr); + if (err == 0) + err = register_netdev(netdev); + + if (err != 0) { + nc2_release(nc); + return NULL; + } + + return nc; +} + +/* Release a netchannel2 structure previously allocated with + * nc2_new(). Call with no locks held. The rings will be + * automatically detach if necessary. */ +void nc2_release(struct netchannel2 *nc) +{ + netif_carrier_off(nc->net_device); + + unregister_netdev(nc->net_device); + + nc2_detach_rings(nc); + + /* Unregistering the net device stops any netdev methods from + running, and detaching the rings stops the napi methods, so + we're now the only thing accessing this netchannel2 + structure and we can tear it down with impunity. */ + + cleanup_ring_pair(&nc->rings); + + nc2_queue_purge(&nc->rings, &nc->pending_skbs); + + free_netdev(nc->net_device); +} + +static void _nc2_attach_rings(struct netchannel2_ring_pair *ncrp, + struct netchannel2_sring_cons *cons_sring, + const volatile void *cons_payload, + size_t cons_size, + struct netchannel2_sring_prod *prod_sring, + void *prod_payload, + size_t prod_size, + domid_t otherend_id) +{ + BUG_ON(prod_sring == NULL); + BUG_ON(cons_sring == NULL); + + ncrp->prod_ring.sring = prod_sring; + ncrp->prod_ring.payload_bytes = prod_size; + ncrp->prod_ring.prod_pvt = 0; + ncrp->prod_ring.payload = prod_payload; + + ncrp->cons_ring.sring = cons_sring; + ncrp->cons_ring.payload_bytes = cons_size; + ncrp->cons_ring.sring->prod_event = ncrp->cons_ring.sring->prod + 1; + ncrp->cons_ring.cons_pvt = 0; + ncrp->cons_ring.payload = cons_payload; + + ncrp->otherend_id = otherend_id; + + ncrp->is_attached = 1; + + ncrp->need_advertise_max_packets = 1; +} + +/* Attach a netchannel2 structure to a ring pair. The endpoint is + also expected to set up an event channel after calling this before + using the interface. Returns 0 on success or <0 on error. */ +int nc2_attach_rings(struct netchannel2 *nc, + struct netchannel2_sring_cons *cons_sring, + const volatile void *cons_payload, + size_t cons_size, + struct netchannel2_sring_prod *prod_sring, + void *prod_payload, + size_t prod_size, + domid_t otherend_id) +{ + spin_lock_bh(&nc->rings.lock); + _nc2_attach_rings(&nc->rings, cons_sring, cons_payload, cons_size, + prod_sring, prod_payload, prod_size, otherend_id); + + spin_unlock_bh(&nc->rings.lock); + + netif_carrier_on(nc->net_device); + + /* Kick it to get it going. */ + nc2_kick(&nc->rings); + + return 0; +} + +static void _detach_rings(struct netchannel2_ring_pair *ncrp) +{ + spin_lock_bh(&ncrp->lock); + /* We need to release all of the pending transmission packets, + because they're never going to complete now that we've lost + the ring. */ + drop_pending_tx_packets(ncrp); + + disable_irq(ncrp->irq); + + BUG_ON(ncrp->nr_tx_packets_outstanding); + ncrp->max_tx_packets_outstanding = 0; + + /* No way of sending pending finish messages now; drop + * them. */ + ncrp->pending_finish.prod = 0; + ncrp->pending_finish.cons = 0; + + ncrp->cons_ring.sring = NULL; + ncrp->prod_ring.sring = NULL; + ncrp->is_attached = 0; + + spin_unlock_bh(&ncrp->lock); +} + +/* Detach from the rings. This includes unmapping them and stopping + the interrupt. */ +/* Careful: the netdev methods may still be running at this point. */ +/* This is not allowed to wait for the other end, because it might + have gone away (e.g. over suspend/resume). */ +static void nc2_detach_ring(struct netchannel2_ring_pair *ncrp) +{ + if (!ncrp->is_attached) + return; + napi_disable(&ncrp->napi); + _detach_rings(ncrp); +} + +/* Trivial wrapper around nc2_detach_ring(). Make the ring no longer + used. */ +void nc2_detach_rings(struct netchannel2 *nc) +{ + nc2_detach_ring(&nc->rings); + + /* Okay, all async access to the ring is stopped. Kill the + irqhandlers. (It might be better to do this from the + _detach_ring() functions, but you're not allowed to + free_irq() from interrupt context, and tasklets are close + enough to cause problems). */ + + if (nc->rings.irq >= 0) + unbind_from_irqhandler(nc->rings.irq, &nc->rings); + nc->rings.irq = -1; +} + +#if defined(CONFIG_XEN_NETDEV2_BACKEND) +/* Connect to an event channel port in a remote domain. Returns 0 on + success or <0 on error. The port is automatically disconnected + when the channel is released or if the rings are detached. This + should not be called if the port is already open. */ +int nc2_connect_evtchn(struct netchannel2 *nc, domid_t domid, + int evtchn) +{ + int err; + + BUG_ON(nc->rings.irq >= 0); + + err = bind_interdomain_evtchn_to_irqhandler(domid, + evtchn, + nc2_int, + IRQF_SAMPLE_RANDOM, + "netchannel2", + &nc->rings); + if (err >= 0) { + nc->rings.irq = err; + return 0; + } else { + return err; + } +} +#endif + +#if defined(CONFIG_XEN_NETDEV2_FRONTEND) +/* Listen for incoming event channel connections from domain domid. + Similar semantics to nc2_connect_evtchn(). */ +int nc2_listen_evtchn(struct netchannel2 *nc, domid_t domid) +{ + int err; + + BUG_ON(nc->rings.irq >= 0); + + err = xen_alloc_evtchn(domid, &nc->rings.evtchn); + if (err) + return err; + + err = bind_evtchn_to_irqhandler(nc->rings.evtchn, nc2_int, + IRQF_SAMPLE_RANDOM, "netchannel2", + &nc->rings); + if (err >= 0) { + nc->rings.irq = err; + return 0; + } else { + return err; + } +} +#endif + +/* Find the local event channel port which was allocated by + * nc2_listen_evtchn() or nc2_connect_evtchn(). It is an error to + * call this when there is no event channel connected. */ +int nc2_get_evtchn_port(struct netchannel2 *nc) +{ + BUG_ON(nc->rings.irq < 0); + return nc->rings.evtchn; +} + +/* @ncrp has been recently nc2_kick()ed. Do all of the necessary + stuff. */ +static int process_ring(struct napi_struct *napi, + int work_avail) +{ + struct netchannel2_ring_pair *ncrp = + container_of(napi, struct netchannel2_ring_pair, napi); + struct netchannel2 *nc = ncrp->interface; + struct sk_buff *skb; + int work_done; + struct sk_buff_head rx_queue; + + skb_queue_head_init(&rx_queue); + + spin_lock(&ncrp->lock); + + /* Pick up incoming messages. */ + work_done = nc2_poll(ncrp, work_avail, &rx_queue); + + /* Transmit pending packets. */ + if (!skb_queue_empty(&ncrp->pending_tx_queue)) { + skb = __skb_dequeue(&ncrp->pending_tx_queue); + do { + if (!nc2_really_start_xmit(ncrp, skb)) { + /* Requeue the packet so that we will try + when the ring is less busy */ + __skb_queue_head(&ncrp->pending_tx_queue, skb); + break; + } + skb = __skb_dequeue(&ncrp->pending_tx_queue); + } while (skb != NULL); + + flush_rings(ncrp); + + while ((skb = __skb_dequeue(&ncrp->release_on_flush_batcher))) + release_tx_packet(ncrp, skb); + } + + if (nc->is_stopped) { + /* If the other end has processed some messages, there + may be space on the ring for a delayed send from + earlier. Process it now. */ + while (1) { + skb = skb_peek_tail(&nc->pending_skbs); + if (!skb) + break; + if (prepare_xmit_allocate_resources(nc, skb) < 0) { + /* Still stuck */ + break; + } + __skb_unlink(skb, &nc->pending_skbs); + queue_packet_to_interface(skb, ncrp); + } + if (skb_queue_empty(&nc->pending_skbs)) { + nc->is_stopped = 0; + netif_wake_queue(nc->net_device); + } + } + + spin_unlock(&ncrp->lock); + + receive_pending_skbs(&rx_queue); + + return work_done; +} diff --git a/drivers/net/xen-netchannel2/netback2.c b/drivers/net/xen-netchannel2/netback2.c new file mode 100644 index 0000000..bd33786 --- /dev/null +++ b/drivers/net/xen-netchannel2/netback2.c @@ -0,0 +1,354 @@ +#include <linux/kernel.h> +#include <linux/gfp.h> +#include <linux/vmalloc.h> +#include <xen/grant_table.h> +#include <xen/xenbus.h> +#include <xen/interface/io/netchannel2.h> + +#include "netchannel2_core.h" +#include "netchannel2_endpoint.h" + +#define NETBACK2_MAGIC 0xb5e99485 +struct netback2 { + unsigned magic; + struct xenbus_device *xenbus_device; + + struct netchannel2 *chan; + + struct grant_mapping b2f_mapping; + struct grant_mapping f2b_mapping; + struct grant_mapping control_mapping; + + int attached; + + struct xenbus_watch shutdown_watch; + int have_shutdown_watch; +}; + +static struct netback2 *xenbus_device_to_nb2(struct xenbus_device *xd) +{ + struct netback2 *nb = xd->dev.driver_data; + BUG_ON(nb->magic != NETBACK2_MAGIC); + return nb; +} + +/* Read a range of grants out of xenstore and map them in gm. Any + existing mapping in gm is released. Returns 0 on success or <0 on + error. On error, gm is preserved, and xenbus_dev_fatal() is + called. */ +static int map_grants(struct netback2 *nd, const char *prefix, + struct grant_mapping *gm) +{ + struct xenbus_device *xd = nd->xenbus_device; + int err; + char buf[32]; + int i; + unsigned nr_pages; + grant_ref_t grefs[MAX_GRANT_MAP_PAGES]; + + sprintf(buf, "%s-nr-pages", prefix); + err = xenbus_scanf(XBT_NIL, xd->otherend, buf, "%u", &nr_pages); + if (err == -ENOENT) { + nr_pages = 1; + } else if (err != 1) { + if (err < 0) { + xenbus_dev_fatal(xd, err, "reading %s", buf); + return err; + } else { + xenbus_dev_fatal(xd, err, "reading %s as integer", + buf); + return -EINVAL; + } + } + + for (i = 0; i < nr_pages; i++) { + sprintf(buf, "%s-ref-%d", prefix, i); + err = xenbus_scanf(XBT_NIL, xd->otherend, buf, "%u", + &grefs[i]); + if (err != 1) { + if (err < 0) { + xenbus_dev_fatal(xd, + err, + "reading gref %d from %s/%s", + i, + xd->otherend, + buf); + } else { + xenbus_dev_fatal(xd, + -EINVAL, + "expected an integer at %s/%s", + xd->otherend, + buf); + err = -EINVAL; + } + return err; + } + } + + err = nc2_map_grants(gm, grefs, nr_pages, xd->otherend_id); + if (err < 0) + xenbus_dev_fatal(xd, err, "mapping ring %s from %s", + prefix, xd->otherend); + return err; +} + +/* Undo the effects of attach_to_frontend */ +static void detach_from_frontend(struct netback2 *nb) +{ + if (!nb->attached) + return; + nc2_detach_rings(nb->chan); + nc2_unmap_grants(&nb->b2f_mapping); + nc2_unmap_grants(&nb->f2b_mapping); + nc2_unmap_grants(&nb->control_mapping); + nb->attached = 0; +} + +static int attach_to_frontend(struct netback2 *nd) +{ + int err; + int evtchn; + struct xenbus_device *xd = nd->xenbus_device; + struct netchannel2 *nc = nd->chan; + struct netchannel2_backend_shared *nbs; + + if (nd->attached) + return 0; + + /* Attach the shared memory bits */ + err = map_grants(nd, "b2f-ring", &nd->b2f_mapping); + if (err) + return err; + err = map_grants(nd, "f2b-ring", &nd->f2b_mapping); + if (err) + return err; + err = map_grants(nd, "control", &nd->control_mapping); + if (err) + return err; + nbs = nd->control_mapping.mapping->addr; + err = nc2_attach_rings(nc, + &nbs->cons, + nd->f2b_mapping.mapping->addr, + nd->f2b_mapping.nr_pages * PAGE_SIZE, + &nbs->prod, + nd->b2f_mapping.mapping->addr, + nd->b2f_mapping.nr_pages * PAGE_SIZE, + xd->otherend_id); + if (err < 0) { + xenbus_dev_fatal(xd, err, "attaching to rings"); + return err; + } + + /* Connect the event channel. */ + err = xenbus_scanf(XBT_NIL, xd->otherend, "event-channel", "%u", + &evtchn); + if (err < 0) { + xenbus_dev_fatal(xd, err, + "reading %s/event-channel or {t,r}x-sring-pages", + xd->otherend); + return err; + } + err = nc2_connect_evtchn(nd->chan, xd->otherend_id, evtchn); + if (err < 0) { + xenbus_dev_fatal(xd, err, "binding to event channel"); + return err; + } + + /* All done */ + nd->attached = 1; + + return 0; +} + +static void frontend_changed(struct xenbus_device *xd, + enum xenbus_state frontend_state) +{ + struct netback2 *nb = xenbus_device_to_nb2(xd); + int err; + + switch (frontend_state) { + case XenbusStateInitialising: + /* If the frontend does a kexec following a crash, we + can end up bounced back here even though we're + attached. Try to recover by detaching from the old + rings. */ + /* (A normal shutdown, and even a normal kexec, would + * have gone through Closed first, so we'll already be + * detached, and this is pointless but harmless.) */ + detach_from_frontend(nb); + + /* Tell the frontend what sort of rings we're willing + to accept. */ + xenbus_printf(XBT_NIL, nb->xenbus_device->nodename, + "max-sring-pages", "%d", MAX_GRANT_MAP_PAGES); + + /* Start the device bring-up bit of the state + * machine. */ + xenbus_switch_state(nb->xenbus_device, XenbusStateInitWait); + break; + + case XenbusStateInitWait: + /* Frontend doesn't use this state */ + xenbus_dev_fatal(xd, EINVAL, + "unexpected frontend state InitWait"); + break; + + case XenbusStateInitialised: + case XenbusStateConnected: + /* Frontend has advertised its rings to us */ + err = attach_to_frontend(nb); + if (err >= 0) + xenbus_switch_state(xd, XenbusStateConnected); + break; + + case XenbusStateClosing: + detach_from_frontend(nb); + xenbus_switch_state(xd, XenbusStateClosed); + break; + + case XenbusStateClosed: + detach_from_frontend(nb); + xenbus_switch_state(xd, XenbusStateClosed); + if (!xenbus_dev_is_online(xd)) + device_unregister(&xd->dev); + break; + + case XenbusStateUnknown: + detach_from_frontend(nb); + xenbus_switch_state(xd, XenbusStateClosed); + device_unregister(&xd->dev); + break; + + default: + /* Ignore transitions to unknown states */ + break; + } +} + +static int netback2_uevent(struct xenbus_device *xd, + struct kobj_uevent_env *env) +{ + struct netback2 *nb = xenbus_device_to_nb2(xd); + + add_uevent_var(env, "vif=%s", nb->chan->net_device->name); + + return 0; +} + +static void netback2_shutdown(struct xenbus_device *xd) +{ + xenbus_switch_state(xd, XenbusStateClosing); +} + +static void shutdown_watch_callback(struct xenbus_watch *watch, + const char **vec, + unsigned int len) +{ + struct netback2 *nb = + container_of(watch, struct netback2, shutdown_watch); + char *type; + + type = xenbus_read(XBT_NIL, nb->xenbus_device->nodename, + "shutdown-request", NULL); + if (IS_ERR(type)) { + if (PTR_ERR(type) != -ENOENT) + printk(KERN_WARNING "Cannot read %s/%s: %ld\n", + nb->xenbus_device->nodename, "shutdown-request", + PTR_ERR(type)); + return; + } + if (strcmp(type, "force") == 0) { + detach_from_frontend(nb); + xenbus_switch_state(nb->xenbus_device, XenbusStateClosed); + } else if (strcmp(type, "normal") == 0) { + netback2_shutdown(nb->xenbus_device); + } else { + printk(KERN_WARNING "Unrecognised shutdown request %s from tools\n", + type); + } + xenbus_rm(XBT_NIL, nb->xenbus_device->nodename, "shutdown-request"); + kfree(type); +} + +static int netback2_probe(struct xenbus_device *xd, + const struct xenbus_device_id *id) +{ + struct netback2 *nb; + + nb = kzalloc(sizeof(*nb), GFP_KERNEL); + if (nb == NULL) + goto err; + nb->magic = NETBACK2_MAGIC; + nb->xenbus_device = xd; + + nb->shutdown_watch.node = kasprintf(GFP_KERNEL, "%s/shutdown-request", + xd->nodename); + if (nb->shutdown_watch.node == NULL) + goto err; + nb->shutdown_watch.callback = shutdown_watch_callback; + if (register_xenbus_watch(&nb->shutdown_watch)) + goto err; + nb->have_shutdown_watch = 1; + + nb->chan = nc2_new(xd); + if (!nb->chan) + goto err; + + xd->dev.driver_data = nb; + + kobject_uevent(&xd->dev.kobj, KOBJ_ONLINE); + + return 0; + +err: + if (nb != NULL) { + if (nb->have_shutdown_watch) + unregister_xenbus_watch(&nb->shutdown_watch); + kfree(nb->shutdown_watch.node); + kfree(nb); + } + xenbus_dev_fatal(xd, ENOMEM, "probing netdev"); + return -ENOMEM; +} + +static int netback2_remove(struct xenbus_device *xd) +{ + struct netback2 *nb = xenbus_device_to_nb2(xd); + kobject_uevent(&xd->dev.kobj, KOBJ_OFFLINE); + if (nb->chan != NULL) + nc2_release(nb->chan); + if (nb->have_shutdown_watch) + unregister_xenbus_watch(&nb->shutdown_watch); + kfree(nb->shutdown_watch.node); + nc2_unmap_grants(&nb->b2f_mapping); + nc2_unmap_grants(&nb->f2b_mapping); + nc2_unmap_grants(&nb->control_mapping); + kfree(nb); + return 0; +} + +static const struct xenbus_device_id netback2_ids[] = { + { "vif2" }, + { "" } +}; + +static struct xenbus_driver netback2 = { + .name = "vif2", + .ids = netback2_ids, + .probe = netback2_probe, + .remove = netback2_remove, + .otherend_changed = frontend_changed, + .uevent = netback2_uevent, +}; + +int __init netback2_init(void) +{ + int r; + + r = xenbus_register_backend(&netback2); + if (r < 0) { + printk(KERN_ERR "error %d registering backend driver.\n", + r); + } + return r; +} diff --git a/drivers/net/xen-netchannel2/netchan2.c b/drivers/net/xen-netchannel2/netchan2.c new file mode 100644 index 0000000..b23b7e4 --- /dev/null +++ b/drivers/net/xen-netchannel2/netchan2.c @@ -0,0 +1,32 @@ +#include <linux/kernel.h> +#include <linux/module.h> +#include "netchannel2_endpoint.h" + +static int __init netchan2_init(void) +{ + int r; + + r = nc2_init(); + if (r < 0) + return r; + r = netfront2_init(); + if (r < 0) + return r; + r = netback2_init(); + if (r < 0) + netfront2_exit(); + return r; +} +module_init(netchan2_init); + +/* We can't unload if we're acting as a backend. */ +#ifndef CONFIG_XEN_NETDEV2_BACKEND +static void __exit netchan2_exit(void) +{ + netfront2_exit(); + nc2_exit(); +} +module_exit(netchan2_exit); +#endif + +MODULE_LICENSE("GPL"); diff --git a/drivers/net/xen-netchannel2/netchannel2_core.h b/drivers/net/xen-netchannel2/netchannel2_core.h new file mode 100644 index 0000000..4b324fc --- /dev/null +++ b/drivers/net/xen-netchannel2/netchannel2_core.h @@ -0,0 +1,352 @@ +#ifndef NETCHANNEL2_CORE_H__ +#define NETCHANNEL2_CORE_H__ + +#include <xen/interface/xen.h> +#include <xen/grant_table.h> +#include <xen/interface/io/netchannel2.h> +#include <asm/xen/hypercall.h> +#include <linux/skbuff.h> +#include <linux/netdevice.h> + +/* After we send this number of frags, we request the other end to + * notify us when sending the corresponding finish packet message */ +#define MAX_MAX_COUNT_FRAGS_NO_EVENT 192 + +/* Very small packets (e.g. TCP pure acks) are sent inline in the + * ring, to avoid the hypercall overhead. This is the largest packet + * which will be sent small, in bytes. It should be big enough to + * cover the normal headers (i.e. ethernet + IP + TCP = 66 bytes) plus + * a little bit of slop for options etc. */ +#define PACKET_PREFIX_SIZE 96 + +/* How many packets can we have outstanding at any one time? This + * must be small enough that it won't be confused with an sk_buff + * pointer; see the txp_slot stuff later. */ +#define NR_TX_PACKETS 256 + +/* A way of keeping track of a mapping of a bunch of grant references + into a contigous chunk of virtual address space. This is used for + things like multi-page rings. */ +#define MAX_GRANT_MAP_PAGES 4 +struct grant_mapping { + unsigned nr_pages; + grant_handle_t handles[MAX_GRANT_MAP_PAGES]; + struct vm_struct *mapping; +}; + +enum transmit_policy { + transmit_policy_unknown = 0, + transmit_policy_first = 0xf001, + transmit_policy_grant = transmit_policy_first, + transmit_policy_small, + transmit_policy_last = transmit_policy_small +}; + +/* When we send a packet message, we need to tag it with an ID. That + ID is an index into the TXP slot array. Each slot contains either + a pointer to an sk_buff (if it's in use), or the index of the next + free slot (if it isn't). A slot is in use if the contents is > + NR_TX_PACKETS, and free otherwise. */ +struct txp_slot { + unsigned long __contents; +}; + +typedef uint32_t nc2_txp_index_t; + +#define INVALID_TXP_INDEX ((nc2_txp_index_t)NR_TX_PACKETS) + +static inline int txp_slot_in_use(struct txp_slot *slot) +{ + if (slot->__contents <= NR_TX_PACKETS) + return 0; + else + return 1; +} + +static inline void txp_set_skb(struct txp_slot *slot, struct sk_buff *skb) +{ + slot->__contents = (unsigned long)skb; +} + +static inline struct sk_buff *txp_get_skb(struct txp_slot *slot) +{ + if (txp_slot_in_use(slot)) + return (struct sk_buff *)slot->__contents; + else + return NULL; +} + +static inline void txp_set_next_free(struct txp_slot *slot, + nc2_txp_index_t idx) +{ + slot->__contents = idx; +} + +static inline nc2_txp_index_t txp_get_next_free(struct txp_slot *slot) +{ + return (nc2_txp_index_t)slot->__contents; +} + +/* This goes in struct sk_buff::cb */ +struct skb_cb_overlay { + struct txp_slot *tp; + unsigned nr_fragments; + grant_ref_t gref_pool; + enum transmit_policy policy; + uint8_t failed; + uint8_t expecting_finish; + uint8_t type; + uint16_t inline_prefix_size; +}; + +#define CASSERT(x) typedef unsigned __cassert_ ## __LINE__ [(x)-1] +CASSERT(sizeof(struct skb_cb_overlay) <= sizeof(((struct sk_buff *)0)->cb)); + +static inline struct skb_cb_overlay *get_skb_overlay(struct sk_buff *skb) +{ + return (struct skb_cb_overlay *)skb->cb; +} + + +/* Packets for which we need to send FINISH_PACKET messages for as + soon as possible. */ +struct pending_finish_packets { +#define MAX_PENDING_FINISH_PACKETS 256 + uint32_t ids[MAX_PENDING_FINISH_PACKETS]; + RING_IDX prod; + RING_IDX cons; +}; + +#define RX_GRANT_COPY_BATCH 32 +struct hypercall_batcher { + unsigned nr_pending_gops; + struct gnttab_copy gops[RX_GRANT_COPY_BATCH]; + void *ctxt[RX_GRANT_COPY_BATCH]; +}; + +struct netchannel2_ring_pair { + struct netchannel2 *interface; + /* Main ring lock. Acquired from bottom halves. */ + spinlock_t lock; + + struct napi_struct napi; + + /* Protected by the lock. Initialised at attach_ring() time + and de-initialised at detach_ring() time. */ + struct netchannel2_prod_ring prod_ring; + struct netchannel2_cons_ring cons_ring; + uint8_t is_attached; /* True if the rings are currently safe to + access. */ + + unsigned max_count_frags_no_event; + unsigned expected_finish_messages; + + domid_t otherend_id; + + grant_ref_t gref_pool; + + /* The IRQ corresponding to the event channel which is + connected to the other end. This only changes from the + xenbus state change handler. It is notified from lots of + other places. Fortunately, it's safe to notify on an irq + after it's been released, so the lack of synchronisation + doesn't matter. */ + int irq; + int evtchn; + + /* The MAC address of our peer. */ + unsigned char remote_mac[ETH_ALEN]; + + /* Set if we need to check the source MAC address on incoming + packets. */ + int filter_mac; + + /* A pool of free transmitted_packet structures, threaded on + the list member. Protected by the lock. */ + nc2_txp_index_t head_free_tx_packet; + + /* Total number of packets on the allocated list. Protected + by the lock. */ + unsigned nr_tx_packets_outstanding; + /* Maximum number of packets which the other end will allow us + to keep outstanding at one time. Valid whenever + is_attached is set. */ + unsigned max_tx_packets_outstanding; + + /* Count number of frags that we have sent to the other side + When we reach a max value we request that the other end + send an event when sending the corresponding finish message */ + unsigned count_frags_no_event; + + /* Set if we need to send a SET_MAX_PACKETS message. + Protected by the lock. */ + uint8_t need_advertise_max_packets; + + /* Set if there are messages on the ring which are considered + time-sensitive, so that it's necessary to notify the remote + endpoint as soon as possible. */ + uint8_t pending_time_sensitive_messages; + + /* Set if we've previously suppressed a remote notification + because none of the messages pending at the time of the + flush were time-sensitive. The remote should be notified + as soon as the ring is flushed, even if the normal + filtering rules would suppress the event. */ + uint8_t delayed_kick; + + /* A list of packet IDs which we need to return to the other + end as soon as there is space on the ring. Protected by + the lock. */ + struct pending_finish_packets pending_finish; + + /* transmitted_packet structures which are to be transmitted + next time the TX tasklet looks at this interface. + Protected by the lock. */ + struct sk_buff_head pending_tx_queue; + + /* Packets which we'll have finished transmitting as soon as + we flush the hypercall batcher. Protected by the lock. */ + struct sk_buff_head release_on_flush_batcher; + + struct hypercall_batcher pending_rx_hypercalls; + + /* A pre-allocated pool of TX packets. The + allocated_tx_packets and free_tx_packets linked lists + contain elements of this array, and it can also be directly + indexed by packet ID. Protected by the lock. */ + struct txp_slot tx_packets[NR_TX_PACKETS]; +}; + +struct netchannel2 { +#define NETCHANNEL2_MAGIC 0x57c68c1d + unsigned magic; + + /* Set when the structure is created and never changed */ + struct net_device *net_device; + struct xenbus_device *xenbus_device; + + /* Set if we trust the remote endpoint. */ + int remote_trusted; + /* Set if the remote endpoint is expected to trust us. + There's no guarantee that this is actually correct, but + it's useful for optimisation. */ + int local_trusted; + + struct netchannel2_ring_pair rings; + + /* Packets which we need to transmit soon */ + struct sk_buff_head pending_skbs; + + /* Flag to indicate that the interface is stopped + When the interface is stopped we need to run the tasklet + after we receive an interrupt so that we can wake it up */ + uint8_t is_stopped; + + /* Updates are protected by the lock. This can be read at any + * time without holding any locks, and the rest of Linux is + * expected to cope. */ + struct net_device_stats stats; +}; + +static inline void flush_prepared_grant_copies(struct hypercall_batcher *hb, + void (*on_fail)(void *ctxt, + struct gnttab_copy *gop)) +{ + unsigned x; + + if (hb->nr_pending_gops == 0) + return; + if (HYPERVISOR_grant_table_op(GNTTABOP_copy, hb->gops, + hb->nr_pending_gops)) + BUG(); + for (x = 0; x < hb->nr_pending_gops; x++) + if (hb->gops[x].status != GNTST_okay) + on_fail(hb->ctxt[x], &hb->gops[x]); + hb->nr_pending_gops = 0; +} + +static inline struct gnttab_copy *hypercall_batcher_grant_copy( + struct hypercall_batcher *hb, + void *ctxt, + void (*on_fail)(void *, struct gnttab_copy *gop)) +{ + if (hb->nr_pending_gops == ARRAY_SIZE(hb->gops)) + flush_prepared_grant_copies(hb, on_fail); + hb->ctxt[hb->nr_pending_gops] = ctxt; + return &hb->gops[hb->nr_pending_gops++]; +} + +static inline void flush_hypercall_batcher( + struct hypercall_batcher *hb, + void (*on_fail)(void *, struct gnttab_copy *gop)) +{ + flush_prepared_grant_copies(hb, on_fail); +} + +struct sk_buff *handle_receiver_copy_packet(struct netchannel2 *nc, + struct netchannel2_ring_pair *ncrp, + struct netchannel2_msg_packet *msg, + struct netchannel2_msg_hdr *hdr, + unsigned nr_frags, + unsigned frags_off); + +int prepare_xmit_allocate_small(struct netchannel2_ring_pair *ncrp, + struct sk_buff *skb); +int prepare_xmit_allocate_grant(struct netchannel2_ring_pair *ncrp, + struct sk_buff *skb); +void xmit_grant(struct netchannel2_ring_pair *ncrp, + struct sk_buff *skb, + volatile void *msg); + +void queue_finish_packet_message(struct netchannel2_ring_pair *ncrp, + uint32_t id, uint8_t flags); + +int allocate_txp_slot(struct netchannel2_ring_pair *ncrp, + struct sk_buff *skb); +void release_txp_slot(struct netchannel2_ring_pair *ncrp, + struct sk_buff *skb); +/* Releases the txp slot, the grant pool, and the skb */ +void release_tx_packet(struct netchannel2_ring_pair *ncrp, + struct sk_buff *skb); + +void fetch_fragment(struct netchannel2_ring_pair *ncrp, + unsigned idx, + struct netchannel2_fragment *frag, + unsigned off); + +void nc2_kick(struct netchannel2_ring_pair *ncrp); + +int nc2_map_grants(struct grant_mapping *gm, + const grant_ref_t *grefs, + unsigned nr_grefs, + domid_t remote_domain); +void nc2_unmap_grants(struct grant_mapping *gm); + +void queue_packet_to_interface(struct sk_buff *skb, + struct netchannel2_ring_pair *ncrp); + +void nc2_rscb_on_gntcopy_fail(void *ctxt, struct gnttab_copy *gop); + +int nc2_start_xmit(struct sk_buff *skb, struct net_device *dev); +int nc2_really_start_xmit(struct netchannel2_ring_pair *ncrp, + struct sk_buff *skb); +int prepare_xmit_allocate_resources(struct netchannel2 *nc, + struct sk_buff *skb); +void nc2_handle_finish_packet_msg(struct netchannel2 *nc, + struct netchannel2_ring_pair *ncrp, + struct netchannel2_msg_hdr *hdr); +void nc2_handle_set_max_packets_msg(struct netchannel2_ring_pair *ncrp, + struct netchannel2_msg_hdr *hdr); +void drop_pending_tx_packets(struct netchannel2_ring_pair *ncrp); + +void send_finish_packet_messages(struct netchannel2_ring_pair *ncrp); +void nc2_handle_packet_msg(struct netchannel2 *nc, + struct netchannel2_ring_pair *ncrp, + struct netchannel2_msg_hdr *hdr, + struct sk_buff_head *pending_rx_queue); +void advertise_max_packets(struct netchannel2_ring_pair *ncrp); +void receive_pending_skbs(struct sk_buff_head *rx_queue); +void nc2_queue_purge(struct netchannel2_ring_pair *ncrp, + struct sk_buff_head *queue); + +#endif /* !NETCHANNEL2_CORE_H__ */ diff --git a/drivers/net/xen-netchannel2/netchannel2_endpoint.h b/drivers/net/xen-netchannel2/netchannel2_endpoint.h new file mode 100644 index 0000000..2525f23 --- /dev/null +++ b/drivers/net/xen-netchannel2/netchannel2_endpoint.h @@ -0,0 +1,63 @@ +/* Interface between the endpoint implementations (netfront2.c, + netback2.c) and the netchannel2 core (chan.c and the various + transmission modes). */ +#ifndef NETCHANNEL2_ENDPOINT_H__ +#define NETCHANNEL2_ENDPOINT_H__ + +#include <linux/init.h> +#include <xen/interface/xen.h> + +struct netchannel2_sring_prod; +struct netchannel2_sring_cons; +struct netchannel2; +struct xenbus_device; + +struct netchannel2 *nc2_new(struct xenbus_device *xd); +void nc2_release(struct netchannel2 *nc); + +int nc2_attach_rings(struct netchannel2 *nc, + struct netchannel2_sring_cons *cons_sring, + const volatile void *cons_payload, + size_t cons_size, + struct netchannel2_sring_prod *prod_sring, + void *prod_payload, + size_t prod_size, + domid_t otherend_id); +void nc2_detach_rings(struct netchannel2 *nc); +#if defined(CONFIG_XEN_NETDEV2_FRONTEND) +int nc2_listen_evtchn(struct netchannel2 *nc, domid_t dom); +#endif +#if defined(CONFIG_XEN_NETDEV2_BACKEND) +int nc2_connect_evtchn(struct netchannel2 *nc, domid_t domid, + int evtchn); +#endif +int nc2_get_evtchn_port(struct netchannel2 *nc); +void nc2_suspend(struct netchannel2 *nc); + +void nc2_set_nr_tx_buffers(struct netchannel2 *nc, unsigned nr_buffers); + +/* Interface which the endpoints provide to the core. */ +#ifdef CONFIG_XEN_NETDEV2_FRONTEND +int __init netfront2_init(void); +void __exit netfront2_exit(void); +#else +static inline int netfront2_init(void) +{ + return 0; +} +static inline void netfront2_exit(void) +{ +} +#endif +#ifdef CONFIG_XEN_NETDEV2_BACKEND +int __init netback2_init(void); +#else +static inline int netback2_init(void) +{ + return 0; +} +#endif +int __init nc2_init(void); +void __exit nc2_exit(void); + +#endif /* NETCHANNEL2_ENDPOINT_H__ */ diff --git a/drivers/net/xen-netchannel2/netfront2.c b/drivers/net/xen-netchannel2/netfront2.c new file mode 100644 index 0000000..6c88484 --- /dev/null +++ b/drivers/net/xen-netchannel2/netfront2.c @@ -0,0 +1,489 @@ +#include <linux/kernel.h> +#include <linux/gfp.h> +#include <linux/version.h> +#include <xen/grant_table.h> +#include <xen/xenbus.h> +#include <asm/xen/page.h> + +#include "netchannel2_core.h" +#include "netchannel2_endpoint.h" + +#define MAX_SRING_PAGES 4 + +struct netfront2 { +#define NETFRONT2_MAGIC 0x9268e704 + unsigned magic; + struct xenbus_device *xenbus_device; + + void *f2b_sring; + grant_ref_t f2b_grefs[MAX_SRING_PAGES]; + void *b2f_sring; + grant_ref_t b2f_grefs[MAX_SRING_PAGES]; + + struct netchannel2_frontend_shared *control_shared; + grant_ref_t control_shared_gref; + + int nr_sring_pages; + int sring_order; + + grant_ref_t rings_gref_pool; /* Some pre-allocated grant + references to cover the shared + rings. */ + + struct netchannel2 *chan; + + int attached; /* True if the shared rings are ready to go. */ +}; + +static struct netfront2 *xenbus_device_to_nf2(struct xenbus_device *xd) +{ + struct netfront2 *work = xd->dev.driver_data; + BUG_ON(work->magic != NETFRONT2_MAGIC); + return work; +} + +/* Try to revoke a bunch of grant references and return the grefs to + the rings grefs pool. Any cleared grefs are set to 0. Returns 0 + on success or <0 on error. Ignores zero entries in the @grefs + list, and zeroes any entries which are successfully ended. */ +static int ungrant_access_to_ring(struct netfront2 *nf, + grant_ref_t *grefs, + int nr_pages) +{ + int i; + int succ; + int failed; + + failed = 0; + + for (i = 0; i < nr_pages; i++) { + if (grefs[i]) { + succ = gnttab_end_foreign_access_ref(grefs[i], 0); + if (!succ) { + /* XXX we can't recover when this + * happens. Try to do something + * vaguely plausible, but the device + * is pretty much doomed. */ + printk(KERN_WARNING "Failed to end access to gref %d\n", + i); + failed = 1; + continue; + } + gnttab_release_grant_reference(&nf->rings_gref_pool, + grefs[i]); + grefs[i] = 0; + } + } + + if (failed) + return -EBUSY; + else + return 0; +} + +/* Allocate and initialise grant references to cover a bunch of pages. + @ring should be in the direct-mapped region. The rings_gref_pool + on nf should contain at least @nr_pages references. + Already-populated slots in the @grefs list are left unchanged. */ +static void grant_access_to_ring(struct netfront2 *nf, + domid_t otherend, + void *ring, + int *grefs, + int nr_pages) +{ + void *p; + int i; + grant_ref_t ref; + + for (i = 0; i < nr_pages; i++) { + + if (grefs[i] != 0) + continue; + + p = (void *)((unsigned long)ring + PAGE_SIZE * i); + + ref = gnttab_claim_grant_reference(&nf->rings_gref_pool); + /* There should be enough grefs in the pool to handle + the rings. */ + BUG_ON(ref < 0); + gnttab_grant_foreign_access_ref(ref, + otherend, + virt_to_mfn(p), + 0); + grefs[i] = ref; + } +} + +/* Push an already-granted ring into xenstore. */ +static int publish_ring(struct xenbus_transaction xbt, + struct netfront2 *nf, + const char *prefix, + const int *grefs, + int nr_grefs) +{ + int i; + char buf[32]; + int err; + + sprintf(buf, "%s-nr-pages", prefix); + err = xenbus_printf(xbt, nf->xenbus_device->nodename, buf, + "%u", nr_grefs); + if (err) + return err; + + for (i = 0; i < nr_grefs; i++) { + BUG_ON(grefs[i] == 0); + sprintf(buf, "%s-ref-%u", prefix, i); + err = xenbus_printf(xbt, nf->xenbus_device->nodename, + buf, "%u", grefs[i]); + if (err) + return err; + } + return 0; +} + +static int publish_rings(struct netfront2 *nf) +{ + int err; + struct xenbus_transaction xbt; + const char *msg; + +again: + err = xenbus_transaction_start(&xbt); + if (err) { + xenbus_dev_fatal(nf->xenbus_device, err, + "starting transaction"); + return err; + } + + err = publish_ring(xbt, nf, "f2b-ring", nf->f2b_grefs, + nf->nr_sring_pages); + if (err) { + msg = "publishing f2b-ring"; + goto abort; + } + err = publish_ring(xbt, nf, "b2f-ring", nf->b2f_grefs, + nf->nr_sring_pages); + if (err) { + msg = "publishing b2f-ring"; + goto abort; + } + err = publish_ring(xbt, nf, "control", &nf->control_shared_gref, 1); + if (err) { + msg = "publishing control"; + goto abort; + } + err = xenbus_printf(xbt, nf->xenbus_device->nodename, + "event-channel", "%u", + nc2_get_evtchn_port(nf->chan)); + if (err) { + msg = "publishing event channel"; + goto abort; + } + + err = xenbus_transaction_end(xbt, 0); + if (err) { + if (err == -EAGAIN) + goto again; + xenbus_dev_fatal(nf->xenbus_device, err, + "completing transaction"); + } + + return err; + +abort: + xenbus_transaction_end(xbt, 1); + xenbus_dev_fatal(nf->xenbus_device, err, msg); + return err; +} + +/* Release the rings. WARNING: This will leak memory if the other end + still has the rings mapped. There isn't really anything we can do + about that; the alternative (giving the other end access to + whatever Linux puts in the memory after we released it) is probably + worse. */ +static void release_rings(struct netfront2 *nf) +{ + int have_outstanding_grants; + + have_outstanding_grants = 0; + + if (nf->f2b_sring) { + if (ungrant_access_to_ring(nf, nf->f2b_grefs, + nf->nr_sring_pages) >= 0) { + free_pages((unsigned long)nf->f2b_sring, + nf->sring_order); + } else { + have_outstanding_grants = 1; + } + nf->f2b_sring = NULL; + } + + if (nf->b2f_sring) { + if (ungrant_access_to_ring(nf, nf->b2f_grefs, + nf->nr_sring_pages) >= 0) { + free_pages((unsigned long)nf->b2f_sring, + nf->sring_order); + } else { + have_outstanding_grants = 1; + } + nf->b2f_sring = NULL; + } + + if (nf->control_shared) { + if (ungrant_access_to_ring(nf, &nf->control_shared_gref, + 1) >= 0) { + free_page((unsigned long)nf->control_shared); + } else { + have_outstanding_grants = 1; + } + nf->control_shared = NULL; + } + + if (have_outstanding_grants != 0) { + printk(KERN_WARNING + "Released shared rings while the backend still had them mapped; leaking memory\n"); + } + + /* We can't release the gref pool if there are still + references outstanding against it. */ + if (!have_outstanding_grants) { + if (nf->rings_gref_pool) + gnttab_free_grant_references(nf->rings_gref_pool); + nf->rings_gref_pool = 0; + } + + nf->attached = 0; +} + +static int allocate_rings(struct netfront2 *nf, domid_t otherend) +{ + int err; + int max_sring_pages; + int sring_order; + int nr_sring_pages; + size_t sring_size; + + /* Figure out how big our shared rings are going to be. */ + err = xenbus_scanf(XBT_NIL, nf->xenbus_device->otherend, + "max-sring-pages", "%d", &max_sring_pages); + if (err < 0) { + xenbus_dev_fatal(nf->xenbus_device, err, + "reading %s/max-sring-pages", + nf->xenbus_device->otherend); + return err; + } + if (max_sring_pages > MAX_SRING_PAGES) + max_sring_pages = MAX_SRING_PAGES; + sring_order = order_base_2(max_sring_pages); + nr_sring_pages = 1 << sring_order; + sring_size = nr_sring_pages * PAGE_SIZE; + + release_rings(nf); + + nf->nr_sring_pages = nr_sring_pages; + nf->sring_order = sring_order; + + nf->f2b_sring = (void *)__get_free_pages(GFP_KERNEL, sring_order); + if (!nf->f2b_sring) + return -ENOMEM; + memset(nf->f2b_sring, 0, sring_size); + + nf->b2f_sring = (void *)__get_free_pages(GFP_KERNEL, sring_order); + if (!nf->b2f_sring) + return -ENOMEM; + memset(nf->b2f_sring, 0, sring_size); + + nf->control_shared = (void *)get_zeroed_page(GFP_KERNEL); + if (!nf->control_shared) + return -ENOMEM; + + /* Pre-allocate enough grant references to be sure that we can + grant access to both rings without an error. */ + err = gnttab_alloc_grant_references(nr_sring_pages * 2 + 1, + &nf->rings_gref_pool); + if (err < 0) + return err; + + grant_access_to_ring(nf, + otherend, + nf->b2f_sring, + nf->b2f_grefs, + nr_sring_pages); + grant_access_to_ring(nf, + otherend, + nf->f2b_sring, + nf->f2b_grefs, + nr_sring_pages); + grant_access_to_ring(nf, + otherend, + nf->control_shared, + &nf->control_shared_gref, + 1); + err = nc2_listen_evtchn(nf->chan, otherend); + if (err < 0) + return err; + + nf->attached = 1; + + return 0; +} + +static void backend_changed(struct xenbus_device *xd, + enum xenbus_state backend_state) +{ + struct netfront2 *nf = xenbus_device_to_nf2(xd); + int err; + + switch (backend_state) { + case XenbusStateInitialising: + /* Backend isn't ready yet, don't do anything. */ + break; + + case XenbusStateInitWait: + /* Backend has advertised the ring protocol. Allocate + the rings, and tell the backend about them. */ + + err = 0; + if (!nf->attached) + err = allocate_rings(nf, xd->otherend_id); + if (err < 0) { + xenbus_dev_fatal(xd, err, "allocating shared rings"); + break; + } + err = publish_rings(nf); + if (err >= 0) + xenbus_switch_state(xd, XenbusStateInitialised); + break; + + case XenbusStateInitialised: + /* Backend isn't supposed to use this state. */ + xenbus_dev_fatal(xd, EINVAL, + "unexpected backend state Initialised"); + break; + + case XenbusStateConnected: + /* All ready */ + err = nc2_attach_rings(nf->chan, + &nf->control_shared->cons, + nf->b2f_sring, + nf->nr_sring_pages * PAGE_SIZE, + &nf->control_shared->prod, + nf->f2b_sring, + nf->nr_sring_pages * PAGE_SIZE, + nf->xenbus_device->otherend_id); + if (err < 0) { + xenbus_dev_fatal(xd, err, + "failed to attach to rings"); + } else { + xenbus_switch_state(xd, XenbusStateConnected); + } + break; + + case XenbusStateClosing: + xenbus_switch_state(xd, XenbusStateClosing); + break; + + case XenbusStateClosed: + /* Tell the tools that it's safe to remove the device + from the bus. */ + xenbus_frontend_closed(xd); + /* Note that we don't release the rings here. This + means that if the backend moves to a different + domain, we won't be able to reconnect, but it also + limits the amount of memory which can be wasted in + the release_rings() leak if the backend is faulty + or malicious. It's not obvious which is more + useful, and so I choose the safer but less + featureful approach. */ + /* This is only a problem if you're using driver + domains and trying to recover from a driver error + by rebooting the backend domain. The rest of the + tools don't support that, so it's a bit + theoretical. The memory leaks aren't, though. */ + break; + + case XenbusStateUnknown: + /* The tools have removed the device area from the + store. Do nothing and rely on xenbus core to call + our remove method. */ + break; + + default: + /* Ignore transitions to unknown states */ + break; + } +} + +static int __devinit netfront_probe(struct xenbus_device *xd, + const struct xenbus_device_id *id) +{ + struct netfront2 *nf; + + nf = kzalloc(sizeof(*nf), GFP_KERNEL); + if (nf == NULL) + goto err; + nf->magic = NETFRONT2_MAGIC; + nf->xenbus_device = xd; + nf->chan = nc2_new(xd); + if (nf->chan == NULL) + goto err; + + xd->dev.driver_data = nf; + + return 0; + +err: + kfree(nf); + xenbus_dev_fatal(xd, ENOMEM, "probing netdev"); + return -ENOMEM; +} + +static int netfront_resume(struct xenbus_device *xd) +{ + /* We've been suspended and come back. The rings are + therefore dead. Tear them down. */ + /* We rely on the normal xenbus state machine to bring them + back to life. */ + struct netfront2 *nf = xenbus_device_to_nf2(xd); + + nc2_detach_rings(nf->chan); + release_rings(nf); + + return 0; +} + +static int __devexit netfront_remove(struct xenbus_device *xd) +{ + struct netfront2 *nf = xenbus_device_to_nf2(xd); + if (nf->chan != NULL) + nc2_release(nf->chan); + release_rings(nf); + kfree(nf); + return 0; +} + +static const struct xenbus_device_id netfront_ids[] = { + { "vif2" }, + { "" } +}; +MODULE_ALIAS("xen:vif2"); + +static struct xenbus_driver netfront2 = { + .name = "vif2", + .ids = netfront_ids, + .probe = netfront_probe, + .remove = __devexit_p(netfront_remove), + .otherend_changed = backend_changed, + .resume = netfront_resume, +}; + +int __init netfront2_init(void) +{ + return xenbus_register_frontend(&netfront2); +} + +void __exit netfront2_exit(void) +{ + xenbus_unregister_driver(&netfront2); +} diff --git a/drivers/net/xen-netchannel2/recv_packet.c b/drivers/net/xen-netchannel2/recv_packet.c new file mode 100644 index 0000000..4678c28 --- /dev/null +++ b/drivers/net/xen-netchannel2/recv_packet.c @@ -0,0 +1,216 @@ +/* Support for receiving individual packets, and all the stuff which + * goes with that. */ +#include <linux/kernel.h> +#include <linux/etherdevice.h> +#include <linux/version.h> +#include "netchannel2_core.h" + +/* Send as many finish packet messages as will fit on the ring. */ +void send_finish_packet_messages(struct netchannel2_ring_pair *ncrp) +{ + struct pending_finish_packets *pfp = &ncrp->pending_finish; + struct netchannel2_msg_finish_packet msg; + RING_IDX cons; + + while (pfp->prod != pfp->cons && + nc2_can_send_payload_bytes(&ncrp->prod_ring, sizeof(msg))) { + cons = pfp->cons; + msg.id = pfp->ids[pfp->cons % MAX_PENDING_FINISH_PACKETS]; + pfp->cons++; + nc2_send_message(&ncrp->prod_ring, + NETCHANNEL2_MSG_FINISH_PACKET, + 0, + &msg, + sizeof(msg)); + } +} + +/* Add a packet ID to the finish packet queue. The caller should + arrange that send_finish_packet_messages is sent soon to flush the + requests out. */ +void queue_finish_packet_message(struct netchannel2_ring_pair *ncrp, + uint32_t id, uint8_t flags) +{ + struct pending_finish_packets *pfp = &ncrp->pending_finish; + RING_IDX prod; + + prod = pfp->prod; + pfp->ids[prod % MAX_PENDING_FINISH_PACKETS] = id; + pfp->prod++; + + if (flags & NC2_PACKET_FLAG_need_event) + ncrp->pending_time_sensitive_messages = 1; +} + +/* Handle a packet message from the other end. On success, queues the + new skb to the pending skb list. If the packet is invalid, it is + discarded without generating a FINISH message. */ +/* Caution: this drops and re-acquires the ring lock. */ +void nc2_handle_packet_msg(struct netchannel2 *nc, + struct netchannel2_ring_pair *ncrp, + struct netchannel2_msg_hdr *hdr, + struct sk_buff_head *pending_rx_queue) +{ + unsigned nr_frags; + struct netchannel2_msg_packet msg; + struct sk_buff *skb; + const unsigned frags_off = sizeof(msg); + unsigned frags_bytes; + + if (ncrp->pending_finish.prod - ncrp->pending_finish.cons == + MAX_PENDING_FINISH_PACKETS) { + pr_debug("Remote endpoint sent too many packets!\n"); + nc->stats.rx_errors++; + return; + } + + if (hdr->size < sizeof(msg)) { + pr_debug("Packet message too small (%d < %zd)\n", hdr->size, + sizeof(msg)); + nc->stats.rx_errors++; + return; + } + + if (hdr->size & 7) { + pr_debug("Packet size in ring not multiple of 8: %d\n", + hdr->size); + nc->stats.rx_errors++; + return; + } + + nc2_copy_from_ring(&ncrp->cons_ring, &msg, sizeof(msg)); + + frags_bytes = hdr->size - sizeof(msg) - msg.prefix_size; + nr_frags = frags_bytes / sizeof(struct netchannel2_fragment); + + switch (msg.type) { + case NC2_PACKET_TYPE_small: + if (nr_frags != 0) { + /* Small packets, by definition, have no + * fragments */ + pr_debug("Received small packet with %d frags?\n", + nr_frags); + nc->stats.rx_errors++; + return; + } + /* Any of the receiver functions can handle small + packets as a trivial special case. Use receiver + copy, since that's the simplest. */ + skb = handle_receiver_copy_packet(nc, ncrp, &msg, hdr, + nr_frags, frags_off); + /* No finish message */ + break; + case NC2_PACKET_TYPE_receiver_copy: + skb = handle_receiver_copy_packet(nc, ncrp, &msg, hdr, + nr_frags, frags_off); + queue_finish_packet_message(ncrp, msg.id, msg.flags); + break; + default: + pr_debug("Unknown packet type %d\n", msg.type); + nc->stats.rx_errors++; + skb = NULL; + break; + } + if (skb != NULL) { + nc->stats.rx_bytes += skb->len; + nc->stats.rx_packets++; + skb->dev = nc->net_device; + + if (ncrp->filter_mac && + skb_headlen(skb) >= sizeof(struct ethhdr) && + memcmp(((struct ethhdr *)skb->data)->h_source, + ncrp->remote_mac, + ETH_ALEN)) { + /* We're in filter MACs mode and the source + MAC on this packet is wrong. Drop it. */ + /* (We know that any packet big enough to + contain an ethernet header at all will + contain it in the head space because we do + a pull_through at the end of the type + handler.) */ + nc->stats.rx_missed_errors++; + goto err; + } + + __skb_queue_tail(pending_rx_queue, skb); + + if (ncrp->pending_rx_hypercalls.nr_pending_gops >= + RX_GRANT_COPY_BATCH) { + flush_prepared_grant_copies(&ncrp->pending_rx_hypercalls, + nc2_rscb_on_gntcopy_fail); + /* since receive could generate ACKs to the + start_xmit() function we need to release + the ring lock */ + spin_unlock(&ncrp->lock); + /* we should receive the packet as soon as the + copy is complete to benefit from cache + locality */ + receive_pending_skbs(pending_rx_queue); + spin_lock(&ncrp->lock); + + } + + } + return; + +err: + /* If the receive succeeded part-way, there may be references + to the skb in the hypercall batcher. Flush them out before + we release it. This is a slow path, so we don't care that + much about performance. */ + flush_prepared_grant_copies(&ncrp->pending_rx_hypercalls, + nc2_rscb_on_gntcopy_fail); + + /* We may need to send a FINISH message here if this was a + receiver-map packet. That should be handled automatically + by the kfree_skb(). */ + kfree_skb(skb); + nc->stats.rx_errors++; + return; +} + +/* If there is space on the ring, tell the other end how many packets + its allowed to send at one time and clear the + need_advertise_max_packets flag. */ +void advertise_max_packets(struct netchannel2_ring_pair *ncrp) +{ + struct netchannel2_msg_set_max_packets msg; + + if (!nc2_can_send_payload_bytes(&ncrp->prod_ring, sizeof(msg))) + return; + msg.max_outstanding_packets = MAX_PENDING_FINISH_PACKETS; + nc2_send_message(&ncrp->prod_ring, + NETCHANNEL2_MSG_SET_MAX_PACKETS, + 0, + &msg, + sizeof(msg)); + ncrp->need_advertise_max_packets = 0; + ncrp->pending_time_sensitive_messages = 1; +} + +void receive_pending_skbs(struct sk_buff_head *pending_rx_queue) +{ + struct sk_buff *skb; + struct skb_cb_overlay *sco; + while (!skb_queue_empty(pending_rx_queue)) { + skb = __skb_dequeue(pending_rx_queue); + sco = get_skb_overlay(skb); + if (unlikely(sco->failed)) + kfree_skb(skb); + else { + skb->protocol = eth_type_trans(skb, skb->dev); + netif_receive_skb(skb); + } + } +} + + +/* These don't really belong here, but it's as good a place as any. */ +int __init nc2_init(void) +{ + return 0; +} + +void __exit nc2_exit(void) +{ +} diff --git a/drivers/net/xen-netchannel2/rscb.c b/drivers/net/xen-netchannel2/rscb.c new file mode 100644 index 0000000..5cc8cd9 --- /dev/null +++ b/drivers/net/xen-netchannel2/rscb.c @@ -0,0 +1,386 @@ +/* Receiver-side copy buffer support */ +#include <linux/kernel.h> +#include <linux/skbuff.h> +#include <linux/netdevice.h> +#include <linux/version.h> +#include <xen/grant_table.h> +#include <xen/live_maps.h> +#include <asm/xen/page.h> + +#include "netchannel2_core.h" + +/* -------------------------- Receive -------------------------------- */ + +/* This is called whenever an RSCB grant copy fails. */ +void nc2_rscb_on_gntcopy_fail(void *ctxt, struct gnttab_copy *gop) +{ + struct sk_buff *skb = ctxt; + struct skb_cb_overlay *sco = get_skb_overlay(skb); + if (!sco->failed && net_ratelimit()) + printk(KERN_WARNING "Dropping RX packet because of copy error\n"); + sco->failed = 1; +} + + +/* Copy @size bytes from @offset in grant ref @gref against domain + @domid and shove them on the end of @skb. Fails if it the head + does not have enough space or if the copy would span multiple + pages. */ +static int nc2_grant_copy(struct netchannel2_ring_pair *ncrp, + struct sk_buff *skb, + unsigned offset, + unsigned size, + grant_ref_t gref, + domid_t domid) +{ + struct gnttab_copy *gop; + void *tail; + void *end; + + if (size > PAGE_SIZE) + return 0; + + tail = skb_tail_pointer(skb); + end = skb_end_pointer(skb); + + if (unlikely(size > (end-tail))) + return 0; + + if (unlikely(offset_in_page(tail) + size > PAGE_SIZE)) { + unsigned f1 = PAGE_SIZE - offset_in_page(tail); + /* Recursive, but only ever to depth 1, so okay */ + if (!nc2_grant_copy(ncrp, skb, offset, f1, gref, domid)) + return 0; + offset += f1; + size -= f1; + tail += f1; + } + + /* Copy this fragment into the header. */ + gop = hypercall_batcher_grant_copy(&ncrp->pending_rx_hypercalls, + skb, + nc2_rscb_on_gntcopy_fail); + gop->flags = GNTCOPY_source_gref; + gop->source.domid = domid; + gop->source.offset = offset; + gop->source.u.ref = gref; + gop->dest.domid = DOMID_SELF; + gop->dest.offset = offset_in_page(tail); + gop->dest.u.gmfn = virt_to_mfn(tail); + gop->len = size; + + skb_put(skb, size); + + return 1; +} + +/* We've received a receiver-copy packet message from the remote. + Parse it up, build an sk_buff, and return it. Returns NULL on + error. */ +struct sk_buff *handle_receiver_copy_packet(struct netchannel2 *nc, + struct netchannel2_ring_pair *ncrp, + struct netchannel2_msg_packet *msg, + struct netchannel2_msg_hdr *hdr, + unsigned nr_frags, + unsigned frags_off) +{ + struct netchannel2_fragment frag; + unsigned nr_bytes; + unsigned x; + struct sk_buff *skb; + unsigned skb_headsize; + int first_frag, first_frag_size; + struct gnttab_copy *gop; + struct skb_shared_info *shinfo; + struct page *new_page; + + if (msg->prefix_size > NETCHANNEL2_MAX_INLINE_BYTES) { + pr_debug("Inline prefix too big! (%d > %d)\n", + msg->prefix_size, NETCHANNEL2_MAX_INLINE_BYTES); + return NULL; + } + + /* Count the number of bytes in the packet. Be careful: the + other end can still access the packet on the ring, so the + size could change later. */ + nr_bytes = msg->prefix_size; + for (x = 0; x < nr_frags; x++) { + fetch_fragment(ncrp, x, &frag, frags_off); + nr_bytes += frag.size; + } + if (nr_bytes > NETCHANNEL2_MAX_PACKET_BYTES) { + pr_debug("Packet too big! (%d > %d)\n", nr_bytes, + NETCHANNEL2_MAX_PACKET_BYTES); + return NULL; + } + if (nr_bytes < 64) { + /* Linux sometimes has problems with very small SKBs. + Impose a minimum size of 64 bytes. */ + nr_bytes = 64; + } + + first_frag = 0; + if (nr_frags > 0) { + fetch_fragment(ncrp, 0, &frag, frags_off); + first_frag_size = frag.size; + first_frag = 1; + } else { + first_frag_size = 0; + first_frag = 0; + } + + /* We try to have both prefix and the first frag in the skb head + if they do not exceed the page size */ + skb_headsize = msg->prefix_size + first_frag_size + NET_IP_ALIGN; + if (skb_headsize > + ((PAGE_SIZE - sizeof(struct skb_shared_info) - NET_SKB_PAD) & + ~(SMP_CACHE_BYTES - 1))) { + skb_headsize = msg->prefix_size + NET_IP_ALIGN; + first_frag = 0; + } + + skb = dev_alloc_skb(skb_headsize); + if (!skb) { + /* Drop the packet. */ + pr_debug("Couldn't allocate a %d byte skb.\n", nr_bytes); + nc->stats.rx_dropped++; + return NULL; + } + + /* Arrange that the IP header is nicely aligned in memory. */ + skb_reserve(skb, NET_IP_ALIGN); + + /* The inline prefix should always fit in the SKB head. */ + nc2_copy_from_ring_off(&ncrp->cons_ring, + skb_put(skb, msg->prefix_size), + msg->prefix_size, + frags_off + nr_frags * sizeof(frag)); + + /* copy first frag into skb head if it does not cross a + page boundary */ + if (first_frag == 1) { + fetch_fragment(ncrp, 0, &frag, frags_off); + if (!nc2_grant_copy(ncrp, skb, frag.off, frag.size, + frag.receiver_copy.gref, + ncrp->otherend_id)) { + get_skb_overlay(skb)->failed = 1; + return skb; + } + } + + shinfo = skb_shinfo(skb); + for (x = first_frag; x < nr_frags; x++) { + fetch_fragment(ncrp, x, &frag, frags_off); + + /* Allocate a new page for the fragment */ + new_page = alloc_page(GFP_ATOMIC); + if (!new_page) { + get_skb_overlay(skb)->failed = 1; + return skb; + } + + gop = hypercall_batcher_grant_copy(&ncrp->pending_rx_hypercalls, + skb, + nc2_rscb_on_gntcopy_fail); + gop->flags = GNTCOPY_source_gref; + gop->source.domid = ncrp->otherend_id; + gop->source.offset = frag.off; + gop->source.u.ref = frag.receiver_copy.gref; + gop->dest.domid = DOMID_SELF; + gop->dest.offset = 0; + gop->dest.u.gmfn = pfn_to_mfn(page_to_pfn(new_page)); + gop->len = frag.size; + + shinfo->frags[x-first_frag].page = new_page; + shinfo->frags[x-first_frag].page_offset = 0; + shinfo->frags[x-first_frag].size = frag.size; + shinfo->nr_frags++; + + skb->truesize += frag.size; + skb->data_len += frag.size; + skb->len += frag.size; + } + return skb; +} + + + +/* ------------------------------- Transmit ---------------------------- */ + +struct grant_packet_plan { + volatile struct netchannel2_fragment *out_fragment; + grant_ref_t gref_pool; + unsigned prefix_avail; +}; + +static inline int nfrags_skb(struct sk_buff *skb, int prefix_size) +{ + unsigned long start_grant; + unsigned long end_grant; + + if (skb_headlen(skb) <= prefix_size) + return skb_shinfo(skb)->nr_frags; + + start_grant = ((unsigned long)skb->data + prefix_size) & + ~(PAGE_SIZE-1); + end_grant = ((unsigned long)skb->data + + skb_headlen(skb) + PAGE_SIZE - 1) & + ~(PAGE_SIZE-1); + return ((end_grant - start_grant) >> PAGE_SHIFT) + + skb_shinfo(skb)->nr_frags; +} + +int prepare_xmit_allocate_grant(struct netchannel2_ring_pair *ncrp, + struct sk_buff *skb) +{ + struct skb_cb_overlay *skb_co = get_skb_overlay(skb); + unsigned nr_fragments; + grant_ref_t gref_pool; + int err; + unsigned inline_prefix_size; + + if (allocate_txp_slot(ncrp, skb) < 0) + return -1; + + /* We're going to have to get the remote to issue a grant copy + hypercall anyway, so there's no real benefit to shoving the + headers inline. */ + /* (very small packets won't go through here, so there's no + chance that we could completely eliminate the grant + copy.) */ + inline_prefix_size = sizeof(struct ethhdr); + + if (skb_co->nr_fragments == 0) { + nr_fragments = nfrags_skb(skb, inline_prefix_size); + + /* No-fragments packets should be policy small, not + * policy grant. */ + BUG_ON(nr_fragments == 0); + + skb_co->nr_fragments = nr_fragments; + } + + /* Grab the grant references. */ + err = gnttab_suballoc_grant_references(skb_co->nr_fragments, + &ncrp->gref_pool, + &gref_pool); + if (err < 0) { + release_txp_slot(ncrp, skb); + /* Leave skb_co->nr_fragments set, so that we don't + have to recompute it next time around. */ + return -1; + } + skb_co->gref_pool = gref_pool; + skb_co->inline_prefix_size = inline_prefix_size; + + skb_co->type = NC2_PACKET_TYPE_receiver_copy; + + return 0; +} + +static void prepare_subpage_grant(struct netchannel2_ring_pair *ncrp, + struct page *page, + unsigned off_in_page, + unsigned size, + struct grant_packet_plan *plan) +{ + volatile struct netchannel2_fragment *frag; + domid_t trans_domid; + grant_ref_t trans_gref; + grant_ref_t gref; + + if (size <= plan->prefix_avail) { + /* This fragment is going to be inline -> nothing to + * do. */ + plan->prefix_avail -= size; + return; + } + if (plan->prefix_avail > 0) { + /* Part inline, part in payload. */ + size -= plan->prefix_avail; + off_in_page += plan->prefix_avail; + plan->prefix_avail = 0; + } + frag = plan->out_fragment; + gref = gnttab_claim_grant_reference(&plan->gref_pool); + frag->receiver_copy.gref = gref; + if (page_is_tracked(page)) { + lookup_tracker_page(page, &trans_domid, &trans_gref); + gnttab_grant_foreign_access_ref_trans(gref, + ncrp->otherend_id, + GTF_readonly, + trans_domid, + trans_gref); + } else { + gnttab_grant_foreign_access_ref_subpage(gref, + ncrp->otherend_id, + virt_to_mfn(page_address(page)), + GTF_readonly, + off_in_page, + size); + } + + frag->off = off_in_page; + frag->size = size; + plan->out_fragment++; +} + +static int grant_data_area(struct netchannel2_ring_pair *ncrp, + struct sk_buff *skb, + struct grant_packet_plan *plan) +{ + void *ptr = skb->data; + unsigned len = skb_headlen(skb); + unsigned off; + unsigned this_time; + + for (off = 0; off < len; off += this_time) { + this_time = len - off; + if (this_time + offset_in_page(ptr + off) > PAGE_SIZE) + this_time = PAGE_SIZE - offset_in_page(ptr + off); + prepare_subpage_grant(ncrp, + virt_to_page(ptr + off), + offset_in_page(ptr + off), + this_time, + plan); + } + return 0; +} + +void xmit_grant(struct netchannel2_ring_pair *ncrp, + struct sk_buff *skb, + volatile void *msg_buf) +{ + volatile struct netchannel2_msg_packet *msg = msg_buf; + struct skb_cb_overlay *skb_co = get_skb_overlay(skb); + struct grant_packet_plan plan; + unsigned x; + struct skb_shared_info *shinfo; + skb_frag_t *frag; + + memset(&plan, 0, sizeof(plan)); + plan.prefix_avail = skb_co->inline_prefix_size; + plan.out_fragment = msg->frags; + plan.gref_pool = skb_co->gref_pool; + + ncrp->count_frags_no_event += skb_co->nr_fragments; + if (ncrp->count_frags_no_event >= ncrp->max_count_frags_no_event) { + msg->flags |= NC2_PACKET_FLAG_need_event; + ncrp->count_frags_no_event = 0; + } + + grant_data_area(ncrp, skb, &plan); + + shinfo = skb_shinfo(skb); + for (x = 0; x < shinfo->nr_frags; x++) { + frag = &shinfo->frags[x]; + prepare_subpage_grant(ncrp, + frag->page, + frag->page_offset, + frag->size, + &plan); + } + + skb_co->nr_fragments = plan.out_fragment - msg->frags; +} + diff --git a/drivers/net/xen-netchannel2/util.c b/drivers/net/xen-netchannel2/util.c new file mode 100644 index 0000000..57e6aed --- /dev/null +++ b/drivers/net/xen-netchannel2/util.c @@ -0,0 +1,227 @@ +#include <linux/kernel.h> +#include <linux/list.h> +#include <linux/skbuff.h> +#include <linux/version.h> +#include <xen/grant_table.h> +#include "netchannel2_core.h" + +int allocate_txp_slot(struct netchannel2_ring_pair *ncrp, + struct sk_buff *skb) +{ + struct skb_cb_overlay *skb_co = get_skb_overlay(skb); + struct txp_slot *tp; + + BUG_ON(skb_co->tp); + + if (ncrp->head_free_tx_packet == INVALID_TXP_INDEX || + ncrp->nr_tx_packets_outstanding == + ncrp->max_tx_packets_outstanding) { + return -1; + } + + tp = &ncrp->tx_packets[ncrp->head_free_tx_packet]; + ncrp->head_free_tx_packet = txp_get_next_free(tp); + + txp_set_skb(tp, skb); + skb_co->tp = tp; + ncrp->nr_tx_packets_outstanding++; + return 0; +} + +static void nc2_free_skb(struct netchannel2 *nc, + struct sk_buff *skb) +{ + dev_kfree_skb(skb); +} + +void release_txp_slot(struct netchannel2_ring_pair *ncrp, + struct sk_buff *skb) +{ + struct skb_cb_overlay *skb_co = get_skb_overlay(skb); + struct txp_slot *tp = skb_co->tp; + + BUG_ON(txp_get_skb(tp) != skb); + + /* Try to keep the free TX packet list in order as far as + * possible, since that gives slightly better cache behaviour. + * It's not worth spending a lot of effort getting this right, + * though, so just use a simple heuristic: if we're freeing a + * packet, and the previous packet is already free, chain this + * packet directly after it, rather than putting it at the + * head of the list. This isn't perfect by any means, but + * it's enough that you get nice long runs of contiguous + * packets in the free list, and that's all we really need. + * Runs much bigger than a cache line aren't really very + * useful, anyway. */ + if (tp != ncrp->tx_packets && !txp_slot_in_use(tp - 1)) { + txp_set_next_free(tp, txp_get_next_free(tp - 1)); + txp_set_next_free(tp - 1, tp - ncrp->tx_packets); + } else { + txp_set_next_free(tp, ncrp->head_free_tx_packet); + ncrp->head_free_tx_packet = tp - ncrp->tx_packets; + } + skb_co->tp = NULL; + ncrp->nr_tx_packets_outstanding--; +} + +void release_tx_packet(struct netchannel2_ring_pair *ncrp, + struct sk_buff *skb) +{ + struct skb_cb_overlay *skb_co = get_skb_overlay(skb); + struct txp_slot *tp = skb_co->tp; + grant_ref_t gref; + int r; + unsigned cntr; + + if (skb_co->type == NC2_PACKET_TYPE_receiver_copy) { + while (1) { + r = gnttab_claim_grant_reference(&skb_co->gref_pool); + if (r == -ENOSPC) + break; + gref = (grant_ref_t)r; + /* It's a subpage grant reference, so Xen + guarantees to release it quickly. Sit and + wait for it to do so. */ + cntr = 0; + while (!gnttab_end_foreign_access_ref(gref, 0)) { + cpu_relax(); + if (++cntr % 65536 == 0) + printk(KERN_WARNING "Having trouble ending gref %d for receiver copy.\n", + gref); + } + gnttab_release_grant_reference(&ncrp->gref_pool, gref); + } + } else if (skb_co->gref_pool != 0) { + gnttab_subfree_grant_references(skb_co->gref_pool, + &ncrp->gref_pool); + } + + if (tp != NULL) + release_txp_slot(ncrp, skb); + + nc2_free_skb(ncrp->interface, skb); +} + +void fetch_fragment(struct netchannel2_ring_pair *ncrp, + unsigned idx, + struct netchannel2_fragment *frag, + unsigned off) +{ + nc2_copy_from_ring_off(&ncrp->cons_ring, + frag, + sizeof(*frag), + off + idx * sizeof(*frag)); +} + +/* Copy @count bytes from the skb's data area into its head, updating + * the pointers as appropriate. The caller should ensure that there + * is actually enough space in the head. */ +void pull_through(struct sk_buff *skb, unsigned count) +{ + unsigned frag = 0; + unsigned this_frag; + void *buf; + void *va; + + while (count != 0 && frag < skb_shinfo(skb)->nr_frags) { + this_frag = skb_shinfo(skb)->frags[frag].size; + if (this_frag > count) + this_frag = count; + va = page_address(skb_shinfo(skb)->frags[frag].page); + buf = skb->tail; + memcpy(buf, va + skb_shinfo(skb)->frags[frag].page_offset, + this_frag); + skb->tail += this_frag; + BUG_ON(skb->tail > skb->end); + skb_shinfo(skb)->frags[frag].size -= this_frag; + skb_shinfo(skb)->frags[frag].page_offset += this_frag; + skb->data_len -= this_frag; + count -= this_frag; + frag++; + } + for (frag = 0; + frag < skb_shinfo(skb)->nr_frags && + skb_shinfo(skb)->frags[frag].size == 0; + frag++) { + put_page(skb_shinfo(skb)->frags[frag].page); + } + skb_shinfo(skb)->nr_frags -= frag; + memmove(skb_shinfo(skb)->frags, + skb_shinfo(skb)->frags+frag, + sizeof(skb_shinfo(skb)->frags[0]) * + skb_shinfo(skb)->nr_frags); +} + +#ifdef CONFIG_XEN_NETDEV2_BACKEND + +/* Zap a grant_mapping structure, releasing all mappings and the + reserved virtual address space. Prepare the grant_mapping for + re-use. */ +void nc2_unmap_grants(struct grant_mapping *gm) +{ + struct gnttab_unmap_grant_ref op[MAX_GRANT_MAP_PAGES]; + int i; + + if (gm->mapping == NULL) + return; + for (i = 0; i < gm->nr_pages; i++) { + gnttab_set_unmap_op(&op[i], + (unsigned long)gm->mapping->addr + + i * PAGE_SIZE, + GNTMAP_host_map, + gm->handles[i]); + } + if (HYPERVISOR_grant_table_op(GNTTABOP_unmap_grant_ref, op, i)) + BUG(); + free_vm_area(gm->mapping); + memset(gm, 0, sizeof(*gm)); +} + +int nc2_map_grants(struct grant_mapping *gm, + const grant_ref_t *grefs, + unsigned nr_grefs, + domid_t remote_domain) +{ + struct grant_mapping work; + struct gnttab_map_grant_ref op[MAX_GRANT_MAP_PAGES]; + int i; + + memset(&work, 0, sizeof(work)); + + if (nr_grefs > MAX_GRANT_MAP_PAGES || nr_grefs == 0) + return -EINVAL; + + if (nr_grefs & (nr_grefs-1)) { + /* Must map a power-of-two number of pages. */ + return -EINVAL; + } + + work.nr_pages = nr_grefs; + work.mapping = alloc_vm_area(PAGE_SIZE * work.nr_pages); + if (!work.mapping) + return -ENOMEM; + for (i = 0; i < nr_grefs; i++) + gnttab_set_map_op(&op[i], + (unsigned long)work.mapping->addr + + i * PAGE_SIZE, + GNTMAP_host_map, + grefs[i], + remote_domain); + + if (HYPERVISOR_grant_table_op(GNTTABOP_map_grant_ref, op, nr_grefs)) + BUG(); + + for (i = 0; i < nr_grefs; i++) { + if (op[i].status) { + work.nr_pages = i; + nc2_unmap_grants(&work); + return -EFAULT; + } + work.handles[i] = op[i].handle; + } + + nc2_unmap_grants(gm); + *gm = work; + return 0; +} +#endif diff --git a/drivers/net/xen-netchannel2/xmit_packet.c b/drivers/net/xen-netchannel2/xmit_packet.c new file mode 100644 index 0000000..92fbabf --- /dev/null +++ b/drivers/net/xen-netchannel2/xmit_packet.c @@ -0,0 +1,318 @@ +/* Things related to actually sending packet messages, and which is + shared across all transmit modes. */ +#include <linux/kernel.h> +#include <linux/version.h> +#include "netchannel2_core.h" + +/* We limit the number of transmitted packets which can be in flight + at any one time, as a somewhat paranoid safety catch. */ +#define MAX_TX_PACKETS MAX_PENDING_FINISH_PACKETS + +static enum transmit_policy transmit_policy(struct netchannel2 *nc, + struct sk_buff *skb) +{ + if (skb->len <= PACKET_PREFIX_SIZE && !skb_is_nonlinear(skb)) + return transmit_policy_small; + else + return transmit_policy_grant; +} + +/* Allocate resources for a small packet. The entire thing will be + transmitted in the ring. This is only called for small, linear + SKBs. It always succeeds, but has an int return type for symmetry + with the other prepare_xmit_*() functions. */ +int prepare_xmit_allocate_small(struct netchannel2_ring_pair *ncrp, + struct sk_buff *skb) +{ + struct skb_cb_overlay *skb_co = get_skb_overlay(skb); + + BUG_ON(skb_is_nonlinear(skb)); + BUG_ON(skb->len > NETCHANNEL2_MAX_INLINE_BYTES); + + skb_co->type = NC2_PACKET_TYPE_small; + skb_co->gref_pool = 0; + skb_co->inline_prefix_size = skb->len; + + return 0; +} + +/* Figure out how much space @tp will take up on the ring. */ +static unsigned get_transmitted_packet_msg_size(struct sk_buff *skb) +{ + struct skb_cb_overlay *skb_co = get_skb_overlay(skb); + return (sizeof(struct netchannel2_msg_packet) + + sizeof(struct netchannel2_fragment) * skb_co->nr_fragments + + skb_co->inline_prefix_size + 7) & ~7; +} + +/* Do the minimum amount of work to be certain that when we come to + transmit this packet we won't run out of resources. This includes + figuring out how we're going to fragment the packet for + transmission, which buffers we're going to use, etc. Return <0 if + insufficient resources are available right now, or 0 if we + succeed. */ +/* Careful: this may allocate e.g. a TXP slot and then discover that + it can't reserve ring space. In that case, the TXP remains + allocated. The expected case is that the caller will arrange for + us to retry the allocation later, in which case we'll pick up the + already-allocated buffers. */ +int prepare_xmit_allocate_resources(struct netchannel2 *nc, + struct sk_buff *skb) +{ + struct skb_cb_overlay *skb_co = get_skb_overlay(skb); + enum transmit_policy policy; + unsigned msg_size; + int r; + + if (skb_co->policy == transmit_policy_unknown) { + policy = transmit_policy(nc, skb); + switch (policy) { + case transmit_policy_small: + r = prepare_xmit_allocate_small(&nc->rings, skb); + break; + case transmit_policy_grant: + r = prepare_xmit_allocate_grant(&nc->rings, skb); + break; + default: + BUG(); + /* Shut the compiler up. */ + r = -1; + } + if (r < 0) + return r; + skb_co->policy = policy; + } + + msg_size = get_transmitted_packet_msg_size(skb); + if (nc2_reserve_payload_bytes(&nc->rings.prod_ring, msg_size)) + return 0; + + return -1; +} + +/* Transmit a packet which has previously been prepared with + prepare_xmit_allocate_resources(). */ +/* Once this has been called, the ring must not be flushed until the + TX hypercall batcher is (assuming this ring has a hypercall + batcher). */ +int nc2_really_start_xmit(struct netchannel2_ring_pair *ncrp, + struct sk_buff *skb) +{ + struct skb_cb_overlay *skb_co = get_skb_overlay(skb); + struct netchannel2 *nc = ncrp->interface; + unsigned msg_size; + volatile struct netchannel2_msg_packet *msg; + + msg_size = get_transmitted_packet_msg_size(skb); + /* Un-reserve the space we reserved for the packet. */ + BUG_ON(ncrp->prod_ring.reserve < msg_size); + ncrp->prod_ring.reserve -= msg_size; + if (!nc2_can_send_payload_bytes(&ncrp->prod_ring, msg_size)) { + /* Aw, crud. We had to transmit a PAD message at just + the wrong time, and our attempt to reserve ring + space failed. Delay transmiting this packet + Make sure we redo the space reserve */ + ncrp->prod_ring.reserve += msg_size; + return 0; + } + __nc2_avoid_ring_wrap(&ncrp->prod_ring, msg_size); + + /* Set up part of the message. We do the message header + itself and the inline prefix. The individual xmit_* + methods are responsible for the fragments. They may also + set some more msg flags. */ + msg = __nc2_get_message_ptr(&ncrp->prod_ring); + msg->hdr.type = NETCHANNEL2_MSG_PACKET; + msg->hdr.flags = 0; + msg->id = skb_co->tp - ncrp->tx_packets; + msg->type = skb_co->type; + msg->flags = 0; + msg->prefix_size = skb_co->inline_prefix_size; + + /* We cast away the volatile to avoid compiler warnings, and + then use barrier()s to discourage gcc from using msg->frags + in CSE or somesuch. It's kind of unlikely that it would, + but better to make sure. */ + barrier(); + memcpy((void *)(msg->frags + skb_co->nr_fragments), + skb->data, + skb_co->inline_prefix_size); + barrier(); + + switch (skb_co->policy) { + case transmit_policy_small: + /* Nothing to do */ + break; + case transmit_policy_grant: + xmit_grant(ncrp, skb, msg); + break; + default: + BUG(); + } + + /* The transmission method may have decided not to use all the + fragments it reserved, which changes the message size. */ + msg_size = get_transmitted_packet_msg_size(skb); + msg->hdr.size = msg_size; + + ncrp->prod_ring.prod_pvt += msg_size; + + BUG_ON(ncrp->prod_ring.bytes_available < msg_size); + + ncrp->prod_ring.bytes_available -= msg_size; + + ncrp->pending_time_sensitive_messages = 1; + + if (skb_co->tp) { + ncrp->expected_finish_messages++; + /* We're now ready to accept a FINISH message for this + packet. */ + skb_co->expecting_finish = 1; + } else { + /* This packet doesn't need a FINISH message. Queue + it up to be released as soon as we flush the + hypercall batcher and the ring. */ + nc->stats.tx_bytes += skb->len; + nc->stats.tx_packets++; + __skb_queue_tail(&ncrp->release_on_flush_batcher, skb); + } + + return 1; +} + +/* Arrange that @skb will be sent on ring @ncrp soon. Assumes that + prepare_xmit_allocate_resources() has been successfully called on + @skb already. */ +void queue_packet_to_interface(struct sk_buff *skb, + struct netchannel2_ring_pair *ncrp) +{ + __skb_queue_tail(&ncrp->pending_tx_queue, skb); + if (ncrp->pending_tx_queue.qlen == 1) + nc2_kick(ncrp); +} + +int nc2_start_xmit(struct sk_buff *skb, struct net_device *dev) +{ + struct netchannel2 *nc = netdev_priv(dev); + struct skb_cb_overlay *sco = get_skb_overlay(skb); + int r; + + memset(sco, 0, sizeof(*sco)); + + spin_lock_bh(&nc->rings.lock); + + if (!nc->rings.is_attached) { + spin_unlock_bh(&nc->rings.lock); + dev_kfree_skb(skb); + nc->stats.tx_dropped++; + return NETDEV_TX_OK; + } + + r = prepare_xmit_allocate_resources(nc, skb); + if (r < 0) + goto out_busy; + queue_packet_to_interface(skb, &nc->rings); + spin_unlock_bh(&nc->rings.lock); + + return NETDEV_TX_OK; + +out_busy: + /* Some more buffers may have arrived, so kick the worker + * thread to go and have a look. */ + nc2_kick(&nc->rings); + + __skb_queue_tail(&nc->pending_skbs, skb); + nc->is_stopped = 1; + netif_stop_queue(dev); + spin_unlock_bh(&nc->rings.lock); + return NETDEV_TX_OK; +} + + +void nc2_handle_finish_packet_msg(struct netchannel2 *nc, + struct netchannel2_ring_pair *ncrp, + struct netchannel2_msg_hdr *hdr) +{ + struct skb_cb_overlay *sco; + struct netchannel2_msg_finish_packet msg; + struct txp_slot *tp; + struct sk_buff *skb; + + if (hdr->size < sizeof(msg)) { + pr_debug("Packet finish message had strange size %d\n", + hdr->size); + return; + } + nc2_copy_from_ring(&ncrp->cons_ring, &msg, sizeof(msg)); + if (msg.id > NR_TX_PACKETS) { + pr_debug("Other end tried to end bad packet id %d\n", + msg.id); + return; + } + tp = &ncrp->tx_packets[msg.id]; + skb = txp_get_skb(tp); + if (!skb) { + pr_debug("Other end tried to end packet id %d which wasn't in use\n", + msg.id); + return; + } + sco = get_skb_overlay(skb); + /* Careful: if the remote is malicious, they may try to end a + packet after we allocate it but before we send it (e.g. if + we've had to back out because we didn't have enough ring + space). */ + if (!sco->expecting_finish) { + pr_debug("Other end finished packet before we sent it?\n"); + return; + } + nc->stats.tx_bytes += skb->len; + nc->stats.tx_packets++; + release_tx_packet(ncrp, skb); + ncrp->expected_finish_messages--; +} + + +/* ------------------------ Control-path operations ---------------------- */ +void nc2_handle_set_max_packets_msg(struct netchannel2_ring_pair *ncrp, + struct netchannel2_msg_hdr *hdr) +{ + struct netchannel2_msg_set_max_packets msg; + + if (hdr->size != sizeof(msg)) { + pr_debug("Set max packets message had strange size %d\n", + hdr->size); + return; + } + if (ncrp->max_tx_packets_outstanding != 0) { + pr_debug("Other end tried to change number of outstanding packets from %d.\n", + ncrp->max_tx_packets_outstanding); + return; + } + nc2_copy_from_ring(&ncrp->cons_ring, &msg, sizeof(msg)); + /* Limit the number of outstanding packets to something sane. + This is a little bit paranoid (it should be safe to set + this arbitrarily high), but limiting it avoids nasty + surprises in untested configurations. */ + if (msg.max_outstanding_packets > MAX_TX_PACKETS) { + pr_debug("Other end tried to set max outstanding to %d, limiting to %d.\n", + msg.max_outstanding_packets, MAX_TX_PACKETS); + ncrp->max_tx_packets_outstanding = MAX_TX_PACKETS; + } else { + ncrp->max_tx_packets_outstanding = msg.max_outstanding_packets; + } +} + +/* Release all packets on the transmitted and pending_tx lists. */ +void drop_pending_tx_packets(struct netchannel2_ring_pair *ncrp) +{ + struct sk_buff *skb; + unsigned x; + + nc2_queue_purge(ncrp, &ncrp->pending_tx_queue); + for (x = 0; x < NR_TX_PACKETS; x++) { + skb = txp_get_skb(&ncrp->tx_packets[x]); + if (skb) + release_tx_packet(ncrp, skb); + } +} + diff --git a/drivers/xen/events.c b/drivers/xen/events.c index 68c287c..d3c267f 100644 --- a/drivers/xen/events.c +++ b/drivers/xen/events.c @@ -752,7 +752,6 @@ static int bind_interdomain_evtchn_to_irq(unsigned int remote_domain, return err ? : bind_evtchn_to_irq(bind_interdomain.local_port); } - int bind_virq_to_irq(unsigned int virq, unsigned int cpu) { struct evtchn_bind_virq bind_virq; @@ -867,6 +866,22 @@ int bind_interdomain_evtchn_to_irqhandler(unsigned int remote_domain, } EXPORT_SYMBOL_GPL(bind_interdomain_evtchn_to_irqhandler); +int xen_alloc_evtchn(domid_t domid, int *port) +{ + struct evtchn_alloc_unbound alloc_unbound; + int err; + + alloc_unbound.dom = DOMID_SELF; + alloc_unbound.remote_dom = domid; + + err = HYPERVISOR_event_channel_op(EVTCHNOP_alloc_unbound, + &alloc_unbound); + if (err == 0) + *port = alloc_unbound.port; + return err; +} +EXPORT_SYMBOL_GPL(xen_alloc_evtchn); + int bind_virq_to_irqhandler(unsigned int virq, unsigned int cpu, irq_handler_t handler, unsigned long irqflags, const char *devname, void *dev_id) diff --git a/drivers/xen/xenbus/xenbus_client.c b/drivers/xen/xenbus/xenbus_client.c index 92a1ef8..188e77c 100644 --- a/drivers/xen/xenbus/xenbus_client.c +++ b/drivers/xen/xenbus/xenbus_client.c @@ -309,19 +309,10 @@ EXPORT_SYMBOL_GPL(xenbus_grant_ring); */ int xenbus_alloc_evtchn(struct xenbus_device *dev, int *port) { - struct evtchn_alloc_unbound alloc_unbound; int err; - - alloc_unbound.dom = DOMID_SELF; - alloc_unbound.remote_dom = dev->otherend_id; - - err = HYPERVISOR_event_channel_op(EVTCHNOP_alloc_unbound, - &alloc_unbound); - if (err) + err = xen_alloc_evtchn(dev->otherend_id, port); + if (err < 0) xenbus_dev_fatal(dev, err, "allocating event channel"); - else - *port = alloc_unbound.port; - return err; } EXPORT_SYMBOL_GPL(xenbus_alloc_evtchn); diff --git a/include/xen/events.h b/include/xen/events.h index 0814763..bb654f2 100644 --- a/include/xen/events.h +++ b/include/xen/events.h @@ -30,6 +30,7 @@ int bind_interdomain_evtchn_to_irqhandler(unsigned int remote_domain, unsigned long irqflags, const char *devname, void *dev_id); +int xen_alloc_evtchn(domid_t domid, int *port); /* * Common unbind function for all event sources. Takes IRQ to unbind from. diff --git a/include/xen/interface/io/netchannel2.h b/include/xen/interface/io/netchannel2.h new file mode 100644 index 0000000..c45963e --- /dev/null +++ b/include/xen/interface/io/netchannel2.h @@ -0,0 +1,106 @@ +#ifndef __NETCHANNEL2_H__ +#define __NETCHANNEL2_H__ + +#include <xen/interface/io/uring.h> + +/* Tell the other end how many packets its allowed to have + * simultaneously outstanding for transmission. An endpoint must not + * send PACKET messages which would take it over this limit. + * + * The SET_MAX_PACKETS message must be sent before any PACKET + * messages. It should only be sent once, unless the ring is + * disconnected and reconnected. + */ +#define NETCHANNEL2_MSG_SET_MAX_PACKETS 1 +struct netchannel2_msg_set_max_packets { + struct netchannel2_msg_hdr hdr; + uint32_t max_outstanding_packets; +}; + +/* Pass a packet to the other end. The packet consists of a header, + * followed by a bunch of fragment descriptors, followed by an inline + * packet prefix. Every fragment descriptor in a packet must be the + * same type, and the type is determined by the header. The receiving + * endpoint should respond with a finished_packet message as soon as + * possible. The prefix may be no more than + * NETCHANNEL2_MAX_INLINE_BYTES. Packets may contain no more than + * NETCHANNEL2_MAX_PACKET_BYTES bytes of data, including all fragments + * and the prefix. + */ +#define NETCHANNEL2_MSG_PACKET 2 +#define NETCHANNEL2_MAX_PACKET_BYTES 65536 +#define NETCHANNEL2_MAX_INLINE_BYTES 256 +struct netchannel2_fragment { + uint16_t size; + /* The offset is always relative to the start of the page. + For pre_posted packet types, it is not relative to the + start of the buffer (although the fragment range will + obviously be within the buffer range). */ + uint16_t off; + union { + struct { + grant_ref_t gref; + } receiver_copy; + }; +}; +struct netchannel2_msg_packet { + struct netchannel2_msg_hdr hdr; + uint32_t id; /* Opaque ID which is echoed into the finished + packet message. */ + uint8_t type; + uint8_t flags; + uint8_t pad0; + uint8_t pad1; + uint16_t prefix_size; + uint16_t pad2; + uint16_t pad3; + uint16_t pad4; + /* Variable-size array. The number of elements is determined + by the size of the message. */ + /* Until we support scatter-gather, this will be either 0 or 1 + element. */ + struct netchannel2_fragment frags[0]; +}; + +/* If set, the transmitting domain requires an event urgently when + * this packet's finish message is sent. Otherwise, the event can be + * delayed. */ +#define NC2_PACKET_FLAG_need_event 8 + +/* The mechanism which should be used to receive the data part of + * a packet: + * + * receiver_copy -- The transmitting domain has granted the receiving + * domain access to the original RX buffers using + * copy-only grant references. The receiving domain + * should copy the data out of the buffers and issue + * a FINISH message. + * + * Due to backend bugs, it is in not safe to use this + * packet type except on bypass rings. + * + * small -- The packet does not have any fragment descriptors + * (i.e. the entire thing is inline in the ring). The receiving + * domain should simply the copy the packet out of the ring + * into a locally allocated buffer. No FINISH message is required + * or allowed. + * + * This packet type may be used on any ring. + * + * All endpoints must be able to receive all packet types, but note + * that it is correct to treat receiver_map and small packets as + * receiver_copy ones. */ +#define NC2_PACKET_TYPE_receiver_copy 1 +#define NC2_PACKET_TYPE_small 4 + +/* Tell the other end that we're finished with a message it sent us, + and it can release the transmit buffers etc. This must be sent in + response to receiver_copy and receiver_map packets. It must not be + sent in response to pre_posted or small packets. */ +#define NETCHANNEL2_MSG_FINISH_PACKET 3 +struct netchannel2_msg_finish_packet { + struct netchannel2_msg_hdr hdr; + uint32_t id; +}; + +#endif /* !__NETCHANNEL2_H__ */ diff --git a/include/xen/interface/io/uring.h b/include/xen/interface/io/uring.h new file mode 100644 index 0000000..663c3d7 --- /dev/null +++ b/include/xen/interface/io/uring.h @@ -0,0 +1,426 @@ +#ifndef __XEN_PUBLIC_IO_URING_H__ +#define __XEN_PUBLIC_IO_URING_H__ + +#include <linux/kernel.h> +#include <linux/module.h> +#include <asm/system.h> + +typedef unsigned RING_IDX; + +#define NETCHANNEL2_MSG_PAD 255 + +/* The sring structures themselves. The _cons and _prod variants are + different views of the same bit of shared memory, and are supposed + to provide better checking of the expected use patterns. Fields in + the shared ring are owned by either the producer end or the + consumer end. If a field is owned by your end, the other end will + never modify it. If it's owned by the other end, the other end is + allowed to modify it whenever it likes, and you can never do so. + + Fields owned by the other end are always const (because you can't + change them). They're also volatile, because there are a bunch + of places where we go: + + local_x = sring->x; + validate(local_x); + use(local_x); + + and it would be very bad if the compiler turned that into: + + local_x = sring->x; + validate(sring->x); + use(local_x); + + because that contains a potential TOCTOU race (hard to exploit, but + still present). The compiler is only allowed to do that + optimisation because it knows that local_x == sring->x at the start + of the call to validate(), and it only knows that if it can reorder + the read of sring->x over the sequence point at the end of the + first statement. In other words, it can only do the bad + optimisation if it knows that reads of sring->x are side-effect + free. volatile stops it from making that assumption. + + We don't need a full memory barrier here, because it's sufficient + to copy the volatile data into stable guest-local storage, and + volatile achieves that. i.e. we don't need local_x to be precisely + sring->x, but we do need it to be a stable snapshot of some + previous valud of sring->x. + + Note that there are still plenty of other places where we *do* need + full barriers. volatile just deals with this one, specific, case. + + We could also deal with it by putting compiler barriers in all over + the place. The downside of that approach is that you need to put + the barrier()s in lots of different places (basically, everywhere + which needs to access these fields), and it's easy to forget one. + barrier()s also have somewhat heavier semantics than volatile + (because they prevent all reordering, rather than just reordering + on this one field), although that's pretty much irrelevant because + gcc usually treats pretty much any volatile access as a call to + barrier(). +*/ + +/* Messages are sent over sring pairs. Each sring in a pair provides + * a unidirectional byte stream which can generate events when either + * the producer or consumer pointers cross a particular threshold. + * + * We define both sring_prod and sring_cons structures. The two + * structures will always map onto the same physical bytes in memory, + * but they provide different views of that memory which are + * appropriate to either producers or consumers. + * + * Obviously, the endpoints need to agree on which end produces + * messages on which ring. The endpoint which provided the memory + * backing the ring always produces on the first sring, and the one + * which just mapped the ring produces on the second. By convention, + * these are known as the frontend and backend, respectively. + */ + +/* For both rings, the producer (consumer) pointers point at the + * *next* byte which is going to be produced (consumed). An endpoint + * must generate an event on the event channel port if it moves the + * producer pointer (consumer pointer) across prod_event (cons_event). + * + * i.e if an endpoint ever updates a pointer so that the old pointer + * is strictly less than the event, and the new pointer is greater + * than or equal to the event then the remote must be notified. If + * the pointer overflows the ring, treat the new value as if it were + * (actual new value) + (1 << 32). + */ +struct netchannel2_sring_prod { + RING_IDX prod; + volatile const RING_IDX cons; + volatile const RING_IDX prod_event; + RING_IDX cons_event; + unsigned char pad[48]; +}; + +struct netchannel2_sring_cons { + volatile const RING_IDX prod; + RING_IDX cons; + RING_IDX prod_event; + volatile const RING_IDX cons_event; + unsigned char pad[48]; +}; + +struct netchannel2_frontend_shared { + struct netchannel2_sring_prod prod; + struct netchannel2_sring_cons cons; +}; + +struct netchannel2_backend_shared { + struct netchannel2_sring_cons cons; + struct netchannel2_sring_prod prod; +}; + +struct netchannel2_prod_ring { + struct netchannel2_sring_prod *sring; + void *payload; + RING_IDX prod_pvt; + /* This is the number of bytes available after prod_pvt last + time we checked, minus the number of bytes which we've + consumed since then. It's used to a avoid a bunch of + memory barriers when checking for ring space. */ + unsigned bytes_available; + /* Number of bytes reserved by nc2_reserve_payload_bytes() */ + unsigned reserve; + size_t payload_bytes; +}; + +struct netchannel2_cons_ring { + struct netchannel2_sring_cons *sring; + const volatile void *payload; + RING_IDX cons_pvt; + size_t payload_bytes; +}; + +/* A message header. There is one of these at the start of every + * message. @type is one of the #define's below, and @size is the + * size of the message, including the header and any padding. + * size should be a multiple of 8 so we avoid unaligned memory copies. + * structs defining message formats should have sizes multiple of 8 + * bytes and should use paddding fields if needed. + */ +struct netchannel2_msg_hdr { + uint8_t type; + uint8_t flags; + uint16_t size; +}; + +/* Copy some bytes from the shared ring to a stable local buffer, + * starting at the private consumer pointer. Does not update the + * private consumer pointer. + */ +static inline void nc2_copy_from_ring_off(struct netchannel2_cons_ring *ring, + void *buf, + size_t nbytes, + unsigned off) +{ + unsigned start, end; + + start = (ring->cons_pvt + off) & (ring->payload_bytes-1); + end = (ring->cons_pvt + nbytes + off) & (ring->payload_bytes-1); + /* We cast away the volatile modifier to get rid of an + irritating compiler warning, and compensate with a + barrier() at the end. */ + memcpy(buf, (const void *)ring->payload + start, nbytes); + barrier(); +} + +static inline void nc2_copy_from_ring(struct netchannel2_cons_ring *ring, + void *buf, + size_t nbytes) +{ + nc2_copy_from_ring_off(ring, buf, nbytes, 0); +} + + +/* Copy some bytes to the shared ring, starting at the private + * producer pointer. Does not update the private pointer. + */ +static inline void nc2_copy_to_ring_off(struct netchannel2_prod_ring *ring, + const void *src, + unsigned nr_bytes, + unsigned off) +{ + unsigned start, end; + + start = (ring->prod_pvt + off) & (ring->payload_bytes-1); + end = (ring->prod_pvt + nr_bytes + off) & (ring->payload_bytes-1); + memcpy(ring->payload + start, src, nr_bytes); +} + +static inline void nc2_copy_to_ring(struct netchannel2_prod_ring *ring, + const void *src, + unsigned nr_bytes) +{ + nc2_copy_to_ring_off(ring, src, nr_bytes, 0); +} + +static inline void __nc2_send_pad(struct netchannel2_prod_ring *ring, + unsigned nr_bytes) +{ + struct netchannel2_msg_hdr msg; + msg.type = NETCHANNEL2_MSG_PAD; + msg.flags = 0; + msg.size = nr_bytes; + nc2_copy_to_ring(ring, &msg, sizeof(msg)); + ring->prod_pvt += nr_bytes; + ring->bytes_available -= nr_bytes; +} + +static inline int __nc2_ring_would_wrap(struct netchannel2_prod_ring *ring, + unsigned nr_bytes) +{ + RING_IDX mask; + mask = ~(ring->payload_bytes - 1); + return (ring->prod_pvt & mask) != ((ring->prod_pvt + nr_bytes) & mask); +} + +static inline unsigned __nc2_pad_needed(struct netchannel2_prod_ring *ring) +{ + return ring->payload_bytes - + (ring->prod_pvt & (ring->payload_bytes - 1)); +} + +static inline void __nc2_avoid_ring_wrap(struct netchannel2_prod_ring *ring, + unsigned nr_bytes) +{ + if (!__nc2_ring_would_wrap(ring, nr_bytes)) + return; + __nc2_send_pad(ring, __nc2_pad_needed(ring)); + +} + +/* Prepare a message for the other end and place it on the shared + * ring, updating the private producer pointer. You need to call + * nc2_flush_messages() before the message is actually made visible to + * the other end. It is permissible to send several messages in a + * batch and only flush them once. + */ +static inline void nc2_send_message(struct netchannel2_prod_ring *ring, + unsigned type, + unsigned flags, + const void *msg, + size_t size) +{ + struct netchannel2_msg_hdr *hdr = (struct netchannel2_msg_hdr *)msg; + + __nc2_avoid_ring_wrap(ring, size); + + hdr->type = type; + hdr->flags = flags; + hdr->size = size; + + nc2_copy_to_ring(ring, msg, size); + ring->prod_pvt += size; + BUG_ON(ring->bytes_available < size); + ring->bytes_available -= size; +} + +static inline volatile void *__nc2_get_message_ptr(struct netchannel2_prod_ring *ncrp) +{ + return (volatile void *)ncrp->payload + + (ncrp->prod_pvt & (ncrp->payload_bytes-1)); +} + +/* Copy the private producer pointer to the shared producer pointer, + * with a suitable memory barrier such that all messages placed on the + * ring are stable before we do the copy. This effectively pushes any + * messages which we've just sent out to the other end. Returns 1 if + * we need to notify the other end and 0 otherwise. + */ +static inline int nc2_flush_ring(struct netchannel2_prod_ring *ring) +{ + RING_IDX old_prod, new_prod; + + old_prod = ring->sring->prod; + new_prod = ring->prod_pvt; + + wmb(); + + ring->sring->prod = new_prod; + + /* We need the update to prod to happen before we read + * event. */ + mb(); + + /* We notify if the producer pointer moves across the event + * pointer. */ + if ((RING_IDX)(new_prod - ring->sring->prod_event) < + (RING_IDX)(new_prod - old_prod)) + return 1; + else + return 0; +} + +/* Copy the private consumer pointer to the shared consumer pointer, + * with a memory barrier so that any previous reads from the ring + * complete before the pointer is updated. This tells the other end + * that we're finished with the messages, and that it can re-use the + * ring space for more messages. Returns 1 if we need to notify the + * other end and 0 otherwise. + */ +static inline int nc2_finish_messages(struct netchannel2_cons_ring *ring) +{ + RING_IDX old_cons, new_cons; + + old_cons = ring->sring->cons; + new_cons = ring->cons_pvt; + + /* Need to finish reading from the ring before updating + cons */ + mb(); + ring->sring->cons = ring->cons_pvt; + + /* Need to publish our new consumer pointer before checking + event. */ + mb(); + if ((RING_IDX)(new_cons - ring->sring->cons_event) < + (RING_IDX)(new_cons - old_cons)) + return 1; + else + return 0; +} + +/* Check whether there are any unconsumed messages left on the shared + * ring. Returns 1 if there are, and 0 if there aren't. If there are + * no more messages, set the producer event so that we'll get a + * notification as soon as another one gets sent. It is assumed that + * all messages up to @prod have been processed, and none of the ones + * after it have been. */ +static inline int nc2_final_check_for_messages(struct netchannel2_cons_ring *ring, + RING_IDX prod) +{ + if (prod != ring->sring->prod) + return 1; + /* Request an event when more stuff gets poked on the ring. */ + ring->sring->prod_event = prod + 1; + + /* Publish event before final check for responses. */ + mb(); + if (prod != ring->sring->prod) + return 1; + else + return 0; +} + +/* Can we send a message with @nr_bytes payload bytes? Returns 1 if + * we can or 0 if we can't. If there isn't space right now, set the + * consumer event so that we'll get notified when space is + * available. */ +static inline int nc2_can_send_payload_bytes(struct netchannel2_prod_ring *ring, + unsigned nr_bytes) +{ + unsigned space; + RING_IDX cons; + BUG_ON(ring->bytes_available > ring->payload_bytes); + /* Times 2 because we might need to send a pad message */ + if (likely(ring->bytes_available > nr_bytes * 2 + ring->reserve)) + return 1; + if (__nc2_ring_would_wrap(ring, nr_bytes)) + nr_bytes += __nc2_pad_needed(ring); +retry: + cons = ring->sring->cons; + space = ring->payload_bytes - (ring->prod_pvt - cons); + if (likely(space >= nr_bytes + ring->reserve)) { + /* We have enough space to send the message. */ + + /* Need to make sure that the read of cons happens + before any following memory writes. */ + mb(); + + ring->bytes_available = space; + + return 1; + } else { + /* Not enough space available. Set an event pointer + when cons changes. We need to be sure that the + @cons used here is the same as the cons used to + calculate @space above, and the volatile modifier + on sring->cons achieves that. */ + ring->sring->cons_event = cons + 1; + + /* Check whether more space became available while we + were messing about. */ + + /* Need the event pointer to be stable before we do + the check. */ + mb(); + if (unlikely(cons != ring->sring->cons)) { + /* Cons pointer changed. Try again. */ + goto retry; + } + + /* There definitely isn't space on the ring now, and + an event has been set such that we'll be notified + if more space becomes available. */ + /* XXX we get a notification as soon as any more space + becomes available. We could maybe optimise by + setting the event such that we only get notified + when we know that enough space is available. The + main complication is handling the case where you + try to send a message of size A, fail due to lack + of space, and then try to send one of size B, where + B < A. It's not clear whether you want to set the + event for A bytes or B bytes. The obvious answer + is B, but that means moving the event pointer + backwards, and it's not clear that that's always + safe. Always setting for a single byte is safe, so + stick with that for now. */ + return 0; + } +} + +static inline int nc2_reserve_payload_bytes(struct netchannel2_prod_ring *ring, + unsigned nr_bytes) +{ + if (nc2_can_send_payload_bytes(ring, nr_bytes)) { + ring->reserve += nr_bytes; + return 1; + } else { + return 0; + } +} + +#endif /* __XEN_PUBLIC_IO_URING_H__ */ -- 1.6.3.1 _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxxxxxxxx http://lists.xensource.com/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |