[DRBD-cvs] svn commit by lars - r2486 - branches/drbd-0.7/drbd - in protocol A and B, on connection loss, we "forgot" to

drbd-cvs at lists.linbit.com drbd-cvs at lists.linbit.com
Mon Oct 2 17:51:34 CEST 2006


Author: lars
Date: 2006-10-02 17:51:32 +0200 (Mon, 02 Oct 2006)
New Revision: 2486

Modified:
   branches/drbd-0.7/drbd/drbd_compat_wrappers.h
   branches/drbd-0.7/drbd/drbd_int.h
   branches/drbd-0.7/drbd/drbd_main.c
   branches/drbd-0.7/drbd/drbd_receiver.c
   branches/drbd-0.7/drbd/drbd_req.c
Log:

in protocol A and B, on connection loss, we "forgot" to set certain areas out of sync.



Modified: branches/drbd-0.7/drbd/drbd_compat_wrappers.h
===================================================================
--- branches/drbd-0.7/drbd/drbd_compat_wrappers.h	2006-10-02 12:57:05 UTC (rev 2485)
+++ branches/drbd-0.7/drbd/drbd_compat_wrappers.h	2006-10-02 15:51:32 UTC (rev 2486)
@@ -83,16 +83,6 @@
 	return (drbd_dev*) req->private_bio.b_private;
 }
 
-static inline sector_t drbd_req_get_sector(struct drbd_request *req)
-{
-	return req->private_bio.b_blocknr;
-}
-
-static inline unsigned short drbd_req_get_size(struct drbd_request *req)
-{
-	return req->private_bio.b_size;
-}
-
 static inline drbd_bio_t* drbd_req_private_bio(struct drbd_request *req)
 {
 	return &req->private_bio;
@@ -233,6 +223,8 @@
 		     |(1 << BH_Mapped) ;
 
 	req->rq_status = RQ_DRBD_NOTHING;
+	req->sector = req->private_bio.b_blocknr;
+	req->size = req->private_bio.b_size;
 }
 
 static inline void
@@ -258,6 +250,8 @@
 		     |(1 << BH_Mapped) ;
 
 	req->rq_status = RQ_DRBD_NOTHING;
+	req->sector = req->private_bio.b_blocknr;
+	req->size = req->private_bio.b_size;
 }
 
 static inline struct page* drbd_bio_get_page(struct buffer_head *bh)
@@ -397,18 +391,6 @@
 	return (drbd_dev*) req->mdev;
 }
 
-static inline sector_t drbd_req_get_sector(struct drbd_request *req)
-{
-	return req->master_bio->bi_sector;
-}
-
-static inline unsigned short drbd_req_get_size(struct drbd_request *req)
-{
-	drbd_dev* mdev = req->mdev;
-	D_ASSERT(req->master_bio->bi_size);
-	return req->master_bio->bi_size;
-}
-
 static inline drbd_bio_t* drbd_req_private_bio(struct drbd_request *req)
 {
 	return req->private_bio;
@@ -553,6 +535,8 @@
 
 	req->rq_status = RQ_DRBD_NOTHING;
 	req->mdev      = mdev;
+	req->sector = req->master_bio->bi_sector;
+	req->size   = req->master_bio->bi_size;	  
 }
 
 static inline void
@@ -568,6 +552,8 @@
 
 	req->rq_status = RQ_DRBD_NOTHING;
 	req->mdev      = mdev;
+	req->sector = req->master_bio->bi_sector;
+	req->size   = req->master_bio->bi_size;
 }
 
 static inline struct page* drbd_bio_get_page(struct bio *bio)

Modified: branches/drbd-0.7/drbd/drbd_int.h
===================================================================
--- branches/drbd-0.7/drbd/drbd_int.h	2006-10-02 12:57:05 UTC (rev 2485)
+++ branches/drbd-0.7/drbd/drbd_int.h	2006-10-02 15:51:32 UTC (rev 2486)
@@ -57,6 +57,11 @@
 	last->next = at;
 	at->prev = last;
 }
+static inline void list_splice(struct list_head *list, struct list_head *head)
+{
+	if (!list_empty(list))
+		__list_splice(list, head);
+}
 static inline void list_splice_init(struct list_head *list,
 				    struct list_head *head)
 {
@@ -613,6 +618,8 @@
 	int rq_status;
 	struct drbd_barrier *barrier; // The next barrier.
 	drbd_bio_t *master_bio;       // master bio pointer
+	unsigned int size;
+	sector_t sector;
 #if LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)
 	drbd_bio_t private_bio;       // private bio struct
 #else
@@ -766,7 +773,6 @@
 	atomic_t unacked_cnt;    // Need to send replys for
 	atomic_t local_cnt;      // Waiting for local disk to signal completion
 	spinlock_t req_lock;
-	spinlock_t tl_lock;
 	struct drbd_barrier* newest_barrier;
 	struct drbd_barrier* oldest_barrier;
 	unsigned long flags;
@@ -826,7 +832,7 @@
 extern void tl_release(drbd_dev *mdev,unsigned int barrier_nr,
 		       unsigned int set_size);
 extern void tl_clear(drbd_dev *mdev);
-extern int tl_dependence(drbd_dev *mdev, drbd_request_t * item);
+extern int tl_dependence(drbd_dev *mdev, drbd_request_t * item, int free_it);
 extern void drbd_free_sock(drbd_dev *mdev);
 extern int drbd_send(drbd_dev *mdev, struct socket *sock,
 		     void* buf, size_t size, unsigned msg_flags);
@@ -1020,7 +1026,7 @@
 extern mempool_t *drbd_request_mempool;
 
 // drbd_req
-#define ERF_NOTLD    2   /* do not call tl_dependence */
+extern void _drbd_end_req(drbd_request_t *, int, int, sector_t);
 extern void drbd_end_req(drbd_request_t *, int, int, sector_t);
 #if LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)
 extern int drbd_make_request_24(request_queue_t *q, int rw, struct buffer_head *bio);
@@ -1225,6 +1231,16 @@
 	}
 }
 
+static inline sector_t drbd_req_get_sector(struct drbd_request *req)
+{
+	return req->sector;
+}
+
+static inline unsigned short drbd_req_get_size(struct drbd_request *req)
+{
+	return req->size;
+}
+
 static inline void
 _drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w)
 {

Modified: branches/drbd-0.7/drbd/drbd_main.c
===================================================================
--- branches/drbd-0.7/drbd/drbd_main.c	2006-10-02 12:57:05 UTC (rev 2485)
+++ branches/drbd-0.7/drbd/drbd_main.c	2006-10-02 15:51:32 UTC (rev 2486)
@@ -208,7 +208,7 @@
 {
 	struct drbd_barrier *b;
 
-	spin_lock_irq(&mdev->tl_lock);
+	spin_lock_irq(&mdev->req_lock);
 
 	b=mdev->newest_barrier;
 
@@ -220,24 +220,9 @@
 		set_bit(ISSUE_BARRIER,&mdev->flags);
 	}
 
-	spin_unlock_irq(&mdev->tl_lock);
+	spin_unlock_irq(&mdev->req_lock);
 }
 
-STATIC void tl_cancel(drbd_dev *mdev, drbd_request_t * item)
-{
-	struct drbd_barrier *b;
-
-	spin_lock_irq(&mdev->tl_lock);
-
-	b=item->barrier;
-	b->n_req--;
-
-	list_del(&item->w.list);
-	item->rq_status &= ~RQ_DRBD_IN_TL;
-
-	spin_unlock_irq(&mdev->tl_lock);
-}
-
 STATIC unsigned int tl_add_barrier(drbd_dev *mdev)
 {
 	unsigned int bnr;
@@ -256,13 +241,13 @@
 	b->br_number=barrier_nr_issue;
 	b->n_req=0;
 
-	spin_lock_irq(&mdev->tl_lock);
+	spin_lock_irq(&mdev->req_lock);
 
 	bnr = mdev->newest_barrier->br_number;
 	mdev->newest_barrier->next = b;
 	mdev->newest_barrier = b;
 
-	spin_unlock_irq(&mdev->tl_lock);
+	spin_unlock_irq(&mdev->req_lock);
 
 	return bnr;
 }
@@ -271,48 +256,36 @@
 		       unsigned int set_size)
 {
 	struct drbd_barrier *b;
+	struct list_head *le,*tle;
+	drbd_request_t *req;
 
-	spin_lock_irq(&mdev->tl_lock);
+	spin_lock_irq(&mdev->req_lock);
 
 	b = mdev->oldest_barrier;
 	mdev->oldest_barrier = b->next;
 
-	list_del(&b->requests);
-	/* There could be requests on the list waiting for completion
-	   of the write to the local disk, to avoid corruptions of
-	   slab's data structures we have to remove the lists head */
+	list_for_each_safe(le,tle,&b->requests) {
+		req = list_entry(le, struct drbd_request,w.list);
+		/* master_bio already completed, but protocol != C,
+		 * so we had it still in the tl. need to list_del
+		 * anyways, there may be local io pending! */
+		list_del(&req->w.list);
+		req->rq_status &= ~RQ_DRBD_IN_TL;
+		if( (req->rq_status & RQ_DRBD_DONE) == RQ_DRBD_DONE ) {
+			D_ASSERT(req->master_bio == NULL);
+			INVALIDATE_MAGIC(req);
+			mempool_free(req,drbd_request_mempool);
+		}
+	}
+	spin_unlock_irq(&mdev->req_lock);
 
-	spin_unlock_irq(&mdev->tl_lock);
-
 	D_ASSERT(b->br_number == barrier_nr);
 	D_ASSERT(b->n_req == set_size);
 
+	list_del(&b->requests);
 	kfree(b);
 }
 
-/* tl_dependence reports if this sector was present in the current
-   epoch.
-   As side effect it clears also the pointer to the request if it
-   was present in the transfert log. (Since tl_dependence indicates
-   that IO is complete and that drbd_end_req() should not be called
-   in case tl_clear has to be called due to interruption of the
-   communication)
-*/
-/* bool */
-int tl_dependence(drbd_dev *mdev, drbd_request_t * item)
-{
-	unsigned long flags;
-	int r=TRUE;
-
-	spin_lock_irqsave(&mdev->tl_lock,flags);
-
-	r = ( item->barrier == mdev->newest_barrier );
-	list_del(&item->w.list);
-
-	spin_unlock_irqrestore(&mdev->tl_lock,flags);
-	return r;
-}
-
 void tl_clear(drbd_dev *mdev)
 {
 	struct list_head *le,*tle;
@@ -334,32 +307,34 @@
 	new_first->br_number=4711;
 	new_first->n_req=0;
 
-	spin_lock_irq(&mdev->tl_lock);
+	spin_lock_irq(&mdev->req_lock);
 
 	b=mdev->oldest_barrier;
 	mdev->oldest_barrier = new_first;
 	mdev->newest_barrier = new_first;
 
-	spin_unlock_irq(&mdev->tl_lock);
-
 	inc_ap_pending(mdev); // Since we count the old first as well...
-
 	while ( b ) {
 		list_for_each_safe(le, tle, &b->requests) {
 			r = list_entry(le, struct drbd_request,w.list);
 			// bi_size and bi_sector are modified in bio_endio!
 			sector = drbd_req_get_sector(r);
 			size   = drbd_req_get_size(r);
+
+			r->rq_status &= ~RQ_DRBD_IN_TL;
+			list_del(&r->w.list);
+
 			if( !(r->rq_status & RQ_DRBD_SENT) ) {
 				if(mdev->conf.wire_protocol != DRBD_PROT_A )
 					dec_ap_pending(mdev);
-				drbd_end_req(r,RQ_DRBD_SENT,ERF_NOTLD|1, sector);
-				goto mark;
-			}
-			if(mdev->conf.wire_protocol != DRBD_PROT_C ) {
-			mark:
-				drbd_set_out_of_sync(mdev, sector, size);
-			}
+
+				_drbd_end_req(r,RQ_DRBD_SENT,1, sector);
+			} else if ((r->rq_status & RQ_DRBD_DONE) == RQ_DRBD_DONE) {
+				D_ASSERT(r->master_bio == NULL);
+				INVALIDATE_MAGIC(r);
+				mempool_free(r,drbd_request_mempool);
+			} /* else: local io still pending */
+			drbd_set_out_of_sync(mdev, sector, size);
 		}
 		f=b;
 		b=b->next;
@@ -367,6 +342,8 @@
 		kfree(f);
 		dec_ap_pending(mdev); // for the barrier
 	}
+	spin_unlock_irq(&mdev->req_lock);
+
 }
 
 /**
@@ -1057,8 +1034,7 @@
 	   tl_ datastructure (=> We would want to remove it before it
 	   is there!)
 	3. Q: Why can we add it to tl_ even when drbd_send() might fail ?
-	      There could be a tl_cancel() to remove it within the semaphore!
-	   A: If drbd_send fails, we will loose the connection. Then
+	   A: If drbd_send fails, we will lose the connection. Then
 	      tl_cear() will simulate a RQ_DRBD_SEND and set it out of sync
 	      for everything in the data structure.
 	*/
@@ -1090,15 +1066,8 @@
 				ok = _drbd_send_zc_bio(mdev,drbd_req_private_bio(req));
 			}
 		}
-		if(!ok) tl_cancel(mdev,req);
 	}
-	if (!ok) {
-		drbd_set_out_of_sync(mdev,
-				     drbd_req_get_sector(req),
-				     drbd_req_get_size(req));
-		drbd_end_req(req,RQ_DRBD_SENT,ERF_NOTLD|1,
-			     drbd_req_get_sector(req));
-	}
+	/* if (!ok) ... cleanup done in tl_clear */
 	spin_lock(&mdev->send_task_lock);
 	mdev->send_task=NULL;
 	spin_unlock(&mdev->send_task_lock);
@@ -1378,7 +1347,6 @@
 	sema_init(&mdev->meta.work.s,0);
 
 	mdev->al_lock        = SPIN_LOCK_UNLOCKED;
-	mdev->tl_lock        = SPIN_LOCK_UNLOCKED;
 	mdev->ee_lock        = SPIN_LOCK_UNLOCKED;
 	mdev->req_lock       = SPIN_LOCK_UNLOCKED;
 	mdev->pr_lock        = SPIN_LOCK_UNLOCKED;

Modified: branches/drbd-0.7/drbd/drbd_receiver.c
===================================================================
--- branches/drbd-0.7/drbd/drbd_receiver.c	2006-10-02 12:57:05 UTC (rev 2485)
+++ branches/drbd-0.7/drbd/drbd_receiver.c	2006-10-02 15:51:32 UTC (rev 2486)
@@ -2152,7 +2152,13 @@
 
 			ERR_IF (!VALID_POINTER(req)) return FALSE;
 
-			drbd_end_req(req, RQ_DRBD_SENT, 1, sector);
+			spin_lock_irq(&mdev->req_lock);
+			if (mdev->conf.wire_protocol == DRBD_PROT_C) {
+				req->rq_status &= ~RQ_DRBD_IN_TL;
+				list_del(&req->w.list);
+			}
+			_drbd_end_req(req, RQ_DRBD_SENT, 1, sector);
+			spin_unlock_irq(&mdev->req_lock);
 
 			if (test_bit(SYNC_STARTED,&mdev->flags) &&
 			    mdev->conf.wire_protocol == DRBD_PROT_C)

Modified: branches/drbd-0.7/drbd/drbd_req.c
===================================================================
--- branches/drbd-0.7/drbd/drbd_req.c	2006-10-02 12:57:05 UTC (rev 2485)
+++ branches/drbd-0.7/drbd/drbd_req.c	2006-10-02 15:51:32 UTC (rev 2486)
@@ -32,53 +32,29 @@
 #include <linux/drbd.h>
 #include "drbd_int.h"
 
-void drbd_end_req(drbd_request_t *req, int nextstate, int er_flags,
+void _drbd_end_req(drbd_request_t *req, int nextstate, int er_flags,
 		  sector_t rsector)
 {
-	/* This callback will be called in irq context by the IDE drivers,
-	   and in Softirqs/Tasklets/BH context by the SCSI drivers.
-	   This function is called by the receiver in kernel-thread context.
-	   Try to get the locking right :) */
-
 	struct Drbd_Conf* mdev = drbd_req_get_mdev(req);
-	unsigned long flags=0;
 	int uptodate;
 
-	PARANOIA_BUG_ON(!IS_VALID_MDEV(mdev));
-	PARANOIA_BUG_ON(drbd_req_get_sector(req) != rsector);
-	spin_lock_irqsave(&mdev->req_lock,flags);
-
 	if(req->rq_status & nextstate) {
 		ERR("request state error(%d)\n", req->rq_status);
 	}
 
 	req->rq_status |= nextstate;
 	req->rq_status &= er_flags | ~0x0001;
-	if( (req->rq_status & RQ_DRBD_DONE) == RQ_DRBD_DONE ) goto end_it;
+	if( (req->rq_status & RQ_DRBD_DONE) != RQ_DRBD_DONE )
+		return;
 
-	spin_unlock_irqrestore(&mdev->req_lock,flags);
+	/* complete the master bio now. */
 
-	return;
+	/* if this sector was present in the current epoch, close it.
+	 * FIXME compares reusable pointer addresses,
+	 * possibly artificially reducing epoch size */
+	if (req->barrier == mdev->newest_barrier)
+		set_bit(ISSUE_BARRIER,&mdev->flags);
 
-/* We only report uptodate == TRUE if both operations (WRITE && SEND)
-   reported uptodate == TRUE
- */
-
-	end_it:
-	spin_unlock_irqrestore(&mdev->req_lock,flags);
-
-	if( req->rq_status & RQ_DRBD_IN_TL ) {
-		if( ! ( er_flags & ERF_NOTLD ) ) {
-			/*If this call is from tl_clear() we may not call 
-			  tl_dependene, otherwhise we have a homegrown 
-			  spinlock deadlock.   */
-			if(tl_dependence(mdev,req))
-				set_bit(ISSUE_BARRIER,&mdev->flags);
-		} else {
-			list_del(&req->w.list); // we have the tl_lock...
-		}
-	}
-
 	uptodate = req->rq_status & 0x0001;
 	if( !uptodate && mdev->on_io_error == Detach) {
 		drbd_set_out_of_sync(mdev,rsector, drbd_req_get_size(req));
@@ -92,28 +68,47 @@
 // FIXME proto A and diskless :)
 
 		req->w.cb = w_io_error;
-		drbd_queue_work(mdev,&mdev->data.work,&req->w);
+		_drbd_queue_work(&mdev->data.work,&req->w);
 
 		goto out;
 
 	}
 
 	drbd_bio_endio(req->master_bio,uptodate);
+	req->master_bio = NULL;
 	dec_ap_bio(mdev);
 
-	INVALIDATE_MAGIC(req);
-	mempool_free(req,drbd_request_mempool);
+	/* free the request,
+	 * if it is not/no longer in the transfer log, because
+	 *  it was local only (not connected), or
+	 *  this is protocol C, or
+	 *  the corresponding barrier ack has been received already, or
+	 *  it has been cleared from the transfer log (after connection loss)
+	 */
+	if (!(req->rq_status & RQ_DRBD_IN_TL)) {
+		INVALIDATE_MAGIC(req);
+		mempool_free(req,drbd_request_mempool);
+	}
 
  out:
 	if (test_bit(ISSUE_BARRIER,&mdev->flags)) {
-		spin_lock_irqsave(&mdev->req_lock,flags);
 		if(list_empty(&mdev->barrier_work.list)) {
 			_drbd_queue_work(&mdev->data.work,&mdev->barrier_work);
 		}
-		spin_unlock_irqrestore(&mdev->req_lock,flags);
 	}
 }
 
+void drbd_end_req(drbd_request_t *req, int nextstate, int er_flags,
+		  sector_t rsector)
+{
+	struct Drbd_Conf* mdev = drbd_req_get_mdev(req);
+	unsigned long flags=0;
+	spin_lock_irqsave(&mdev->req_lock,flags);
+	_drbd_end_req(req,nextstate,er_flags,rsector);
+	spin_unlock_irqrestore(&mdev->req_lock,flags);
+}
+
+
 int drbd_read_remote(drbd_dev *mdev, drbd_request_t *req)
 {
 	int rv;



More information about the drbd-cvs mailing list