[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [PATCH 15/22] Add support for receiver-map mode.



In this mode of operation, the receiving domain maps the sending
domain's buffers, rather than grant-copying them into local memory.
This is marginally faster, but requires the receiving domain to be
somewhat trusted, because:

a) It can see anything else which happens to be on the same page
   as the transmit buffer, and
b) It can just hold onto the pages indefinitely, causing a memory leak
   in the transmitting domain.

It's therefore only really suitable for talking to a trusted peer, and
we use it in that way.

Signed-off-by: Steven Smith <steven.smith@xxxxxxxxxx>
---
 drivers/xen/netchannel2/Makefile           |    3 +-
 drivers/xen/netchannel2/chan.c             |   14 +
 drivers/xen/netchannel2/netchannel2_core.h |   17 +-
 drivers/xen/netchannel2/receiver_map.c     |  786 ++++++++++++++++++++++++++++
 drivers/xen/netchannel2/recv_packet.c      |   23 +
 drivers/xen/netchannel2/rscb.c             |   46 ++-
 drivers/xen/netchannel2/util.c             |   14 +
 drivers/xen/netchannel2/xmit_packet.c      |   12 +-
 include/xen/interface/io/netchannel2.h     |   20 +
 9 files changed, 919 insertions(+), 16 deletions(-)
 create mode 100644 drivers/xen/netchannel2/receiver_map.c

diff --git a/drivers/xen/netchannel2/Makefile b/drivers/xen/netchannel2/Makefile
index 565ba89..d6fb796 100644
--- a/drivers/xen/netchannel2/Makefile
+++ b/drivers/xen/netchannel2/Makefile
@@ -1,7 +1,8 @@
 obj-$(CONFIG_XEN_NETCHANNEL2) += netchannel2.o
 
 netchannel2-objs := chan.o netchan2.o rscb.o util.o \
-       xmit_packet.o offload.o recv_packet.o poll.o
+       xmit_packet.o offload.o recv_packet.o poll.o \
+       receiver_map.o
 
 ifeq ($(CONFIG_XEN_NETDEV2_BACKEND),y)
 netchannel2-objs += netback2.o
diff --git a/drivers/xen/netchannel2/chan.c b/drivers/xen/netchannel2/chan.c
index 9bb7ce7..47e1c5e 100644
--- a/drivers/xen/netchannel2/chan.c
+++ b/drivers/xen/netchannel2/chan.c
@@ -395,6 +395,13 @@ struct netchannel2 *nc2_new(struct xenbus_device *xd)
                return NULL;
        }
 
+       if (local_trusted) {
+               if (init_receive_map_mode() < 0) {
+                       nc2_release(nc);
+                       return NULL;
+               }
+       }
+
        netdev->open = nc2_open;
        netdev->stop = nc2_stop;
        netdev->hard_start_xmit = nc2_start_xmit;
@@ -499,6 +506,8 @@ int nc2_attach_rings(struct netchannel2 *nc,
 
        spin_unlock_bh(&nc->rings.lock);
 
+       resume_receive_map_mode();
+
        netif_carrier_on(nc->net_device);
 
        /* Kick it to get it going. */
@@ -630,6 +639,11 @@ int nc2_get_evtchn_port(struct netchannel2 *nc)
        return nc->rings.evtchn;
 }
 
+void nc2_suspend(struct netchannel2 *nc)
+{
+       suspend_receive_map_mode();
+}
+
 /* @ncrp has been recently nc2_kick()ed.  Do all of the necessary
    stuff. */
 static int process_ring(struct napi_struct *napi,
diff --git a/drivers/xen/netchannel2/netchannel2_core.h 
b/drivers/xen/netchannel2/netchannel2_core.h
index 7be97ea..c4de063 100644
--- a/drivers/xen/netchannel2/netchannel2_core.h
+++ b/drivers/xen/netchannel2/netchannel2_core.h
@@ -37,6 +37,7 @@ enum transmit_policy {
        transmit_policy_unknown = 0,
        transmit_policy_first = 0xf001,
        transmit_policy_grant = transmit_policy_first,
+       transmit_policy_map,
        transmit_policy_small,
        transmit_policy_last = transmit_policy_small
 };
@@ -320,6 +321,11 @@ struct sk_buff *handle_receiver_copy_packet(struct 
netchannel2 *nc,
                                            struct netchannel2_msg_hdr *hdr,
                                            unsigned nr_frags,
                                            unsigned frags_off);
+struct sk_buff *handle_receiver_map_packet(struct netchannel2 *nc,
+                                          struct netchannel2_msg_packet *msg,
+                                          struct netchannel2_msg_hdr *hdr,
+                                          unsigned nr_frags,
+                                          unsigned frags_off);
 
 enum prepare_xmit_result {
        PREP_XMIT_OKAY = 0,
@@ -332,9 +338,11 @@ enum prepare_xmit_result prepare_xmit_allocate_small(
        struct sk_buff *skb);
 enum prepare_xmit_result prepare_xmit_allocate_grant(
        struct netchannel2_ring_pair *ncrp,
-       struct sk_buff *skb);
+       struct sk_buff *skb,
+       int use_subpage_grants);
 void xmit_grant(struct netchannel2_ring_pair *ncrp,
                struct sk_buff *skb,
+               int use_subpage_grants,
                volatile void *msg);
 
 void queue_finish_packet_message(struct netchannel2_ring_pair *ncrp,
@@ -353,6 +361,8 @@ void fetch_fragment(struct netchannel2_ring_pair *ncrp,
                    struct netchannel2_fragment *frag,
                    unsigned off);
 
+void pull_through(struct sk_buff *skb, unsigned count);
+
 void nc2_kick(struct netchannel2_ring_pair *ncrp);
 
 int nc2_map_grants(struct grant_mapping *gm,
@@ -366,6 +376,11 @@ void queue_packet_to_interface(struct sk_buff *skb,
 
 void nc2_rscb_on_gntcopy_fail(void *ctxt, gnttab_copy_t *gop);
 
+int init_receive_map_mode(void);
+void deinit_receive_map_mode(void);
+void suspend_receive_map_mode(void);
+void resume_receive_map_mode(void);
+
 int nc2_start_xmit(struct sk_buff *skb, struct net_device *dev);
 int nc2_really_start_xmit(struct netchannel2_ring_pair *ncrp,
                          struct sk_buff *skb);
diff --git a/drivers/xen/netchannel2/receiver_map.c 
b/drivers/xen/netchannel2/receiver_map.c
new file mode 100644
index 0000000..e5c4ed1
--- /dev/null
+++ b/drivers/xen/netchannel2/receiver_map.c
@@ -0,0 +1,786 @@
+/* Support for mapping packets into the local domain, rather than
+   copying them or using pre-posted buffers.  We only implement
+   receive-side support here; for transmit-side, we use the rscb.c
+   implementation. */
+#include <linux/kernel.h>
+#include <linux/delay.h>
+#include <linux/skbuff.h>
+#include <linux/netdevice.h>
+#include <xen/live_maps.h>
+#include <xen/gnttab.h>
+#include <xen/balloon.h>
+#include <xen/evtchn.h>
+#include "netchannel2_core.h"
+
+#define MAX_MAPPED_FRAGS 1024
+#define MAX_MAPPED_PACKETS MAX_PENDING_FINISH_PACKETS
+#define SKB_MIN_PAYLOAD_SIZE 128
+
+static DEFINE_SPINLOCK(global_map_lock);
+static struct receive_mapper *receive_mapper;
+
+/* How long do we leave the packets in the Linux stack before trying
+   to copy them, in jiffies? */
+#define PACKET_TIMEOUT (HZ/2)
+
+/* A slot into which we could map a fragment. */
+struct rx_map_fragment {
+       struct list_head list;
+       struct rx_map_packet *packet;
+       grant_handle_t handle; /* 0 if the fragment isn't currently
+                               * mapped */
+       struct netchannel2_fragment nc_frag;
+};
+
+struct rx_map_packet {
+       struct list_head list;
+       struct list_head frags;
+       /* We take a reference for every mapped fragment associated
+          with the packet.  When the refcnt goes to zero, the packet
+          is finished, and can be moved to the
+          finished_packets_list. */
+       atomic_t refcnt;
+       unsigned id;
+       unsigned long expires; /* We expect Linux to have finished
+                                 with the packet by this time (in
+                                 jiffies), or we try to copy it. */
+       struct netchannel2 *nc;
+       uint8_t flags;
+};
+
+struct receive_mapper {
+       struct page_foreign_tracker *tracker;
+
+       struct page **pages;
+
+       /* Nests inside the netchannel2 lock.  The
+          finished_packets_lock nests inside this. */
+       spinlock_t rm_lock;
+
+       /* Packet fragments which we've mapped, or slots into which we
+          could map packets.  The free list and count are protected
+          by @rm_lock. */
+       struct rx_map_fragment frags[MAX_MAPPED_FRAGS];
+       struct list_head free_frags;
+
+       struct rx_map_packet packets[MAX_MAPPED_PACKETS];
+       struct list_head free_packets;
+       struct list_head active_packets;
+       unsigned nr_free_packets;
+
+       /* Packets which Linux has finished with but which we haven't
+          returned to the other endpoint yet. */
+       spinlock_t finished_packets_lock; /* BH-safe leaf lock,
+                                          * acquired from the page
+                                          * free callback.  Nests
+                                          * inside the rm_lock. */
+       struct list_head finished_packets;
+
+       struct tasklet_struct gc_tasklet;
+
+       struct timer_list expire_timer;
+
+       /* Set if we're trying to run the mapper down prior to
+          suspending the domain. */
+       uint8_t suspending;
+};
+
+static void suspend_receive_mapper(struct receive_mapper *rm);
+
+static unsigned fragment_idx(const struct rx_map_fragment *frag)
+{
+       return frag - receive_mapper->frags;
+}
+
+static int alloc_rx_frags_for_packet(unsigned nr_frags,
+                                    struct rx_map_packet *packet)
+{
+       struct rx_map_fragment *rmf;
+       unsigned x;
+
+       INIT_LIST_HEAD(&packet->frags);
+       for (x = 0; x < nr_frags; x++) {
+               if (list_empty(&receive_mapper->free_frags))
+                       goto err;
+               rmf = list_entry(receive_mapper->free_frags.next,
+                                struct rx_map_fragment,
+                                list);
+               rmf->packet = packet;
+               rmf->handle = -1;
+               list_move(&rmf->list, &packet->frags);
+       }
+       return 0;
+
+err:
+       list_splice_init(&packet->frags, &receive_mapper->free_frags);
+       return -EBUSY;
+}
+
+static struct rx_map_packet *alloc_rx_packet(struct netchannel2 *nc,
+                                            unsigned nr_frags)
+{
+       struct rx_map_packet *rmp;
+
+       spin_lock(&receive_mapper->rm_lock);
+       if (list_empty(&receive_mapper->free_packets) ||
+           receive_mapper->suspending) {
+               spin_unlock(&receive_mapper->rm_lock);
+               return NULL;
+       }
+       rmp = list_entry(receive_mapper->free_packets.next,
+                        struct rx_map_packet, list);
+
+       if (alloc_rx_frags_for_packet(nr_frags, rmp) < 0) {
+               spin_unlock(&receive_mapper->rm_lock);
+               return NULL;
+       }
+       list_del(&rmp->list);
+       atomic_set(&rmp->refcnt, nr_frags);
+       rmp->nc = nc;
+       receive_mapper->nr_free_packets--;
+
+       spin_unlock(&receive_mapper->rm_lock);
+
+       return rmp;
+}
+
+struct grant_unmapper {
+       unsigned nr_gops;
+       gnttab_unmap_grant_ref_t gop_queue[32];
+};
+
+static void do_unmaps(struct grant_unmapper *unmapper)
+{
+       int ret;
+       unsigned x;
+
+       if (unmapper->nr_gops != 0) {
+               ret = HYPERVISOR_grant_table_op(GNTTABOP_unmap_grant_ref,
+                                               unmapper->gop_queue,
+                                               unmapper->nr_gops);
+               BUG_ON(ret);
+               for (x = 0; x < unmapper->nr_gops; x++) {
+                       set_phys_to_machine(
+                               __pa(unmapper->gop_queue[x].host_addr) >>
+                                       PAGE_SHIFT,
+                               INVALID_P2M_ENTRY);
+               }
+       }
+       unmapper->nr_gops = 0;
+}
+
+static void grant_unmap(struct grant_unmapper *unmapper,
+                       void *va,
+                       int handle)
+{
+       gnttab_unmap_grant_ref_t *gop;
+       if (unmapper->nr_gops == ARRAY_SIZE(unmapper->gop_queue))
+               do_unmaps(unmapper);
+       gop = &unmapper->gop_queue[unmapper->nr_gops];
+       gnttab_set_unmap_op(gop, (unsigned long)va, GNTMAP_host_map, handle);
+       unmapper->nr_gops++;
+}
+
+/* A tasklet which is invoked shortly after a packet is released so
+   that we can send the FINISH_PACKET message. */
+static void gc_tasklet(unsigned long _rm)
+{
+       struct list_head packets;
+       struct rx_map_packet *packet;
+       struct rx_map_fragment *rx_frag;
+       struct list_head released_fragments;
+       unsigned nr_released_packets;
+       unsigned idx;
+       struct grant_unmapper unmapper;
+       struct page *page;
+       struct netchannel2 *locked_nc;
+
+       INIT_LIST_HEAD(&packets);
+
+       spin_lock(&receive_mapper->finished_packets_lock);
+       list_splice_init(&receive_mapper->finished_packets, &packets);
+       spin_unlock(&receive_mapper->finished_packets_lock);
+
+       /* Unmap the fragments. */
+       unmapper.nr_gops = 0;
+       BUG_ON(packets.next == NULL);
+       list_for_each_entry(packet, &packets, list) {
+               BUG_ON(packet->list.next == NULL);
+               BUG_ON(atomic_read(&packet->refcnt) != 0);
+               BUG_ON(packet->frags.next == NULL);
+               list_for_each_entry(rx_frag, &packet->frags, list) {
+                       BUG_ON(rx_frag->list.next == NULL);
+                       if (rx_frag->handle == -1)
+                               continue;
+                       idx = fragment_idx(rx_frag);
+                       page = receive_mapper->pages[idx];
+                       stop_tracking_page(page);
+                       grant_unmap(&unmapper, page_address(page),
+                                   rx_frag->handle);
+               }
+       }
+       do_unmaps(&unmapper);
+
+       /* Tell the other end that the packets are finished, and
+          accumulate the fragments into a local free list. */
+       INIT_LIST_HEAD(&released_fragments);
+       nr_released_packets = 0;
+
+       locked_nc = NULL;
+       list_for_each_entry(packet, &packets, list) {
+               if (locked_nc != packet->nc) {
+                       if (locked_nc) {
+                               spin_unlock(&locked_nc->rings.lock);
+                               nc2_kick(&locked_nc->rings);
+                       }
+                       spin_lock(&packet->nc->rings.lock);
+                       locked_nc = packet->nc;
+               }
+               BUG_ON(packet->frags.next == NULL);
+               list_for_each_entry(rx_frag, &packet->frags, list) {
+                       BUG_ON(rx_frag->list.next == NULL);
+                       idx = fragment_idx(rx_frag);
+                       gnttab_reset_grant_page(receive_mapper->pages[idx]);
+               }
+               nr_released_packets++;
+               list_splice_init(&packet->frags, &released_fragments);
+               queue_finish_packet_message(&locked_nc->rings, packet->id,
+                                           packet->flags);
+       }
+
+       if (locked_nc) {
+               spin_unlock(&locked_nc->rings.lock);
+               nc2_kick(&locked_nc->rings);
+               locked_nc = NULL;
+
+               spin_lock(&receive_mapper->rm_lock);
+               list_splice(&packets, &receive_mapper->free_packets);
+               list_splice(&released_fragments, &receive_mapper->free_frags);
+               receive_mapper->nr_free_packets += nr_released_packets;
+
+               /* Reprogram the expire timer. */
+               if (!list_empty(&receive_mapper->active_packets)) {
+                       mod_timer(&receive_mapper->expire_timer,
+                                 
list_entry(receive_mapper->active_packets.next,
+                                            struct rx_map_packet,
+                                            list)->expires);
+               }
+               spin_unlock(&receive_mapper->rm_lock);
+       }
+}
+
+/* Decrement the refcnt on @rmp and, if necessary, move it to the
+   finished packets list and schedule the GC tasklet. */
+static void put_rx_map_packet(struct rx_map_packet *rmp)
+{
+       if (atomic_dec_and_test(&rmp->refcnt)) {
+               /* Remove it from the active list. */
+               spin_lock_bh(&receive_mapper->rm_lock);
+               list_del(&rmp->list);
+               spin_unlock_bh(&receive_mapper->rm_lock);
+
+               /* Add it to the finished list. */
+               spin_lock_bh(&receive_mapper->finished_packets_lock);
+               list_add_tail(&rmp->list, &receive_mapper->finished_packets);
+               spin_unlock_bh(&receive_mapper->finished_packets_lock);
+
+               tasklet_schedule(&receive_mapper->gc_tasklet);
+       }
+}
+
+
+/* The page @page, which was previously part of a receiver-mapped SKB,
+ * has been released.  If it was the last page involved in its SKB,
+ * the packet is finished and we can tell the other end that it's
+ * finished.
+ */
+static void netchan2_page_release(struct page *page, unsigned order)
+{
+       struct rx_map_fragment *frag;
+       struct rx_map_packet *rmp;
+
+       BUG_ON(order != 0);
+
+       frag = (struct rx_map_fragment *)page->mapping;
+       rmp = frag->packet;
+
+       put_rx_map_packet(rmp);
+}
+
+/* Unmap the packet, removing all other references to it.  The caller
+ * should take an additional reference to the packet before calling
+ * this, to stop it disappearing underneath us.         The only way of
+ * checking whether this succeeded is to look at the packet's
+ * reference count after it returns.
+ */
+static void unmap_this_packet(struct rx_map_packet *rmp)
+{
+       struct rx_map_fragment *rx_frag;
+       unsigned idx;
+       int r;
+       int cnt;
+
+       /* Unmap every fragment in the packet. We don't fail the whole
+          function just because gnttab_copy_grant_page() failed,
+          because success or failure will be inferable from the
+          reference count on the packet (this makes it easier to
+          handle the case where some pages have already been copied,
+          for instance). */
+       cnt = 0;
+       list_for_each_entry(rx_frag, &rmp->frags, list) {
+               idx = fragment_idx(rx_frag);
+               if (rx_frag->handle != -1) {
+                       r = gnttab_copy_grant_page(rx_frag->handle,
+                                                  &receive_mapper->pages[idx]);
+                       if (r == 0) {
+                               /* We copied the page, so it's not really
+                                  mapped any more. */
+                               rx_frag->handle = -1;
+                               atomic_dec(&rmp->refcnt);
+                       }
+               }
+               cnt++;
+       }
+
+       /* Caller should hold a reference. */
+       BUG_ON(atomic_read(&rmp->refcnt) == 0);
+}
+
+static void unmap_all_packets(void)
+{
+       struct rx_map_packet *rmp;
+       struct rx_map_packet *next;
+       struct list_head finished_packets;
+       int need_tasklet;
+
+       INIT_LIST_HEAD(&finished_packets);
+
+       spin_lock_bh(&receive_mapper->rm_lock);
+
+       list_for_each_entry_safe(rmp, next, &receive_mapper->active_packets,
+                                list) {
+               atomic_inc(&rmp->refcnt);
+               unmap_this_packet(rmp);
+               if (atomic_dec_and_test(&rmp->refcnt))
+                       list_move(&rmp->list, finished_packets.prev);
+       }
+       spin_unlock_bh(&receive_mapper->rm_lock);
+
+       need_tasklet = !list_empty(&finished_packets);
+
+       spin_lock_bh(&receive_mapper->finished_packets_lock);
+       list_splice(&finished_packets, receive_mapper->finished_packets.prev);
+       spin_unlock_bh(&receive_mapper->finished_packets_lock);
+
+       if (need_tasklet)
+               tasklet_schedule(&receive_mapper->gc_tasklet);
+}
+
+static void free_receive_mapper(struct receive_mapper *rm)
+{
+       unsigned x;
+
+       /* Get rid of any packets which are currently mapped. */
+       suspend_receive_mapper(rm);
+
+       /* Stop the expiry timer.  We know it won't get requeued
+        * because there are no packets outstanding and rm->suspending
+        * is set (because of suspend_receive_mapper()). */
+       del_timer_sync(&rm->expire_timer);
+
+       /* Wait for any last instances of the tasklet to finish. */
+       tasklet_kill(&rm->gc_tasklet);
+
+       if (rm->pages != NULL) {
+               for (x = 0; x < MAX_MAPPED_FRAGS; x++) {
+                       if (PageForeign(rm->pages[x]))
+                               ClearPageForeign(rm->pages[x]);
+                       rm->pages[x]->mapping = NULL;
+               }
+               free_empty_pages_and_pagevec(rm->pages, MAX_MAPPED_FRAGS);
+       }
+       if (rm->tracker != NULL)
+               free_page_foreign_tracker(rm->tracker);
+       kfree(rm);
+}
+
+/* Timer invoked shortly after a packet expires, so that we can copy
+   the data and get it back from Linux.         This is necessary if a packet
+   gets stuck in a socket RX queue somewhere, or you risk a
+   deadlock. */
+static void expire_timer(unsigned long data)
+{
+       struct rx_map_packet *rmp, *next;
+       struct list_head finished_packets;
+       int need_tasklet;
+
+       INIT_LIST_HEAD(&finished_packets);
+
+       spin_lock(&receive_mapper->rm_lock);
+       list_for_each_entry_safe(rmp, next, &receive_mapper->active_packets,
+                                list) {
+               if (time_after(rmp->expires, jiffies)) {
+                       mod_timer(&receive_mapper->expire_timer, rmp->expires);
+                       break;
+               }
+               atomic_inc(&rmp->refcnt);
+               unmap_this_packet(rmp);
+               if (atomic_dec_and_test(&rmp->refcnt)) {
+                       list_move(&rmp->list, finished_packets.prev);
+               } else {
+                       /* Couldn't unmap the packet, either because
+                          it's in use by real hardware or we've run
+                          out of memory.  Send the packet to the end
+                          of the queue and update the expiry time so
+                          that we try again later. */
+                       /* Note that this can make the active packet
+                          list slightly out of order.  Oh well; it
+                          won't be by more than a few jiffies, and it
+                          doesn't really matter that much. */
+                       rmp->expires = jiffies + PACKET_TIMEOUT;
+                       list_move(&rmp->list,
+                                 receive_mapper->active_packets.prev);
+               }
+       }
+       spin_unlock(&receive_mapper->rm_lock);
+
+       need_tasklet = !list_empty(&finished_packets);
+
+       spin_lock(&receive_mapper->finished_packets_lock);
+       list_splice(&finished_packets, receive_mapper->finished_packets.prev);
+       spin_unlock(&receive_mapper->finished_packets_lock);
+
+       if (need_tasklet)
+               tasklet_schedule(&receive_mapper->gc_tasklet);
+}
+
+static struct receive_mapper *new_receive_mapper(void)
+{
+       struct receive_mapper *rm;
+       unsigned x;
+
+       rm = kzalloc(sizeof(*rm), GFP_KERNEL);
+       if (!rm)
+               goto err;
+       INIT_LIST_HEAD(&rm->free_frags);
+       INIT_LIST_HEAD(&rm->free_packets);
+       INIT_LIST_HEAD(&rm->active_packets);
+       INIT_LIST_HEAD(&rm->finished_packets);
+       spin_lock_init(&rm->rm_lock);
+       spin_lock_init(&rm->finished_packets_lock);
+       for (x = 0; x < MAX_MAPPED_FRAGS; x++)
+               list_add_tail(&rm->frags[x].list, &rm->free_frags);
+       for (x = 0; x < MAX_MAPPED_PACKETS; x++)
+               list_add_tail(&rm->packets[x].list, &rm->free_packets);
+       rm->nr_free_packets = MAX_MAPPED_PACKETS;
+
+       setup_timer(&rm->expire_timer, expire_timer, 0);
+       tasklet_init(&rm->gc_tasklet, gc_tasklet, 0);
+
+       rm->tracker = alloc_page_foreign_tracker(MAX_MAPPED_FRAGS);
+       if (!rm->tracker)
+               goto err;
+       rm->pages = alloc_empty_pages_and_pagevec(MAX_MAPPED_FRAGS);
+       if (!rm->pages)
+               goto err;
+       for (x = 0; x < MAX_MAPPED_FRAGS; x++) {
+               SetPageForeign(rm->pages[x], netchan2_page_release);
+               rm->pages[x]->mapping = (void *)&rm->frags[x];
+       }
+
+       return rm;
+
+err:
+       if (rm != NULL)
+               free_receive_mapper(rm);
+       return NULL;
+}
+
+static void attach_frag_to_skb(struct sk_buff *skb,
+                              struct rx_map_fragment *frag)
+{
+       unsigned idx;
+       struct skb_shared_info *shinfo;
+       skb_frag_t *sk_frag;
+
+       shinfo = skb_shinfo(skb);
+       sk_frag = &shinfo->frags[shinfo->nr_frags];
+       idx = fragment_idx(frag);
+       sk_frag->page = receive_mapper->pages[idx];
+       sk_frag->page_offset = frag->nc_frag.off;
+       sk_frag->size = frag->nc_frag.size;
+       shinfo->nr_frags++;
+}
+
+struct rx_plan {
+       int is_failed;
+       unsigned nr_mops;
+       gnttab_map_grant_ref_t mops[8];
+       struct rx_map_fragment *frags[8];
+};
+
+static void flush_grant_operations(struct rx_plan *rp)
+{
+       unsigned x;
+       int ret;
+       gnttab_map_grant_ref_t *mop;
+
+       if (rp->nr_mops == 0)
+               return;
+       if (!rp->is_failed) {
+               ret = HYPERVISOR_grant_table_op(GNTTABOP_map_grant_ref,
+                                               rp->mops,
+                                               rp->nr_mops);
+               BUG_ON(ret);
+               for (x = 0; x < rp->nr_mops; x++) {
+                       mop = &rp->mops[x];
+                       if (mop->status != 0) {
+                               rp->is_failed = 1;
+                       } else {
+                               rp->frags[x]->handle = mop->handle;
+                               set_phys_to_machine(
+                                       __pa(mop->host_addr) >> PAGE_SHIFT,
+                                       FOREIGN_FRAME(mop->dev_bus_addr >>
+                                                     PAGE_SHIFT));
+                       }
+               }
+       }
+       rp->nr_mops = 0;
+}
+
+static void map_fragment(struct rx_plan *rp,
+                        struct rx_map_fragment *rx_frag,
+                        struct netchannel2 *nc)
+{
+       unsigned idx = fragment_idx(rx_frag);
+       gnttab_map_grant_ref_t *mop;
+
+       if (rp->nr_mops == ARRAY_SIZE(rp->mops))
+               flush_grant_operations(rp);
+       mop = &rp->mops[rp->nr_mops];
+       gnttab_set_map_op(mop,
+                         (unsigned 
long)page_address(receive_mapper->pages[idx]),
+                         GNTMAP_host_map | GNTMAP_readonly,
+                         rx_frag->nc_frag.receiver_map.gref,
+                         nc->rings.otherend_id);
+       rp->frags[rp->nr_mops] = rx_frag;
+       rp->nr_mops++;
+}
+
+/* Unmap a packet which has been half-mapped. */
+static void unmap_partial_packet(struct rx_map_packet *rmp)
+{
+       unsigned idx;
+       struct rx_map_fragment *rx_frag;
+       struct grant_unmapper unmapper;
+
+       unmapper.nr_gops = 0;
+       list_for_each_entry(rx_frag, &rmp->frags, list) {
+               if (rx_frag->handle == -1)
+                       continue;
+               idx = fragment_idx(rx_frag);
+               grant_unmap(&unmapper,
+                           page_address(receive_mapper->pages[idx]),
+                           rx_frag->handle);
+       }
+       do_unmaps(&unmapper);
+}
+
+struct sk_buff *handle_receiver_map_packet(struct netchannel2 *nc,
+                                          struct netchannel2_msg_packet *msg,
+                                          struct netchannel2_msg_hdr *hdr,
+                                          unsigned nr_frags,
+                                          unsigned frags_off)
+{
+       struct sk_buff *skb;
+       struct rx_map_fragment *rx_frag;
+       unsigned x;
+       unsigned len;
+       struct rx_map_packet *rmp;
+       unsigned idx;
+       struct rx_plan plan;
+       unsigned prefix_size;
+
+       memset(&plan, 0, sizeof(plan));
+
+       rmp = alloc_rx_packet(nc, nr_frags);
+       if (rmp == NULL)
+               return NULL;
+
+       if (msg->prefix_size < SKB_MIN_PAYLOAD_SIZE)
+               prefix_size = SKB_MIN_PAYLOAD_SIZE;
+       else
+               prefix_size = msg->prefix_size;
+       /* As in posted_buffers.c, we don't limit the total size of
+          the packet, because we don't need to allocate more memory
+          for very large packets.  The prefix is safe because it's
+          only a 16 bit number.  A 64k allocation won't always
+          succeed, but it's unlikely to trigger the OOM killer or
+          otherwise interfere with the normal operation of the local
+          domain. */
+       skb = dev_alloc_skb(prefix_size + NET_IP_ALIGN);
+       if (skb == NULL) {
+               spin_lock(&receive_mapper->rm_lock);
+               list_splice(&rmp->frags, &receive_mapper->free_frags);
+               list_add(&rmp->list, &receive_mapper->free_packets);
+               receive_mapper->nr_free_packets++;
+               spin_unlock(&receive_mapper->rm_lock);
+               return NULL;
+       }
+       skb_reserve(skb, NET_IP_ALIGN);
+
+       rmp->id = msg->id;
+       rmp->flags = msg->flags;
+
+       rx_frag = list_entry(rmp->frags.next, struct rx_map_fragment, list);
+       for (x = 0; x < nr_frags; x++) {
+               fetch_fragment(&nc->rings, x, &rx_frag->nc_frag, frags_off);
+               if (rx_frag->nc_frag.size > PAGE_SIZE ||
+                   rx_frag->nc_frag.off >= PAGE_SIZE ||
+                   rx_frag->nc_frag.size + rx_frag->nc_frag.off > PAGE_SIZE) {
+                       plan.is_failed = 1;
+                       break;
+               }
+               map_fragment(&plan, rx_frag, nc);
+               rx_frag = list_entry(rx_frag->list.next,
+                                    struct rx_map_fragment,
+                                    list);
+       }
+
+       flush_grant_operations(&plan);
+       if (plan.is_failed)
+               goto fail_and_unmap;
+
+       /* Grab the prefix off of the ring. */
+       nc2_copy_from_ring_off(&nc->rings.cons_ring,
+                              skb_put(skb, msg->prefix_size),
+                              msg->prefix_size,
+                              frags_off +
+                              nr_frags * sizeof(struct netchannel2_fragment));
+
+       /* All fragments mapped, so we know that this is going to
+          work.  Transfer the receive slots into the SKB. */
+       len = 0;
+       list_for_each_entry(rx_frag, &rmp->frags, list) {
+               attach_frag_to_skb(skb, rx_frag);
+               idx = fragment_idx(rx_frag);
+               start_tracking_page(receive_mapper->tracker,
+                                   receive_mapper->pages[idx],
+                                   nc->rings.otherend_id,
+                                   rx_frag->nc_frag.receiver_map.gref,
+                                   idx,
+                                   nc);
+               len += rx_frag->nc_frag.size;
+       }
+
+       skb->len += len;
+       skb->data_len += len;
+       skb->truesize += len;
+
+       spin_lock(&receive_mapper->rm_lock);
+       list_add_tail(&rmp->list, &receive_mapper->active_packets);
+       rmp->expires = jiffies + PACKET_TIMEOUT;
+       if (rmp == list_entry(receive_mapper->active_packets.next,
+                             struct rx_map_packet,
+                             list))
+               mod_timer(&receive_mapper->expire_timer, rmp->expires);
+       spin_unlock(&receive_mapper->rm_lock);
+
+       if (skb_headlen(skb) < SKB_MIN_PAYLOAD_SIZE)
+               pull_through(skb,
+                            SKB_MIN_PAYLOAD_SIZE - skb_headlen(skb));
+
+       return skb;
+
+fail_and_unmap:
+       pr_debug("Failed to map received packet!\n");
+       unmap_partial_packet(rmp);
+
+       spin_lock(&receive_mapper->rm_lock);
+       list_splice(&rmp->frags, &receive_mapper->free_frags);
+       list_add_tail(&rmp->list, &receive_mapper->free_packets);
+       receive_mapper->nr_free_packets++;
+       spin_unlock(&receive_mapper->rm_lock);
+
+       kfree_skb(skb);
+       return NULL;
+}
+
+static void suspend_receive_mapper(struct receive_mapper *rm)
+{
+       spin_lock_bh(&rm->rm_lock);
+       /* Stop any more packets coming in. */
+       rm->suspending = 1;
+
+       /* Wait for Linux to give back all of the SKBs which we've
+          given it. */
+       while (rm->nr_free_packets != MAX_MAPPED_PACKETS) {
+               spin_unlock_bh(&rm->rm_lock);
+               unmap_all_packets();
+               msleep(100);
+               spin_lock_bh(&rm->rm_lock);
+       }
+       spin_unlock_bh(&rm->rm_lock);
+}
+
+static void resume_receive_mapper(void)
+{
+       spin_lock_bh(&receive_mapper->rm_lock);
+       receive_mapper->suspending = 0;
+       spin_unlock_bh(&receive_mapper->rm_lock);
+}
+
+
+int init_receive_map_mode(void)
+{
+       struct receive_mapper *new_rm;
+       spin_lock(&global_map_lock);
+       while (receive_mapper == NULL) {
+               spin_unlock(&global_map_lock);
+               new_rm = new_receive_mapper();
+               if (new_rm == NULL)
+                       return -ENOMEM;
+               spin_lock(&global_map_lock);
+               if (receive_mapper == NULL) {
+                       receive_mapper = new_rm;
+               } else {
+                       spin_unlock(&global_map_lock);
+                       free_receive_mapper(new_rm);
+                       spin_lock(&global_map_lock);
+               }
+       }
+       spin_unlock(&global_map_lock);
+       return 0;
+}
+
+void deinit_receive_map_mode(void)
+{
+       if (!receive_mapper)
+               return;
+       BUG_ON(spin_is_locked(&global_map_lock));
+       free_receive_mapper(receive_mapper);
+       receive_mapper = NULL;
+}
+
+void suspend_receive_map_mode(void)
+{
+       if (!receive_mapper)
+               return;
+       suspend_receive_mapper(receive_mapper);
+}
+
+void resume_receive_map_mode(void)
+{
+       if (!receive_mapper)
+               return;
+       resume_receive_mapper();
+}
+
+struct netchannel2 *nc2_get_interface_for_page(struct page *p)
+{
+       BUG_ON(!page_is_tracked(p));
+       if (!receive_mapper ||
+           tracker_for_page(p) != receive_mapper->tracker)
+               return NULL;
+       return get_page_tracker_ctxt(p);
+}
diff --git a/drivers/xen/netchannel2/recv_packet.c 
b/drivers/xen/netchannel2/recv_packet.c
index 80c5d5d..8c38788 100644
--- a/drivers/xen/netchannel2/recv_packet.c
+++ b/drivers/xen/netchannel2/recv_packet.c
@@ -112,6 +112,28 @@ void nc2_handle_packet_msg(struct netchannel2 *nc,
                                                  nr_frags, frags_off);
                queue_finish_packet_message(ncrp, msg.id, msg.flags);
                break;
+       case NC2_PACKET_TYPE_receiver_map:
+               if (!nc->local_trusted) {
+                       /* The remote doesn't trust us, so they
+                          shouldn't be sending us receiver-map
+                          packets.  Just treat it as an RSCB
+                          packet. */
+                       skb = NULL;
+               } else {
+                       skb = handle_receiver_map_packet(nc, &msg, hdr,
+                                                        nr_frags,
+                                                        frags_off);
+                       /* Finish message will be sent when we unmap
+                        * the packet. */
+               }
+               if (skb == NULL) {
+                       /* We can't currently map this skb.  Use a
+                          receiver copy instead. */
+                       skb = handle_receiver_copy_packet(nc, ncrp, &msg, hdr,
+                                                         nr_frags, frags_off);
+                       queue_finish_packet_message(ncrp, msg.id, msg.flags);
+               }
+               break;
        default:
                pr_debug("Unknown packet type %d\n", msg.type);
                nc->stats.rx_errors++;
@@ -285,4 +307,5 @@ int __init nc2_init(void)
 
 void __exit nc2_exit(void)
 {
+       deinit_receive_map_mode();
 }
diff --git a/drivers/xen/netchannel2/rscb.c b/drivers/xen/netchannel2/rscb.c
index 8ad5454..cdcb116 100644
--- a/drivers/xen/netchannel2/rscb.c
+++ b/drivers/xen/netchannel2/rscb.c
@@ -209,6 +209,7 @@ struct sk_buff *handle_receiver_copy_packet(struct 
netchannel2 *nc,
 struct grant_packet_plan {
        volatile struct netchannel2_fragment *out_fragment;
        grant_ref_t gref_pool;
+       int use_subpage_grants;
        unsigned prefix_avail;
 };
 
@@ -223,14 +224,15 @@ static inline int nfrags_skb(struct sk_buff *skb, int 
prefix_size)
        start_grant = ((unsigned long)skb->data + prefix_size) &
                ~(PAGE_SIZE-1);
        end_grant = ((unsigned long)skb->data +
-                    skb_headlen(skb) +  PAGE_SIZE - 1) &
+                    skb_headlen(skb) + PAGE_SIZE - 1) &
                ~(PAGE_SIZE-1);
        return ((end_grant - start_grant) >> PAGE_SHIFT)
                + skb_shinfo(skb)->nr_frags;
 }
 
 enum prepare_xmit_result prepare_xmit_allocate_grant(struct 
netchannel2_ring_pair *ncrp,
-                                                    struct sk_buff *skb)
+                                                    struct sk_buff *skb,
+                                                    int use_subpage_grants)
 {
        struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
        unsigned nr_fragments;
@@ -241,13 +243,23 @@ enum prepare_xmit_result 
prepare_xmit_allocate_grant(struct netchannel2_ring_pai
        if (allocate_txp_slot(ncrp, skb) < 0)
                return PREP_XMIT_BUSY;
 
-       /* We're going to have to get the remote to issue a grant copy
-          hypercall anyway, so there's no real benefit to shoving the
-          headers inline. */
-       /* (very small packets won't go through here, so there's no
-          chance that we could completely eliminate the grant
-          copy.) */
-       inline_prefix_size = sizeof(struct ethhdr);
+       if (use_subpage_grants) {
+               /* We're going to have to get the remote to issue a
+                  grant copy hypercall anyway, so there's no real
+                  benefit to shoving the headers inline. */
+               /* (very small packets won't go through here, so
+                  there's no chance that we could completely
+                  eliminate the grant copy.) */
+               inline_prefix_size = sizeof(struct ethhdr);
+       } else {
+               /* If we're going off-box (and we probably are, if the
+                  remote is trusted), putting the header in the ring
+                  potentially saves a TLB miss in the bridge, which
+                  is worth doing. */
+               inline_prefix_size = PACKET_PREFIX_SIZE;
+               if (skb_headlen(skb) < inline_prefix_size)
+                       inline_prefix_size = skb_headlen(skb);
+       }
 
        if (skb_co->nr_fragments == 0) {
                nr_fragments = nfrags_skb(skb, inline_prefix_size);
@@ -277,10 +289,14 @@ enum prepare_xmit_result 
prepare_xmit_allocate_grant(struct netchannel2_ring_pai
                   have to recompute it next time around. */
                return PREP_XMIT_BUSY;
        }
+
        skb_co->gref_pool = gref_pool;
        skb_co->inline_prefix_size = inline_prefix_size;
 
-       skb_co->type = NC2_PACKET_TYPE_receiver_copy;
+       if (use_subpage_grants)
+               skb_co->type = NC2_PACKET_TYPE_receiver_copy;
+       else
+               skb_co->type = NC2_PACKET_TYPE_receiver_map;
 
        return PREP_XMIT_OKAY;
 }
@@ -318,15 +334,19 @@ static void prepare_subpage_grant(struct 
netchannel2_ring_pair *ncrp,
                                                      GTF_readonly,
                                                      trans_domid,
                                                      trans_gref);
-       } else {
+       } else if (plan->use_subpage_grants) {
                gnttab_grant_foreign_access_ref_subpage(gref,
                                                        ncrp->otherend_id,
                                                        
virt_to_mfn(page_address(page)),
                                                        GTF_readonly,
                                                        off_in_page,
                                                        size);
+       } else {
+               gnttab_grant_foreign_access_ref(gref,
+                                               ncrp->otherend_id,
+                                               virt_to_mfn(page_address(page)),
+                                               GTF_readonly);
        }
-
        frag->off = off_in_page;
        frag->size = size;
        plan->out_fragment++;
@@ -356,6 +376,7 @@ static int grant_data_area(struct netchannel2_ring_pair 
*ncrp,
 
 void xmit_grant(struct netchannel2_ring_pair *ncrp,
                struct sk_buff *skb,
+               int use_subpage_grants,
                volatile void *msg_buf)
 {
        volatile struct netchannel2_msg_packet *msg = msg_buf;
@@ -366,6 +387,7 @@ void xmit_grant(struct netchannel2_ring_pair *ncrp,
        skb_frag_t *frag;
 
        memset(&plan, 0, sizeof(plan));
+       plan.use_subpage_grants = use_subpage_grants;
        plan.prefix_avail = skb_co->inline_prefix_size;
        plan.out_fragment = msg->frags;
        plan.gref_pool = skb_co->gref_pool;
diff --git a/drivers/xen/netchannel2/util.c b/drivers/xen/netchannel2/util.c
index 302dfc1..79d9f09 100644
--- a/drivers/xen/netchannel2/util.c
+++ b/drivers/xen/netchannel2/util.c
@@ -94,6 +94,20 @@ void release_tx_packet(struct netchannel2_ring_pair *ncrp,
                        }
                        gnttab_release_grant_reference(&ncrp->gref_pool, gref);
                }
+       } else if (skb_co->type == NC2_PACKET_TYPE_receiver_map) {
+               while (1) {
+                       r = gnttab_claim_grant_reference(&skb_co->gref_pool);
+                       if (r == -ENOSPC)
+                               break;
+                       gref = (grant_ref_t)r;
+                       r = gnttab_end_foreign_access_ref(gref);
+                       if (r == 0) {
+                               printk(KERN_WARNING "Failed to end remote 
access to packet memory.\n");
+                       } else {
+                               gnttab_release_grant_reference(&ncrp->gref_pool,
+                                                              gref);
+                       }
+               }
        } else if (skb_co->gref_pool != 0) {
                gnttab_subfree_grant_references(skb_co->gref_pool,
                                                &ncrp->gref_pool);
diff --git a/drivers/xen/netchannel2/xmit_packet.c 
b/drivers/xen/netchannel2/xmit_packet.c
index 7eb845d..d95ad09 100644
--- a/drivers/xen/netchannel2/xmit_packet.c
+++ b/drivers/xen/netchannel2/xmit_packet.c
@@ -13,6 +13,8 @@ static enum transmit_policy transmit_policy(struct 
netchannel2 *nc,
 {
        if (skb->len <= PACKET_PREFIX_SIZE && !skb_is_nonlinear(skb))
                return transmit_policy_small;
+       else if (nc->remote_trusted)
+               return transmit_policy_map;
        else
                return transmit_policy_grant;
 }
@@ -72,7 +74,10 @@ enum prepare_xmit_result 
prepare_xmit_allocate_resources(struct netchannel2 *nc,
                        r = prepare_xmit_allocate_small(&nc->rings, skb);
                        break;
                case transmit_policy_grant:
-                       r = prepare_xmit_allocate_grant(&nc->rings, skb);
+                       r = prepare_xmit_allocate_grant(&nc->rings, skb, 1);
+                       break;
+               case transmit_policy_map:
+                       r = prepare_xmit_allocate_grant(&nc->rings, skb, 0);
                        break;
                default:
                        BUG();
@@ -170,7 +175,10 @@ int nc2_really_start_xmit(struct netchannel2_ring_pair 
*ncrp,
                /* Nothing to do */
                break;
        case transmit_policy_grant:
-               xmit_grant(ncrp, skb, msg);
+               xmit_grant(ncrp, skb, 1, msg);
+               break;
+       case transmit_policy_map:
+               xmit_grant(ncrp, skb, 0, msg);
                break;
        default:
                BUG();
diff --git a/include/xen/interface/io/netchannel2.h 
b/include/xen/interface/io/netchannel2.h
index 1cca607..f264995 100644
--- a/include/xen/interface/io/netchannel2.h
+++ b/include/xen/interface/io/netchannel2.h
@@ -46,6 +46,9 @@ struct netchannel2_fragment {
                struct {
                        grant_ref_t gref;
                } receiver_copy;
+               struct {
+                       grant_ref_t gref;
+               } receiver_map;
        };
 };
 struct netchannel2_msg_packet {
@@ -98,6 +101,22 @@ struct netchannel2_msg_packet {
  *                 Due to backend bugs, it is in not safe to use this
  *                 packet type except on bypass rings.
  *
+ * receiver_map -- The transmitting domain has granted the receiving
+ *                 domain access to the original RX buffers using
+ *                 full (mappable) grant references.  This can be
+ *                 treated the same way as receiver_copy, but the
+ *                 receiving domain also has the option of mapping
+ *                 the fragments, rather than copying them.  If it
+ *                 decides to do so, it should ensure that the fragments
+ *                 will be unmapped in a reasonably timely fashion,
+ *                 and don't e.g. become stuck in a receive buffer
+ *                 somewhere.  In general, anything longer than about
+ *                 a second is likely to cause problems.  Once all
+ *                 grant references have been unmapper, the receiving
+ *                 domain should send a FINISH message.
+ *
+ *                 This packet type may not be used on bypass rings.
+ *
  * small -- The packet does not have any fragment descriptors
  *         (i.e. the entire thing is inline in the ring).  The receiving
  *         domain should simply the copy the packet out of the ring
@@ -110,6 +129,7 @@ struct netchannel2_msg_packet {
  * that it is correct to treat receiver_map and small packets as
  * receiver_copy ones. */
 #define NC2_PACKET_TYPE_receiver_copy 1
+#define NC2_PACKET_TYPE_receiver_map 3
 #define NC2_PACKET_TYPE_small 4
 
 #define NC2_PACKET_SEGMENTATION_TYPE_none  0
-- 
1.6.3.1


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.