WARNING - OLD ARCHIVES

This is an archived copy of the Xen.org mailing list, which we have preserved to ensure that existing links to archives are not broken. The live archive, which contains the latest emails, can be found at http://lists.xen.org/
   
 
 
Xen 
 
Home Products Support Community News
 
   
 

xen-changelog

[Xen-changelog] Enhanced concurrency support in blockstore.

ChangeSet 1.1372, 2005/03/25 15:39:09+00:00, jrb44@xxxxxxxxxxxxxxxxx

        Enhanced concurrency support in blockstore.
        
        Signed-off-by: James Bulpin <James.Bulpin@xxxxxxxxxxxx>



 Makefile            |   26 +++++-----
 blktaplib.c         |   15 ++++++
 blockstore.c        |  126 ++++++++++++++++++++++++++++++++++++++++++----------
 parallax-threaded.h |    3 -
 4 files changed, 132 insertions(+), 38 deletions(-)


diff -Nru a/tools/blktap/Makefile b/tools/blktap/Makefile
--- a/tools/blktap/Makefile     2005-03-25 18:03:07 -05:00
+++ b/tools/blktap/Makefile     2005-03-25 18:03:07 -05:00
@@ -58,7 +58,7 @@
 
 LIB      = libblktap.so libblktap.so.$(MAJOR) libblktap.so.$(MAJOR).$(MINOR)
 
-all: mk-symlinks blkdump blkcow blkimg blkcowimg blkgnbd blkcowgnbd 
$(VDI_TOOLS) parallax 
+all: mk-symlinks blkdump blkcow blkimg blkcowimg blkgnbd blkcowgnbd 
$(VDI_TOOLS) parallax parallax-threaded blockstored
        $(MAKE) $(LIB)
 
 LINUX_ROOT := $(wildcard $(XEN_ROOT)/linux-2.6.*-xen-sparse)
@@ -120,42 +120,42 @@
        $(CC) $(CFLAGS) -o blkaio -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lblktap 
blkaio.c blkaiolib.c -laio -lpthread
 
 parallax: $(LIB) $(PLX_SRCS)
-       $(CC) $(CFLAGS) -o parallax -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. 
-lblktap $(PLX_SRCS) libgnbd/libgnbd.a
+       $(CC) $(CFLAGS) -o parallax -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. 
-lblktap -lpthread $(PLX_SRCS) libgnbd/libgnbd.a
 
 parallax-threaded: $(LIB) $(PLXT_SRCS)
        $(CC) $(CFLAGS) -o parallax-threaded -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) 
-L. -lpthread -lblktap $(PLXT_SRCS) libgnbd/libgnbd.a
 
 vdi_test: $(LIB) $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_test -DVDI_STANDALONE $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_test -DVDI_STANDALONE -lpthread $(VDI_SRCS)
 
 vdi_list: $(LIB) vdi_list.c $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_list vdi_list.c $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_list vdi_list.c -lpthread $(VDI_SRCS)
 
 vdi_create: $(LIB) vdi_create.c $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_create vdi_create.c $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_create vdi_create.c -lpthread $(VDI_SRCS)
 
 vdi_snap: $(LIB) vdi_snap.c $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_snap vdi_snap.c $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_snap vdi_snap.c -lpthread $(VDI_SRCS)
 
 vdi_snap_list: $(LIB) vdi_snap_list.c $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_snap_list vdi_snap_list.c $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_snap_list vdi_snap_list.c -lpthread 
$(VDI_SRCS)
 
 vdi_snap_delete: $(LIB) vdi_snap_delete.c $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_snap_delete vdi_snap_delete.c $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_snap_delete vdi_snap_delete.c -lpthread 
$(VDI_SRCS)
 
 vdi_tree: $(LIB) vdi_tree.c $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_tree vdi_tree.c $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_tree vdi_tree.c -lpthread $(VDI_SRCS)
 
 vdi_fill: $(LIB) vdi_fill.c $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_fill vdi_fill.c $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_fill vdi_fill.c -lpthread $(VDI_SRCS)
 
 vdi_validate: $(LIB) vdi_validate.c $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_validate vdi_validate.c $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_validate vdi_validate.c -lpthread $(VDI_SRCS)
 
 blockstored: blockstored.c
-       $(CC) $(CFLAGS) -g3 -o blockstored blockstored.c
+       $(CC) $(CFLAGS) -g3 -o blockstored -lpthread blockstored.c
 bstest: bstest.c blockstore.c
-       $(CC) $(CFLAGS) -g3 -o bstest bstest.c blockstore.c
+       $(CC) $(CFLAGS) -g3 -o bstest bstest.c -lpthread blockstore.c
 
 .PHONY: TAGS clean install mk-symlinks rpm
 TAGS:
diff -Nru a/tools/blktap/blktaplib.c b/tools/blktap/blktaplib.c
--- a/tools/blktap/blktaplib.c  2005-03-25 18:03:07 -05:00
+++ b/tools/blktap/blktaplib.c  2005-03-25 18:03:07 -05:00
@@ -248,12 +248,21 @@
     }
 }
 
+static pthread_mutex_t push_mutex = PTHREAD_MUTEX_INITIALIZER;
+
 void blktap_inject_response(blkif_response_t *rsp)
 {
+    
     apply_rsp_hooks(rsp);
+    
     write_rsp_to_fe_ring(rsp);
+    
+    pthread_mutex_lock(&push_mutex);
+    
     RING_PUSH_RESPONSES(&fe_ring);
     ioctl(fd, BLKTAP_IOCTL_KICK_FE);
+    
+    pthread_mutex_unlock(&push_mutex);
 }
 
 /*-----[ Polling fd listeners ]------------------------------------------*/
@@ -449,7 +458,9 @@
             }
             /* Using this as a unidirectional ring. */
             ctrl_ring.req_cons = ctrl_ring.rsp_prod_pvt = i;
+pthread_mutex_lock(&push_mutex);
             RING_PUSH_RESPONSES(&ctrl_ring);
+pthread_mutex_unlock(&push_mutex);
             
             /* empty the fe_ring */
             notify_fe = 0;
@@ -517,14 +528,18 @@
 
             if (notify_be) {
                 DPRINTF("notifying be\n");
+pthread_mutex_lock(&push_mutex);
                 RING_PUSH_REQUESTS(&be_ring);
                 ioctl(fd, BLKTAP_IOCTL_KICK_BE);
+pthread_mutex_unlock(&push_mutex);
             }
 
             if (notify_fe) {
                 DPRINTF("notifying fe\n");
+pthread_mutex_lock(&push_mutex);
                 RING_PUSH_RESPONSES(&fe_ring);
                 ioctl(fd, BLKTAP_IOCTL_KICK_FE);
+pthread_mutex_unlock(&push_mutex);
             }
         }        
     }
diff -Nru a/tools/blktap/blockstore.c b/tools/blktap/blockstore.c
--- a/tools/blktap/blockstore.c 2005-03-25 18:03:07 -05:00
+++ b/tools/blktap/blockstore.c 2005-03-25 18:03:07 -05:00
@@ -13,13 +13,16 @@
 #include <string.h>
 #include <sys/types.h>
 #include <sys/stat.h>
+#include <sys/time.h>
 #include <stdarg.h>
 #include "blockstore.h"
 #include <pthread.h>
 #include "parallax-threaded.h"
 
 #define BLOCKSTORE_REMOTE
-#define BSDEBUG
+//#define BSDEBUG
+
+#define RETRY_TIMEOUT 1000000 /* microseconds */
 
 /*****************************************************************************
  * Debugging
@@ -63,6 +66,37 @@
 int bssock = 0;
 
 /*****************************************************************************
+ * Notification                                                              *
+ *****************************************************************************/
+
+typedef struct pool_thread_t_struct {
+    pthread_mutex_t ptmutex;
+    pthread_cond_t ptcv;
+    int newdata;
+} pool_thread_t;
+
+pool_thread_t pool_thread[READ_POOL_SIZE+1];
+
+#define RECV_NOTIFY(tid) { \
+    pthread_mutex_lock(&(pool_thread[tid].ptmutex)); \
+    pool_thread[tid].newdata = 1; \
+    DB("CV Waking %u", tid); \
+    pthread_cond_signal(&(pool_thread[tid].ptcv)); \
+    pthread_mutex_unlock(&(pool_thread[tid].ptmutex)); }
+#define RECV_AWAIT(tid) { \
+    pthread_mutex_lock(&(pool_thread[tid].ptmutex)); \
+    if (pool_thread[tid].newdata) { \
+        pool_thread[tid].newdata = 0; \
+        DB("CV Woken %u", tid); \
+    } \
+    else { \
+        DB("CV Waiting %u", tid); \
+        pthread_cond_wait(&(pool_thread[tid].ptcv), \
+                          &(pool_thread[tid].ptmutex)); \
+    } \
+    pthread_mutex_unlock(&(pool_thread[tid].ptmutex)); }
+
+/*****************************************************************************
  * Message queue management                                                  *
  *****************************************************************************/
 
@@ -76,23 +110,6 @@
 #define ENTER_RECV_CR pthread_mutex_lock(&ptmutex_recv)
 #define LEAVE_RECV_CR pthread_mutex_unlock(&ptmutex_recv)
 
-int notify = 0;
-pthread_mutex_t ptmutex_notify;
-pthread_cond_t ptcv_notify;
-#define RECV_NOTIFY { \
-    pthread_mutex_lock(&ptmutex_notify); \
-    notify = 1; \
-    pthread_cond_signal(&ptcv_notify); \
-    pthread_mutex_unlock(&ptmutex_notify); }
-#define RECV_AWAIT { \
-    pthread_mutex_lock(&ptmutex_notify); \
-    if (notify) \
-        notify = 0; \
-    else \
-        pthread_cond_wait(&ptcv_notify, &ptmutex_notify); \
-    pthread_mutex_unlock(&ptmutex_notify); }
-    
-
 /* A message queue entry. We allocate one of these for every request we send.
  * Asynchronous reply reception also used one of these.
  */
@@ -104,6 +121,8 @@
     int length;
     struct msghdr msghdr;
     struct iovec iov[2];
+    int tid;
+    struct timeval tv_sent;
     bshdr_t message;
     void *block;
 } bsq_t;
@@ -267,11 +286,13 @@
     qe->message.luid = new_luid();
 
     qe->status = 0;
+    qe->tid = (int)pthread_getspecific(tid_key);
     if (enqueue(qe) < 0) {
         fprintf(stderr, "Error enqueuing request.\n");
         return -1;
     }
 
+    gettimeofday(&(qe->tv_sent), NULL);
     DB("send_message to %d luid=%016llx\n", qe->server, qe->message.luid);
     rc = sendmsg(bssock, &(qe->msghdr), MSG_DONTWAIT);
     //rc = sendto(bssock, (void *)&(qe->message), qe->length, 0,
@@ -407,6 +428,7 @@
 int wait_recv(bsq_t **reqs, int numreqs) {
     bsq_t *q, *m;
     unsigned int x, i;
+    int tid = (int)pthread_getspecific(tid_key);
 
     DB("ENTER wait_recv %u\n", numreqs);
 
@@ -420,7 +442,7 @@
         return numreqs;
     }
 
-    RECV_AWAIT;
+    RECV_AWAIT(tid);
 
     /*
     rxagain:
@@ -442,6 +464,52 @@
 
 }
 
+/* retry
+ */


-------------------------------------------------------
SF email is sponsored by - The IT Product Guide
Read honest & candid reviews on hundreds of IT Products from real users.
Discover which products truly live up to the hype. Start reading now.
http://ads.osdn.com/?ad_id=6595&alloc_id=14396&op=click
_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxxxx
https://lists.sourceforge.net/lists/listinfo/xen-changelog

<Prev in Thread] Current Thread [Next in Thread>
  • [Xen-changelog] Enhanced concurrency support in blockstore., BitKeeper Bot <=