[DRBD-cvs] svn commit by lars - r2226 - in branches/drbd-0.7: drbd scripts user - fix the drbd_get_ee thing.

drbd-cvs at lists.linbit.com drbd-cvs at lists.linbit.com
Tue May 30 23:47:37 CEST 2006


this is basically a copy fro
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Author: lars
Date: 2006-05-30 23:47:36 +0200 (Tue, 30 May 2006)
New Revision: 2226

Modified:
   branches/drbd-0.7/drbd/drbd_receiver.c
   branches/drbd-0.7/scripts/drbd.conf
   branches/drbd-0.7/user/drbdadm_parser.y
   branches/drbd-0.7/user/drbdadm_scanner.fl
Log:

fix the drbd_get_ee thing.
this is basically a copy from the drbd-plus branch.

this also fixes some whitespace of the previous t(h)reshold patch...



Modified: branches/drbd-0.7/drbd/drbd_receiver.c
===================================================================
--- branches/drbd-0.7/drbd/drbd_receiver.c	2006-05-30 21:44:18 UTC (rev 2225)
+++ branches/drbd-0.7/drbd/drbd_receiver.c	2006-05-30 21:47:36 UTC (rev 2226)
@@ -145,7 +145,6 @@
  _drbd_process_ee()
 
 You must not have the ee_lock:
- _drbd_alloc_ee()
  drbd_alloc_ee()
  drbd_init_ee()
  drbd_release_ee()
@@ -155,89 +154,90 @@
  drbd_wait_ee()
 */
 
-STATIC int _drbd_alloc_ee(drbd_dev *mdev,struct page* page,int mask)
+/*
+ * gfp_mask is only used for the page allocation,
+ * not for the kmem_cache_alloc, which is hardcoded to
+ * basically GFP_NOIO, see below.
+ *
+ * THIS MUST NOT HOLD A LOCK
+ */
+struct Tl_epoch_entry*
+drbd_try_alloc_ee(drbd_dev *mdev, int gfp_mask)
 {
 	struct Tl_epoch_entry* e;
+	struct page *page = alloc_page(gfp_mask);
 
-	/* kmem_cache does not like to mix different memory types.
-	 * so even if we alloc'ed the page from HIGHMEM,
-	 * the ee comes from normal memory.
-	 */
-	e = kmem_cache_alloc(drbd_ee_cache, mask & ~(__GFP_HIGHMEM));
-	if( e == NULL ) return FALSE;
-
-	drbd_ee_init(e,page);
-	spin_lock_irq(&mdev->ee_lock);
-	list_add(&e->w.list,&mdev->free_ee);
-	mdev->ee_vacant++;
-	spin_unlock_irq(&mdev->ee_lock);
-
-	return TRUE;
-}
-
-/* bool */
-STATIC int drbd_alloc_ee(drbd_dev *mdev,int mask)
-{
-	struct page *page;
-
-	page=alloc_page(mask);
-	if(!page) return FALSE;
-
+	if (!page) return NULL;
 	/* if we got the page, we really want the ee, too,
 	 * even for "GFP_TRY".
 	 * we may wait, but better not cause IO,
 	 * we might be in the IO path (of our peer).
+	 *
+	 * kmem_cache does not like to mix different memory types.
+	 * so even if we alloc'ed the page from HIGHMEM,
+	 * the ee comes from normal memory.
 	 */
-	if(!_drbd_alloc_ee(mdev,page,mask | GFP_NOIO)) {
-		__free_page(page);
-		return FALSE;
-	}
+	e = kmem_cache_alloc(drbd_ee_cache, (gfp_mask|GFP_NOIO) & ~(__GFP_HIGHMEM));
 
-	return TRUE;
+	if (e) drbd_ee_init(e,page);
+	else  __free_page(page);
+
+	return e;
 }
 
-STATIC struct page* drbd_free_ee(drbd_dev *mdev, struct list_head *list)
+/*
+ * just free the ee and the associated page.
+ * no accounting here, no list movement, nothing.
+ */
+void drbd_free_ee(drbd_dev *mdev, struct Tl_epoch_entry *e)
 {
-	struct list_head *le;
-	struct Tl_epoch_entry* e;
-	struct page* page;
-
-	MUST_HOLD(&mdev->ee_lock);
-
-	D_ASSERT(!list_empty(list));
-	le = list->next;
-	e = list_entry(le, struct Tl_epoch_entry, w.list);
-	list_del(le);
-
-	page = drbd_bio_get_page(&e->private_bio);
+	struct page *page = drbd_bio_get_page(&e->private_bio);
 ONLY_IN_26(
 	D_ASSERT(page == e->ee_bvec.bv_page);
-	page = e->ee_bvec.bv_page;
+	page = e->ee_bvec.bv_page;		/* just in case */
 )
+	__free_page(page);
 	kmem_cache_free(drbd_ee_cache, e);
-	mdev->ee_vacant--;
-
-	return page;
 }
 
+/* XXX currently only called on module init.
+ * actually we do not need to care about spinlocks.
+ */
 int drbd_init_ee(drbd_dev *mdev)
 {
-	while(mdev->ee_vacant < EE_MININUM ) {
-		if(!drbd_alloc_ee(mdev,GFP_USER)) {
+	LIST_HEAD(tmp);
+	struct Tl_epoch_entry *e;
+	int i;
+
+	for (i = 0; i < EE_MININUM; i++) {
+		e = drbd_try_alloc_ee(mdev,GFP_HIGHUSER);
+		if (!e) {
 			ERR("Failed to allocate %d EEs !\n",EE_MININUM);
-			return 0;
+			break;
 		}
+		list_add(&e->w.list,&tmp);
 	}
-	return 1;
+
+	spin_lock_irq(&mdev->ee_lock);
+	list_splice(&tmp,&mdev->free_ee);
+	mdev->ee_vacant += i;
+	spin_unlock_irq(&mdev->ee_lock);
+
+	return (i == EE_MININUM);
 }
 
 int drbd_release_ee(drbd_dev *mdev,struct list_head* list)
 {
 	int count=0;
+	struct Tl_epoch_entry *e;
+	struct list_head *le, *tmp;
 
 	spin_lock_irq(&mdev->ee_lock);
-	while(!list_empty(list)) {
-		__free_page(drbd_free_ee(mdev,list));
+	list_for_each_safe(le,tmp,list) {
+		list_del(le);
+		e = list_entry(le, struct Tl_epoch_entry, w.list);
+		drbd_free_ee(mdev,e);
+		mdev->ee_vacant--;
 		count++;
 	}
 	spin_unlock_irq(&mdev->ee_lock);
@@ -288,13 +288,16 @@
 #endif
 
 /**
- * drbd_get_ee: Returns an Tl_epoch_entry; might sleep. Fails only if
- * a signal comes in.
+ * drbd_get_ee:
+ * Returns an Tl_epoch_entry; might sleep. Fails only if a signal comes in.
+ *
+ * We do not do this straight forward, because we don't want to consume
+ * too much memory: the upper bound is mdev->conf.max_buffers.
  */
 struct Tl_epoch_entry* drbd_get_ee(drbd_dev *mdev)
 {
 	struct list_head *le;
-	struct Tl_epoch_entry* e;
+	struct Tl_epoch_entry* e = NULL;
 	DEFINE_WAIT(wait);
 
 	MUST_HOLD(&mdev->ee_lock);
@@ -308,49 +311,50 @@
 
 	if(list_empty(&mdev->free_ee)) _drbd_process_ee(mdev,1);
 
-	if(list_empty(&mdev->free_ee)) {
-		for (;;) {
-			prepare_to_wait(&mdev->ee_wait, &wait, 
-					TASK_INTERRUPTIBLE);
-			if(!list_empty(&mdev->free_ee)) break;
+	while (list_empty(&mdev->free_ee)) {
+		if( mdev->conf.max_buffers >
+		    mdev->ee_vacant + mdev->ee_in_use )
+		{
 			spin_unlock_irq(&mdev->ee_lock);
-			if( ( mdev->ee_vacant+mdev->ee_in_use) < 
-			      mdev->conf.max_buffers ) {
-				if(drbd_alloc_ee(mdev,GFP_TRY)) {
-	/* race race race
-	 * (currently harmless for drbd07, since drbd_get_ee is called by
-	 * receiver_thread only. solved with different implementation in
-	 * drbd-plus already.)
-	 */
-					spin_lock_irq(&mdev->ee_lock);
-					break;
-				}
-			}
-			drbd_kick_lo(mdev);
-			schedule();
+			e = drbd_try_alloc_ee(mdev,GFP_TRY);
 			spin_lock_irq(&mdev->ee_lock);
-			finish_wait(&mdev->ee_wait, &wait);
-			if (signal_pending(current)) {
-				WARN("drbd_get_ee interrupted!\n");
-				return 0;
-			}
-			// finish wait is inside, so that we are TASK_RUNNING 
-			// in _drbd_process_ee (which might sleep by itself.)
-			_drbd_process_ee(mdev,1);
+			if (e) goto got_new_ee;
 		}
-		finish_wait(&mdev->ee_wait, &wait); 
+
+		prepare_to_wait(&mdev->ee_wait, &wait, TASK_INTERRUPTIBLE);
+		if (!list_empty(&mdev->free_ee)) break;
+
+		spin_unlock_irq(&mdev->ee_lock);
+
+		drbd_kick_lo(mdev);
+		schedule();
+
+		// finish wait is inside, so that we are TASK_RUNNING
+		// in _drbd_process_ee (which might sleep by itself.)
+		finish_wait(&mdev->ee_wait, &wait);
+
+		spin_lock_irq(&mdev->ee_lock);
+
+		if (signal_pending(current)) {
+			WARN("drbd_get_ee interrupted!\n");
+			return NULL;
+		}
+
+		_drbd_process_ee(mdev,1);
 	}
+	/* no-op when &wait has not been touched */
+	finish_wait(&mdev->ee_wait, &wait);
 
-	/* race race race */
-	le=mdev->free_ee.next;
+	le = mdev->free_ee.next;
 	list_del(le);
 	mdev->ee_vacant--;
-	mdev->ee_in_use++;
 	e=list_entry(le, struct Tl_epoch_entry, w.list);
 ONLY_IN_26(
 	D_ASSERT(e->private_bio.bi_idx == 0);
 	drbd_ee_init(e,e->ee_bvec.bv_page); // reinitialize
 )
+  got_new_ee:
+	mdev->ee_in_use++;
 	e->block_id = !ID_VACANT;
 	SET_MAGIC(e);
 	return e;
@@ -358,27 +362,30 @@
 
 void drbd_put_ee(drbd_dev *mdev,struct Tl_epoch_entry *e)
 {
-	struct page* page;
-
 	MUST_HOLD(&mdev->ee_lock);
 
 	D_ASSERT(page_count(drbd_bio_get_page(&e->private_bio)) == 1);
 
 	mdev->ee_in_use--;
-	mdev->ee_vacant++;
 	e->block_id = ID_VACANT;
 	INVALIDATE_MAGIC(e);
-	list_add_tail(&e->w.list,&mdev->free_ee);
 
-	if((mdev->ee_vacant * 2 > mdev->ee_in_use ) &&
-	   ( mdev->ee_vacant + mdev->ee_in_use > EE_MININUM) ) {
-		// FIXME cleanup: never returns NULL anymore
-		page=drbd_free_ee(mdev,&mdev->free_ee);
-		if( page ) __free_page(page);
+	if ( (mdev->ee_vacant * 2 > mdev->ee_in_use) &&
+	     (mdev->ee_vacant + mdev->ee_in_use > EE_MININUM) ) {
+		drbd_free_ee(mdev,e);
+	} else {
+		list_add_tail(&e->w.list,&mdev->free_ee);
+		mdev->ee_vacant++;
 	}
-	if(mdev->ee_in_use == 0) {
-		while( mdev->ee_vacant > EE_MININUM ) {
-			__free_page(drbd_free_ee(mdev,&mdev->free_ee));
+	if (mdev->ee_in_use == 0) {
+		struct Tl_epoch_entry *e;
+		struct list_head *le, *tmp;
+		list_for_each_safe(le,tmp,&mdev->free_ee) {
+			if (mdev->ee_vacant <= EE_MININUM) break;
+			list_del(le);
+			e = list_entry(le, struct Tl_epoch_entry, w.list);
+			drbd_free_ee(mdev,e);
+			mdev->ee_vacant--;
 		}
 	}
 
@@ -1225,7 +1232,9 @@
 		if (!drbd_rs_begin_io(mdev,sector)) {
 			// we have been interrupted, probably connection lost!
 			D_ASSERT(signal_pending(current));
+			spin_lock_irq(&mdev->ee_lock);
 			drbd_put_ee(mdev,e);
+			spin_unlock_irq(&mdev->ee_lock);
 			return 0;
 		}
 		break;

Modified: branches/drbd-0.7/scripts/drbd.conf
===================================================================
--- branches/drbd-0.7/scripts/drbd.conf	2006-05-30 21:44:18 UTC (rev 2225)
+++ branches/drbd-0.7/scripts/drbd.conf	2006-05-30 21:47:36 UTC (rev 2226)
@@ -97,9 +97,9 @@
   # C: write IO is reported as completed, if we know it has
   #    reached _both_ local and remote DISK.
   #    * for critical transactional data.
+  #    * for most cases.
   # B: write IO is reported as completed, if it has reached
   #    local DISK and remote buffer cache.
-  #    * for most cases.
   # A: write IO is reported as completed, if it has reached
   #    local DISK and local tcp send buffer. (see also sndbuf-size)
   #    * for high latency networks

Modified: branches/drbd-0.7/user/drbdadm_parser.y
===================================================================
--- branches/drbd-0.7/user/drbdadm_parser.y	2006-05-30 21:44:18 UTC (rev 2225)
+++ branches/drbd-0.7/user/drbdadm_parser.y	2006-05-30 21:47:36 UTC (rev 2226)
@@ -457,7 +457,7 @@
 		{ range_check(R_PING_INT,$1,$2);	$$=new_opt($1,$2); }
 		| TK_MAX_BUFFERS    TK_INTEGER
 		{ range_check(R_MAX_BUFFERS,$1,$2);	$$=new_opt($1,$2); }
-		| TK_BDEV_THRESHOLD    TK_INTEGER
+		| TK_BDEV_THRESHOLD TK_INTEGER
 		{ range_check(R_BDEV_THRESHOLD,$1,$2);	$$=new_opt($1,$2); }
 		| TK_MAX_EPOCH_SIZE TK_INTEGER
 		{ range_check(R_MAX_EPOCH_SIZE,$1,$2);	$$=new_opt($1,$2); }

Modified: branches/drbd-0.7/user/drbdadm_scanner.fl
===================================================================
--- branches/drbd-0.7/user/drbdadm_scanner.fl	2006-05-30 21:44:18 UTC (rev 2225)
+++ branches/drbd-0.7/user/drbdadm_scanner.fl	2006-05-30 21:47:36 UTC (rev 2226)
@@ -258,7 +258,7 @@
   ping-int		do_assign(NUM);   CP; return TK_PING_INT;
   connect-int		do_assign(NUM);   CP; return TK_CONNECT_INT;
   max-buffers		do_assign(NUM);   CP; return TK_MAX_BUFFERS;
-  bdev-threshold		do_assign(NUM);   CP; return TK_BDEV_THRESHOLD;
+  bdev-threshold	do_assign(NUM);   CP; return TK_BDEV_THRESHOLD;
   max-epoch-size	do_assign(NUM);   CP; return TK_MAX_EPOCH_SIZE;
   ko-count		do_assign(NUM);   CP; return TK_KO_COUNT;
   on-disconnect 	do_assign(ON_DISCONNECT); CP; return TK_ON_DISCONNECT;



More information about the drbd-cvs mailing list