[DRBD-cvs] drbd by phil; Reenabled zero copy send via network. Pr...

drbd-user@lists.linbit.com drbd-user@lists.linbit.com
Thu, 24 Jun 2004 17:17:38 +0200 (CEST)


DRBD CVS committal

Author  : phil
Module  : drbd

Dir     : drbd/drbd


Modified Files:
      Tag: rel-0_7-branch
	drbd_main.c drbd_receiver.c 


Log Message:
Reenabled zero copy send via network.
  Protocol A is still broken, I will disable zero_copy for protocol A...
Needs testing and verification.

===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_main.c,v
retrieving revision 1.73.2.189
retrieving revision 1.73.2.190
diff -u -3 -r1.73.2.189 -r1.73.2.190
--- drbd_main.c	24 Jun 2004 11:43:39 -0000	1.73.2.189
+++ drbd_main.c	24 Jun 2004 15:17:33 -0000	1.73.2.190
@@ -864,7 +864,7 @@
 	return drop_it; /* && (mdev->state == Primary) */;
 }
 
-#if 0
+#if 1
 /* We have the following problem with zero copy network IO:
    
    The idea of sendpage seems to be to put some kind of reference 
@@ -874,23 +874,16 @@
    As soon as the page was really sent over the network put_page()
    gets called by some part of the network layer. [ NIC driver? ]
 
-   [ get_page() / put_page() are functions of the buffer cache, they
-     increment/decrement the count. If count reaches 0 it goes deeply
-     into the page cache... ]
+   [ get_page() / put_page() increment/decrement the count. If count 
+     reaches 0 the page will be freed. ]
 
-   This works nicely as long as the FSs only use pages that are 
-   unter the control of the page cache. [ XFS is one of the
-   exceptions, it also uses pages allocated by other means ]
-
-   The other problematic case are our own private buffer pages (EEs). 
-   We initialize the count to 1, so they do not get handed back to the 
-   page cache, this is good. But we do not wait until the data is really 
-   sent, so somethimes we reuse our EE pages before the data was actually 
-   sent. (happens during resync.)
-
-   => I think it is possible to fix the EE case, but what should be done 
-      to the XFS issue ? 
+   This works nicely with pages from FSs. 
+   But this means that in protocol A we might signal IO completion too early !
 
+   In order not to corrupt data during a full sync we must make sure
+   that we do not reuse our own buffer pages (EEs) to early. 
+   Have a look at drbd_get_ee() where we check if the count of the page
+   has already dropped to 1 .
 */
 int _drbd_send_page(drbd_dev *mdev, struct page *page,
 		    int offset, size_t size)
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_receiver.c,v
retrieving revision 1.97.2.173
retrieving revision 1.97.2.174
diff -u -3 -r1.97.2.173 -r1.97.2.174
--- drbd_receiver.c	18 Jun 2004 12:45:05 -0000	1.97.2.173
+++ drbd_receiver.c	24 Jun 2004 15:17:33 -0000	1.97.2.174
@@ -280,6 +280,7 @@
 	struct list_head *le;
 	struct Tl_epoch_entry* e;
 	DEFINE_WAIT(wait);
+	LIST_HEAD(active);
 
 	MUST_HOLD(&mdev->ee_lock);
 
@@ -288,7 +289,7 @@
 		drbd_kick_lo(mdev);
 		spin_lock_irq(&mdev->ee_lock);
 	}
-
+ retry:
 	if(list_empty(&mdev->free_ee)) _drbd_process_ee(mdev,&mdev->done_ee);
 
 	if(list_empty(&mdev->free_ee)) {
@@ -310,20 +311,39 @@
 			finish_wait(&mdev->ee_wait, &wait);
 			if (signal_pending(current)) {
 				WARN("drbd_get_ee interrupted!\n");
+				list_splice(&active,mdev->free_ee.prev);
 				return 0;
 			}
 			// finish wait is inside, so that we are TASK_RUNNING 
 			// in _drbd_process_ee (which might sleep by itself.)
 			_drbd_process_ee(mdev,&mdev->done_ee);
+
+			list_for_each(le,&active) {
+				e=list_entry(le, struct Tl_epoch_entry,w.list);
+				if( page_count(drbd_bio_get_page(&e->private_bio)) == 1 ) {
+					list_move(le,&mdev->free_ee);
+					break;
+				}
+			}
 		}
 		finish_wait(&mdev->ee_wait, &wait); 
 	}
 
 	le=mdev->free_ee.next;
 	list_del(le);
+
+	e=list_entry(le, struct Tl_epoch_entry, w.list);
+	if( page_count(drbd_bio_get_page(&e->private_bio)) > 1 ) {
+		/* This might happen if the sendpage() has not finished */
+		list_add(le,&active);
+		goto retry;
+	}
+
+	list_splice(&active,mdev->free_ee.prev);
+
 	mdev->ee_vacant--;
 	mdev->ee_in_use++;
-	e=list_entry(le, struct Tl_epoch_entry, w.list);
+
 ONLY_IN_26(
 	D_ASSERT(e->private_bio.bi_idx == 0);
 	drbd_ee_init(e,e->ee_bvec.bv_page); // reinitialize
@@ -343,7 +363,7 @@
 	mdev->ee_vacant++;
 	e->block_id = ID_VACANT;
 	INVALIDATE_MAGIC(e);
-	list_add(&e->w.list,&mdev->free_ee);
+	list_add_tail(&e->w.list,&mdev->free_ee);
 
 	if((mdev->ee_vacant * 2 > mdev->ee_in_use ) &&
 	   ( mdev->ee_vacant + mdev->ee_in_use > EE_MININUM) ) {