WARNING - OLD ARCHIVES

This is an archived copy of the Xen.org mailing list, which we have preserved to ensure that existing links to archives are not broken. The live archive, which contains the latest emails, can be found at http://lists.xen.org/
   
 
 
Xen 
 
Home Products Support Community News
 
   
 

xen-changelog

[Xen-changelog] Added asynchronous support to the blockstore.

ChangeSet 1.1336.1.1, 2005/03/22 15:50:43+00:00, jrb44@xxxxxxxxxxxxxxxxxx

        Added asynchronous support to the blockstore.



 blockstore.c |  531 ++++++++++++++++++++++++++++++++++++++++++++++++++---------
 blockstore.h |    4 
 2 files changed, 455 insertions(+), 80 deletions(-)


diff -Nru a/tools/blktap/blockstore.c b/tools/blktap/blockstore.c
--- a/tools/blktap/blockstore.c 2005-03-22 15:03:49 -05:00
+++ b/tools/blktap/blockstore.c 2005-03-22 15:03:49 -05:00
@@ -13,31 +13,73 @@
 #include <string.h>
 #include <sys/types.h>
 #include <sys/stat.h>
+#include <stdarg.h>
 #include "blockstore.h"
 
 #define BLOCKSTORE_REMOTE
+//#define BSDEBUG
 
-#ifdef BLOCKSTORE_REMOTE
+/*****************************************************************************
+ * Debugging
+ */
+#ifdef BSDEBUG
+void DB(char *format, ...)
+{
+    va_list args;
+    
+    va_start(args, format);
+    vfprintf(stderr, format, args);
+    va_end(args);
+}
+#else
+#define DB(format, ...) (void)0
+#endif
 
-//#define BSDEBUG
+#ifdef BLOCKSTORE_REMOTE
 
 #include <sys/socket.h>
 #include <sys/ioctl.h>
 #include <netinet/in.h>
 #include <netdb.h>
 
-#define ENTER_QUEUE_CR (void)0
-#define LEAVE_QUEUE_CR (void)0
+/*****************************************************************************
+ *                                                                           *
+ *****************************************************************************/
+
+/*****************************************************************************
+ * Network state                                                             *
+ *****************************************************************************/
 
+/* The individual disk servers we talks to. These will be referenced by
+ * an integer index into bsservers[].
+ */
 bsserver_t bsservers[MAX_SERVERS];
+
+/* The cluster map. This is indexed by an integer cluster number.
+ */
 bscluster_t bsclusters[MAX_CLUSTERS];
 
+/* Local socket.
+ */
 struct sockaddr_in sin_local;
 int bssock = 0;
 
+/*****************************************************************************
+ * Message queue management                                                  *
+ *****************************************************************************/
+
+/* Protects the queue manipulation critcal regions.
+ */
+#define ENTER_QUEUE_CR (void)0
+#define LEAVE_QUEUE_CR (void)0
+
+/* A message queue entry. We allocate one of these for every request we send.
+ * Asynchronous reply reception also used one of these.
+ */
 typedef struct bsq_t_struct {
     struct bsq_t_struct *prev;
     struct bsq_t_struct *next;
+    int status;
     int server;
     int length;
     struct msghdr msghdr;
@@ -46,8 +88,134 @@
     void *block;
 } bsq_t;
 
+#define BSQ_STATUS_MATCHED 1
+
+#define ENTER_LUID_CR (void)0
+#define LEAVE_LUID_CR (void)0
+
+static u64 luid_cnt = 0x1000ULL;
+u64 new_luid(void) {
+    u64 luid;
+    ENTER_LUID_CR;
+    luid = luid_cnt++;
+    LEAVE_LUID_CR;
+    return luid;
+}
+
+/* Queue of outstanding requests.
+ */
 bsq_t *bs_head = NULL;
 bsq_t *bs_tail = NULL;
+int bs_qlen = 0;
+
+/*
+ */
+void queuedebug(char *msg) {
+    bsq_t *q;
+    ENTER_QUEUE_CR;
+    fprintf(stderr, "Q: %s len=%u\n", msg, bs_qlen);
+    for (q = bs_head; q; q = q->next) {
+        fprintf(stderr, "  luid=%016llx server=%u\n",
+                q->message.luid, q->server);
+    }
+    LEAVE_QUEUE_CR;
+}
+
+int enqueue(bsq_t *qe) {
+    ENTER_QUEUE_CR;
+    qe->next = NULL;
+    qe->prev = bs_tail;
+    if (!bs_head)
+        bs_head = qe;
+    else
+        bs_tail->next = qe;
+    bs_tail = qe;
+    bs_qlen++;
+    LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+    queuedebug("enqueue");
+#endif
+    return 0;
+}
+
+int dequeue(bsq_t *qe) {
+    bsq_t *q;
+    ENTER_QUEUE_CR;
+    for (q = bs_head; q; q = q->next) {
+        if (q == qe) {
+            if (q->prev)
+                q->prev->next = q->next;
+            else 
+                bs_head = q->next;
+            if (q->next)
+                q->next->prev = q->prev;
+            else
+                bs_tail = q->prev;
+            bs_qlen--;
+            goto found;
+        }
+    }
+
+    LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+    queuedebug("dequeue not found");
+#endif
+    return 0;
+
+    found:
+    LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+    queuedebug("dequeue not found");
+#endif
+    return 1;
+}
+
+bsq_t *queuesearch(bsq_t *qe) {
+    bsq_t *q;
+    ENTER_QUEUE_CR;
+    for (q = bs_head; q; q = q->next) {
+        if ((qe->server == q->server) &&
+            (qe->message.operation == q->message.operation) &&
+            (qe->message.luid == q->message.luid)) {
+
+            if ((q->message.operation == BSOP_READBLOCK) &&
+                ((q->message.flags & BSOP_FLAG_ERROR) == 0)) {
+                q->block = qe->block;
+                qe->block = NULL;
+            }
+            q->length = qe->length;
+            q->message.flags = qe->message.flags;
+            q->message.id = qe->message.id;
+            q->status |= BSQ_STATUS_MATCHED;
+
+            if (q->prev)
+                q->prev->next = q->next;
+            else 
+                bs_head = q->next;
+            if (q->next)
+                q->next->prev = q->prev;
+            else
+                bs_tail = q->prev;
+            q->next = NULL;
+            q->prev = NULL;
+            bs_qlen--;
+            goto found;
+        }
+    }
+
+    LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+    queuedebug("queuesearch not found");
+#endif
+    return NULL;
+
+    found:
+    LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+    queuedebug("queuesearch found");
+#endif
+    return q;
+}
 
 int send_message(bsq_t *qe) {
     int rc;
@@ -71,16 +239,21 @@
         qe->iov[1].iov_len = BLOCK_SIZE;
     }
 
-    rc = sendmsg(bssock, &(qe->msghdr), 0);
+    qe->message.luid = new_luid();
+
+    qe->status = 0;
+    if (enqueue(qe) < 0) {
+        fprintf(stderr, "Error enqueuing request.\n");
+        return -1;
+    }
+
+    DB("send_message to %d luid=%016llx\n", qe->server, qe->message.luid);
+    rc = sendmsg(bssock, &(qe->msghdr), MSG_DONTWAIT);
     //rc = sendto(bssock, (void *)&(qe->message), qe->length, 0,
     //           (struct sockaddr *)&(bsservers[qe->server].sin),
     //           sizeof(struct sockaddr_in));
     if (rc < 0)
         return rc;
-    
-    ENTER_QUEUE_CR;
-    
-    LEAVE_QUEUE_CR;
 
     return rc;
 }
@@ -115,22 +288,148 @@
     return rc;
 }
 
+int get_server_number(struct sockaddr_in *sin) {
+    int i;
+


-------------------------------------------------------
This SF.net email is sponsored by: 2005 Windows Mobile Application Contest
Submit applications for Windows Mobile(tm)-based Pocket PCs or Smartphones
for the chance to win $25,000 and application distribution. Enter today at
http://ads.osdn.com/?ad_id=6882&alloc_id=15148&op=click
_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxxxx
https://lists.sourceforge.net/lists/listinfo/xen-changelog

<Prev in Thread] Current Thread [Next in Thread>
  • [Xen-changelog] Added asynchronous support to the blockstore., BitKeeper Bot <=