WARNING - OLD ARCHIVES

This is an archived copy of the Xen.org mailing list, which we have preserved to ensure that existing links to archives are not broken. The live archive, which contains the latest emails, can be found at http://lists.xen.org/
   
 
 
Xen 
 
Home Products Support Community News
 
   
 

xen-devel

[Xen-devel] [PATCH 15/22] Add support for receiver-map mode.

In this mode of operation, the receiving domain maps the sending
domain's buffers, rather than grant-copying them into local memory.
This is marginally faster, but requires the receiving domain to be
somewhat trusted, because:

a) It can see anything else which happens to be on the same page
   as the transmit buffer, and
b) It can just hold onto the pages indefinitely, causing a memory leak
   in the transmitting domain.

It's therefore only really suitable for talking to a trusted peer, and
we use it in that way.

Signed-off-by: Steven Smith <steven.smith@xxxxxxxxxx>
---
 drivers/xen/netchannel2/Makefile           |    3 +-
 drivers/xen/netchannel2/chan.c             |   14 +
 drivers/xen/netchannel2/netchannel2_core.h |   17 +-
 drivers/xen/netchannel2/receiver_map.c     |  786 ++++++++++++++++++++++++++++
 drivers/xen/netchannel2/recv_packet.c      |   23 +
 drivers/xen/netchannel2/rscb.c             |   46 ++-
 drivers/xen/netchannel2/util.c             |   14 +
 drivers/xen/netchannel2/xmit_packet.c      |   12 +-
 include/xen/interface/io/netchannel2.h     |   20 +
 9 files changed, 919 insertions(+), 16 deletions(-)
 create mode 100644 drivers/xen/netchannel2/receiver_map.c

diff --git a/drivers/xen/netchannel2/Makefile b/drivers/xen/netchannel2/Makefile
index 565ba89..d6fb796 100644
--- a/drivers/xen/netchannel2/Makefile
+++ b/drivers/xen/netchannel2/Makefile
@@ -1,7 +1,8 @@
 obj-$(CONFIG_XEN_NETCHANNEL2) += netchannel2.o
 
 netchannel2-objs := chan.o netchan2.o rscb.o util.o \
-       xmit_packet.o offload.o recv_packet.o poll.o
+       xmit_packet.o offload.o recv_packet.o poll.o \
+       receiver_map.o
 
 ifeq ($(CONFIG_XEN_NETDEV2_BACKEND),y)
 netchannel2-objs += netback2.o
diff --git a/drivers/xen/netchannel2/chan.c b/drivers/xen/netchannel2/chan.c
index 9bb7ce7..47e1c5e 100644
--- a/drivers/xen/netchannel2/chan.c
+++ b/drivers/xen/netchannel2/chan.c
@@ -395,6 +395,13 @@ struct netchannel2 *nc2_new(struct xenbus_device *xd)
                return NULL;
        }
 
+       if (local_trusted) {
+               if (init_receive_map_mode() < 0) {
+                       nc2_release(nc);
+                       return NULL;
+               }
+       }
+
        netdev->open = nc2_open;
        netdev->stop = nc2_stop;
        netdev->hard_start_xmit = nc2_start_xmit;
@@ -499,6 +506,8 @@ int nc2_attach_rings(struct netchannel2 *nc,
 
        spin_unlock_bh(&nc->rings.lock);
 
+       resume_receive_map_mode();
+
        netif_carrier_on(nc->net_device);
 
        /* Kick it to get it going. */
@@ -630,6 +639,11 @@ int nc2_get_evtchn_port(struct netchannel2 *nc)
        return nc->rings.evtchn;
 }
 
+void nc2_suspend(struct netchannel2 *nc)
+{
+       suspend_receive_map_mode();
+}
+
 /* @ncrp has been recently nc2_kick()ed.  Do all of the necessary
    stuff. */
 static int process_ring(struct napi_struct *napi,
diff --git a/drivers/xen/netchannel2/netchannel2_core.h 
b/drivers/xen/netchannel2/netchannel2_core.h
index 7be97ea..c4de063 100644
--- a/drivers/xen/netchannel2/netchannel2_core.h
+++ b/drivers/xen/netchannel2/netchannel2_core.h
@@ -37,6 +37,7 @@ enum transmit_policy {
        transmit_policy_unknown = 0,
        transmit_policy_first = 0xf001,
        transmit_policy_grant = transmit_policy_first,
+       transmit_policy_map,
        transmit_policy_small,
        transmit_policy_last = transmit_policy_small
 };
@@ -320,6 +321,11 @@ struct sk_buff *handle_receiver_copy_packet(struct 
netchannel2 *nc,
                                            struct netchannel2_msg_hdr *hdr,
                                            unsigned nr_frags,
                                            unsigned frags_off);
+struct sk_buff *handle_receiver_map_packet(struct netchannel2 *nc,
+                                          struct netchannel2_msg_packet *msg,
+                                          struct netchannel2_msg_hdr *hdr,
+                                          unsigned nr_frags,
+                                          unsigned frags_off);
 
 enum prepare_xmit_result {
        PREP_XMIT_OKAY = 0,
@@ -332,9 +338,11 @@ enum prepare_xmit_result prepare_xmit_allocate_small(
        struct sk_buff *skb);
 enum prepare_xmit_result prepare_xmit_allocate_grant(
        struct netchannel2_ring_pair *ncrp,
-       struct sk_buff *skb);
+       struct sk_buff *skb,
+       int use_subpage_grants);
 void xmit_grant(struct netchannel2_ring_pair *ncrp,
                struct sk_buff *skb,
+               int use_subpage_grants,
                volatile void *msg);
 
 void queue_finish_packet_message(struct netchannel2_ring_pair *ncrp,
@@ -353,6 +361,8 @@ void fetch_fragment(struct netchannel2_ring_pair *ncrp,
                    struct netchannel2_fragment *frag,
                    unsigned off);
 
+void pull_through(struct sk_buff *skb, unsigned count);
+
 void nc2_kick(struct netchannel2_ring_pair *ncrp);
 
 int nc2_map_grants(struct grant_mapping *gm,
@@ -366,6 +376,11 @@ void queue_packet_to_interface(struct sk_buff *skb,
 
 void nc2_rscb_on_gntcopy_fail(void *ctxt, gnttab_copy_t *gop);
 
+int init_receive_map_mode(void);
+void deinit_receive_map_mode(void);
+void suspend_receive_map_mode(void);
+void resume_receive_map_mode(void);
+
 int nc2_start_xmit(struct sk_buff *skb, struct net_device *dev);
 int nc2_really_start_xmit(struct netchannel2_ring_pair *ncrp,
                          struct sk_buff *skb);
diff --git a/drivers/xen/netchannel2/receiver_map.c 
b/drivers/xen/netchannel2/receiver_map.c
new file mode 100644
index 0000000..e5c4ed1
--- /dev/null
+++ b/drivers/xen/netchannel2/receiver_map.c
@@ -0,0 +1,786 @@
+/* Support for mapping packets into the local domain, rather than
+   copying them or using pre-posted buffers.  We only implement
+   receive-side support here; for transmit-side, we use the rscb.c
+   implementation. */
+#include <linux/kernel.h>
+#include <linux/delay.h>
+#include <linux/skbuff.h>
+#include <linux/netdevice.h>
+#include <xen/live_maps.h>
+#include <xen/gnttab.h>
+#include <xen/balloon.h>
+#include <xen/evtchn.h>
+#include "netchannel2_core.h"
+
+#define MAX_MAPPED_FRAGS 1024
+#define MAX_MAPPED_PACKETS MAX_PENDING_FINISH_PACKETS
+#define SKB_MIN_PAYLOAD_SIZE 128
+
+static DEFINE_SPINLOCK(global_map_lock);
+static struct receive_mapper *receive_mapper;
+
+/* How long do we leave the packets in the Linux stack before trying
+   to copy them, in jiffies? */
+#define PACKET_TIMEOUT (HZ/2)
+
+/* A slot into which we could map a fragment. */
+struct rx_map_fragment {
+       struct list_head list;
+       struct rx_map_packet *packet;
+       grant_handle_t handle; /* 0 if the fragment isn't currently
+                               * mapped */
+       struct netchannel2_fragment nc_frag;
+};
+
+struct rx_map_packet {
+       struct list_head list;
+       struct list_head frags;
+       /* We take a reference for every mapped fragment associated
+          with the packet.  When the refcnt goes to zero, the packet
+          is finished, and can be moved to the
+          finished_packets_list. */
+       atomic_t refcnt;
+       unsigned id;
+       unsigned long expires; /* We expect Linux to have finished
+                                 with the packet by this time (in
+                                 jiffies), or we try to copy it. */
+       struct netchannel2 *nc;
+       uint8_t flags;
+};
+
+struct receive_mapper {
+       struct page_foreign_tracker *tracker;
+
+       struct page **pages;
+
+       /* Nests inside the netchannel2 lock.  The
+          finished_packets_lock nests inside this. */
+       spinlock_t rm_lock;
+
+       /* Packet fragments which we've mapped, or slots into which we
+          could map packets.  The free list and count are protected
+          by @rm_lock. */
+       struct rx_map_fragment frags[MAX_MAPPED_FRAGS];
+       struct list_head free_frags;
+
+       struct rx_map_packet packets[MAX_MAPPED_PACKETS];
+       struct list_head free_packets;
+       struct list_head active_packets;
+       unsigned nr_free_packets;
+
+       /* Packets which Linux has finished with but which we haven't
+          returned to the other endpoint yet. */
+       spinlock_t finished_packets_lock; /* BH-safe leaf lock,
+                                          * acquired from the page
+                                          * free callback.  Nests
+                                          * inside the rm_lock. */
+       struct list_head finished_packets;
+
+       struct tasklet_struct gc_tasklet;
+
+       struct timer_list expire_timer;
+
+       /* Set if we're trying to run the mapper down prior to
+          suspending the domain. */
+       uint8_t suspending;
+};
+
+static void suspend_receive_mapper(struct receive_mapper *rm);
+
+static unsigned fragment_idx(const struct rx_map_fragment *frag)
+{
+       return frag - receive_mapper->frags;
+}
+
+static int alloc_rx_frags_for_packet(unsigned nr_frags,
+                                    struct rx_map_packet *packet)
+{
+       struct rx_map_fragment *rmf;
+       unsigned x;
+
+       INIT_LIST_HEAD(&packet->frags);
+       for (x = 0; x < nr_frags; x++) {
+               if (list_empty(&receive_mapper->free_frags))
+                       goto err;
+               rmf = list_entry(receive_mapper->free_frags.next,
+                                struct rx_map_fragment,
+                                list);
+               rmf->packet = packet;
+               rmf->handle = -1;
+               list_move(&rmf->list, &packet->frags);
+       }
+       return 0;
+
+err:
+       list_splice_init(&packet->frags, &receive_mapper->free_frags);
+       return -EBUSY;
+}
+
+static struct rx_map_packet *alloc_rx_packet(struct netchannel2 *nc,
+                                            unsigned nr_frags)
+{
+       struct rx_map_packet *rmp;
+
+       spin_lock(&receive_mapper->rm_lock);
+       if (list_empty(&receive_mapper->free_packets) ||
+           receive_mapper->suspending) {
+               spin_unlock(&receive_mapper->rm_lock);
+               return NULL;
+       }
+       rmp = list_entry(receive_mapper->free_packets.next,
+                        struct rx_map_packet, list);
+
+       if (alloc_rx_frags_for_packet(nr_frags, rmp) < 0) {
+               spin_unlock(&receive_mapper->rm_lock);
+               return NULL;
+       }
+       list_del(&rmp->list);
+       atomic_set(&rmp->refcnt, nr_frags);
+       rmp->nc = nc;
+       receive_mapper->nr_free_packets--;
+
+       spin_unlock(&receive_mapper->rm_lock);
+
+       return rmp;
+}
+
+struct grant_unmapper {
+       unsigned nr_gops;
+       gnttab_unmap_grant_ref_t gop_queue[32];
+};
+
+static void do_unmaps(struct grant_unmapper *unmapper)
+{
+       int ret;
+       unsigned x;
+
+       if (unmapper->nr_gops != 0) {
+               ret = HYPERVISOR_grant_table_op(GNTTABOP_unmap_grant_ref,
+                                               unmapper->gop_queue,
+                                               unmapper->nr_gops);
+               BUG_ON(ret);
+               for (x = 0; x < unmapper->nr_gops; x++) {
+                       set_phys_to_machine(
+                               __pa(unmapper->gop_queue[x].host_addr) >>
+                                       PAGE_SHIFT,
+                               INVALID_P2M_ENTRY);
+               }
+       }
+       unmapper->nr_gops = 0;
+}
+
+static void grant_unmap(struct grant_unmapper *unmapper,
+                       void *va,
+                       int handle)
+{
+       gnttab_unmap_grant_ref_t *gop;
+       if (unmapper->nr_gops == ARRAY_SIZE(unmapper->gop_queue))
+               do_unmaps(unmapper);
+       gop = &unmapper->gop_queue[unmapper->nr_gops];
+       gnttab_set_unmap_op(gop, (unsigned long)va, GNTMAP_host_map, handle);
+       unmapper->nr_gops++;
+}
+
+/* A tasklet which is invoked shortly after a packet is released so
+   that we can send the FINISH_PACKET message. */
+static void gc_tasklet(unsigned long _rm)
+{
+       struct list_head packets;
+       struct rx_map_packet *packet;
+       struct rx_map_fragment *rx_frag;
+       struct list_head released_fragments;
+       unsigned nr_released_packets;
+       unsigned idx;
+       struct grant_unmapper unmapper;
+       struct page *page;
+       struct netchannel2 *locked_nc;
+
+       INIT_LIST_HEAD(&packets);
+
+       spin_lock(&receive_mapper->finished_packets_lock);
+       list_splice_init(&receive_mapper->finished_packets, &packets);
+       spin_unlock(&receive_mapper->finished_packets_lock);
+
+       /* Unmap the fragments. */
+       unmapper.nr_gops = 0;
+       BUG_ON(packets.next == NULL);
+       list_for_each_entry(packet, &packets, list) {
+               BUG_ON(packet->list.next == NULL);
+               BUG_ON(atomic_read(&packet->refcnt) != 0);
+               BUG_ON(packet->frags.next == NULL);
+               list_for_each_entry(rx_frag, &packet->frags, list) {
+                       BUG_ON(rx_frag->list.next == NULL);
+                       if (rx_frag->handle == -1)
+                               continue;
+                       idx = fragment_idx(rx_frag);
+                       page = receive_mapper->pages[idx];
+                       stop_tracking_page(page);
+                       grant_unmap(&unmapper, page_address(page),
+                                   rx_frag->handle);
+               }
+       }
+       do_unmaps(&unmapper);
+
+       /* Tell the other end that the packets are finished, and
+          accumulate the fragments into a local free list. */
+       INIT_LIST_HEAD(&released_fragments);
+       nr_released_packets = 0;
+
+       locked_nc = NULL;
+       list_for_each_entry(packet, &packets, list) {
+               if (locked_nc != packet->nc) {
+                       if (locked_nc) {
+                               spin_unlock(&locked_nc->rings.lock);
+                               nc2_kick(&locked_nc->rings);
+                       }
+                       spin_lock(&packet->nc->rings.lock);
+                       locked_nc = packet->nc;
+               }
+               BUG_ON(packet->frags.next == NULL);
+               list_for_each_entry(rx_frag, &packet->frags, list) {
+                       BUG_ON(rx_frag->list.next == NULL);
+                       idx = fragment_idx(rx_frag);
+                       gnttab_reset_grant_page(receive_mapper->pages[idx]);
+               }
+               nr_released_packets++;
+               list_splice_init(&packet->frags, &released_fragments);
+               queue_finish_packet_message(&locked_nc->rings, packet->id,
+                                           packet->flags);
+       }
+
+       if (locked_nc) {
+               spin_unlock(&locked_nc->rings.lock);
+               nc2_kick(&locked_nc->rings);
+               locked_nc = NULL;
+
+               spin_lock(&receive_mapper->rm_lock);
+               list_splice(&packets, &receive_mapper->free_packets);
+               list_splice(&released_fragments, &receive_mapper->free_frags);
+               receive_mapper->nr_free_packets += nr_released_packets;
+
+               /* Reprogram the expire timer. */
+               if (!list_empty(&receive_mapper->active_packets)) {
+                       mod_timer(&receive_mapper->expire_timer,
+                                 
list_entry(receive_mapper->active_packets.next,
+                                            struct rx_map_packet,
+                                            list)->expires);
+               }
+               spin_unlock(&receive_mapper->rm_lock);
+       }
+}
+
+/* Decrement the refcnt on @rmp and, if necessary, move it to the
+   finished packets list and schedule the GC tasklet. */
+static void put_rx_map_packet(struct rx_map_packet *rmp)
+{
+       if (atomic_dec_and_test(&rmp->refcnt)) {
+               /* Remove it from the active list. */
+               spin_lock_bh(&receive_mapper->rm_lock);
+               list_del(&rmp->list);
+               spin_unlock_bh(&receive_mapper->rm_lock);
+
+               /* Add it to the finished list. */
+               spin_lock_bh(&receive_mapper->finished_packets_lock);
+               list_add_tail(&rmp->list, &receive_mapper->finished_packets);
+               spin_unlock_bh(&receive_mapper->finished_packets_lock);
+
+               tasklet_schedule(&receive_mapper->gc_tasklet);
+       }
+}
+
+
+/* The page @page, which was previously part of a receiver-mapped SKB,
+ * has been released.  If it was the last page involved in its SKB,
+ * the packet is finished and we can tell the other end that it's
+ * finished.
+ */
+static void netchan2_page_release(struct page *page, unsigned order)
+{
+       struct rx_map_fragment *frag;
+       struct rx_map_packet *rmp;
+
+       BUG_ON(order != 0);
+
+       frag = (struct rx_map_fragment *)page->mapping;
+       rmp = frag->packet;
+
+       put_rx_map_packet(rmp);
+}
+
+/* Unmap the packet, removing all other references to it.  The caller
+ * should take an additional reference to the packet before calling
+ * this, to stop it disappearing underneath us.         The only way of
+ * checking whether this succeeded is to look at the packet's
+ * reference count after it returns.
+ */
+static void unmap_this_packet(struct rx_map_packet *rmp)
+{
+       struct rx_map_fragment *rx_frag;
+       unsigned idx;
+       int r;
+       int cnt;
+
+       /* Unmap every fragment in the packet. We don't fail the whole
+          function just because gnttab_copy_grant_page() failed,
+          because success or failure will be inferable from the
+          reference count on the packet (this makes it easier to
+          handle the case where some pages have already been copied,
+          for instance). */
+       cnt = 0;
+       list_for_each_entry(rx_frag, &rmp->frags, list) {
+               idx = fragment_idx(rx_frag);
+               if (rx_frag->handle != -1) {
+                       r = gnttab_copy_grant_page(rx_frag->handle,
+                                                  &receive_mapper->pages[idx]);
+                       if (r == 0) {
+                               /* We copied the page, so it's not really
+                                  mapped any more. */
+                               rx_frag->handle = -1;
+                               atomic_dec(&rmp->refcnt);
+                       }
+               }
+               cnt++;
+       }
+
+       /* Caller should hold a reference. */
+       BUG_ON(atomic_read(&rmp->refcnt) == 0);
+}
+
+static void unmap_all_packets(void)
+{
+       struct rx_map_packet *rmp;
+       struct rx_map_packet *next;
+       struct list_head finished_packets;
+       int need_tasklet;
+
+       INIT_LIST_HEAD(&finished_packets);
+
+       spin_lock_bh(&receive_mapper->rm_lock);
+
+       list_for_each_entry_safe(rmp, next, &receive_mapper->active_packets,
+                                list) {
+               atomic_inc(&rmp->refcnt);
+               unmap_this_packet(rmp);
+               if (atomic_dec_and_test(&rmp->refcnt))
+                       list_move(&rmp->list, finished_packets.prev);
+       }
+       spin_unlock_bh(&receive_mapper->rm_lock);
+
+       need_tasklet = !list_empty(&finished_packets);
+
+       spin_lock_bh(&receive_mapper->finished_packets_lock);
+       list_splice(&finished_packets, receive_mapper->finished_packets.prev);
+       spin_unlock_bh(&receive_mapper->finished_packets_lock);
+
+       if (need_tasklet)
+               tasklet_schedule(&receive_mapper->gc_tasklet);
+}
+
+static void free_receive_mapper(struct receive_mapper *rm)
+{
+       unsigned x;
+
+       /* Get rid of any packets which are currently mapped. */
+       suspend_receive_mapper(rm);
+
+       /* Stop the expiry timer.  We know it won't get requeued
+        * because there are no packets outstanding and rm->suspending
+        * is set (because of suspend_receive_mapper()). */
+       del_timer_sync(&rm->expire_timer);
+
+       /* Wait for any last instances of the tasklet to finish. */
+       tasklet_kill(&rm->gc_tasklet);
+
+       if (rm->pages != NULL) {
+               for (x = 0; x < MAX_MAPPED_FRAGS; x++) {
+                       if (PageForeign(rm->pages[x]))
+                               ClearPageForeign(rm->pages[x]);
+                       rm->pages[x]->mapping = NULL;
+               }
+               free_empty_pages_and_pagevec(rm->pages, MAX_MAPPED_FRAGS);
+       }
+       if (rm->tracker != NULL)
+               free_page_foreign_tracker(rm->tracker);
+       kfree(rm);
+}
+
+/* Timer invoked shortly after a packet expires, so that we can copy
+   the data and get it back from Linux.         This is necessary if a packet
+   gets stuck in a socket RX queue somewhere, or you risk a
+   deadlock. */
+static void expire_timer(unsigned long data)
+{
+       struct rx_map_packet *rmp, *next;
+       struct list_head finished_packets;
+       int need_tasklet;
+
+       INIT_LIST_HEAD(&finished_packets);
+
+       spin_lock(&receive_mapper->rm_lock);
+       list_for_each_entry_safe(rmp, next, &receive_mapper->active_packets,
+                                list) {
+               if (time_after(rmp->expires, jiffies)) {
+                       mod_timer(&receive_mapper->expire_timer, rmp->expires);
+                       break;
+               }
+               atomic_inc(&rmp->refcnt);
+               unmap_this_packet(rmp);
+               if (atomic_dec_and_test(&rmp->refcnt)) {
+                       list_move(&rmp->list, finished_packets.prev);
+               } else {
+                       /* Couldn't unmap the packet, either because
+                          it's in use by real hardware or we've run
+                          out of memory.  Send the packet to the end
+                          of the queue and update the expiry time so
+                          that we try again later. */
+                       /* Note that this can make the active packet
+                          list slightly out of order.  Oh well; it
+                          won't be by more than a few jiffies, and it
+                          doesn't really matter that much. */
+                       rmp->expires = jiffies + PACKET_TIMEOUT;
+                       list_move(&rmp->list,
+                                 receive_mapper->active_packets.prev);
+               }
+       }
+       spin_unlock(&receive_mapper->rm_lock);
+
+       need_tasklet = !list_empty(&finished_packets);
+
+       spin_lock(&receive_mapper->finished_packets_lock);
+       list_splice(&finished_packets, receive_mapper->finished_packets.prev);
+       spin_unlock(&receive_mapper->finished_packets_lock);
+
+       if (need_tasklet)
+               tasklet_schedule(&receive_mapper->gc_tasklet);
+}
+
+static struct receive_mapper *new_receive_mapper(void)
+{
+       struct receive_mapper *rm;
+       unsigned x;
+
+       rm = kzalloc(sizeof(*rm), GFP_KERNEL);
+       if (!rm)
+               goto err;
+       INIT_LIST_HEAD(&rm->free_frags);
+       INIT_LIST_HEAD(&rm->free_packets);
+       INIT_LIST_HEAD(&rm->active_packets);
+       INIT_LIST_HEAD(&rm->finished_packets);
+       spin_lock_init(&rm->rm_lock);
+       spin_lock_init(&rm->finished_packets_lock);
+       for (x = 0; x < MAX_MAPPED_FRAGS; x++)
+               list_add_tail(&rm->frags[x].list, &rm->free_frags);
+       for (x = 0; x < MAX_MAPPED_PACKETS; x++)
+               list_add_tail(&rm->packets[x].list, &rm->free_packets);
+       rm->nr_free_packets = MAX_MAPPED_PACKETS;
+
+       setup_timer(&rm->expire_timer, expire_timer, 0);
+       tasklet_init(&rm->gc_tasklet, gc_tasklet, 0);
+
+       rm->tracker = alloc_page_foreign_tracker(MAX_MAPPED_FRAGS);
+       if (!rm->tracker)
+               goto err;
+       rm->pages = alloc_empty_pages_and_pagevec(MAX_MAPPED_FRAGS);
+       if (!rm->pages)
+               goto err;
+       for (x = 0; x < MAX_MAPPED_FRAGS; x++) {
+               SetPageForeign(rm->pages[x], netchan2_page_release);
+               rm->pages[x]->mapping = (void *)&rm->frags[x];
+       }
+
+       return rm;
+
+err:
+       if (rm != NULL)
+               free_receive_mapper(rm);
+       return NULL;
+}
+
+static void attach_frag_to_skb(struct sk_buff *skb,
+                              struct rx_map_fragment *frag)
+{
+       unsigned idx;
+       struct skb_shared_info *shinfo;
+       skb_frag_t *sk_frag;
+
+       shinfo = skb_shinfo(skb);
+       sk_frag = &shinfo->frags[shinfo->nr_frags];
+       idx = fragment_idx(frag);
+       sk_frag->page = receive_mapper->pages[idx];
+       sk_frag->page_offset = frag->nc_frag.off;
+       sk_frag->size = frag->nc_frag.size;
+       shinfo->nr_frags++;
+}
+
+struct rx_plan {
+       int is_failed;
+       unsigned nr_mops;
+       gnttab_map_grant_ref_t mops[8];
+       struct rx_map_fragment *frags[8];
+};
+
+static void flush_grant_operations(struct rx_plan *rp)
+{
+       unsigned x;
+       int ret;
+       gnttab_map_grant_ref_t *mop;
+
+       if (rp->nr_mops == 0)
+               return;
+       if (!rp->is_failed) {
+               ret = HYPERVISOR_grant_table_op(GNTTABOP_map_grant_ref,
+                                               rp->mops,
+                                               rp->nr_mops);
+               BUG_ON(ret);
+               for (x = 0; x < rp->nr_mops; x++) {
+                       mop = &rp->mops[x];
+                       if (mop->status != 0) {
+                               rp->is_failed = 1;
+                       } else {
+                               rp->frags[x]->handle = mop->handle;
+                               set_phys_to_machine(
+                                       __pa(mop->host_addr) >> PAGE_SHIFT,
+                                       FOREIGN_FRAME(mop->dev_bus_addr >>
+                                                     PAGE_SHIFT));
+                       }
+               }
+       }
+       rp->nr_mops = 0;
+}
+
+static void map_fragment(struct rx_plan *rp,
+                        struct rx_map_fragment *rx_frag,
+                        struct netchannel2 *nc)
+{
+       unsigned idx = fragment_idx(rx_frag);
+       gnttab_map_grant_ref_t *mop;
+
+       if (rp->nr_mops == ARRAY_SIZE(rp->mops))
+               flush_grant_operations(rp);
+       mop = &rp->mops[rp->nr_mops];
+       gnttab_set_map_op(mop,
+                         (unsigned 
long)page_address(receive_mapper->pages[idx]),
+                         GNTMAP_host_map | GNTMAP_readonly,
+                         rx_frag->nc_frag.receiver_map.gref,
+                         nc->rings.otherend_id);
+       rp->frags[rp->nr_mops] = rx_frag;
+       rp->nr_mops++;
+}
+
+/* Unmap a packet which has been half-mapped. */
+static void unmap_partial_packet(struct rx_map_packet *rmp)
+{
+       unsigned idx;
+       struct rx_map_fragment *rx_frag;
+       struct grant_unmapper unmapper;
+
+       unmapper.nr_gops = 0;
+       list_for_each_entry(rx_frag, &rmp->frags, list) {
+               if (rx_frag->handle == -1)
+                       continue;
+               idx = fragment_idx(rx_frag);
+               grant_unmap(&unmapper,
+                           page_address(receive_mapper->pages[idx]),
+                           rx_frag->handle);
+       }
+       do_unmaps(&unmapper);
+}
+
+struct sk_buff *handle_receiver_map_packet(struct netchannel2 *nc,
+                                          struct netchannel2_msg_packet *msg,
+                                          struct netchannel2_msg_hdr *hdr,
+                                          unsigned nr_frags,
+                                          unsigned frags_off)
+{
+       struct sk_buff *skb;
+       struct rx_map_fragment *rx_frag;
+       unsigned x;
+       unsigned len;
+       struct rx_map_packet *rmp;
+       unsigned idx;
+       struct rx_plan plan;
+       unsigned prefix_size;
+
+       memset(&plan, 0, sizeof(plan));
+
+       rmp = alloc_rx_packet(nc, nr_frags);
+       if (rmp == NULL)
+               return NULL;
+
+       if (msg->prefix_size < SKB_MIN_PAYLOAD_SIZE)
+               prefix_size = SKB_MIN_PAYLOAD_SIZE;
+       else
+               prefix_size = msg->prefix_size;
+       /* As in posted_buffers.c, we don't limit the total size of
+          the packet, because we don't need to allocate more memory
+          for very large packets.  The prefix is safe because it's
+          only a 16 bit number.  A 64k allocation won't always
+          succeed, but it's unlikely to trigger the OOM killer or
+          otherwise interfere with the normal operation of the local
+          domain. */
+       skb = dev_alloc_skb(prefix_size + NET_IP_ALIGN);
+       if (skb == NULL) {
+               spin_lock(&receive_mapper->rm_lock);
+               list_splice(&rmp->frags, &receive_mapper->free_frags);
+               list_add(&rmp->list, &receive_mapper->free_packets);
+               receive_mapper->nr_free_packets++;
+               spin_unlock(&receive_mapper->rm_lock);
+               return NULL;
+       }
+       skb_reserve(skb, NET_IP_ALIGN);
+
+       rmp->id = msg->id;
+       rmp->flags = msg->flags;
+
+       rx_frag = list_entry(rmp->frags.next, struct rx_map_fragment, list);
+       for (x = 0; x < nr_frags; x++) {
+               fetch_fragment(&nc->rings, x, &rx_frag->nc_frag, frags_off);
+               if (rx_frag->nc_frag.size > PAGE_SIZE ||
+                   rx_frag->nc_frag.off >= PAGE_SIZE ||
+                   rx_frag->nc_frag.size + rx_frag->nc_frag.off > PAGE_SIZE) {
+                       plan.is_failed = 1;
+                       break;
+               }
+               map_fragment(&plan, rx_frag, nc);
+               rx_frag = list_entry(rx_frag->list.next,
+                                    struct rx_map_fragment,
+                                    list);
+       }
+
+       flush_grant_operations(&plan);
+       if (plan.is_failed)
+               goto fail_and_unmap;
+
+       /* Grab the prefix off of the ring. */
+       nc2_copy_from_ring_off(&nc->rings.cons_ring,
+                              skb_put(skb, msg->prefix_size),
+                              msg->prefix_size,
+                              frags_off +
+                              nr_frags * sizeof(struct netchannel2_fragment));
+
+       /* All fragments mapped, so we know that this is going to
+          work.  Transfer the receive slots into the SKB. */
+       len = 0;
+       list_for_each_entry(rx_frag, &rmp->frags, list) {
+               attach_frag_to_skb(skb, rx_frag);
+               idx = fragment_idx(rx_frag);
+               start_tracking_page(receive_mapper->tracker,
+                                   receive_mapper->pages[idx],
+                                   nc->rings.otherend_id,
+                                   rx_frag->nc_frag.receiver_map.gref,
+                                   idx,
+                                   nc);
+               len += rx_frag->nc_frag.size;
+       }
+
+       skb->len += len;
+       skb->data_len += len;
+       skb->truesize += len;
+
+       spin_lock(&receive_mapper->rm_lock);
+       list_add_tail(&rmp->list, &receive_mapper->active_packets);
+       rmp->expires = jiffies + PACKET_TIMEOUT;
+       if (rmp == list_entry(receive_mapper->active_packets.next,
+                             struct rx_map_packet,
+                             list))
+               mod_timer(&receive_mapper->expire_timer, rmp->expires);
+       spin_unlock(&receive_mapper->rm_lock);
+
+       if (skb_headlen(skb) < SKB_MIN_PAYLOAD_SIZE)
+               pull_through(skb,
+                            SKB_MIN_PAYLOAD_SIZE - skb_headlen(skb));
+
+       return skb;
+
+fail_and_unmap:
+       pr_debug("Failed to map received packet!\n");
+       unmap_partial_packet(rmp);
+
+       spin_lock(&receive_mapper->rm_lock);
+       list_splice(&rmp->frags, &receive_mapper->free_frags);
+       list_add_tail(&rmp->list, &receive_mapper->free_packets);
+       receive_mapper->nr_free_packets++;
+       spin_unlock(&receive_mapper->rm_lock);
+
+       kfree_skb(skb);
+       return NULL;
+}
+
+static void suspend_receive_mapper(struct receive_mapper *rm)
+{
+       spin_lock_bh(&rm->rm_lock);
+       /* Stop any more packets coming in. */
+       rm->suspending = 1;
+
+       /* Wait for Linux to give back all of the SKBs which we've
+          given it. */
+       while (rm->nr_free_packets != MAX_MAPPED_PACKETS) {
+               spin_unlock_bh(&rm->rm_lock);
+               unmap_all_packets();
+               msleep(100);
+               spin_lock_bh(&rm->rm_lock);
+       }
+       spin_unlock_bh(&rm->rm_lock);
+}
+
+static void resume_receive_mapper(void)
+{
+       spin_lock_bh(&receive_mapper->rm_lock);
+       receive_mapper->suspending = 0;
+       spin_unlock_bh(&receive_mapper->rm_lock);
+}
+
+
+int init_receive_map_mode(void)
+{
+       struct receive_mapper *new_rm;
+       spin_lock(&global_map_lock);
+       while (receive_mapper == NULL) {
+               spin_unlock(&global_map_lock);
+               new_rm = new_receive_mapper();
+               if (new_rm == NULL)
+                       return -ENOMEM;
+               spin_lock(&global_map_lock);
+               if (receive_mapper == NULL) {
+                       receive_mapper = new_rm;
+               } else {
+                       spin_unlock(&global_map_lock);
+                       free_receive_mapper(new_rm);
+                       spin_lock(&global_map_lock);
+               }
+       }
+       spin_unlock(&global_map_lock);
+       return 0;
+}
+
+void deinit_receive_map_mode(void)
+{
+       if (!receive_mapper)
+               return;
+       BUG_ON(spin_is_locked(&global_map_lock));
+       free_receive_mapper(receive_mapper);
+       receive_mapper = NULL;
+}
+
+void suspend_receive_map_mode(void)
+{
+       if (!receive_mapper)
+               return;
+       suspend_receive_mapper(receive_mapper);
+}
+
+void resume_receive_map_mode(void)
+{
+       if (!receive_mapper)
+               return;
+       resume_receive_mapper();
+}
+
+struct netchannel2 *nc2_get_interface_for_page(struct page *p)
+{
+       BUG_ON(!page_is_tracked(p));
+       if (!receive_mapper ||
+           tracker_for_page(p) != receive_mapper->tracker)
+               return NULL;
+       return get_page_tracker_ctxt(p);
+}
diff --git a/drivers/xen/netchannel2/recv_packet.c 
b/drivers/xen/netchannel2/recv_packet.c
index 80c5d5d..8c38788 100644
--- a/drivers/xen/netchannel2/recv_packet.c
+++ b/drivers/xen/netchannel2/recv_packet.c
@@ -112,6 +112,28 @@ void nc2_handle_packet_msg(struct netchannel2 *nc,
                                                  nr_frags, frags_off);
                queue_finish_packet_message(ncrp, msg.id, msg.flags);
                break;
+       case NC2_PACKET_TYPE_receiver_map:
+               if (!nc->local_trusted) {
+                       /* The remote doesn't trust us, so they
+                          shouldn't be sending us receiver-map
+                          packets.  Just treat it as an RSCB
+                          packet. */
+                       skb = NULL;
+               } else {
+                       skb = handle_receiver_map_packet(nc, &msg, hdr,
+                                                        nr_frags,
+                                                        frags_off);
+                       /* Finish message will be sent when we unmap
+                        * the packet. */
+               }
+               if (skb == NULL) {
+                       /* We can't currently map this skb.  Use a
+                          receiver copy instead. */
+                       skb = handle_receiver_copy_packet(nc, ncrp, &msg, hdr,
+                                                         nr_frags, frags_off);
+                       queue_finish_packet_message(ncrp, msg.id, msg.flags);
+               }
+               break;
        default:
                pr_debug("Unknown packet type %d\n", msg.type);
                nc->stats.rx_errors++;
@@ -285,4 +307,5 @@ int __init nc2_init(void)
 
 void __exit nc2_exit(void)
 {
+       deinit_receive_map_mode();
 }
diff --git a/drivers/xen/netchannel2/rscb.c b/drivers/xen/netchannel2/rscb.c
index 8ad5454..cdcb116 100644
--- a/drivers/xen/netchannel2/rscb.c
+++ b/drivers/xen/netchannel2/rscb.c
@@ -209,6 +209,7 @@ struct sk_buff *handle_receiver_copy_packet(struct 
netchannel2 *nc,
 struct grant_packet_plan {
        volatile struct netchannel2_fragment *out_fragment;
        grant_ref_t gref_pool;
+       int use_subpage_grants;
        unsigned prefix_avail;
 };
 
@@ -223,14 +224,15 @@ static inline int nfrags_skb(struct sk_buff *skb, int 
prefix_size)
        start_grant = ((unsigned long)skb->data + prefix_size) &
                ~(PAGE_SIZE-1);
        end_grant = ((unsigned long)skb->data +
-                    skb_headlen(skb) +  PAGE_SIZE - 1) &
+                    skb_headlen(skb) + PAGE_SIZE - 1) &
                ~(PAGE_SIZE-1);
        return ((end_grant - start_grant) >> PAGE_SHIFT)
                + skb_shinfo(skb)->nr_frags;
 }
 
 enum prepare_xmit_result prepare_xmit_allocate_grant(struct 
netchannel2_ring_pair *ncrp,
-                                                    struct sk_buff *skb)
+                                                    struct sk_buff *skb,
+                                                    int use_subpage_grants)
 {
        struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
        unsigned nr_fragments;
@@ -241,13 +243,23 @@ enum prepare_xmit_result 
prepare_xmit_allocate_grant(struct netchannel2_ring_pai
        if (allocate_txp_slot(ncrp, skb) < 0)
                return PREP_XMIT_BUSY;
 
-       /* We're going to have to get the remote to issue a grant copy
-          hypercall anyway, so there's no real benefit to shoving the
-          headers inline. */
-       /* (very small packets won't go through here, so there's no
-          chance that we could completely eliminate the grant
-          copy.) */
-       inline_prefix_size = sizeof(struct ethhdr);
+       if (use_subpage_grants) {
+               /* We're going to have to get the remote to issue a
+                  grant copy hypercall anyway, so there's no real
+                  benefit to shoving the headers inline. */
+               /* (very small packets won't go through here, so
+                  there's no chance that we could completely
+                  eliminate the grant copy.) */
+               inline_prefix_size = sizeof(struct ethhdr);
+       } else {
+               /* If we're going off-box (and we probably are, if the
+                  remote is trusted), putting the header in the ring
+                  potentially saves a TLB miss in the bridge, which
+                  is worth doing. */
+               inline_prefix_size = PACKET_PREFIX_SIZE;
+               if (skb_headlen(skb) < inline_prefix_size)
+                       inline_prefix_size = skb_headlen(skb);
+       }
 
        if (skb_co->nr_fragments == 0) {
                nr_fragments = nfrags_skb(skb, inline_prefix_size);
@@ -277,10 +289,14 @@ enum prepare_xmit_result 
prepare_xmit_allocate_grant(struct netchannel2_ring_pai
                   have to recompute it next time around. */
                return PREP_XMIT_BUSY;
        }
+
        skb_co->gref_pool = gref_pool;
        skb_co->inline_prefix_size = inline_prefix_size;
 
-       skb_co->type = NC2_PACKET_TYPE_receiver_copy;
+       if (use_subpage_grants)
+               skb_co->type = NC2_PACKET_TYPE_receiver_copy;
+       else
+               skb_co->type = NC2_PACKET_TYPE_receiver_map;
 
        return PREP_XMIT_OKAY;
 }
@@ -318,15 +334,19 @@ static void prepare_subpage_grant(struct 
netchannel2_ring_pair *ncrp,
                                                      GTF_readonly,
                                                      trans_domid,
                                                      trans_gref);
-       } else {
+       } else if (plan->use_subpage_grants) {
                gnttab_grant_foreign_access_ref_subpage(gref,
                                                        ncrp->otherend_id,
                                                        
virt_to_mfn(page_address(page)),
                                                        GTF_readonly,
                                                        off_in_page,
                                                        size);
+       } else {
+               gnttab_grant_foreign_access_ref(gref,
+                                               ncrp->otherend_id,
+                                               virt_to_mfn(page_address(page)),
+                                               GTF_readonly);
        }
-
        frag->off = off_in_page;
        frag->size = size;
        plan->out_fragment++;
@@ -356,6 +376,7 @@ static int grant_data_area(struct netchannel2_ring_pair 
*ncrp,
 
 void xmit_grant(struct netchannel2_ring_pair *ncrp,
                struct sk_buff *skb,
+               int use_subpage_grants,
                volatile void *msg_buf)
 {
        volatile struct netchannel2_msg_packet *msg = msg_buf;
@@ -366,6 +387,7 @@ void xmit_grant(struct netchannel2_ring_pair *ncrp,
        skb_frag_t *frag;
 
        memset(&plan, 0, sizeof(plan));
+       plan.use_subpage_grants = use_subpage_grants;
        plan.prefix_avail = skb_co->inline_prefix_size;
        plan.out_fragment = msg->frags;
        plan.gref_pool = skb_co->gref_pool;
diff --git a/drivers/xen/netchannel2/util.c b/drivers/xen/netchannel2/util.c
index 302dfc1..79d9f09 100644
--- a/drivers/xen/netchannel2/util.c
+++ b/drivers/xen/netchannel2/util.c
@@ -94,6 +94,20 @@ void release_tx_packet(struct netchannel2_ring_pair *ncrp,
                        }
                        gnttab_release_grant_reference(&ncrp->gref_pool, gref);
                }
+       } else if (skb_co->type == NC2_PACKET_TYPE_receiver_map) {
+               while (1) {
+                       r = gnttab_claim_grant_reference(&skb_co->gref_pool);
+                       if (r == -ENOSPC)
+                               break;
+                       gref = (grant_ref_t)r;
+                       r = gnttab_end_foreign_access_ref(gref);
+                       if (r == 0) {
+                               printk(KERN_WARNING "Failed to end remote 
access to packet memory.\n");
+                       } else {
+                               gnttab_release_grant_reference(&ncrp->gref_pool,
+                                                              gref);
+                       }
+               }
        } else if (skb_co->gref_pool != 0) {
                gnttab_subfree_grant_references(skb_co->gref_pool,
                                                &ncrp->gref_pool);
diff --git a/drivers/xen/netchannel2/xmit_packet.c 
b/drivers/xen/netchannel2/xmit_packet.c
index 7eb845d..d95ad09 100644
--- a/drivers/xen/netchannel2/xmit_packet.c
+++ b/drivers/xen/netchannel2/xmit_packet.c
@@ -13,6 +13,8 @@ static enum transmit_policy transmit_policy(struct 
netchannel2 *nc,
 {
        if (skb->len <= PACKET_PREFIX_SIZE && !skb_is_nonlinear(skb))
                return transmit_policy_small;
+       else if (nc->remote_trusted)
+               return transmit_policy_map;
        else
                return transmit_policy_grant;
 }
@@ -72,7 +74,10 @@ enum prepare_xmit_result 
prepare_xmit_allocate_resources(struct netchannel2 *nc,
                        r = prepare_xmit_allocate_small(&nc->rings, skb);
                        break;
                case transmit_policy_grant:
-                       r = prepare_xmit_allocate_grant(&nc->rings, skb);
+                       r = prepare_xmit_allocate_grant(&nc->rings, skb, 1);
+                       break;
+               case transmit_policy_map:
+                       r = prepare_xmit_allocate_grant(&nc->rings, skb, 0);
                        break;
                default:
                        BUG();
@@ -170,7 +175,10 @@ int nc2_really_start_xmit(struct netchannel2_ring_pair 
*ncrp,
                /* Nothing to do */
                break;
        case transmit_policy_grant:
-               xmit_grant(ncrp, skb, msg);
+               xmit_grant(ncrp, skb, 1, msg);
+               break;
+       case transmit_policy_map:
+               xmit_grant(ncrp, skb, 0, msg);
                break;
        default:
                BUG();
diff --git a/include/xen/interface/io/netchannel2.h 
b/include/xen/interface/io/netchannel2.h
index 1cca607..f264995 100644
--- a/include/xen/interface/io/netchannel2.h
+++ b/include/xen/interface/io/netchannel2.h
@@ -46,6 +46,9 @@ struct netchannel2_fragment {
                struct {
                        grant_ref_t gref;
                } receiver_copy;
+               struct {
+                       grant_ref_t gref;
+               } receiver_map;
        };
 };
 struct netchannel2_msg_packet {
@@ -98,6 +101,22 @@ struct netchannel2_msg_packet {
  *                 Due to backend bugs, it is in not safe to use this
  *                 packet type except on bypass rings.
  *
+ * receiver_map -- The transmitting domain has granted the receiving
+ *                 domain access to the original RX buffers using
+ *                 full (mappable) grant references.  This can be
+ *                 treated the same way as receiver_copy, but the
+ *                 receiving domain also has the option of mapping
+ *                 the fragments, rather than copying them.  If it
+ *                 decides to do so, it should ensure that the fragments
+ *                 will be unmapped in a reasonably timely fashion,
+ *                 and don't e.g. become stuck in a receive buffer
+ *                 somewhere.  In general, anything longer than about
+ *                 a second is likely to cause problems.  Once all
+ *                 grant references have been unmapper, the receiving
+ *                 domain should send a FINISH message.
+ *
+ *                 This packet type may not be used on bypass rings.
+ *
  * small -- The packet does not have any fragment descriptors
  *         (i.e. the entire thing is inline in the ring).  The receiving
  *         domain should simply the copy the packet out of the ring
@@ -110,6 +129,7 @@ struct netchannel2_msg_packet {
  * that it is correct to treat receiver_map and small packets as
  * receiver_copy ones. */
 #define NC2_PACKET_TYPE_receiver_copy 1
+#define NC2_PACKET_TYPE_receiver_map 3
 #define NC2_PACKET_TYPE_small 4
 
 #define NC2_PACKET_SEGMENTATION_TYPE_none  0
-- 
1.6.3.1


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-devel