[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [PATCH 20/22] Posted buffer mode support.
In this mode, domains are expected to pre-post a number of receive buffers to their peer, and the peer will then copy packets into those buffers when it wants to transmit. This is similar to the way netchannel1 worked. This isn't particularly useful by itself, because the software-only implementation is slower than the other transmission modes, and is disabled unless you set a #define, but it's necessary for VMQ support. Signed-off-by: Steven Smith <steven.smith@xxxxxxxxxx> --- drivers/xen/netchannel2/Makefile | 2 +- drivers/xen/netchannel2/chan.c | 37 ++ drivers/xen/netchannel2/netback2.c | 12 + drivers/xen/netchannel2/netchannel2_core.h | 92 ++++ drivers/xen/netchannel2/netfront2.c | 2 + drivers/xen/netchannel2/posted_buffers.c | 781 ++++++++++++++++++++++++++++ drivers/xen/netchannel2/recv_packet.c | 5 + drivers/xen/netchannel2/xmit_packet.c | 21 + include/xen/interface/io/netchannel2.h | 71 +++ 9 files changed, 1022 insertions(+), 1 deletions(-) create mode 100644 drivers/xen/netchannel2/posted_buffers.c diff --git a/drivers/xen/netchannel2/Makefile b/drivers/xen/netchannel2/Makefile index 9c4f97a..11a257e 100644 --- a/drivers/xen/netchannel2/Makefile +++ b/drivers/xen/netchannel2/Makefile @@ -1,7 +1,7 @@ obj-$(CONFIG_XEN_NETCHANNEL2) += netchannel2.o netchannel2-objs := chan.o netchan2.o rscb.o util.o \ - xmit_packet.o offload.o recv_packet.o poll.o \ + posted_buffers.o xmit_packet.o offload.o recv_packet.o poll.o \ receiver_map.o ifeq ($(CONFIG_XEN_NETDEV2_BACKEND),y) diff --git a/drivers/xen/netchannel2/chan.c b/drivers/xen/netchannel2/chan.c index fa52353..060b49b 100644 --- a/drivers/xen/netchannel2/chan.c +++ b/drivers/xen/netchannel2/chan.c @@ -89,6 +89,15 @@ retry: nc2_handle_set_max_fragments_per_packet(nc, ncrp, &hdr); break; + case NETCHANNEL2_MSG_POST_BUFFER: + nc2_handle_post_buffer(nc, ncrp, &hdr); + break; + case NETCHANNEL2_MSG_RETURN_POSTED_BUFFER: + nc2_handle_return_posted_buffer(nc, ncrp, &hdr); + break; + case NETCHANNEL2_MSG_SET_NR_POSTED_BUFFERS: + nc2_handle_set_nr_posted_buffers(nc, ncrp, &hdr); + break; case NETCHANNEL2_MSG_BYPASS_FRONTEND: nc2_handle_bypass_frontend(nc, ncrp, &hdr); break; @@ -172,8 +181,12 @@ static void flush_rings(struct netchannel2_ring_pair *ncrp) advertise_max_fragments_per_packet(ncrp); if (ncrp == &nc->rings) { + nc2_replenish_rx_buffers(nc); + nc2_return_pending_posted_buffers(nc); if (nc->need_advertise_offloads) advertise_offloads(nc); + if (nc->need_advertise_tx_buffers) + nc2_advertise_tx_buffers(nc); nc2_advertise_bypasses(nc); nc2_crank_aux_ring_state_machine(nc); nc2_autobypass_make_suggestions(nc); @@ -454,6 +467,13 @@ struct netchannel2 *nc2_new(struct xenbus_device *xd) nc2_release(nc); return NULL; } + INIT_LIST_HEAD(&nc->rx_buffers); + INIT_LIST_HEAD(&nc->unused_rx_buffers); + INIT_LIST_HEAD(&nc->unposted_rx_buffers); + INIT_LIST_HEAD(&nc->avail_tx_buffers); + nc->nr_avail_tx_buffers = 0; + INIT_LIST_HEAD(&nc->unused_tx_buffer_slots); + INIT_LIST_HEAD(&nc->pending_tx_buffer_return); if (local_trusted) { if (init_receive_map_mode() < 0) { @@ -513,8 +533,13 @@ void nc2_release(struct netchannel2 *nc) nc2_queue_purge(&nc->rings, &nc->pending_skbs); + /* Should have been released when we detached. */ + BUG_ON(nc->rx_buffer_structs); + release_bypasses(nc); + unprepare_tx_buffers(nc); + free_netdev(nc->net_device); } @@ -604,6 +629,9 @@ int nc2_attach_rings(struct netchannel2 *nc, static void _detach_rings(struct netchannel2_ring_pair *ncrp) { + if (ncrp == &ncrp->interface->rings) + nc2_posted_buffer_rx_forget(ncrp->interface); + spin_lock_bh(&ncrp->lock); /* We need to release all of the pending transmission packets, because they're never going to complete now that we've lost @@ -795,6 +823,15 @@ static int process_ring(struct napi_struct *napi, skb = __skb_dequeue(&ncrp->pending_tx_queue); } while (skb != NULL); + /* If we've transmitted on the main ring then we may + have made use of the hypercall batcher. Flush it. + This must happen before we flush the rings, since + that's when the PACKET messages will be made + visible to the other end. */ + if (ncrp == &nc->rings) + flush_hypercall_batcher(&nc->batcher, + nc2_posted_on_gntcopy_fail); + flush_rings(ncrp); while ((skb = __skb_dequeue(&ncrp->release_on_flush_batcher))) diff --git a/drivers/xen/netchannel2/netback2.c b/drivers/xen/netchannel2/netback2.c index cf52839..129ef81 100644 --- a/drivers/xen/netchannel2/netback2.c +++ b/drivers/xen/netchannel2/netback2.c @@ -11,6 +11,8 @@ #include "netchannel2_endpoint.h" #include "netchannel2_uspace.h" +#define NR_TX_BUFS 256 + static atomic_t next_handle; /* A list of all currently-live netback2 interfaces. */ static LIST_HEAD(all_netbacks); @@ -172,6 +174,11 @@ static int attach_to_frontend(struct netback2 *nd) return 0; } +static void nb2_shutdown(struct netchannel2 *nc) +{ + nc2_set_nr_tx_buffers(nc, 0); +} + static void frontend_changed(struct xenbus_device *xd, enum xenbus_state frontend_state) { @@ -189,6 +196,8 @@ static void frontend_changed(struct xenbus_device *xd, * detached, and this is pointless but harmless.) */ detach_from_frontend(nb); + nc2_set_nr_tx_buffers(nb->chan, NR_TX_BUFS); + /* Tell the frontend what sort of rings we're willing to accept. */ xenbus_printf(XBT_NIL, nb->xenbus_device->nodename, @@ -222,6 +231,7 @@ static void frontend_changed(struct xenbus_device *xd, break; case XenbusStateClosing: + nb2_shutdown(nb->chan); detach_from_frontend(nb); xenbus_switch_state(xd, XenbusStateClosed); break; @@ -257,6 +267,8 @@ static int netback2_uevent(struct xenbus_device *xd, static void netback2_shutdown(struct xenbus_device *xd) { + struct netback2 *nb = xenbus_device_to_nb2(xd); + nb2_shutdown(nb->chan); xenbus_switch_state(xd, XenbusStateClosing); } diff --git a/drivers/xen/netchannel2/netchannel2_core.h b/drivers/xen/netchannel2/netchannel2_core.h index 2a5ed06..1939cbb 100644 --- a/drivers/xen/netchannel2/netchannel2_core.h +++ b/drivers/xen/netchannel2/netchannel2_core.h @@ -23,6 +23,10 @@ * pointer; see the txp_slot stuff later. */ #define NR_TX_PACKETS 256 +/* No matter what the other end wants, we never post more than this + number of RX buffers to it. */ +#define MAX_POSTED_BUFFERS (2048+256) + /* A way of keeping track of a mapping of a bunch of grant references into a contigous chunk of virtual address space. This is used for things like multi-page rings. */ @@ -37,6 +41,7 @@ enum transmit_policy { transmit_policy_unknown = 0, transmit_policy_first = 0xf001, transmit_policy_grant = transmit_policy_first, + transmit_policy_post, transmit_policy_map, transmit_policy_small, transmit_policy_last = transmit_policy_small @@ -89,6 +94,8 @@ static inline nc2_txp_index_t txp_get_next_free(struct txp_slot *slot) /* This goes in struct sk_buff::cb */ struct skb_cb_overlay { + struct list_head buffers; /* Only if we're using the posted + buffer strategy. */ struct txp_slot *tp; unsigned nr_fragments; grant_ref_t gref_pool; @@ -369,11 +376,67 @@ struct netchannel2 { struct nc2_incoming_bypass_suggestions incoming_bypass_suggestions; #endif + /* Infrastructure for managing buffers which we've posted to + the other end. These are all protected by the lock. */ + /* A list of nx2_rx_buffer structures, threaded on list, which + we've posted to the other end. */ + struct list_head rx_buffers; + /* Buffers which we've allocated but not yet sent to the other + end. */ + struct list_head unposted_rx_buffers; + /* Buffers which are available but not yet allocated. */ + struct list_head unused_rx_buffers; + /* The number of buffers in the rx_buffers list. */ + unsigned nr_rx_buffers; + /* The maximum number of buffers which we can ever have + outstanding, and the size of the rx_buffer_structs + array. */ + unsigned max_nr_rx_buffers; + /* A bunch of nc2_rx_buffer structures which can be used for + RX buffers. */ + struct nc2_rx_buffer *rx_buffer_structs; + /* Set if we're sufficiently far through device shutdown that + posting more RX buffers would be a bad idea. */ + uint8_t dont_post_buffers; + + /* Infrastructure for managing buffers which the other end has + posted to us. Protected by the lock. */ + /* A list of nc2_tx_buffer structures, threaded on list, which + contains all tx buffers which have been posted by the + remote. */ + struct list_head avail_tx_buffers; + /* A list of nc2_tx_buffer structures which the other end + hasn't populated yet. */ + struct list_head unused_tx_buffer_slots; + /* A list of nc2_tx_buffer structures which we need to return + to the other end. */ + struct list_head pending_tx_buffer_return; + /* Some pre-allocated nc2_tx_buffer structures. We have to + pre-allocate, because we always need to be able to respond + to a POST_BUFFER message (up to some limit). */ + struct nc2_tx_buffer *tx_buffers; + /* Non-zero if we need to send the other end a + SET_NR_POSTED_BUFFERS message. */ + uint8_t need_advertise_tx_buffers; + /* Number of tx buffers. This is the actual number of slots + in the @tx_buffers array. */ + uint32_t nr_tx_buffers; + /* Number of available tx buffers. The length of the + * avail_tx_buffers list. */ + uint32_t nr_avail_tx_buffers; + /* ``Configured'' number of tx buffers. We only actually + allocate any TX buffers when the local interface is up, but + this is set to the desired number of buffers all the + time. */ + uint32_t configured_nr_tx_buffers; + /* Updates are protected by the lock. This can be read at any * time without holding any locks, and the rest of Linux is * expected to cope. */ struct net_device_stats stats; + struct hypercall_batcher batcher; + #ifdef CONFIG_XEN_NETDEV2_AUTOMATIC_BYPASS struct nc2_auto_bypass auto_bypass; #endif @@ -680,11 +743,26 @@ struct sk_buff *handle_receiver_copy_packet(struct netchannel2 *nc, struct netchannel2_msg_hdr *hdr, unsigned nr_frags, unsigned frags_off); +struct sk_buff *handle_pre_posted_packet(struct netchannel2 *nc, + struct netchannel2_msg_packet *msg, + struct netchannel2_msg_hdr *hdr, + unsigned nr_frags, + unsigned frags_off); struct sk_buff *handle_receiver_map_packet(struct netchannel2 *nc, struct netchannel2_msg_packet *msg, struct netchannel2_msg_hdr *hdr, unsigned nr_frags, unsigned frags_off); +void nc2_handle_return_posted_buffer(struct netchannel2 *nc, + struct netchannel2_ring_pair *ncrp, + struct netchannel2_msg_hdr *hdr); +void nc2_handle_post_buffer(struct netchannel2 *nc, + struct netchannel2_ring_pair *ncrp, + struct netchannel2_msg_hdr *hdr); +void nc2_handle_set_nr_posted_buffers(struct netchannel2 *nc, + struct netchannel2_ring_pair *ncrp, + struct netchannel2_msg_hdr *hdr); +void nc2_advertise_tx_buffers(struct netchannel2 *nc); enum prepare_xmit_result { PREP_XMIT_OKAY = 0, @@ -704,9 +782,20 @@ void xmit_grant(struct netchannel2_ring_pair *ncrp, int use_subpage_grants, volatile void *msg); +int prepare_xmit_allocate_post(struct netchannel2 *nc, + struct sk_buff *skb); +void xmit_post(struct netchannel2 *nc, + struct sk_buff *skb, + volatile void *msg); + +void nc2_replenish_rx_buffers(struct netchannel2 *nc); + void queue_finish_packet_message(struct netchannel2_ring_pair *ncrp, uint32_t id, uint8_t flags); +void nc2_return_pending_posted_buffers(struct netchannel2 *nc); +void nc2_posted_buffer_rx_forget(struct netchannel2 *nc); + int allocate_txp_slot(struct netchannel2_ring_pair *ncrp, struct sk_buff *skb); void release_txp_slot(struct netchannel2_ring_pair *ncrp, @@ -715,6 +804,8 @@ void release_txp_slot(struct netchannel2_ring_pair *ncrp, void release_tx_packet(struct netchannel2_ring_pair *ncrp, struct sk_buff *skb); +void unprepare_tx_buffers(struct netchannel2 *nc); + void fetch_fragment(struct netchannel2_ring_pair *ncrp, unsigned idx, struct netchannel2_fragment *frag, @@ -749,6 +840,7 @@ irqreturn_t nc2_int(int irq, void *dev_id); void cleanup_ring_pair(struct netchannel2_ring_pair *ncrp); void nc2_rscb_on_gntcopy_fail(void *ctxt, gnttab_copy_t *gop); +void nc2_posted_on_gntcopy_fail(void *ctxt, gnttab_copy_t *gop); int init_receive_map_mode(void); void deinit_receive_map_mode(void); diff --git a/drivers/xen/netchannel2/netfront2.c b/drivers/xen/netchannel2/netfront2.c index 9b2e2ec..e06fa77 100644 --- a/drivers/xen/netchannel2/netfront2.c +++ b/drivers/xen/netchannel2/netfront2.c @@ -356,6 +356,8 @@ static void backend_changed(struct xenbus_device *xd, /* Backend has advertised the ring protocol. Allocate the rings, and tell the backend about them. */ + nc2_set_nr_tx_buffers(nf->chan, 0); + err = 0; if (!nf->attached) err = allocate_rings(nf, xd->otherend_id); diff --git a/drivers/xen/netchannel2/posted_buffers.c b/drivers/xen/netchannel2/posted_buffers.c new file mode 100644 index 0000000..96de7da --- /dev/null +++ b/drivers/xen/netchannel2/posted_buffers.c @@ -0,0 +1,781 @@ +/* Support for receiver-posted buffers */ +#include <linux/kernel.h> +#include <linux/delay.h> +#include <linux/skbuff.h> +#include <linux/netdevice.h> +#include <xen/evtchn.h> +#include <xen/gnttab.h> +#include <xen/xenbus.h> +#include <xen/live_maps.h> +#include "netchannel2_endpoint.h" +#include "netchannel2_core.h" + +#define POSTED_BUFFER_SIZE PAGE_SIZE + +/* A poison value to make certain buffer management errors more + * obvious. */ +#define RX_BUFFER_BIAS 0xbeef0000 + +static void prepare_tx_buffers(struct netchannel2 *nc); + +/* --------------------------- Receive -------------------------------- */ + +/* A buffer which we have allocated for the other end to send us + packets in. */ +struct nc2_rx_buffer { + struct list_head list; + void *buffer; + grant_ref_t gref; + uint8_t is_posted; /* Set if this buffer is available to the + other end. */ +}; + +/* The other end just sent us a buffer id. Convert it back to an + nc2_rx_buffer structure. Returns NULL if the id is invalid, or if + it isn't currently owned by the other end. */ +static struct nc2_rx_buffer *find_rx_buffer(struct netchannel2 *nc, + uint32_t id) +{ + struct nc2_rx_buffer *rxb; + id -= RX_BUFFER_BIAS; + if (id >= nc->max_nr_rx_buffers) + return NULL; + rxb = &nc->rx_buffer_structs[id]; + if (rxb->is_posted) + return rxb; + else + return NULL; +} + +/* Post a buffer to the other endpoint immediately. Assumes that the + caller has already checked that there is enough space available on + the ring. */ +static void _nc2_post_buffer(struct netchannel2 *nc, + struct nc2_rx_buffer *rxb) +{ + struct netchannel2_msg_post_buffer msg; + + BUG_ON(!nc->remote_trusted); + + msg.id = rxb - nc->rx_buffer_structs + RX_BUFFER_BIAS; + msg.gref = rxb->gref; + msg.off_in_page = offset_in_page(rxb->buffer); + msg.size = POSTED_BUFFER_SIZE; + + nc2_send_message(&nc->rings.prod_ring, NETCHANNEL2_MSG_POST_BUFFER, + 0, &msg, sizeof(msg)); +} + +/* Push out all pending buffer posts, until the ring becomes full or + we run out of buffers to post. Called under the lock. */ +static void push_rx_buffer_posts(struct netchannel2 *nc) +{ + struct nc2_rx_buffer *buf; + + while (!list_empty(&nc->unposted_rx_buffers) && + nc2_can_send_payload_bytes(&nc->rings.prod_ring, + sizeof(struct netchannel2_msg_post_buffer))) { + buf = list_entry(nc->unposted_rx_buffers.next, + struct nc2_rx_buffer, + list); + _nc2_post_buffer(nc, buf); + buf->is_posted = 1; + list_move(&buf->list, &nc->rx_buffers); + nc->nr_rx_buffers++; + + nc->rings.pending_time_sensitive_messages = 1; + } +} + +/* Allocate more RX buffers until we reach our target number of RX + buffers and post them to the other endpoint. Call under the + lock. */ +void nc2_replenish_rx_buffers(struct netchannel2 *nc) +{ + struct nc2_rx_buffer *rb; + + if (nc->dont_post_buffers || !nc->remote_trusted) + return; + + while (!list_empty(&nc->unused_rx_buffers)) { + rb = list_entry(nc->unused_rx_buffers.next, + struct nc2_rx_buffer, + list); + rb->buffer = (void *)__get_free_pages(GFP_ATOMIC|__GFP_NOWARN, + 0); + if (!rb->buffer) + break; + rb->gref = + gnttab_grant_foreign_access(nc->rings.otherend_id, + virt_to_mfn(rb->buffer), + 0); + if ((int)rb->gref < 0) { + free_page((unsigned long)rb->buffer); + break; + } + + list_move(&rb->list, &nc->unposted_rx_buffers); + } + + push_rx_buffer_posts(nc); +} + +/* The other endpoint has used @rxb to transmit part of the packet + which we're goign to represent by @skb. Attach it to the packet's + fragment list. The caller should make sure that @skb currently has + less than MAX_SKB_FRAGS in its shinfo area, and that @size and + @offset are appropriate for the buffer. @size gives the size of + the fragment, and @offset gives its offset relative to the start of + the receive buffer. */ +/* This effectively transfers ownership of the buffer's page from @rxb + to @skb. */ +static void attach_buffer_to_skb(struct sk_buff *skb, + struct nc2_rx_buffer *rxb, + unsigned size, + unsigned offset) +{ + struct skb_shared_info *shinfo = skb_shinfo(skb); + skb_frag_t *frag = &shinfo->frags[shinfo->nr_frags]; + + BUG_ON(shinfo->nr_frags >= MAX_SKB_FRAGS); + + frag->page = virt_to_page(rxb->buffer); + frag->page_offset = offset_in_page(rxb->buffer) + offset; + frag->size = size; + skb->truesize += size; + skb->data_len += size; + skb->len += size; + + shinfo->nr_frags++; +} + +/* The other end has sent us a packet using pre-posted buffers. Parse + it up and return an skb representing the packet, or NULL on + error. */ +struct sk_buff *handle_pre_posted_packet(struct netchannel2 *nc, + struct netchannel2_msg_packet *msg, + struct netchannel2_msg_hdr *hdr, + unsigned nr_frags, + unsigned frags_off) +{ + struct netchannel2_fragment frag; + struct sk_buff *skb; + unsigned x; + struct nc2_rx_buffer *rxb; + int is_bad; + int dropped; + unsigned prefix_len; + +#define SKB_MIN_PAYLOAD_SIZE 128 + + dropped = 0; + is_bad = 0; + if (msg->prefix_size < SKB_MIN_PAYLOAD_SIZE) + prefix_len = SKB_MIN_PAYLOAD_SIZE; + else + prefix_len = msg->prefix_size; + /* We don't enforce the MAX_PACKET_BYTES limit here. That's + okay, because the amount of memory which the other end can + cause us to allocate is still limited, which is all that's + really needed. */ + skb = dev_alloc_skb(prefix_len + NET_IP_ALIGN); + if (skb == NULL) { + is_bad = 1; + dropped = 1; + } else { + skb_reserve(skb, NET_IP_ALIGN); + nc2_copy_from_ring_off(&nc->rings.cons_ring, + skb_put(skb, msg->prefix_size), + msg->prefix_size, + frags_off + nr_frags * sizeof(frag)); + } + + for (x = 0; x < nr_frags; x++) { + fetch_fragment(&nc->rings, x, &frag, frags_off); + rxb = find_rx_buffer(nc, frag.pre_post.id); + if (rxb == NULL) { + pr_debug("RX in bad frag %d.\n", frag.pre_post.id); + is_bad = 1; + continue; + } + + if (!is_bad && + frag.size <= PAGE_SIZE && + frag.off < PAGE_SIZE && + frag.size + frag.off <= POSTED_BUFFER_SIZE && + gnttab_end_foreign_access_ref(rxb->gref)) { + gnttab_free_grant_reference(rxb->gref); + attach_buffer_to_skb(skb, rxb, frag.size, + frag.off); + + } else { + is_bad = 1; + gnttab_end_foreign_access(rxb->gref, + (unsigned long)rxb->buffer); + } + rxb->gref = 0; + rxb->buffer = NULL; + rxb->is_posted = 0; + nc->nr_rx_buffers--; + list_move(&rxb->list, &nc->unused_rx_buffers); + } + + if (is_bad) { + pr_debug("Received skb is bad!\n"); + if (skb) + kfree_skb(skb); + skb = NULL; + if (dropped) + nc->stats.rx_dropped++; + else + nc->stats.rx_errors++; + } else { + if (skb_headlen(skb) < SKB_MIN_PAYLOAD_SIZE) + pull_through(skb, + SKB_MIN_PAYLOAD_SIZE - skb_headlen(skb)); + } + + return skb; +} + +/* Release a single RX buffer and return it to the unused list. */ +static void release_rx_buffer(struct netchannel2 *nc, + struct nc2_rx_buffer *rxb) +{ + rxb->is_posted = 0; + gnttab_end_foreign_access(rxb->gref, + (unsigned long)rxb->buffer); + nc->nr_rx_buffers--; + list_move(&rxb->list, &nc->unused_rx_buffers); +} + +/* The other endpoint has finished with one of our RX buffers. Do + something suitable with it. */ +void nc2_handle_return_posted_buffer(struct netchannel2 *nc, + struct netchannel2_ring_pair *ncrp, + struct netchannel2_msg_hdr *hdr) +{ + struct netchannel2_msg_return_posted_buffer msg; + struct nc2_rx_buffer *rxb; + + if (hdr->size != sizeof(msg)) { + pr_debug("return rx buffer message wrong size %d != %zd\n", + hdr->size, sizeof(msg)); + return; + } + if (ncrp != &nc->rings) { + pr_debug("Return a posted buffer on an ancillary ring!\n"); + return; + } + nc2_copy_from_ring(&nc->rings.cons_ring, &msg, hdr->size); + rxb = find_rx_buffer(nc, msg.id); + if (!rxb) { + pr_debug("Other end returned buffer id %d which we didn't know about.\n", + msg.id); + return; + } + release_rx_buffer(nc, rxb); +} + +/* Tear down any remaining RX buffers. The caller should have done + something to make sure that the other end isn't going to try and + use them any more. */ +void nc2_posted_buffer_rx_forget(struct netchannel2 *nc) +{ + struct nc2_rx_buffer *rxb, *next; + + spin_lock_bh(&nc->rings.lock); + list_for_each_entry_safe(rxb, next, &nc->rx_buffers, list) + release_rx_buffer(nc, rxb); + list_for_each_entry_safe(rxb, next, &nc->unposted_rx_buffers, list) + release_rx_buffer(nc, rxb); + + BUG_ON(!list_empty(&nc->rx_buffers)); + BUG_ON(!list_empty(&nc->unposted_rx_buffers)); + + INIT_LIST_HEAD(&nc->unused_rx_buffers); + kfree(nc->rx_buffer_structs); + nc->rx_buffer_structs = NULL; + nc->max_nr_rx_buffers = 0; + spin_unlock_bh(&nc->rings.lock); +} + +void nc2_handle_set_nr_posted_buffers(struct netchannel2 *nc, + struct netchannel2_ring_pair *ncrp, + struct netchannel2_msg_hdr *hdr) +{ + struct netchannel2_msg_set_nr_posted_buffers msg; + struct nc2_rx_buffer *buffer_structs; + unsigned x; + unsigned nr_buffers; + + if (ncrp != &nc->rings) { + pr_debug("set_nr_posted_buffers on an ancillary ring!\n"); + return; + } + if (hdr->size != sizeof(msg)) { + pr_debug("set nr posted buffers message wrong size %d != %zd\n", + hdr->size, sizeof(msg)); + return; + } + if (nc->rx_buffer_structs != NULL) { + pr_debug("Other end tried to change posted buffer settings when they were already set.\n"); + return; + } + nc2_copy_from_ring(&nc->rings.cons_ring, &msg, hdr->size); + if (msg.nr_buffers <= MAX_POSTED_BUFFERS) { + nr_buffers = msg.nr_buffers; + } else { + pr_debug("remote recommended %d buffers, using %d\n", + msg.nr_buffers, MAX_POSTED_BUFFERS); + nr_buffers = MAX_POSTED_BUFFERS; + } + + buffer_structs = kzalloc(sizeof(struct nc2_rx_buffer) * nr_buffers, + GFP_ATOMIC); + if (buffer_structs == NULL) { + printk(KERN_WARNING "failed to allocate %d rx buffers", + nr_buffers); + return; + } + + for (x = 0; x < nr_buffers; x++) + list_add_tail(&buffer_structs[x].list, + &nc->unused_rx_buffers); + nc->max_nr_rx_buffers = nr_buffers; + nc->rx_buffer_structs = buffer_structs; + nc->dont_post_buffers = 0; +} + + +/* -------------------------- Transmit ------------------------------- */ + +/* A buffer which the other end has provided us which we can use to + transmit packets to it. */ +struct nc2_tx_buffer { + struct list_head list; + uint32_t id; /* ID assigned by the remote endpoint. */ + grant_ref_t gref; + uint16_t off_in_page; + uint16_t size; + grant_handle_t grant_handle; +}; + +/* A representation of a packet which is halfway through being + prepared for transmission. */ +struct post_packet_plan { + unsigned off_in_cur_buffer; + struct nc2_tx_buffer *cur_buffer; + + /* We assemble the next fragment in work_frag, and then copy + to output_frag once it's done. */ + struct netchannel2_fragment work_frag; + volatile struct netchannel2_fragment *output_frag; +}; + +/* add a buffer slot to list of unused buffer slots after it has been + * returned to other end */ +static void free_tx_buffer(struct netchannel2 *nc, + struct nc2_tx_buffer *buffer) +{ + list_add(&buffer->list, &nc->unused_tx_buffer_slots); +} + +/* A grant copy failed while we were transmitting a packet. That + indicates that the *receiving* domain gave us a bad RX buffer. + We're too late to send them an error, so there isn't really + anything we can do to help them. Oh well, nevermind. */ +void nc2_posted_on_gntcopy_fail(void *ctxt, + gnttab_copy_t *gop) +{ + printk(KERN_WARNING "Grant copy failed for transmit; domain provided bad RX buffer (source %x, %x, %x, dest %x, %x, %x, len %x, flags %x, status %d).\n", + gop->source.u.ref, gop->source.domid, gop->source.offset, + gop->dest.u.ref, gop->dest.domid, gop->dest.offset, + gop->len, gop->flags, gop->status); +} + +/* Advance to the next transmit buffer/fragment in the packet. */ +static void advance_to_next_buffer(struct post_packet_plan *plan) +{ + BUG_ON(plan->off_in_cur_buffer < plan->cur_buffer->size); + plan->cur_buffer = list_entry(plan->cur_buffer->list.next, + struct nc2_tx_buffer, + list); + plan->off_in_cur_buffer = 0; + + *plan->output_frag = plan->work_frag; + plan->output_frag++; + memset(&plan->work_frag, 0, sizeof(plan->work_frag)); + plan->work_frag.pre_post.id = plan->cur_buffer->id; +} + +/* Schedule a copy from a range of bytes in a local page into the + packet we're building in @plan. This cannot cross page or TX + buffer boundaries. */ +static void prepare_grant_copy(struct netchannel2 *nc, + struct post_packet_plan *plan, + struct page *page, + unsigned page_off, + unsigned count, + domid_t domid) +{ + gnttab_copy_t *gop; + + /* XXX: We don't do any error checking on this grant copy. + That's okay. There are only two ways a grant copy can + fail: + + -- The source is bad. But the source is either in our + local memory (so must be good), or something we've + already mapped (so the grant reference must be good, and + must already be pinned so it can't go bad). Therefore, + the source must always be good, and we can't fail + because of a bad source. + + -- The destination is bad. This could happen if the + receiving domain sent us a bad page to use as an RX + buffer. In that case, we'll tell the receiving domain + that it received some data in a page when the page is + actually uninitialised. The worst case is that the + receiving domain ends up copying its own uninitialised + memory to its own userspace. That's not a problem for + us (because it can't see *our* uninitialised memory), + and if it's a problem for the receiving domain then it + should have been more careful about what memory it gave + us to use as RX buffers. + + Therefore, the lack of error checking is actually perfectly + safe. + + (Even if it isn't exactly great software engineering + practice.) + */ + gop = hypercall_batcher_grant_copy(&nc->batcher, + NULL, + nc2_posted_on_gntcopy_fail); + gop->flags = GNTCOPY_dest_gref; + if (page_is_tracked(page)) { + lookup_tracker_page(page, + &gop->source.domid, + &gop->source.u.ref); + gop->flags |= GNTCOPY_source_gref; + } else { + gop->source.domid = DOMID_SELF; + gop->source.u.gmfn = virt_to_mfn(page_address(page)); + } + gop->source.offset = page_off; + gop->dest.domid = domid; + gop->dest.offset = + plan->cur_buffer->off_in_page + plan->off_in_cur_buffer; + gop->dest.u.ref = plan->cur_buffer->gref; + gop->len = count; +} + +/* Add the bytes from @ptr to @ptr + @size to the packet we're + preparing in @plan. This cannot handle page-crossing local + buffers, but will correctly handle buffer-crossing operations. */ +static void prepare_subpage_post(struct netchannel2 *nc, + struct page *page, + unsigned off_in_page, + unsigned size, + struct post_packet_plan *plan) +{ + unsigned remaining_in_buffer; + unsigned this_time; + + BUG_ON(off_in_page + size > PAGE_SIZE); + while (size != 0) { + remaining_in_buffer = + plan->cur_buffer->size - + plan->off_in_cur_buffer; + if (remaining_in_buffer == 0) { + advance_to_next_buffer(plan); + remaining_in_buffer = plan->cur_buffer->size; + } + + this_time = size; + if (this_time > remaining_in_buffer) + this_time = remaining_in_buffer; + prepare_grant_copy(nc, + plan, + page, + off_in_page, + this_time, + nc->rings.otherend_id); + plan->work_frag.size += this_time; + plan->off_in_cur_buffer += this_time; + + size -= this_time; + off_in_page += this_time; + } +} + +/* Add @skb->data to @skb->tail to the packet which is being prepared + in @plan. */ +static void prepare_data_area_post(struct netchannel2 *nc, struct sk_buff *skb, + struct post_packet_plan *plan) +{ + void *ptr = skb->data; + unsigned len = skb_headlen(skb); + unsigned off; + unsigned this_time; + + for (off = 0; off < len; off += this_time) { + this_time = len; + if (this_time + offset_in_page(ptr + off) > PAGE_SIZE) + this_time = PAGE_SIZE - offset_in_page(ptr + off); + prepare_subpage_post(nc, + virt_to_page(ptr + off), + offset_in_page(ptr + off), + this_time, + plan); + } +} + +/* Allocate some TX buffers suitable for transmitting @skb out of + @nc's pool. The buffers are chained on @fragments. On success, + returns the number of buffers allocated. Returns -1 if + insufficient buffers are available, in which case no buffers are + allocated. We assume that the packet will be offset by + NET_IP_ALIGN bytes in the first fragment so that everything after + the ethernet header is properly aligned. */ +static int grab_tx_buffers(struct netchannel2 *nc, + struct sk_buff *skb, + struct list_head *fragments) +{ + unsigned bytes_to_transmit; + unsigned bytes_planned; + struct nc2_tx_buffer *current_buffer, *next; + int count; + + INIT_LIST_HEAD(fragments); + bytes_planned = 0; + bytes_to_transmit = skb->len + NET_IP_ALIGN; + count = 0; + list_for_each_entry_safe(current_buffer, next, &nc->avail_tx_buffers, + list) { + count++; + bytes_planned += current_buffer->size; + list_move(¤t_buffer->list, fragments); + if (bytes_planned >= bytes_to_transmit) { + BUG_ON(nc->nr_avail_tx_buffers < count); + nc->nr_avail_tx_buffers -= count; + return count; + } + } + BUG_ON(nc->nr_avail_tx_buffers != count); + list_splice_init(fragments, &nc->avail_tx_buffers); + return -1; +} + +int prepare_xmit_allocate_post(struct netchannel2 *nc, struct sk_buff *skb) +{ + struct skb_cb_overlay *scb; + int nr_fragments; + + scb = get_skb_overlay(skb); + nr_fragments = grab_tx_buffers(nc, skb, &scb->buffers); + if (nr_fragments < 0) + return -1; + scb->nr_fragments = nr_fragments; + scb->type = NC2_PACKET_TYPE_pre_posted; + + return 0; +} + +void xmit_post(struct netchannel2 *nc, struct sk_buff *skb, + volatile void *msg_buf) +{ + volatile struct netchannel2_msg_packet *msg = msg_buf; + struct skb_cb_overlay *scb; + struct skb_shared_info *shinfo; + skb_frag_t *frag; + unsigned x; + struct post_packet_plan plan; + + scb = get_skb_overlay(skb); + memset(&plan, 0, sizeof(plan)); + + plan.cur_buffer = list_entry(scb->buffers.next, + struct nc2_tx_buffer, + list); + plan.output_frag = msg->frags; + memset(&plan.work_frag, 0, sizeof(plan.work_frag)); + plan.work_frag.pre_post.id = plan.cur_buffer->id; + + /* Burn a couple of bytes at the start of the packet so as we + get better alignment in the body. */ + plan.work_frag.off = NET_IP_ALIGN; + plan.off_in_cur_buffer = NET_IP_ALIGN; + + prepare_data_area_post(nc, skb, &plan); + shinfo = skb_shinfo(skb); + for (x = 0; x < shinfo->nr_frags; x++) { + frag = &shinfo->frags[x]; + prepare_subpage_post(nc, + frag->page, + frag->page_offset, + frag->size, + &plan); + } + + *plan.output_frag = plan.work_frag; + + /* All of the buffer slots which have been used in + this packet are now available for the other end to + fill with new buffers. */ + list_splice(&scb->buffers, &nc->unused_tx_buffer_slots); +} + +/* The other endpoint has sent us a transmit buffer. Add it to the + list. Called under the lock. */ +void nc2_handle_post_buffer(struct netchannel2 *nc, + struct netchannel2_ring_pair *ncrp, + struct netchannel2_msg_hdr *hdr) +{ + struct netchannel2_msg_post_buffer msg; + struct nc2_tx_buffer *txb; + + if (hdr->size != sizeof(msg)) { + pr_debug("Strange sized rx buffer post %d\n", hdr->size); + return; + } + if (ncrp != &nc->rings) { + pr_debug("Posted buffer on an ancillary ring!\n"); + return; + } + nc2_copy_from_ring(&nc->rings.cons_ring, &msg, sizeof(msg)); + if (list_empty(&nc->unused_tx_buffer_slots) || + msg.size > PAGE_SIZE || + msg.off_in_page > PAGE_SIZE || + msg.size + msg.off_in_page > PAGE_SIZE || + msg.size < 64) { + pr_debug("Other end posted too many buffers, or this buffer was strange (%d,%d)\n", + msg.off_in_page, msg.size); + return; + } + + txb = list_entry(nc->unused_tx_buffer_slots.next, + struct nc2_tx_buffer, + list); + txb->id = msg.id; + txb->gref = msg.gref; + txb->off_in_page = msg.off_in_page; + txb->size = msg.size; + + nc->nr_avail_tx_buffers++; + + list_move(&txb->list, &nc->avail_tx_buffers); +} + +/* Process the pending TX buffer return list and push as many as + possible onto the ring. Called under the lock. Does not + automatically flush the ring; that's the caller's + responsibility. */ +void nc2_return_pending_posted_buffers(struct netchannel2 *nc) +{ + struct netchannel2_msg_return_posted_buffer msg; + struct nc2_tx_buffer *txb; + + memset(&msg, 0, sizeof(msg)); + while (!list_empty(&nc->pending_tx_buffer_return) && + nc2_can_send_payload_bytes(&nc->rings.prod_ring, sizeof(msg))) { + txb = list_entry(nc->pending_tx_buffer_return.next, + struct nc2_tx_buffer, + list); + list_del(&txb->list); + free_tx_buffer(nc, txb); + msg.id = txb->id; + nc2_send_message(&nc->rings.prod_ring, + NETCHANNEL2_MSG_RETURN_POSTED_BUFFER, + 0, + &msg, + sizeof(&msg)); + } +} + +/* If there is space on the ring, tell the other end how many RX + buffers we want it to post (i.e. how many TX buffers we're allowed + to accept). Called under the lock. */ +void nc2_advertise_tx_buffers(struct netchannel2 *nc) +{ + struct netchannel2_msg_set_nr_posted_buffers msg; + + if (!nc2_can_send_payload_bytes(&nc->rings.prod_ring, sizeof(msg))) + return; + msg.nr_buffers = nc->nr_tx_buffers; + nc2_send_message(&nc->rings.prod_ring, + NETCHANNEL2_MSG_SET_NR_POSTED_BUFFERS, + 0, &msg, sizeof(msg)); + nc->need_advertise_tx_buffers = 0; + nc->rings.pending_time_sensitive_messages = 1; +} + +/* Set the target number of TX buffers. */ +void nc2_set_nr_tx_buffers(struct netchannel2 *nc, unsigned nr_buffers) +{ + int changed; + + spin_lock_bh(&nc->rings.lock); + changed = (nc->configured_nr_tx_buffers != nr_buffers); + nc->configured_nr_tx_buffers = nr_buffers; + spin_unlock_bh(&nc->rings.lock); + if (changed) + prepare_tx_buffers(nc); +} + +/* The local ethX interface just came up. Set up the TX buffers. */ +static void prepare_tx_buffers(struct netchannel2 *nc) +{ + struct nc2_tx_buffer *buffers; + unsigned x; + unsigned nr_buffers; + + nr_buffers = nc->configured_nr_tx_buffers; + if (nr_buffers == 0) { + /* Trying to shut down TX in posted buffers. */ + unprepare_tx_buffers(nc); + return; + } + + buffers = kzalloc(sizeof(struct nc2_tx_buffer) * nr_buffers, + GFP_KERNEL); + if (buffers == NULL) { + printk(KERN_ERR "Cannot allocate %d tx buffer slots, posted tx disabled.\n", + nr_buffers); + return; + } + + spin_lock_bh(&nc->rings.lock); + + /* nc->tx_buffers should be NULL, because starting and + stopping the TX buffer management should alternate. */ + BUG_ON(nc->tx_buffers); + + INIT_LIST_HEAD(&nc->avail_tx_buffers); + nc->nr_avail_tx_buffers = 0; + for (x = 0; x < nr_buffers; x++) + list_add_tail(&buffers[x].list, &nc->unused_tx_buffer_slots); + nc->tx_buffers = buffers; + nc->nr_tx_buffers = nr_buffers; + nc->need_advertise_tx_buffers = 1; + spin_unlock_bh(&nc->rings.lock); +} + +/* The local ethX interface is goign down. Release the TX buffers + allocated by prepare_tx_buffers(). Note that the poll() method has + already been stopped, so messages posted by the other end will not + be processed. */ +void unprepare_tx_buffers(struct netchannel2 *nc) +{ + spin_lock_bh(&nc->rings.lock); + INIT_LIST_HEAD(&nc->pending_tx_buffer_return); + INIT_LIST_HEAD(&nc->unused_tx_buffer_slots); + INIT_LIST_HEAD(&nc->avail_tx_buffers); + nc->nr_tx_buffers = 0; + nc->nr_avail_tx_buffers = 0; + nc->need_advertise_tx_buffers = 1; + kfree(nc->tx_buffers); + nc->tx_buffers = NULL; + spin_unlock_bh(&nc->rings.lock); +} diff --git a/drivers/xen/netchannel2/recv_packet.c b/drivers/xen/netchannel2/recv_packet.c index 94aa127..4501723 100644 --- a/drivers/xen/netchannel2/recv_packet.c +++ b/drivers/xen/netchannel2/recv_packet.c @@ -121,6 +121,11 @@ void nc2_handle_packet_msg(struct netchannel2 *nc, nr_frags, frags_off); queue_finish_packet_message(ncrp, msg.id, msg.flags); break; + case NC2_PACKET_TYPE_pre_posted: + skb = handle_pre_posted_packet(nc, &msg, hdr, nr_frags, + frags_off); + /* No finish message */ + break; case NC2_PACKET_TYPE_receiver_map: if (!nc->local_trusted) { /* The remote doesn't trust us, so they diff --git a/drivers/xen/netchannel2/xmit_packet.c b/drivers/xen/netchannel2/xmit_packet.c index a24105a..1a879aa 100644 --- a/drivers/xen/netchannel2/xmit_packet.c +++ b/drivers/xen/netchannel2/xmit_packet.c @@ -4,6 +4,11 @@ #include <linux/version.h> #include "netchannel2_core.h" +/* You don't normally want to transmit in posted buffers mode, because + grant mode is usually faster, but it's sometimes useful for testing + the VMQ receiver when you don't have VMQ-capable hardware. */ +#define PREFER_POSTED_BUFFERS 0 + /* We limit the number of transmitted packets which can be in flight at any one time, as a somewhat paranoid safety catch. */ #define MAX_TX_PACKETS MAX_PENDING_FINISH_PACKETS @@ -15,6 +20,16 @@ static enum transmit_policy transmit_policy(struct netchannel2 *nc, return transmit_policy_small; else if (nc->remote_trusted) return transmit_policy_map; + else if (PREFER_POSTED_BUFFERS && + /* We approximate the number of buffers needed by + skb_shinfo(skb)->nr_frags, which isn't entirely + correct, but isn't that far off, either. Getting + it wrong just means we'll delay transmission + waiting for more buffers when we should have gone + ahead with polict grant; not ideal, but hardly a + disaster. */ + nc->nr_avail_tx_buffers > skb_shinfo(skb)->nr_frags) + return transmit_policy_post; else return transmit_policy_grant; } @@ -76,6 +91,9 @@ enum prepare_xmit_result prepare_xmit_allocate_resources(struct netchannel2 *nc, case transmit_policy_grant: r = prepare_xmit_allocate_grant(&nc->rings, skb, 1); break; + case transmit_policy_post: + r = prepare_xmit_allocate_post(nc, skb); + break; case transmit_policy_map: r = prepare_xmit_allocate_grant(&nc->rings, skb, 0); break; @@ -177,6 +195,9 @@ int nc2_really_start_xmit(struct netchannel2_ring_pair *ncrp, case transmit_policy_grant: xmit_grant(ncrp, skb, 1, msg); break; + case transmit_policy_post: + xmit_post(nc, skb, msg); + break; case transmit_policy_map: xmit_grant(ncrp, skb, 0, msg); break; diff --git a/include/xen/interface/io/netchannel2.h b/include/xen/interface/io/netchannel2.h index 075658d..554635c 100644 --- a/include/xen/interface/io/netchannel2.h +++ b/include/xen/interface/io/netchannel2.h @@ -47,6 +47,11 @@ struct netchannel2_fragment { grant_ref_t gref; } receiver_copy; struct { + /* The id of a buffer which previously posted + in a POST_BUFFER message. */ + uint32_t id; + } pre_post; + struct { grant_ref_t gref; } receiver_map; }; @@ -106,6 +111,13 @@ struct netchannel2_msg_packet { * Due to backend bugs, it is in not safe to use this * packet type except on bypass rings. * + * pre_posted -- The transmitting domain has copied the packet to + * buffers which were previously provided in POST_BUFFER + * messages. No FINISH message is required, and it is + * an error to send one. + * + * This packet type may not be used on bypass rings. + * * receiver_map -- The transmitting domain has granted the receiving * domain access to the original RX buffers using * full (mappable) grant references. This can be @@ -134,6 +146,7 @@ struct netchannel2_msg_packet { * that it is correct to treat receiver_map and small packets as * receiver_copy ones. */ #define NC2_PACKET_TYPE_receiver_copy 1 +#define NC2_PACKET_TYPE_pre_posted 2 #define NC2_PACKET_TYPE_receiver_map 3 #define NC2_PACKET_TYPE_small 4 @@ -193,6 +206,64 @@ struct netchannel2_msg_set_max_fragments_per_packet { uint32_t max_frags_per_packet; }; +/* Provide a buffer to the other end. The buffer is initially empty. + * The other end is expected to either: + * + * -- Put some packet data in it, and return it as part of a + * pre_posted PACKET message, or + * -- Not do anything with it, and return it in a RETURN_BUFFER + * message. + * + * The other end is allowed to hold on to the buffer for as long as it + * wants before returning the buffer. Buffers may be used out of + * order. + * + * This message cannot be sent unless the VM has received a + * SET_NR_POSTED_BUFFERS message. The total number of outstanding + * buffers must not exceed the limit specified in the + * SET_NR_POSTED_BUFFERS message. + * + * The grant reference should be a whole-page reference, and not a + * subpage reference, because the reeciving domain may need to map it + * in order to make the buffer available to hardware. The current + * Linux implementation doesn't do this, but a future version will. + */ +#define NETCHANNEL2_MSG_POST_BUFFER 6 +struct netchannel2_msg_post_buffer { + struct netchannel2_msg_hdr hdr; + uint32_t id; + grant_ref_t gref; + uint16_t off_in_page; + uint16_t size; +}; + +/* The other end has decided not to use the buffer for some reason + * (usually because it's shutting down). The buffer is returned + * containing no data. + */ +#define NETCHANNEL2_MSG_RETURN_POSTED_BUFFER 7 +struct netchannel2_msg_return_posted_buffer { + struct netchannel2_msg_hdr hdr; + uint32_t id; +}; + +/* The other end is allowing us to post up to @nr_buffers messages to + * us. If @nr_buffers is 0, the use of posted buffers is disabled. + * + * If there are buffers outstanding, a SET_NR_POSTED_BUFFERS message + * implicitly returns all of them, as if they had been returned with a + * run of RETURN_POSTED_BUFFER messages. This is true even if + * @nr_buffers is unchanged. + * + * @nr_buffers only ever provides an upper bound on the number of + * buffers posted; an endpoint may elect to post less than that. + */ +#define NETCHANNEL2_MSG_SET_NR_POSTED_BUFFERS 8 +struct netchannel2_msg_set_nr_posted_buffers { + struct netchannel2_msg_hdr hdr; + uint32_t nr_buffers; +}; + /* Attach to a bypass ring as a frontend. The receiving domain should * map the bypass ring (which will be in the sending domain's memory) * and attach to it in the same as it attached to the original ring. -- 1.6.3.1 _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxxxxxxxx http://lists.xensource.com/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |