[DRBD-cvs] svn commit by phil - r2315 - trunk/drbd - Removed that sensless down()/up() from the drbd_make_re

drbd-cvs at lists.linbit.com drbd-cvs at lists.linbit.com
Mon Jul 31 15:45:44 CEST 2006


Author: phil
Date: 2006-07-31 15:45:42 +0200 (Mon, 31 Jul 2006)
New Revision: 2315

Modified:
   trunk/drbd/drbd_actlog.c
   trunk/drbd/drbd_int.h
   trunk/drbd/drbd_main.c
   trunk/drbd/drbd_receiver.c
   trunk/drbd/drbd_req.c
   trunk/drbd/drbd_worker.c
Log:
Removed that sensless down()/up() from the drbd_make_request() function.
This should make DRBD fly... 


Modified: trunk/drbd/drbd_actlog.c
===================================================================
--- trunk/drbd/drbd_actlog.c	2006-07-31 13:05:01 UTC (rev 2314)
+++ trunk/drbd/drbd_actlog.c	2006-07-31 13:45:42 UTC (rev 2315)
@@ -231,7 +231,7 @@
 		al_work.al_ext = al_ext;
 		al_work.enr = enr;
 		al_work.w.cb = w_al_write_transaction;
-		drbd_queue_work_front(mdev,&mdev->data.work,&al_work.w);
+		drbd_queue_work_front(&mdev->data.work,&al_work.w);
 		wait_for_completion(&al_work.event);
 		
 		mdev->al_writ_cnt++;
@@ -673,7 +673,7 @@
 			}
 			udw->enr = ext1->lce.lc_number;
 			udw->w.cb = w_update_odbm;
-			drbd_queue_work_front(mdev,&mdev->data.work,&udw->w);
+			drbd_queue_work_front(&mdev->data.work,&udw->w);
 			if (ext1->flags != 0) {
 				WARN("deleting resync lce: %d[%u;%02lx]\n",
 				     ext1->lce.lc_number, ext1->rs_left,

Modified: trunk/drbd/drbd_int.h
===================================================================
--- trunk/drbd/drbd_int.h	2006-07-31 13:05:01 UTC (rev 2314)
+++ trunk/drbd/drbd_int.h	2006-07-31 13:45:42 UTC (rev 2315)
@@ -672,6 +672,7 @@
 struct drbd_work_queue {
 	struct list_head q;
 	struct semaphore s; // producers up it, worker down()s it
+	spinlock_t q_lock;  // to protect the list.
 };
 
 /* If Philipp agrees, we remove the "mutex", and make_request will only
@@ -851,8 +852,10 @@
 		       unsigned int set_size);
 extern void tl_clear(drbd_dev *mdev);
 extern void tl_add(drbd_dev *mdev, drbd_request_t *req);
+extern void _tl_add(drbd_dev *mdev, drbd_request_t *req);
 extern struct drbd_barrier *tl_add_barrier(drbd_dev *mdev);
-extern struct Tl_epoch_entry * ee_have_write(drbd_dev *mdev,drbd_request_t * req);
+extern struct drbd_barrier *_tl_add_barrier(drbd_dev *,struct drbd_barrier *);
+extern struct Tl_epoch_entry * _ee_have_write(drbd_dev *mdev,drbd_request_t * req);
 extern int tl_dependence(drbd_dev *mdev, drbd_request_t * item);
 extern int tl_verify(drbd_dev *mdev, drbd_request_t * item, sector_t sector);
 extern drbd_request_t * req_have_write(drbd_dev *, struct Tl_epoch_entry *);
@@ -1341,7 +1344,6 @@
 	}
 }
 
-
 static inline void
 _drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w)
 {
@@ -1350,34 +1352,25 @@
 }
 
 static inline void
-_drbd_queue_work_front(struct drbd_work_queue *q, struct drbd_work *w)
+drbd_queue_work_front(struct drbd_work_queue *q, struct drbd_work *w)
 {
-	list_add(&w->list,&q->q);
-	up(&q->s);
-}
-
-static inline void
-drbd_queue_work_front(drbd_dev *mdev, struct drbd_work_queue *q,
-			struct drbd_work *w)
-{
 	unsigned long flags;
-	spin_lock_irqsave(&mdev->req_lock,flags);
+	spin_lock_irqsave(&q->q_lock,flags);
 	list_add(&w->list,&q->q);
 	up(&q->s); /* within the spinlock,
 		      see comment near end of drbd_worker() */
-	spin_unlock_irqrestore(&mdev->req_lock,flags);
+	spin_unlock_irqrestore(&q->q_lock,flags);
 }
 
 static inline void
-drbd_queue_work(drbd_dev *mdev, struct drbd_work_queue *q,
-		  struct drbd_work *w)
+drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w)
 {
 	unsigned long flags;
-	spin_lock_irqsave(&mdev->req_lock,flags);
+	spin_lock_irqsave(&q->q_lock,flags);
 	list_add_tail(&w->list,&q->q);
 	up(&q->s); /* within the spinlock,
 		      see comment near end of drbd_worker() */
-	spin_unlock_irqrestore(&mdev->req_lock,flags);
+	spin_unlock_irqrestore(&q->q_lock,flags);
 }
 
 static inline void wake_asender(drbd_dev *mdev) {

Modified: trunk/drbd/drbd_main.c
===================================================================
--- trunk/drbd/drbd_main.c	2006-07-31 13:05:01 UTC (rev 2314)
+++ trunk/drbd/drbd_main.c	2006-07-31 13:45:42 UTC (rev 2315)
@@ -204,7 +204,7 @@
 }
 
 
-STATIC void _tl_add(drbd_dev *mdev, drbd_request_t *req)
+void _tl_add(drbd_dev *mdev, drbd_request_t *req)
 {
 	struct drbd_barrier *b;
 
@@ -260,6 +260,24 @@
 	return newest_before;
 }
 
+struct drbd_barrier *_tl_add_barrier(drbd_dev *mdev,struct drbd_barrier *new)
+{
+	struct drbd_barrier *newest_before;
+
+	INIT_LIST_HEAD(&new->requests);
+	new->next=0;
+	new->n_req=0;
+
+	/* mdev->newest_barrier == NULL "cannot happen". but anyways... */
+	newest_before = mdev->newest_barrier;
+	/* never send a barrier number == 0 */
+	new->br_number = (newest_before->br_number+1) ?: 1;
+	mdev->newest_barrier->next = new;
+	mdev->newest_barrier = new;
+
+	return newest_before;
+}
+
 void tl_release(drbd_dev *mdev,unsigned int barrier_nr,
 		       unsigned int set_size)
 {
@@ -452,8 +470,7 @@
 	return req;
 }
 
-struct Tl_epoch_entry * ee_have_write(drbd_dev *mdev,
-					     drbd_request_t * req)
+struct Tl_epoch_entry * _ee_have_write(drbd_dev *mdev, drbd_request_t * req)
 {
 	struct hlist_head *slot;
 	struct hlist_node *n;
@@ -464,8 +481,6 @@
 
 	D_ASSERT(size <= 1<<(HT_SHIFT+9) );
 
-	spin_lock_irq(&mdev->tl_lock);
-
 	for(i=-1;i<=1;i++ ) {
 		slot = mdev->ee_hash + ee_hash_fn(mdev,
 						  sector + i*(1<<(HT_SHIFT)));
@@ -480,7 +495,6 @@
 	// Good, no conflict found
 	_tl_add(mdev,req);
  out:
-	spin_unlock_irq(&mdev->tl_lock);
 
 	return ee;
 }
@@ -915,7 +929,7 @@
 			ascw->ns = ns;
 			ascw->flags = flags;
 			ascw->w.cb = w_after_state_ch;
-			_drbd_queue_work(&mdev->data.work,&ascw->w);
+			drbd_queue_work(&mdev->data.work,&ascw->w);
 		} else {
 			WARN("Could not kmalloc an ascw\n");
 		}
@@ -1935,7 +1949,7 @@
 			 * XXX this might be a good addition to drbd_queue_work
 			 * anyways, to detect "double queuing" ... */
 			if (list_empty(&mdev->unplug_work.list))
-				_drbd_queue_work(&mdev->data.work,&mdev->unplug_work);
+				drbd_queue_work(&mdev->data.work,&mdev->unplug_work);
 		}
 	}
 	spin_unlock_irq(&mdev->req_lock);
@@ -1983,6 +1997,9 @@
 	sema_init(&mdev->data.work.s,0);
 	sema_init(&mdev->meta.work.s,0);
 
+	spin_lock_init(&mdev->data.work.q_lock);
+	spin_lock_init(&mdev->meta.work.q_lock);
+
 	spin_lock_init(&mdev->al_lock);
 	spin_lock_init(&mdev->tl_lock);
 	spin_lock_init(&mdev->ee_lock);
@@ -2850,7 +2867,7 @@
 {
 	drbd_dev* mdev = (drbd_dev*) data;
 
-	drbd_queue_work_front(mdev,&mdev->data.work,&mdev->md_sync_work);
+	drbd_queue_work_front(&mdev->data.work,&mdev->md_sync_work);
 }
 
 STATIC int w_md_sync(drbd_dev *mdev, struct drbd_work *w, int unused)

Modified: trunk/drbd/drbd_receiver.c
===================================================================
--- trunk/drbd/drbd_receiver.c	2006-07-31 13:05:01 UTC (rev 2314)
+++ trunk/drbd/drbd_receiver.c	2006-07-31 13:45:42 UTC (rev 2315)
@@ -2410,7 +2410,7 @@
 	disconnect_work = kmalloc(sizeof(struct drbd_work),GFP_KERNEL);
 	if(disconnect_work) {
 		disconnect_work->cb = w_disconnect;
-		drbd_queue_work(mdev,&mdev->data.work,disconnect_work);
+		drbd_queue_work(&mdev->data.work,disconnect_work);
 	} else {
 		WARN("kmalloc failed, taking messy shortcut.\n");
 		w_disconnect(mdev,NULL,1);

Modified: trunk/drbd/drbd_req.c
===================================================================
--- trunk/drbd/drbd_req.c	2006-07-31 13:05:01 UTC (rev 2314)
+++ trunk/drbd/drbd_req.c	2006-07-31 13:45:42 UTC (rev 2315)
@@ -112,7 +112,7 @@
 // FIXME proto A and diskless :)
 
 		req->w.cb = w_io_error;
-		drbd_queue_work(mdev,&mdev->data.work,&req->w);
+		drbd_queue_work(&mdev->data.work,&req->w);
 
 		goto out;
 
@@ -125,11 +125,11 @@
 
  out:
 	if (test_bit(ISSUE_BARRIER,&mdev->flags)) {
-		spin_lock_irqsave(&mdev->req_lock,flags);
+		spin_lock_irqsave(&mdev->data.work.q_lock,flags);
 		if(list_empty(&mdev->barrier_work.list)) {
 			_drbd_queue_work(&mdev->data.work,&mdev->barrier_work);
 		}
-		spin_unlock_irqrestore(&mdev->req_lock,flags);
+		spin_unlock_irqrestore(&mdev->data.work.q_lock,flags);
 	}
 }
 
@@ -148,7 +148,7 @@
 	spin_unlock(&mdev->pr_lock);
 	set_bit(UNPLUG_REMOTE,&mdev->flags);
 
-	drbd_queue_work(mdev, &mdev->data.work, &req->w);
+	drbd_queue_work(&mdev->data.work, &req->w);
 
 	return 1;
 }
@@ -237,6 +237,7 @@
 drbd_make_request_common(drbd_dev *mdev, int rw, int size,
 			 sector_t sector, struct bio *bio)
 {
+	struct drbd_barrier *b;
 	drbd_request_t *req;
 	int local, remote;
 	int mxb;
@@ -352,43 +353,43 @@
 		 */
 		if (rw == WRITE) {
 
-	/* About tl_add():
-	1. This must be within the semaphor,
-	   to ensure right order in tl_ data structure and to
-	   ensure right order of packets on the write
-	2. This must happen before sending, otherwise we might
-	   get in the BlockAck packet before we have it on the
-	   tl_ datastructure (=> We would want to remove it before it
-	   is there!)
-	3. Q: Why can we add it to tl_ even when drbd_send() might fail ?
-	      There could be a tl_cancel() to remove it within the semaphore!
-	   A: If drbd_send fails, we will lose the connection. Then
-	      tl_cear() will simulate a RQ_DRBD_SEND and set it out of sync
-	      for everything in the data structure.
-	*/
-			down(&mdev->data.mutex);
+			b = kmalloc(sizeof(struct drbd_barrier),GFP_NOIO);
+			if(!b) {
+				ERR("Failed to alloc barrier.");
+				goto fail_and_free_req;
+			}
+
+			spin_lock_irq(&mdev->tl_lock);
+
 			if(test_and_clear_bit(ISSUE_BARRIER,&mdev->flags)) {
-				struct drbd_barrier *b = tl_add_barrier(mdev);
+				b = _tl_add_barrier(mdev,b);
 				b->w.cb =  w_send_barrier;
-				drbd_queue_work(mdev,&mdev->data.work, &b->w);
+				drbd_queue_work(&mdev->data.work, &b->w);
+				b = NULL;
 			}
 
 			if (mdev->net_conf->two_primaries) {
-				if(ee_have_write(mdev,req)) { // tl_add() here
+				if(_ee_have_write(mdev,req)) { // tl_add() here
+					spin_unlock_irq(&mdev->tl_lock);
+
 					WARN("Concurrent write! [DISCARD L] sec=%lu\n",
 					     (unsigned long)sector);
 					dec_local(mdev);
 					dec_ap_pending(mdev);
 					local=0;
+
 					drbd_end_req(req, RQ_DRBD_DONE, 1, sector);
+					return 0;
 				}
 			} else {
-				tl_add(mdev,req);
+				_tl_add(mdev,req);
 			}
 			req->w.cb =  w_send_dblock;
-			drbd_queue_work(mdev,&mdev->data.work, &req->w);
+			drbd_queue_work(&mdev->data.work, &req->w);
 
-			up(&mdev->data.mutex);
+			spin_unlock_irq(&mdev->tl_lock);
+
+			if(b) kfree(b);
 		} else {
 			// this node is diskless ...
 			drbd_read_remote(mdev,req);

Modified: trunk/drbd/drbd_worker.c
===================================================================
--- trunk/drbd/drbd_worker.c	2006-07-31 13:05:01 UTC (rev 2314)
+++ trunk/drbd/drbd_worker.c	2006-07-31 13:45:42 UTC (rev 2315)
@@ -88,7 +88,7 @@
 	spin_unlock_irqrestore(&mdev->ee_lock,flags);
 
 	drbd_chk_io_error(mdev,error);
-	drbd_queue_work(mdev,&mdev->data.work,&e->w);
+	drbd_queue_work(&mdev->data.work,&e->w);
 	dec_local(mdev);
 	return 0;
 }
@@ -181,7 +181,7 @@
 		if (DRBD_ratelimit(5*HZ,5))
 			ERR("local read failed, retrying remotely\n");
 		req->w.cb = w_read_retry_remote;
-		drbd_queue_work(mdev,&mdev->data.work,&req->w);
+		drbd_queue_work(&mdev->data.work,&req->w);
 	} else {
 	pass_on:
 		bio_endio(req->master_bio,req->master_bio->bi_size,error);
@@ -282,11 +282,11 @@
 		mdev->resync_work.cb = w_resume_next_sg;
 	}
 
+	spin_unlock_irqrestore(&mdev->req_lock,flags);
+
 	if(list_empty(&mdev->resync_work.list)) {
-		_drbd_queue_work(&mdev->data.work,&mdev->resync_work);
+		drbd_queue_work(&mdev->data.work,&mdev->resync_work);
 	} else INFO("Avoided requeue of resync_work\n");
-
-	spin_unlock_irqrestore(&mdev->req_lock,flags);
 }
 
 #define SLEEP_TIME (HZ/10)
@@ -1047,7 +1047,7 @@
 		   this...   */
 
 		w = 0;
-		spin_lock_irq(&mdev->req_lock);
+		spin_lock_irq(&mdev->data.work.q_lock);
 		ERR_IF(list_empty(&mdev->data.work.q)) {
 			/* something terribly wrong in our logic.
 			 * we were able to down() the semaphore,
@@ -1060,12 +1060,12 @@
 			 *
 			 * I'll try to get away just starting over this loop.
 			 */
-			spin_unlock_irq(&mdev->req_lock);
+			spin_unlock_irq(&mdev->data.work.q_lock);
 			continue;
 		}
 		w = list_entry(mdev->data.work.q.next,struct drbd_work,list);
 		list_del_init(&w->list);
-		spin_unlock_irq(&mdev->req_lock);
+		spin_unlock_irq(&mdev->data.work.q_lock);
 
 		if(!w->cb(mdev,w, mdev->state.conn < Connected )) {
 			//WARN("worker: a callback failed! \n");
@@ -1100,11 +1100,11 @@
 	/* possible paranoia check: the STOP_SYNC_TIMER bit should be set
 	 * if and only if del_timer_sync returns true ... */
 
-	spin_lock_irq(&mdev->req_lock);
+	spin_lock_irq(&mdev->data.work.q_lock);
 	if (test_and_clear_bit(STOP_SYNC_TIMER,&mdev->flags)) {
 		mdev->resync_work.cb = w_resume_next_sg;
 		if (list_empty(&mdev->resync_work.list))
-			_drbd_queue_work(&mdev->data.work,&mdev->resync_work);
+			drbd_queue_work(&mdev->data.work,&mdev->resync_work);
 		// else: already queued
 	} else {
 		/* timer already consumed that bit, or it was never set */
@@ -1122,7 +1122,7 @@
 	i = 0;
   again:
 	list_splice_init(&mdev->data.work.q,&work_list);
-	spin_unlock_irq(&mdev->req_lock);
+	spin_unlock_irq(&mdev->data.work.q_lock);
 
 	while(!list_empty(&work_list)) {
 		w = list_entry(work_list.next, struct drbd_work,list);
@@ -1131,7 +1131,7 @@
 		i++; /* dead debugging code */
 	}
 
-	spin_lock_irq(&mdev->req_lock);
+	spin_lock_irq(&mdev->data.work.q_lock);
 	ERR_IF(!list_empty(&mdev->data.work.q))
 		goto again;
 	sema_init(&mdev->data.work.s,0);
@@ -1140,7 +1140,7 @@
 	 * semaphore without corresponding list entry.
 	 * So don't do that.
 	 */
-	spin_unlock_irq(&mdev->req_lock);
+	spin_unlock_irq(&mdev->data.work.q_lock);
 
 	INFO("worker terminated\n");
 



More information about the drbd-cvs mailing list