[DRBD-cvs] drbd by phil; The last "fix" for the zero-copy-IO issu...

drbd-user@lists.linbit.com drbd-user@lists.linbit.com
Sat, 26 Jun 2004 10:05:46 +0200 (CEST)


DRBD CVS committal

Author  : phil
Module  : drbd

Dir     : drbd/drbd


Modified Files:
      Tag: rel-0_7-branch
	drbd_dsender.c drbd_int.h drbd_main.c drbd_receiver.c 


Log Message:
The last "fix" for the zero-copy-IO issue had an other bug. :)
It could free a page with ongoing sendpage() operation on it.
-> Therefor added a new list for pages with onpoing sendpage() 
   activity on them. And a very nice ASSERTION in drbd_put_ee(),
   which makes sure that we do not put any active page onto the
   free list.
-> Looks very good. Completely untested!

===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_dsender.c,v
retrieving revision 1.1.2.122
retrieving revision 1.1.2.123
diff -u -3 -r1.1.2.122 -r1.1.2.123
--- drbd_dsender.c	17 Jun 2004 01:44:36 -0000	1.1.2.122
+++ drbd_dsender.c	26 Jun 2004 08:05:40 -0000	1.1.2.123
@@ -618,7 +618,12 @@
 	dec_unacked(mdev,HERE);
 
 	spin_lock_irq(&mdev->ee_lock);
-	drbd_put_ee(mdev,e);
+	if( page_count(drbd_bio_get_page(&e->private_bio)) > 1 ) {
+		/* This might happen if sendpage() has not finished */
+		list_add_tail(&e->w.list,&mdev->net_ee);
+	} else {
+		drbd_put_ee(mdev,e);
+	}
 	spin_unlock_irq(&mdev->ee_lock);
 
 	if(unlikely(!ok)) ERR("drbd_send_block() failed\n");
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_int.h,v
retrieving revision 1.58.2.177
retrieving revision 1.58.2.178
diff -u -3 -r1.58.2.177 -r1.58.2.178
--- drbd_int.h	25 Jun 2004 13:19:50 -0000	1.58.2.177
+++ drbd_int.h	26 Jun 2004 08:05:40 -0000	1.58.2.178
@@ -728,6 +728,7 @@
 	struct list_head sync_ee;   // IO in progress
 	struct list_head done_ee;   // send ack
 	struct list_head read_ee;   // IO in progress
+	struct list_head net_ee;    // zero-copy network send in progress
 	spinlock_t pr_lock;
 	struct list_head app_reads;
 	struct list_head resync_reads;
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_main.c,v
retrieving revision 1.73.2.191
retrieving revision 1.73.2.192
diff -u -3 -r1.73.2.191 -r1.73.2.192
--- drbd_main.c	25 Jun 2004 13:19:50 -0000	1.73.2.191
+++ drbd_main.c	26 Jun 2004 08:05:40 -0000	1.73.2.192
@@ -1286,6 +1286,7 @@
 	INIT_LIST_HEAD(&mdev->sync_ee);
 	INIT_LIST_HEAD(&mdev->done_ee);
 	INIT_LIST_HEAD(&mdev->read_ee);
+	INIT_LIST_HEAD(&mdev->net_ee);
 	INIT_LIST_HEAD(&mdev->busy_blocks);
 	INIT_LIST_HEAD(&mdev->app_reads);
 	INIT_LIST_HEAD(&mdev->resync_reads);
@@ -1403,6 +1404,7 @@
 	D_ASSERT(list_empty(&mdev->sync_ee));
 	D_ASSERT(list_empty(&mdev->done_ee));
 	D_ASSERT(list_empty(&mdev->read_ee));
+	D_ASSERT(list_empty(&mdev->net_ee));
 	D_ASSERT(list_empty(&mdev->busy_blocks));
 	D_ASSERT(list_empty(&mdev->app_reads));
 	D_ASSERT(list_empty(&mdev->resync_reads));
@@ -1536,6 +1538,9 @@
 
 			rr = drbd_release_ee(mdev,&mdev->done_ee);
 			if(rr) ERR("%d EEs in done list found!\n",rr);
+
+			rr = drbd_release_ee(mdev,&mdev->net_ee);
+			if(rr) ERR("%d EEs in net list found!\n",rr);
 
 			ERR_IF (!list_empty(&mdev->data.work.q)) {
 				struct list_head *lp;
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_receiver.c,v
retrieving revision 1.97.2.174
retrieving revision 1.97.2.175
diff -u -3 -r1.97.2.174 -r1.97.2.175
--- drbd_receiver.c	24 Jun 2004 15:17:33 -0000	1.97.2.174
+++ drbd_receiver.c	26 Jun 2004 08:05:40 -0000	1.97.2.175
@@ -280,7 +280,6 @@
 	struct list_head *le;
 	struct Tl_epoch_entry* e;
 	DEFINE_WAIT(wait);
-	LIST_HEAD(active);
 
 	MUST_HOLD(&mdev->ee_lock);
 
@@ -289,7 +288,7 @@
 		drbd_kick_lo(mdev);
 		spin_lock_irq(&mdev->ee_lock);
 	}
- retry:
+
 	if(list_empty(&mdev->free_ee)) _drbd_process_ee(mdev,&mdev->done_ee);
 
 	if(list_empty(&mdev->free_ee)) {
@@ -311,39 +310,20 @@
 			finish_wait(&mdev->ee_wait, &wait);
 			if (signal_pending(current)) {
 				WARN("drbd_get_ee interrupted!\n");
-				list_splice(&active,mdev->free_ee.prev);
 				return 0;
 			}
 			// finish wait is inside, so that we are TASK_RUNNING 
 			// in _drbd_process_ee (which might sleep by itself.)
 			_drbd_process_ee(mdev,&mdev->done_ee);
-
-			list_for_each(le,&active) {
-				e=list_entry(le, struct Tl_epoch_entry,w.list);
-				if( page_count(drbd_bio_get_page(&e->private_bio)) == 1 ) {
-					list_move(le,&mdev->free_ee);
-					break;
-				}
-			}
 		}
 		finish_wait(&mdev->ee_wait, &wait); 
 	}
 
 	le=mdev->free_ee.next;
 	list_del(le);
-
-	e=list_entry(le, struct Tl_epoch_entry, w.list);
-	if( page_count(drbd_bio_get_page(&e->private_bio)) > 1 ) {
-		/* This might happen if the sendpage() has not finished */
-		list_add(le,&active);
-		goto retry;
-	}
-
-	list_splice(&active,mdev->free_ee.prev);
-
 	mdev->ee_vacant--;
 	mdev->ee_in_use++;
-
+	e=list_entry(le, struct Tl_epoch_entry, w.list);
 ONLY_IN_26(
 	D_ASSERT(e->private_bio.bi_idx == 0);
 	drbd_ee_init(e,e->ee_bvec.bv_page); // reinitialize
@@ -359,6 +339,8 @@
 
 	MUST_HOLD(&mdev->ee_lock);
 
+	D_ASSERT(page_count(drbd_bio_get_page(&e->private_bio)) == 1);
+
 	mdev->ee_in_use--;
 	mdev->ee_vacant++;
 	e->block_id = ID_VACANT;
@@ -380,6 +362,25 @@
 	wake_up(&mdev->ee_wait);
 }
 
+STATIC void reclaim_net_ee(drbd_dev *mdev)
+{
+	struct Tl_epoch_entry *e;
+	struct list_head *le,*tle;
+
+	/* The EEs are always appended to the end of the list, since
+	   they are sent in order over the wire, they have to finish
+	   in order. As soon as we see the first not finished we can
+	   stop to examine the list... */
+
+	list_for_each_safe(le, tle, &mdev->net_ee) {
+		e = list_entry(le, struct Tl_epoch_entry, w.list);
+		if( page_count(drbd_bio_get_page(&e->private_bio)) > 1 ) break;
+		list_del(le);
+		drbd_put_ee(mdev,e);
+	}
+}
+
+
 /* It is important that the head list is really empty when returning,
    from this function. Note, this function is called from all three
    threads (receiver, worker and asender). To ensure this I only allow
@@ -393,6 +394,8 @@
 
 	MUST_HOLD(&mdev->ee_lock);
 
+	reclaim_net_ee(mdev);
+
 	if( test_and_set_bit(PROCESS_EE_RUNNING,&mdev->flags) ) {
 		spin_unlock_irq(&mdev->ee_lock);
 		got_sig = wait_event_interruptible(mdev->ee_wait,
@@ -432,6 +435,8 @@
 	struct Tl_epoch_entry *e;
 
 	spin_lock_irq(&mdev->ee_lock);
+
+	reclaim_net_ee(mdev);
 
 	while(!list_empty(&mdev->done_ee)) {
 		le = mdev->done_ee.next;