[DRBD-cvs] svn commit by lars - r2460 - trunk/drbd - fix race between drbd_make_request_common and drbd_disc

drbd-cvs at lists.linbit.com drbd-cvs at lists.linbit.com
Mon Sep 25 16:51:31 CEST 2006


Author: lars
Date: 2006-09-25 16:51:30 +0200 (Mon, 25 Sep 2006)
New Revision: 2460

Modified:
   trunk/drbd/drbd_compat_wrappers.h
   trunk/drbd/drbd_int.h
   trunk/drbd/drbd_main.c
   trunk/drbd/drbd_receiver.c
   trunk/drbd/drbd_req.c
   trunk/drbd/drbd_worker.c
Log:
fix race between drbd_make_request_common and drbd_disconnect
fix list corruption on connection loss in receive_DataRequest
fix deadlock of receiver in after_state_ch when we lost connection
 while becoming WFBitMapS

remove some dead code an old compat macros


Modified: trunk/drbd/drbd_compat_wrappers.h
===================================================================
--- trunk/drbd/drbd_compat_wrappers.h	2006-09-25 14:10:01 UTC (rev 2459)
+++ trunk/drbd/drbd_compat_wrappers.h	2006-09-25 14:51:30 UTC (rev 2460)
@@ -102,18 +102,6 @@
 }
 #endif
 
-static inline void
-drbd_ee_prepare_write(drbd_dev *mdev, struct Tl_epoch_entry* e)
-{
-	e->private_bio->bi_end_io = drbd_endio_write_sec;
-}
-
-static inline void
-drbd_ee_prepare_read(drbd_dev *mdev, struct Tl_epoch_entry* e)
-{
-	e->private_bio->bi_end_io = drbd_endio_read_sec;
-}
-
 static inline int drbd_bio_has_active_page(struct bio *bio)
 {
 	struct bio_vec *bvec;

Modified: trunk/drbd/drbd_int.h
===================================================================
--- trunk/drbd/drbd_int.h	2006-09-25 14:10:01 UTC (rev 2459)
+++ trunk/drbd/drbd_int.h	2006-09-25 14:51:30 UTC (rev 2460)
@@ -814,7 +814,6 @@
 	drbd_state_t new_state_tmp; // Used after attach while negotiating new disk state.
 	drbd_state_t state;
 	wait_queue_head_t cstate_wait; // TODO Rename into "misc_wait".
-	wait_queue_head_t rq_wait;
 	unsigned int send_cnt;
 	unsigned int recv_cnt;
 	unsigned int read_cnt;
@@ -1681,10 +1680,44 @@
 	return mxb;
 }
 
+static inline int __inc_ap_bio_cond(drbd_dev* mdev) {
+	int mxb = drbd_get_max_buffers(mdev);
+	if (mdev->state.susp) return 0;
+	if (mdev->state.conn == WFBitMapS) return 0;
+	if (mdev->state.conn == WFBitMapT) return 0;
+	/* since some older kernels don't have atomic_add_unless,
+	 * and we are within the spinlock anyways, we have this workaround.  */
+	if (atomic_read(&mdev->ap_bio_cnt) > mxb) return 0;
+	atomic_inc(&mdev->ap_bio_cnt);
+	return 1;
+}
+
+/* I'd like to use wait_event_lock_irq,
+ * but I'm not sure when it got introduced,
+ * and not sure when it has 3 or 4 arguments */
 static inline void inc_ap_bio(drbd_dev* mdev)
 {
-	int mxb = drbd_get_max_buffers(mdev);
-	wait_event( mdev->rq_wait,atomic_add_unless(&mdev->ap_bio_cnt,1,mxb) );
+	/* compare with after_state_ch,
+	 * os.conn != WFBitMapS && ns.conn == WFBitMapS */
+	DEFINE_WAIT(wait);
+
+	/* we wait here
+	 *    as long as the device is suspended
+	 *    until the bitmap is no longer on the fly during connection handshake
+	 *    as long as we would exeed the max_buffer limit.
+	 *
+	 * to avoid races with the reconnect code,
+	 * we need to atomic_inc within the spinlock. */
+
+	spin_lock_irq(&mdev->req_lock);
+	while (!__inc_ap_bio_cond(mdev)) {
+		prepare_to_wait(&mdev->cstate_wait,&wait,TASK_UNINTERRUPTIBLE);
+		spin_unlock_irq(&mdev->req_lock);
+		schedule();
+		finish_wait(&mdev->cstate_wait, &wait);
+		spin_lock_irq(&mdev->req_lock);
+	}
+	spin_unlock_irq(&mdev->req_lock);
 }
 
 static inline void dec_ap_bio(drbd_dev* mdev)
@@ -1693,8 +1726,7 @@
 	int ap_bio = atomic_dec_return(&mdev->ap_bio_cnt);
 
 	D_ASSERT(ap_bio>=0);
-	if (ap_bio == 0) wake_up(&mdev->cstate_wait);
-	if (ap_bio < mxb) wake_up(&mdev->rq_wait);
+	if (ap_bio < mxb) wake_up(&mdev->cstate_wait);
 }
 
 /* FIXME does not handle wrap around yet */

Modified: trunk/drbd/drbd_main.c
===================================================================
--- trunk/drbd/drbd_main.c	2006-09-25 14:10:01 UTC (rev 2459)
+++ trunk/drbd/drbd_main.c	2006-09-25 14:51:30 UTC (rev 2460)
@@ -269,10 +269,6 @@
 		struct list_head *le, *tle;
 		struct drbd_request *r;
 
-		ERR_IF(!list_empty(&b->w.list)) {
-			DUMPI(b->br_number);
-		}
-
 		list_for_each_safe(le, tle, &b->requests) {
 			r = list_entry(le, struct drbd_request,tl_requests);
 			_req_mod(r, connection_lost_while_pending);
@@ -873,8 +869,12 @@
 	// Do not change the order of the if above and below...
 	if (os.conn != WFBitMapS && ns.conn == WFBitMapS) {
 		/* compare with drbd_make_request_common,
-		 * wait_event and inc_ap_bio */
-		wait_event(mdev->cstate_wait,!atomic_read(&mdev->ap_bio_cnt));
+		 * wait_event and inc_ap_bio.
+		 * Note: we may lose connection whilst waiting here.
+		 * no worries though, should work out ok... */
+		wait_event(mdev->cstate_wait,
+			mdev->state.conn != WFBitMapS ||
+			!atomic_read(&mdev->ap_bio_cnt));
 		drbd_bm_lock(mdev);   // {
 		drbd_send_bitmap(mdev);
 		drbd_bm_unlock(mdev); // }
@@ -1946,7 +1946,6 @@
 	mdev->md_sync_timer.data = (unsigned long) mdev;
 
 	init_waitqueue_head(&mdev->cstate_wait);
-	init_waitqueue_head(&mdev->rq_wait);
 	init_waitqueue_head(&mdev->ee_wait);
 	init_waitqueue_head(&mdev->al_wait);
 

Modified: trunk/drbd/drbd_receiver.c
===================================================================
--- trunk/drbd/drbd_receiver.c	2006-09-25 14:10:01 UTC (rev 2459)
+++ trunk/drbd/drbd_receiver.c	2006-09-25 14:51:30 UTC (rev 2460)
@@ -1014,8 +1014,8 @@
 
 	dec_rs_pending(mdev);
 
-	drbd_ee_prepare_write(mdev,e);
-	e->w.cb     = e_end_resync_block;
+	e->private_bio->bi_end_io = drbd_endio_write_sec;
+	e->w.cb = e_end_resync_block;
 
 	inc_unacked(mdev);
 	/* corresponding dec_unacked() in e_end_resync_block()
@@ -1249,8 +1249,8 @@
 		return FALSE;
 	}
 
-	drbd_ee_prepare_write(mdev, e);
-	e->w.cb     = e_end_block;
+	e->private_bio->bi_end_io = drbd_endio_write_sec;
+	e->w.cb = e_end_block;
 
 	/* FIXME drbd_al_begin_io in case we have two primaries... */
 
@@ -1490,12 +1490,8 @@
 		return FALSE;
 	}
 
-	spin_lock_irq(&mdev->req_lock);
-	list_add(&e->w.list,&mdev->read_ee);
-	spin_unlock_irq(&mdev->req_lock);
+	e->private_bio->bi_end_io = drbd_endio_read_sec;
 
-	drbd_ee_prepare_read(mdev,e);
-
 	switch (h->command) {
 	case DataRequest:
 		e->w.cb = w_e_end_data_req;
@@ -1522,6 +1518,10 @@
 		fault_type = DRBD_FAULT_MAX;
 	}
 
+	spin_lock_irq(&mdev->req_lock);
+	list_add(&e->w.list,&mdev->read_ee);
+	spin_unlock_irq(&mdev->req_lock);
+
 	inc_unacked(mdev);
 	/* FIXME actually, it could be a READA originating from the peer ... */
 	drbd_generic_make_request(READ,fault_type,e->private_bio);

Modified: trunk/drbd/drbd_req.c
===================================================================
--- trunk/drbd/drbd_req.c	2006-09-25 14:10:01 UTC (rev 2459)
+++ trunk/drbd/drbd_req.c	2006-09-25 14:51:30 UTC (rev 2460)
@@ -640,6 +640,9 @@
 		/* can even happen for protocol C,
 		 * when local io is stil pending.
 		 * in which case it does nothing. */
+		if (req->rq_state & RQ_NET_PENDING) {
+			print_rq_state(req, "FIXME");
+		}
 		D_ASSERT(req->rq_state & RQ_NET_SENT);
 		req->rq_state |= RQ_NET_DONE;
 		_req_may_be_done(req);
@@ -716,24 +719,9 @@
 	drbd_request_t *req;
 	int local, remote;
 
-	/* we wait here
-	 *    as long as the device is suspended
-	 *    until the bitmap is no longer on the fly during connection handshake
-	 */
-	wait_event( mdev->cstate_wait,
-		    (volatile int)((mdev->state.conn < WFBitMapS ||
-				    mdev->state.conn > WFBitMapT) &&
-				   !mdev->state.susp ) );
-
-	/* FIXME RACE here in case of freeze io.
-	 * maybe put the inc_ap_bio within the condition of the wait_event? */
-
-	/* compare with after_state_ch,
-	 * os.conn != WFBitMapS && ns.conn == WFBitMapS */
-	inc_ap_bio(mdev);
-
 	/* allocate outside of all locks; get a "reference count" (ap_bio_cnt)
 	 * to avoid races with the disconnect/reconnect code.  */
+	inc_ap_bio(mdev);
 	req = drbd_req_new(mdev,bio);
 	if (!req) {
 		dec_ap_bio(mdev);
@@ -881,7 +869,7 @@
 	 * barrier packet.  To get the write ordering right, we only have to
 	 * make sure that, if this is a write request and it triggered a
 	 * barrier packet, this request is queued within the same spinlock. */
-	if (mdev->unused_spare_barrier &&
+	if (remote && mdev->unused_spare_barrier &&
             test_and_clear_bit(ISSUE_BARRIER,&mdev->flags)) {
 		struct drbd_barrier *b = mdev->unused_spare_barrier;
 		b = _tl_add_barrier(mdev,b);

Modified: trunk/drbd/drbd_worker.c
===================================================================
--- trunk/drbd/drbd_worker.c	2006-09-25 14:10:01 UTC (rev 2459)
+++ trunk/drbd/drbd_worker.c	2006-09-25 14:51:30 UTC (rev 2460)
@@ -940,27 +940,7 @@
 		}
 	}
 
-	drbd_wait_ee_list_empty(mdev,&mdev->read_ee);
-
-	/* When we terminate a resync process, either because it finished
-	 * sucessfully, or because (like in this case here) we lost
-	 * communications, we need to "w_resume_next_sg".
-	 * We cannot use del_timer_sync from within _set_cstate, and since the
-	 * resync timer may still be scheduled and would then trigger anyways,
-	 * we set the STOP_SYNC_TIMER bit, and schedule the timer for immediate
-	 * execution from within _set_cstate().
-	 * The timer should then clear that bit and queue w_resume_next_sg.
-	 *
-	 * This is fine for the normal "resync finished" case.
-	 *
-	 * In this case (worker thread beeing stopped), there is a race:
-	 * we cannot be sure that the timer already triggered.
-	 *
-	 * So we del_timer_sync here, and check that "STOP_SYNC_TIMER" bit.
-	 * if it is still set, we queue w_resume_next_sg anyways,
-	 * just to be sure.
-	 */
-
+	/* FIXME this should go into drbd_disconnect */
 	del_timer_sync(&mdev->resync_timer);
 	/* possible paranoia check: the STOP_SYNC_TIMER bit should be set
 	 * if and only if del_timer_sync returns true ... */



More information about the drbd-cvs mailing list