[DRBD-cvs] svn commit by lars - r2449 - trunk/drbd - rearranged cleanup uppon connection loss. again. :) fin

drbd-cvs at lists.linbit.com drbd-cvs at lists.linbit.com
Fri Sep 22 21:54:02 CEST 2006


Author: lars
Date: 2006-09-22 21:54:01 +0200 (Fri, 22 Sep 2006)
New Revision: 2449

Modified:
   trunk/drbd/drbd_int.h
   trunk/drbd/drbd_main.c
   trunk/drbd/drbd_nl.c
   trunk/drbd/drbd_receiver.c
   trunk/drbd/drbd_req.c
   trunk/drbd/drbd_worker.c
Log:

rearranged cleanup uppon connection loss. again. :)

finally got the ap_bio_cnt and ap_pending_cnt correctly balanced. (?)

even fixed a deadlock on.



Modified: trunk/drbd/drbd_int.h
===================================================================
--- trunk/drbd/drbd_int.h	2006-09-22 13:03:15 UTC (rev 2448)
+++ trunk/drbd/drbd_int.h	2006-09-22 19:54:01 UTC (rev 2449)
@@ -1614,7 +1614,13 @@
 	ERR_IF_CNT_IS_NEGATIVE(unacked_cnt)
 
 
-// warning LGE "FIXME inherently racy. this is buggy by design :("
+static inline void dec_net(drbd_dev* mdev)
+{
+	if(atomic_dec_and_test(&mdev->net_cnt)) {
+		wake_up(&mdev->cstate_wait);
+	}
+}
+
 /**
  * inc_net: Returns TRUE when it is ok to access mdev->net_conf. You
  * should call dec_net() when finished looking at mdev->net_conf.
@@ -1625,17 +1631,10 @@
 
 	atomic_inc(&mdev->net_cnt);
 	have_net_conf = mdev->state.conn >= Unconnected;
-	if(!have_net_conf) atomic_dec(&mdev->net_cnt);
+	if(!have_net_conf) dec_net(mdev);
 	return have_net_conf;
 }
 
-static inline void dec_net(drbd_dev* mdev)
-{
-	if(atomic_dec_and_test(&mdev->net_cnt)) {
-		wake_up(&mdev->cstate_wait);
-	}
-}
-
 /* strictly speaking,
  * these would have to hold the req_lock while looking at
  * the disk state. But since we cannot submit within a spinlock,
@@ -1676,35 +1675,29 @@
 /* this throttles on-the-fly application requests
  * according to max_buffers settings;
  * maybe re-implement using semaphores? */
-static inline void inc_ap_bio(drbd_dev* mdev)
+static inline int drbd_get_max_buffers(drbd_dev* mdev)
 {
 	int mxb = 1000000; /* arbitrary limit on open requests */
-
 	if(inc_net(mdev)) {
 		mxb = mdev->net_conf->max_buffers;
 		dec_net(mdev);
-		/* decrease here already, so you can reconfigure
-		 * the max buffer setting even under load.
-		 * alternative: use rw_semaphore. I'd like that.
-		 */
 	}
+	return mxb;
+}
 
+static inline void inc_ap_bio(drbd_dev* mdev)
+{
+	int mxb = drbd_get_max_buffers(mdev);
 	wait_event( mdev->rq_wait,atomic_add_unless(&mdev->ap_bio_cnt,1,mxb) );
 }
 
 static inline void dec_ap_bio(drbd_dev* mdev)
 {
+	int mxb = drbd_get_max_buffers(mdev);
 	int ap_bio = atomic_dec_return(&mdev->ap_bio_cnt);
 
-	if(ap_bio == 0) wake_up(&mdev->cstate_wait);
-
-	if(inc_net(mdev)) {
-		if(ap_bio < mdev->net_conf->max_buffers) 
-			wake_up(&mdev->rq_wait);
-		dec_net(mdev);
-	}
-
-	D_ASSERT(atomic_read(&mdev->ap_bio_cnt)>=0);
+	D_ASSERT(ap_bio>=0);
+	if (ap_bio < mxb) wake_up(&mdev->rq_wait);
 }
 
 /* FIXME does not handle wrap around yet */

Modified: trunk/drbd/drbd_main.c
===================================================================
--- trunk/drbd/drbd_main.c	2006-09-22 13:03:15 UTC (rev 2448)
+++ trunk/drbd/drbd_main.c	2006-09-22 19:54:01 UTC (rev 2449)
@@ -91,16 +91,6 @@
 
 #include <linux/moduleparam.h>
 MODULE_PARM_DESC(disable_bd_claim, "DONT USE! disables block device claiming" );
-/*
- * please somebody explain to me what the "perm" of the module_param
- * macro is good for (yes, permission for it in the "driverfs", but what
- * do we need to do for them to show up, to begin with?)
- * once I understand this, and the rest of the sysfs stuff, I probably
- * be able to understand how we can move from our ioctl interface to a
- * proper sysfs based one.
- *	-- lge
- */
-
 /* thanks to these macros, if compiled into the kernel (not-module),
  * this becomes the boot parameter drbd.minor_count
  */
@@ -173,6 +163,7 @@
 	b=kmalloc(sizeof(struct drbd_barrier),GFP_KERNEL);
 	if(!b) return 0;
 	INIT_LIST_HEAD(&b->requests);
+	INIT_LIST_HEAD(&b->w.list);
 	b->next=0;
 	b->br_number=4711;
 	b->n_req=0;
@@ -206,10 +197,10 @@
 	struct drbd_barrier *newest_before;
 
 	INIT_LIST_HEAD(&new->requests);
+	INIT_LIST_HEAD(&new->w.list);
 	new->next=0;
 	new->n_req=0;
 
-	/* mdev->newest_barrier == NULL "cannot happen". but anyways... */
 	newest_before = mdev->newest_barrier;
 	/* never send a barrier number == 0, because that is special-cased
 	 * when using TCQ for our write ordering code */
@@ -265,7 +256,7 @@
 	kfree(b);
 }
 
-/* FIXME called by whom? worker only? */
+/* called by drbd_disconnect (exiting receiver only) */
 void tl_clear(drbd_dev *mdev)
 {
 	struct drbd_barrier *b, *tmp;
@@ -277,17 +268,21 @@
 	while ( b ) {
 		struct list_head *le, *tle;
 		struct drbd_request *r;
+
+		ERR_IF(!list_empty(&b->w.list)) {
+			DUMPI(b->br_number);
+		}
+
 		list_for_each_safe(le, tle, &b->requests) {
 			r = list_entry(le, struct drbd_request,tl_requests);
 			_req_mod(r, connection_lost_while_pending);
 		}
 		tmp = b->next;
-		/* FIXME can there still be requests on that ring list now?
-		 * funny race conditions ... */
-		if (!list_empty(&b->requests)) {
-			WARN("FIXME explain this race...");
-			list_del(&b->requests);
-		}
+
+		/* there could still be requests on that ring list,
+		 * in case local io is still pending */
+		list_del(&b->requests);
+
 		if (b == mdev->newest_barrier) {
 			D_ASSERT(tmp == NULL);
 			b->br_number=4711;
@@ -876,6 +871,8 @@
 	}
 	// Do not change the order of the if above and below...
 	if (os.conn != WFBitMapS && ns.conn == WFBitMapS) {
+		/* compare with drbd_make_request_common,
+		 * wait_event and inc_ap_bio */
 		wait_event(mdev->cstate_wait,!atomic_read(&mdev->ap_bio_cnt));
 		drbd_bm_lock(mdev);   // {
 		drbd_send_bitmap(mdev);

Modified: trunk/drbd/drbd_nl.c
===================================================================
--- trunk/drbd/drbd_nl.c	2006-09-22 13:03:15 UTC (rev 2448)
+++ trunk/drbd/drbd_nl.c	2006-09-22 19:54:01 UTC (rev 2449)
@@ -1031,11 +1031,6 @@
 		memset(new_ee_hash, 0, ns*sizeof(void*));
 	}
 
-	/* IMPROVE:
-	   We should warn the user if the LL_DEV is
-	   used already. E.g. some FS mounted on it.
-	*/
-
 	((char*)new_conf->shared_secret)[SHARED_SECRET_MAX-1]=0;
 
 #if 0

Modified: trunk/drbd/drbd_receiver.c
===================================================================
--- trunk/drbd/drbd_receiver.c	2006-09-22 13:03:15 UTC (rev 2448)
+++ trunk/drbd/drbd_receiver.c	2006-09-22 19:54:01 UTC (rev 2449)
@@ -2445,14 +2445,49 @@
 	}
 }
 
+/* FIXME how should freeze-io be handled? */
+STATIC void drbd_fail_pending_reads(drbd_dev *mdev)
+{
+	struct hlist_head *slot;
+	struct hlist_node *n;
+	drbd_request_t * req;
+	struct list_head *le;
+	LIST_HEAD(workset);
+	int i;
+
+	/*
+	 * Application READ requests
+	 */
+	spin_lock_irq(&mdev->req_lock);
+	for(i=0;i<APP_R_HSIZE;i++) {
+		slot = mdev->app_reads_hash+i;
+		hlist_for_each_entry(req, n, slot, colision) {
+			list_add(&req->w.list, &workset);
+		}
+	}
+	memset(mdev->app_reads_hash,0,APP_R_HSIZE*sizeof(void*));
+
+	while(!list_empty(&workset)) {
+		le = workset.next;
+		req = list_entry(le, drbd_request_t, w.list);
+		list_del(le);
+
+		_req_mod(req, connection_lost_while_pending);
+	}
+	spin_unlock_irq(&mdev->req_lock);
+}
+
 STATIC void drbd_disconnect(drbd_dev *mdev)
 {
 	enum fencing_policy fp;
 
-	struct drbd_work *disconnect_work;
-
 	D_ASSERT(mdev->state.conn < Connected);
+	/* FIXME verify that:
+	 * the state change magic prevents us from becoming >= Connected again
+	 * while we are still cleaning up.
+	 */
 
+	/* asender cleans up active_ee, sync_ee and done_ee */
 	drbd_thread_stop(&mdev->asender);
 
 	fp = DontCare;
@@ -2461,29 +2496,74 @@
 		dec_local(mdev);
 	}
 
-	// Receiving side (may be primary, in case we had two primaries)
 	spin_lock_irq(&mdev->req_lock);
 	_drbd_wait_ee_list_empty(mdev,&mdev->read_ee);
-	_drbd_wait_ee_list_empty(mdev,&mdev->active_ee);
-	_drbd_wait_ee_list_empty(mdev,&mdev->sync_ee);
-	_drbd_clear_done_ee(mdev);
-	mdev->epoch_size = 0;
+	reclaim_net_ee(mdev);
 	spin_unlock_irq(&mdev->req_lock);
-	// Needs to happen before we schedule the disconnect work callback,
-	// Since they might have something for the worker's queue as well.
 
-	disconnect_work = kmalloc(sizeof(struct drbd_work),GFP_KERNEL);
-	if(disconnect_work) {
-		disconnect_work->cb = w_disconnect;
-		drbd_queue_work(&mdev->data.work,disconnect_work);
-	} else {
-		WARN("kmalloc failed, taking messy shortcut.\n");
-		w_disconnect(mdev,NULL,1);
+	/* would be nice to be able to wait for all ee and req objects
+	 * currently on the worker queue to be "canceled",
+	 * but that's not really necessary. */
+
+	down(&mdev->data.mutex);
+	drbd_free_sock(mdev);
+	up(&mdev->data.mutex);
+
+	/* FIXME were is the correct place for these? tl_clear? */
+	clear_bit(ISSUE_BARRIER,&mdev->flags);
+	mdev->epoch_size = 0;
+
+	/* FIXME: fail pending reads?
+	 * when we are configured for freeze io,
+	 * we could retry them once we un-freeze. */
+	drbd_fail_pending_reads(mdev);
+
+	/* FIXME maybe the cleanup of the resync should go
+	 * into w_make_resync_request or w_resume_next_sg
+	 * or something like that. */
+
+	/* We do not have data structures that would allow us to
+	   get the rs_pending_cnt down to 0 again.
+	   * On SyncTarget we do not have any data structures describing
+	     the pending RSDataRequest's we have sent.
+	   * On SyncSource there is no data structure that tracks
+	     the RSDataReply blocks that we sent to the SyncTarget.
+	   And no, it is not the sum of the reference counts in the
+	   resync_LRU. The resync_LRU tracks the whole operation including
+           the disk-IO, while the rs_pending_cnt only tracks the blocks
+	   on the fly. */
+	drbd_rs_cancel_all(mdev);
+	mdev->rs_total=0;
+	atomic_set(&mdev->rs_pending_cnt,0);
+	wake_up(&mdev->cstate_wait);
+
+	if(!mdev->state.susp) {
+		/* FIXME should go into some "after_state_change" handler */
+		tl_clear(mdev);
 	}
 
+	D_ASSERT(atomic_read(&mdev->pp_in_use) == 0);
+	D_ASSERT(list_empty(&mdev->read_ee));
+	D_ASSERT(list_empty(&mdev->active_ee));
+	D_ASSERT(list_empty(&mdev->sync_ee));
+	D_ASSERT(list_empty(&mdev->done_ee));
+
+	if ( mdev->p_uuid ) {
+		kfree(mdev->p_uuid);
+		mdev->p_uuid = NULL;
+	}
+
+	INFO("Connection closed\n");
+
+	/* FIXME I'm not sure, there may still be a race? */
+	if(mdev->state.conn == StandAlone && mdev->net_conf ) {
+		kfree(mdev->net_conf);
+		mdev->net_conf = NULL;
+	}
+
 	drbd_md_sync(mdev);
 
-	if ( mdev->state.role == Primary ) {		
+	if ( mdev->state.role == Primary ) {
 		if( fp >= Resource &&
 		    mdev->state.pdsk >= DUnknown ) {
 			drbd_disks_t nps = drbd_try_outdate_peer(mdev);
@@ -3100,6 +3180,14 @@
 		drbd_thread_restart_nowait(&mdev->receiver);
 	}
 
+	INFO("asender starting cleanup\n");
+
+	spin_lock_irq(&mdev->req_lock);
+	_drbd_wait_ee_list_empty(mdev,&mdev->active_ee);
+	_drbd_wait_ee_list_empty(mdev,&mdev->sync_ee);
+	_drbd_clear_done_ee(mdev);
+	spin_unlock_irq(&mdev->req_lock);
+
 	INFO("asender terminated\n");
 
 	return 0;

Modified: trunk/drbd/drbd_req.c
===================================================================
--- trunk/drbd/drbd_req.c	2006-09-22 13:03:15 UTC (rev 2448)
+++ trunk/drbd/drbd_req.c	2006-09-22 19:54:01 UTC (rev 2449)
@@ -716,9 +716,24 @@
 	drbd_request_t *req;
 	int local, remote;
 
+	/* we wait here
+	 *    as long as the device is suspended
+	 *    until the bitmap is no longer on the fly during connection handshake
+	 */
+	wait_event( mdev->cstate_wait,
+		    (volatile int)((mdev->state.conn < WFBitMapS ||
+				    mdev->state.conn > WFBitMapT) &&
+				   !mdev->state.susp ) );
+
+	/* FIXME RACE here in case of freeze io.
+	 * maybe put the inc_ap_bio within the condition of the wait_event? */
+
+	/* compare with after_state_ch,
+	 * os.conn != WFBitMapS && ns.conn == WFBitMapS */
+	inc_ap_bio(mdev);
+
 	/* allocate outside of all locks; get a "reference count" (ap_bio_cnt)
 	 * to avoid races with the disconnect/reconnect code.  */
-	inc_ap_bio(mdev);
 	req = drbd_req_new(mdev,bio);
 	if (!req) {
 		dec_ap_bio(mdev);
@@ -729,15 +744,6 @@
 		return 0;
 	}
 
-	/* we wait here
-	 *    as long as the device is suspended
-	 *    until the bitmap is no longer on the fly during connection handshake
-	 */
-	wait_event( mdev->cstate_wait,
-		    (volatile int)((mdev->state.conn < WFBitMapS ||
-				    mdev->state.conn > WFBitMapT) &&
-				   !mdev->state.susp ) );
-
 	local = inc_local(mdev);
 	if (!local) {
 		bio_put(req->private_bio); /* or we get a bio leak */
@@ -836,6 +842,21 @@
 	/* GOOD, everything prepared, grab the spin_lock */
 	spin_lock_irq(&mdev->req_lock);
 
+	/* FIXME race with drbd_disconnect and tl_clear? */
+	if (remote) {
+		remote = (mdev->state.pdsk == UpToDate ||
+			    ( mdev->state.pdsk == Inconsistent &&
+			      mdev->state.conn >= Connected ) );
+		if (!remote) {
+			WARN("lost connection while grabbing the req_lock!\n");
+		}
+		if (!(local || remote)) {
+			ERR("IO ERROR: neither local nor remote disk\n");
+			spin_unlock_irq(&mdev->req_lock);
+			goto fail_and_free_req;
+		}
+	}
+
 	if (b && mdev->unused_spare_barrier == NULL) {
 		mdev->unused_spare_barrier = b;
 		b = NULL;
@@ -849,6 +870,7 @@
 		goto allocate_barrier;
 	}
 
+
 	/* _maybe_start_new_epoch(mdev);
 	 * If we need to generate a write barrier packet, we have to add the
 	 * new epoch (barrier) object, and queue the barrier packet for sending,
@@ -865,6 +887,11 @@
 		b = _tl_add_barrier(mdev,b);
 		mdev->unused_spare_barrier = NULL;
 		b->w.cb =  w_send_barrier;
+		/* inc_ap_pending done here, so we won't
+		 * get imbalanced on connection loss.
+		 * dec_ap_pending will be done in got_BarrierAck
+		 * or (on connection loss) in tl_clear.  */
+		inc_ap_pending(mdev);
 		drbd_queue_work(&mdev->data.work, &b->w);
 	} else {
 		D_ASSERT(!(remote && rw == WRITE &&

Modified: trunk/drbd/drbd_worker.c
===================================================================
--- trunk/drbd/drbd_worker.c	2006-09-22 13:03:15 UTC (rev 2448)
+++ trunk/drbd/drbd_worker.c	2006-09-22 19:54:01 UTC (rev 2449)
@@ -389,8 +389,6 @@
 	INFO("Resync done (total %lu sec; paused %lu sec; %lu K/sec)\n",
 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
 
-	// assert that all bit-map parts are cleared.
-	D_ASSERT(list_empty(&mdev->resync->lru));
 	D_ASSERT(drbd_bm_total_weight(mdev) == 0);
 	mdev->rs_total  = 0;
 	mdev->rs_paused = 0;
@@ -533,13 +531,12 @@
 	if (!drbd_get_data_sock(mdev))
 		return 0;
 	p->barrier = b->br_number;
-	inc_ap_pending(mdev);
+	/* inc_ap_pending was done where this was queued.
+	 * dec_ap_pending will be done in got_BarrierAck
+	 * or (on connection loss) in tl_clear.  */
 	ok = _drbd_send_cmd(mdev,mdev->data.socket,Barrier,(Drbd_Header*)p,sizeof(*p),0);
 	drbd_put_data_sock(mdev);
 
-	/* pairing dec_ap_pending() happens in got_BarrierAck,
-	 * or (on connection loss) in tl_clear.  */
-
 	return ok;
 }
 
@@ -600,111 +597,6 @@
 	return ok;
 }
 
-/* FIXME how should freeze-io be handled? */
-STATIC void drbd_fail_pending_reads(drbd_dev *mdev)
-{
-	struct hlist_head *slot;
-	struct hlist_node *n;
-	drbd_request_t * req;
-	struct list_head *le;
-	LIST_HEAD(workset);
-	int i;
-
-	/*
-	 * Application READ requests
-	 */
-	spin_lock_irq(&mdev->req_lock);
-	for(i=0;i<APP_R_HSIZE;i++) {
-		slot = mdev->app_reads_hash+i;
-		hlist_for_each_entry(req, n, slot, colision) {
-			list_add(&req->w.list, &workset);
-		}
-	}
-	memset(mdev->app_reads_hash,0,APP_R_HSIZE*sizeof(void*));
-
-	while(!list_empty(&workset)) {
-		le = workset.next;
-		req = list_entry(le, drbd_request_t, w.list);
-		list_del(le);
-
-		_req_mod(req, connection_lost_while_pending);
-	}
-	spin_unlock_irq(&mdev->req_lock);
-}
-
-/**
- * w_disconnect clean up everything, and restart the receiver.
- */
-int w_disconnect(drbd_dev *mdev, struct drbd_work *w, int cancel)
-{
-	D_ASSERT(cancel);
-
-	down(&mdev->data.mutex);
-	/* By grabbing the sock_mutex we make sure that no one
-	   uses the socket right now. */
-	drbd_free_sock(mdev);
-	up(&mdev->data.mutex);
-
-	drbd_fail_pending_reads(mdev);
-	drbd_rs_cancel_all(mdev);
-
-	// primary
-	clear_bit(ISSUE_BARRIER,&mdev->flags);
-
-	if(!mdev->state.susp) {
-		tl_clear(mdev);
-		D_ASSERT(mdev->oldest_barrier->n_req == 0);
-
-		if(atomic_read(&mdev->ap_pending_cnt)) {
-			ERR("ap_pending_cnt = %d\n",
-			    atomic_read(&mdev->ap_pending_cnt));
-			atomic_set(&mdev->ap_pending_cnt,0);
-		}
-	}
-
-	D_ASSERT(atomic_read(&mdev->pp_in_use) == 0);
-	D_ASSERT(list_empty(&mdev->read_ee));
-	D_ASSERT(list_empty(&mdev->active_ee)); // done here
-	D_ASSERT(list_empty(&mdev->sync_ee)); // done here
-	D_ASSERT(list_empty(&mdev->done_ee)); // done here
-
-	mdev->rs_total=0;
-
-	if(atomic_read(&mdev->unacked_cnt)) {
-		ERR("unacked_cnt = %d\n",atomic_read(&mdev->unacked_cnt));
-		atomic_set(&mdev->unacked_cnt,0);
-	}
-
-	/* We do not have data structures that would allow us to
-	   get the rs_pending_cnt down to 0 again.
-	   * On SyncTarget we do not have any data structures describing
-	     the pending RSDataRequest's we have sent.
-	   * On SyncSource there is no data structure that tracks
-	     the RSDataReply blocks that we sent to the SyncTarget.
-	   And no, it is not the sum of the reference counts in the
-	   resync_LRU. The resync_LRU tracks the whole operation including
-           the disk-IO, while the rs_pending_cnt only tracks the blocks
-	   on the fly. */
-	atomic_set(&mdev->rs_pending_cnt,0);
-	wake_up(&mdev->cstate_wait);
-
-	if ( mdev->p_uuid ) {
-		kfree(mdev->p_uuid);
-		mdev->p_uuid = NULL;
-	}
-
-	INFO("Connection closed\n");
-
-	if(w) kfree(w);
-
-	if(mdev->state.conn == StandAlone && mdev->net_conf ) {
-		kfree(mdev->net_conf);
-		mdev->net_conf = NULL;
-	}
-
-	return 1;
-}
-
 STATIC void drbd_global_lock(void)
 {
 	drbd_dev *mdev;



More information about the drbd-cvs mailing list