[DRBD-cvs] svn commit by lars - r2553 - trunk/drbd - when Primary and sync target, the worker could "deadloc

drbd-cvs at lists.linbit.com drbd-cvs at lists.linbit.com
Fri Oct 20 20:07:36 CEST 2006


Author: lars
Date: 2006-10-20 20:07:34 +0200 (Fri, 20 Oct 2006)
New Revision: 2553

Modified:
   trunk/drbd/drbd_actlog.c
   trunk/drbd/drbd_int.h
   trunk/drbd/drbd_main.c
   trunk/drbd/drbd_receiver.c
   trunk/drbd/drbd_req.c
   trunk/drbd/drbd_worker.c
   trunk/drbd/lru_cache.c
   trunk/drbd/lru_cache.h
Log:
when Primary and sync target, the worker could "deadlock" in
w_make_resync_requst (drbd_rs_begin_io), waiting for application
requsts to complete which are still to be sent by the worker. sic.

this implementation may not yet be optimal, but it seems to work.



Modified: trunk/drbd/drbd_actlog.c
===================================================================
--- trunk/drbd/drbd_actlog.c	2006-10-19 16:28:12 UTC (rev 2552)
+++ trunk/drbd/drbd_actlog.c	2006-10-20 18:07:34 UTC (rev 2553)
@@ -696,7 +696,7 @@
 		}
 	} else {
 		ERR("lc_get() failed! locked=%d/%d flags=%lu\n",
-		    atomic_read(&mdev->resync_locked),
+		    mdev->resync_locked,
 		    mdev->resync->nr_elements,
 		    mdev->resync->flags);
 	}
@@ -841,28 +841,28 @@
 	int wakeup = 0;
 	unsigned long     rs_flags;
 
-	if(atomic_read(&mdev->resync_locked) > mdev->resync->nr_elements-3 ) {
+	spin_lock_irq(&mdev->al_lock);
+	if (mdev->resync_locked > mdev->resync->nr_elements-3) {
 		//WARN("bme_get() does not lock all elements\n");
-		return 0;
+		spin_unlock_irq(&mdev->al_lock);
+		return NULL;
 	}
-
-	spin_lock_irq(&mdev->al_lock);
 	bm_ext = (struct bm_extent*) lc_get(mdev->resync,enr);
 	if (bm_ext) {
-		if(bm_ext->lce.lc_number != enr) {
+		if (bm_ext->lce.lc_number != enr) {
 			bm_ext->rs_left = drbd_bm_e_weight(mdev,enr);
 			bm_ext->rs_failed = 0;
 			lc_changed(mdev->resync,(struct lc_element*)bm_ext);
 			wakeup = 1;
 		}
-		if(bm_ext->lce.refcnt == 1) atomic_inc(&mdev->resync_locked);
-		set_bit(BME_NO_WRITES,&bm_ext->flags); // within the lock
+		if (bm_ext->lce.refcnt == 1) mdev->resync_locked++;
+		set_bit(BME_NO_WRITES,&bm_ext->flags);
 	}
 	rs_flags=mdev->resync->flags;
 	spin_unlock_irq(&mdev->al_lock);
 	if (wakeup) wake_up(&mdev->al_wait);
 
-	if(!bm_ext) {
+	if (!bm_ext) {
 		if (rs_flags & LC_STARVING) {
 			WARN("Have to wait for element"
 			     " (resync LRU too small?)\n");
@@ -903,6 +903,10 @@
  * to BME_LOCKED.
  *
  * @sector: The sector number
+ *
+ * sleeps on al_wait.
+ * returns 1 if successful.
+ * returns 0 if interrupted.
  */
 int drbd_rs_begin_io(drbd_dev* mdev, sector_t sector)
 {
@@ -928,7 +932,7 @@
 			spin_lock_irq(&mdev->al_lock);
 			if( lc_put(mdev->resync,&bm_ext->lce) == 0 ) {
 				clear_bit(BME_NO_WRITES,&bm_ext->flags);
-				atomic_dec(&mdev->resync_locked);
+				mdev->resync_locked--;
 				wake_up(&mdev->al_wait);
 			}
 			spin_unlock_irq(&mdev->al_lock);
@@ -941,6 +945,112 @@
 	return 1;
 }
 
+/**
+ * drbd_try_rs_begin_io: Gets an extent in the resync LRU cache, sets it
+ * to BME_NO_WRITES, then tries to set it to BME_LOCKED.
+ *
+ * @sector: The sector number
+ *
+ * does not sleep.
+ * returns zero if we could set BME_LOCKED and can proceed,
+ * -EAGAIN if we need to try again.
+ */
+int drbd_try_rs_begin_io(drbd_dev* mdev, sector_t sector)
+{
+	unsigned int enr = BM_SECT_TO_EXT(sector);
+	const unsigned int al_enr = enr*AL_EXT_PER_BM_SECT;
+	struct bm_extent* bm_ext;
+	int i;
+
+	spin_lock_irq(&mdev->al_lock);
+	if (mdev->resync_wenr != LC_FREE) {
+		/* in case you have very heavy scattered io, it may
+		 * stall the syncer undefined if we giveup the ref count
+		 * when we try again and requeue.
+		 *
+		 * if we don't give up the refcount, but the next time
+		 * we are scheduled this extent has been "synced" by new
+		 * application writes, we'd miss the lc_put on the
+		 * extent we keept the refcount on.
+		 * so we remembered which extent we had to try agin, and
+		 * if the next requested one is something else, we do
+		 * the lc_put here...
+		 * we also have to wake_up
+		 */
+		bm_ext = (struct bm_extent*)lc_find(mdev->resync,mdev->resync_wenr);
+		if (bm_ext) {
+			D_ASSERT(!test_bit(BME_LOCKED,&bm_ext->flags));
+			D_ASSERT(test_bit(BME_NO_WRITES,&bm_ext->flags));
+			clear_bit(BME_NO_WRITES,&bm_ext->flags);
+			mdev->resync_wenr = LC_FREE;
+			lc_put(mdev->resync,&bm_ext->lce);
+			wake_up(&mdev->al_wait);
+		} else {
+			ALERT("LOGIC BUG\n");
+		}
+	}
+	bm_ext = (struct bm_extent*)lc_try_get(mdev->resync,enr);
+	if (bm_ext) {
+		if (test_bit(BME_LOCKED,&bm_ext->flags)) {
+			goto proceed;
+		}
+		if (!test_and_set_bit(BME_NO_WRITES,&bm_ext->flags)) {
+			mdev->resync_locked++;
+		} else {
+			/* we did set the BME_NO_WRITES,
+			 * but then could not set BME_LOCKED,
+			 * so we tried again.
+			 * drop the extra reference. */
+			bm_ext->lce.refcnt--;
+			D_ASSERT(bm_ext->lce.refcnt > 0);
+		}
+		goto check_al;
+	} else {
+		if (mdev->resync_locked > mdev->resync->nr_elements-3)
+			goto try_again;
+		bm_ext = (struct bm_extent*)lc_get(mdev->resync,enr);
+		if (!bm_ext) {
+			const unsigned long rs_flags = mdev->resync->flags;
+			if (rs_flags & LC_STARVING) {
+				WARN("Have to wait for element"
+				     " (resync LRU too small?)\n");
+			}
+			if (rs_flags & LC_DIRTY) {
+				BUG(); // WARN("Ongoing RS update (???)\n");
+			}
+			goto try_again;
+		}
+		if (bm_ext->lce.lc_number != enr) {
+			bm_ext->rs_left = drbd_bm_e_weight(mdev,enr);
+			bm_ext->rs_failed = 0;
+			lc_changed(mdev->resync,(struct lc_element*)bm_ext);
+			wake_up(&mdev->al_wait);
+			D_ASSERT(test_bit(BME_LOCKED,&bm_ext->flags) == 0);
+		}
+		set_bit(BME_NO_WRITES,&bm_ext->flags);
+		D_ASSERT(bm_ext->lce.refcnt == 1);
+		mdev->resync_locked++;
+		goto check_al;
+	}
+  check_al:
+	for (i=0;i<AL_EXT_PER_BM_SECT;i++) {
+		if (unlikely(al_enr+i == mdev->act_log->new_number))
+			goto try_again;
+		if (lc_is_used(mdev->act_log,al_enr+i))
+			goto try_again;
+	}
+	set_bit(BME_LOCKED,&bm_ext->flags);
+  proceed:
+	mdev->resync_wenr = LC_FREE;
+	spin_unlock_irq(&mdev->al_lock);
+	return 0;
+
+  try_again:
+	if (bm_ext) mdev->resync_wenr = enr;
+	spin_unlock_irq(&mdev->al_lock);
+	return -EAGAIN;
+}
+
 void drbd_rs_complete_io(drbd_dev* mdev, sector_t sector)
 {
 	unsigned int enr = BM_SECT_TO_EXT(sector);
@@ -963,7 +1073,7 @@
 	if( lc_put(mdev->resync,(struct lc_element *)bm_ext) == 0 ) {
 		clear_bit(BME_LOCKED,&bm_ext->flags);
 		clear_bit(BME_NO_WRITES,&bm_ext->flags);
-		atomic_dec(&mdev->resync_locked);
+		mdev->resync_locked--;
 		wake_up(&mdev->al_wait);
 	}
 
@@ -998,7 +1108,8 @@
 		mdev->resync->used=0;
 		dec_local(mdev);
 	}
-	atomic_set(&mdev->resync_locked,0);
+	mdev->resync_locked = 0;
+	mdev->resync_wenr = LC_FREE;
 	spin_unlock_irq(&mdev->al_lock);
 	wake_up(&mdev->al_wait);
 }

Modified: trunk/drbd/drbd_int.h
===================================================================
--- trunk/drbd/drbd_int.h	2006-10-19 16:28:12 UTC (rev 2552)
+++ trunk/drbd/drbd_int.h	2006-10-20 18:07:34 UTC (rev 2553)
@@ -809,14 +809,12 @@
 	struct Drbd_thread asender;
 	struct drbd_bitmap* bitmap;
 	struct lru_cache* resync; // Used to track operations of resync...
-	atomic_t resync_locked;   // Number of locked elements in resync LRU
+	unsigned int resync_locked; // Number of locked elements in resync LRU
+	unsigned int resync_wenr;   // resync extent number waiting for application requests
 	int open_cnt;
 	u64 *p_uuid;
-	/* no more ee_lock
-	 * we had to grab both req_lock _and_ ee_lock in almost every place we
-	 * needed one of them. so why bother having too spinlocks?
-	 * FIXME clean comments, restructure so it is more obvious which
-	 * members areprotected by what */
+	/* FIXME clean comments, restructure so it is more obvious which
+	 * members are protected by what */
 	unsigned int epoch_size;
 	struct list_head active_ee; // IO in progress
 	struct list_head sync_ee;   // IO in progress
@@ -1319,6 +1317,7 @@
 extern void drbd_al_complete_io(struct Drbd_Conf *mdev, sector_t sector);
 extern void drbd_rs_complete_io(struct Drbd_Conf *mdev, sector_t sector);
 extern int drbd_rs_begin_io(struct Drbd_Conf *mdev, sector_t sector);
+extern int drbd_try_rs_begin_io(struct Drbd_Conf *mdev, sector_t sector);
 extern void drbd_rs_cancel_all(drbd_dev* mdev);
 extern void drbd_rs_del_all(drbd_dev* mdev);
 extern void drbd_rs_failed_io(drbd_dev* mdev, sector_t sector, int size);

Modified: trunk/drbd/drbd_main.c
===================================================================
--- trunk/drbd/drbd_main.c	2006-10-19 16:28:12 UTC (rev 2552)
+++ trunk/drbd/drbd_main.c	2006-10-20 18:07:34 UTC (rev 2553)
@@ -1031,7 +1031,12 @@
 		}
 	}
 
-	/* it feels better to have the module_put last ... */
+	/* FIXME what about Primary, Diskless, and then losing
+	 * the connection? since we survive that "somehow",
+	 * maybe we may not stop the worker yet,
+	 * since that would call drbd_mdev_cleanup.
+	 * after which we probably won't survive the next
+	 * request from the upper layers ... BOOM again :( */
 	if ( (os.disk > Diskless || os.conn > StandAlone) &&
 	     ns.disk == Diskless && ns.conn == StandAlone ) {
 		drbd_thread_stop_nowait(&mdev->worker);
@@ -1925,7 +1930,6 @@
 	atomic_set(&mdev->unacked_cnt,0);
 	atomic_set(&mdev->local_cnt,0);
 	atomic_set(&mdev->net_cnt,0);
-	atomic_set(&mdev->resync_locked,0);
 	atomic_set(&mdev->packet_seq,0);
 	atomic_set(&mdev->pp_in_use, 0);
 

Modified: trunk/drbd/drbd_receiver.c
===================================================================
--- trunk/drbd/drbd_receiver.c	2006-10-19 16:28:12 UTC (rev 2552)
+++ trunk/drbd/drbd_receiver.c	2006-10-20 18:07:34 UTC (rev 2553)
@@ -2477,9 +2477,7 @@
 	drbd_cmd_handler_f handler;
 	Drbd_Header *header = &mdev->data.rbuf.head;
 
-	/* FIXME test for thread state == RUNNING here,
-	 * in case we missed some signal! */
-	for (;;) {
+	while (get_t_state(&mdev->receiver) == Running) {
 		if (!drbd_recv_header(mdev,header))
 			break;
 

Modified: trunk/drbd/drbd_req.c
===================================================================
--- trunk/drbd/drbd_req.c	2006-10-19 16:28:12 UTC (rev 2552)
+++ trunk/drbd/drbd_req.c	2006-10-20 18:07:34 UTC (rev 2553)
@@ -243,6 +243,16 @@
 				drbd_set_in_sync(mdev,req->sector,req->size);
 			}
 
+			/* one might be tempted to move the drbd_al_complete_io
+			 * to the local io completion callback drbd_endio_pri.
+			 * but, if this was a mirror write, we may only
+			 * drbd_al_complete_io after this is RQ_NET_DONE,
+			 * otherwise the extent could be dropped from the al
+			 * before it has actually been written on the peer.
+			 * if we crash before our peer knows about the request,
+			 * but after the extent has been dropped from the al,
+			 * we would forget to resync the corresponding extent.
+			 */
 			if (s & RQ_LOCAL_MASK) {
 				drbd_al_complete_io(mdev, req->sector);
 			}

Modified: trunk/drbd/drbd_worker.c
===================================================================
--- trunk/drbd/drbd_worker.c	2006-10-19 16:28:12 UTC (rev 2552)
+++ trunk/drbd/drbd_worker.c	2006-10-20 18:07:34 UTC (rev 2553)
@@ -305,13 +305,12 @@
 
 		sector = BM_BIT_TO_SECT(bit);
 
-		if(!drbd_rs_begin_io(mdev,sector)) {
-			// we have been interrupted, probably connection lost!
-			D_ASSERT(signal_pending(current));
-			return 0;
+		if (drbd_try_rs_begin_io(mdev, sector)) {
+			drbd_bm_set_find(mdev,bit);
+			goto requeue;
 		}
 
-		if(unlikely( drbd_bm_test_bit(mdev,bit) == 0 )) {
+		if (unlikely(drbd_bm_test_bit(mdev,bit) == 0 )) {
 		      //INFO("Block got synced while in drbd_rs_begin_io()\n");
 			drbd_rs_complete_io(mdev,sector);
 			goto next_sector;
@@ -367,7 +366,7 @@
 				       sector,size,ID_SYNCER)) {
 			ERR("drbd_send_drequest() failed, aborting...\n");
 			dec_rs_pending(mdev);
-			return 0; // FAILED. worker will abort!
+			return 0;
 		}
 	}
 
@@ -855,7 +854,7 @@
 
 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
 
-	for (;;) {
+	while (get_t_state(thi) == Running) {
 		intr = down_interruptible(&mdev->data.work.s);
 
 		if (intr) {
@@ -921,6 +920,11 @@
 	 * So don't do that.
 	 */
 	spin_unlock_irq(&mdev->data.work.q_lock);
+	/* FIXME verify that there absolutely can not be any more work
+	 * on the queue now...
+	 * if so, the comment above is no longer true, but historic
+	 * from the times when the worker did not live as long as the
+	 * device.. */
 
 	D_ASSERT( mdev->state.disk == Diskless && mdev->state.conn == StandAlone );
 	drbd_mdev_cleanup(mdev);

Modified: trunk/drbd/lru_cache.c
===================================================================
--- trunk/drbd/lru_cache.c	2006-10-19 16:28:12 UTC (rev 2552)
+++ trunk/drbd/lru_cache.c	2006-10-20 18:07:34 UTC (rev 2553)
@@ -260,6 +260,32 @@
 	RETURN(e);
 }
 
+/* similar to lc_get,
+ * but only gets a new reference on an existing element.
+ * you either get the requested element, or NULL.
+ */
+struct lc_element* lc_try_get(struct lru_cache* lc, unsigned int enr)
+{
+	struct lc_element *e;
+
+	BUG_ON(!lc);
+	BUG_ON(!lc->nr_elements);
+
+	PARANOIA_ENTRY();
+	if ( lc->flags & LC_STARVING ) {
+		++lc->starving;
+		RETURN(NULL);
+	}
+
+	e = lc_find(lc, enr);
+	if (e) {
+		++lc->hits;
+		if( e->refcnt++ == 0) lc->used++;
+		list_move(&e->list,&lc->in_use); // Not evictable...
+	}
+	RETURN(e);
+}
+
 void lc_changed(struct lru_cache* lc, struct lc_element* e)
 {
 	PARANOIA_ENTRY();

Modified: trunk/drbd/lru_cache.h
===================================================================
--- trunk/drbd/lru_cache.h	2006-10-19 16:28:12 UTC (rev 2552)
+++ trunk/drbd/lru_cache.h	2006-10-20 18:07:34 UTC (rev 2553)
@@ -104,6 +104,7 @@
 extern void lc_set (struct lru_cache* lc, unsigned int enr, int index);
 extern void lc_del (struct lru_cache* lc, struct lc_element *element);
 
+extern struct lc_element* lc_try_get(struct lru_cache* lc, unsigned int enr);
 extern struct lc_element* lc_find(struct lru_cache* lc, unsigned int enr);
 extern struct lc_element* lc_get (struct lru_cache* lc, unsigned int enr);
 extern unsigned int       lc_put (struct lru_cache* lc, struct lc_element* e);
@@ -130,8 +131,14 @@
 	smp_mb__after_clear_bit();
 }
 
-#define LC_FREE (-1)
+static inline int lc_is_used(struct lru_cache* lc, unsigned int enr)
+{
+	struct lc_element* e = lc_find(lc,enr);
+	return (e && e->refcnt);
+}
 
+#define LC_FREE (-1U)
+
 #define lc_e_base(lc)  ((char*) ( (lc)->slot + (lc)->nr_elements ) )
 #define lc_entry(lc,i) ((struct lc_element*) \
                        (lc_e_base(lc) + (i)*(lc)->element_size))



More information about the drbd-cvs mailing list