[DRBD-cvs] DRBD CVS: drbd by phil from

drbd-user@lists.linbit.com drbd-user@lists.linbit.com
Thu, 15 Jan 2004 15:26:30 +0100 (CET)


DRBD CVS committal

Author  : phil
Host    : 
Module  : drbd

Dir     : drbd/drbd


Modified Files:
      Tag: rel-0_7-branch
	drbd_actlog.c drbd_dsender.c drbd_fs.c drbd_int.h drbd_main.c 
	drbd_receiver.c 


Log Message:
[Patch by Lars]
dsender becomes worker.

===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_actlog.c,v
retrieving revision 1.1.2.52
retrieving revision 1.1.2.53
diff -u -3 -r1.1.2.52 -r1.1.2.53
--- drbd_actlog.c	14 Jan 2004 13:36:01 -0000	1.1.2.52
+++ drbd_actlog.c	15 Jan 2004 14:26:29 -0000	1.1.2.53
@@ -560,30 +560,21 @@
 	   from other places in non IRQ */
 	unsigned long flags=0;
 	int cleared;
-	int wake_dsender=0;
+	/* notify of SYNC_FINISHED is now done by other means */
 
 	cleared = bm_set_bit(mdev, sector, blk_size, SS_IN_SYNC);
 
-	spin_lock_irqsave(&mdev->rs_lock,flags);
+	spin_lock_irqsave(&mdev->al_lock,flags);
 	mdev->rs_left -= cleared;
 	D_ASSERT((long)mdev->rs_left >= 0);
-	if( cleared && mdev->rs_left == 0 ) {
-		spin_lock(&mdev->ee_lock); // IRQ lock already taken by rs_lock
-		set_bit(SYNC_FINISHED,&mdev->flags);
-		spin_unlock(&mdev->ee_lock);
-		wake_dsender=1;
-	}
 
 	if(jiffies - mdev->rs_mark_time > HZ*10) {
 		mdev->rs_mark_time=jiffies;
 		mdev->rs_mark_left=mdev->rs_left;
 	}
-	spin_unlock_irqrestore(&mdev->rs_lock,flags);
+	spin_unlock_irqrestore(&mdev->al_lock,flags);
 
 	drbd_try_clear_on_disk_bm(mdev,sector,cleared,may_sleep);
-	if(wake_dsender) {// must happen after drbd_try_clear_on_disk_bm();
-		wake_up_interruptible(&mdev->dsender_wait);
-	}
 }
 
 
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_dsender.c,v
retrieving revision 1.1.2.46
retrieving revision 1.1.2.47
diff -u -3 -r1.1.2.46 -r1.1.2.47
--- drbd_dsender.c	14 Jan 2004 16:05:36 -0000	1.1.2.46
+++ drbd_dsender.c	15 Jan 2004 14:26:29 -0000	1.1.2.47
@@ -44,7 +44,7 @@
 #include "drbd.h"
 #include "drbd_int.h"
 
-void drbd_dio_end_read(struct buffer_head *bh, int uptodate)
+void enslaved_read_bh_end_io(struct buffer_head *bh, int uptodate)
 {
 	/* This callback will be called in irq context by the IDE drivers,
 	   and in Softirqs/Tasklets/BH context by the SCSI drivers.
@@ -67,36 +67,21 @@
 	smp_mb__after_clear_bit();
 
 	list_del(&e->w.list);
-	list_add(&e->w.list,&mdev->rdone_ee);
-
 	spin_unlock_irqrestore(&mdev->ee_lock,flags);
 
-	wake_up_interruptible(&mdev->dsender_wait);
+	__drbd_queue_work(mdev,&mdev->data.work,&e->w);
 }
 
-int drbd_process_rdone_ee(struct Drbd_Conf* mdev)
+int w_resync_source(drbd_dev *mdev, struct drbd_work *w)
 {
-	struct Tl_epoch_entry *e;
-	struct list_head *le;
-	int ok=1;
-
-	MUST_HOLD(&mdev->ee_lock);
-
-	while(!list_empty(&mdev->rdone_ee)) {
-		le = mdev->rdone_ee.next;
-		e = list_entry(le, struct Tl_epoch_entry,w.list);
-		spin_unlock_irq(&mdev->ee_lock);
-		ok = ok && e->w.cb(mdev,&e->w);
-
-		spin_lock_irq(&mdev->ee_lock);
-		list_del(le);         // remove from list first.
-
-		drbd_put_ee(mdev,e);
-	}
-
-	wake_up_interruptible(&mdev->ee_wait);
+	ERR("I seem to be resync source, but callback triggered??\n");
+	return 0;
+}
 
-	return ok;
+int w_resync_inactive(drbd_dev *mdev, struct drbd_work *w)
+{
+	ERR("resync inactive, but callback triggered??\n");
+	return 0;
 }
 
 STATIC drbd_dev *ds_find_osg(drbd_dev *mdev)
@@ -115,107 +100,64 @@
 	return 0;
 }
 
-STATIC int _ds_wait_osg(drbd_dev* odev, struct drbd_hook* dh)
+int w_make_resync_request(drbd_dev* mdev, struct drbd_work* w)
 {
-	// This is a callback, I better not assume that this 
-	// is a context which allows to send something from.
-	unsigned long flags;
-	drbd_dev *mdev = (drbd_dev*) dh->data;
-	int added=0;
-
-	if(odev->cstate <= Connected) {
-	retry:
-		if( (odev = ds_find_osg(mdev)) ) {
-			spin_lock_irqsave(&odev->req_lock,flags);
-			if(odev->cstate > Connected) {
-				list_add_tail(&dh->list,&odev->cstate_hook);
-				added=1;
-			}
-			spin_unlock_irqrestore(&odev->req_lock,flags);
-			if(!added) goto retry;
-		} else {
-			set_bit(SYNC_CONTINUE,&mdev->flags);
-			wake_up_interruptible(&mdev->dsender_wait);
-			kfree(dh);
-		}
-		return 0; // do not add to this hook again.
-	}
-	return 1; // run again. 
-}
+	struct Pending_read *pr;
+	struct Drbd_Conf    *odev;
+	sector_t sector;
 
-STATIC int drbd_wait_for_other_sync_groups(drbd_dev *mdev)
-{
-	drbd_dev *odev;
-	struct drbd_hook *dh=NULL;
-	int added=0;
+	PARANOIA_BUG_ON(w != &mdev->resync_work);
 
- retry:
+	// wait_for_other_sync_groups
 	odev = ds_find_osg(mdev);
-	if(!odev) return FALSE;
-
-	while( dh == NULL ) {
-		dh=kmalloc(sizeof(struct drbd_hook),GFP_KERNEL);
-		if(dh) break;
-		ERR("could not kmalloc drbd_hook\n");
-		current->state = TASK_INTERRUPTIBLE;
-		schedule_timeout(HZ);
+	if (odev) {
+		spin_lock_irq(&odev->req_lock);
+		if (odev->cstate > Connected) {
+			list_add_tail(&w->list,&odev->cstate_hook);
+			spin_unlock_irq(&odev->req_lock);
+			INFO("Syncer waits for sync group %i\n",
+			     odev->sync_conf.group);
+			drbd_send_short_cmd(mdev,SyncStop);
+			set_cstate(mdev,PausedSyncT);
+			return 1;
+		}
+		// state change while we were looking at it. never mind ...
+		spin_unlock_irq(&odev->req_lock);
 	}
 
-	dh->data = mdev;
-	dh->callback = _ds_wait_osg;
-
-	spin_lock_irq(&odev->req_lock);
-	if(odev->cstate > Connected) {
-		list_add_tail(&dh->list,&odev->cstate_hook);
-		added=1;
+	if (mdev->cstate == PausedSyncT) {
+		INFO("resumed synchronisation.\n");
+		drbd_send_short_cmd(mdev,SyncCont);
+		set_cstate(mdev,SyncTarget);
 	}
-	spin_unlock_irq(&odev->req_lock);
-	if(!added) goto retry;
-
-	INFO("Syncer waits for sync group %i\n",
-	     odev->sync_conf.group);
-	drbd_send_short_cmd(mdev,SyncStop);
-	set_cstate(mdev,PausedSyncT);
-
-	return TRUE;
-}
-
-/* bool */
-STATIC int ds_issue_requests(struct Drbd_Conf* mdev)
-{
-	int number,i;
-	sector_t sector;
-
-#define SLEEP_TIME (HZ/10)
-
-	number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
+	D_ASSERT(mdev->cstate == SyncTarget);
 
-	// Remove later
-	if(number > 1000) number=1000;
-	if(atomic_read(&mdev->pending_cnt)>1200) {
+	if (atomic_read(&mdev->pending_cnt)>1200) {
 		ERR("pending cnt high -- throttling resync.\n");
-		return TRUE;
-	}
-	// /Remove later
+		// schedule_timeout(HZ/10); ??
+		// FIXME do we send a write_hint here ?
 
-	if(drbd_wait_for_other_sync_groups(mdev)) return FALSE;
+		/* FIXME if (current_sync_throughput > mdev->sync_conf.rate)
+		 * and (we have other pending writes [//reads ?? ])
+		 * throttle, too...
+		 * If drbd_make_request would not send itself, but just
+		 * queue the work to the worker, the latter (condition)
+		 * simplifies to (!list_empty(work.q))		-lge
+		 */
+		goto requeue;
+	}
 
-	for(i=0;i<number;i++) {
-		struct Pending_read *pr;
+	pr = mempool_alloc(drbd_pr_mempool, GFP_USER);
+	if (likely(pr!= NULL)) {
 		int size=BM_BLOCK_SIZE;
 
-		pr = mempool_alloc(drbd_pr_mempool, GFP_USER);
-		if (!pr) return TRUE;
-		SET_MAGIC(pr);
-
 		sector = bm_get_sector(mdev->mbds_id,&size);
-
-		if(sector == MBDS_DONE) {
-			Drbd_Header h;
+		if (sector == MBDS_DONE) {
 			INVALIDATE_MAGIC(pr);
 			mempool_free(pr,drbd_pr_mempool);
-			drbd_send_cmd(mdev,mdev->data.socket,WriteHint,&h,sizeof(h));
-			return FALSE;
+			// __queue_work ... shortcut.
+			w_resync_finished(mdev,w);
+			return 1;
 		}
 
 		pr->d.sector = sector;
@@ -226,111 +168,162 @@
 
 		inc_pending(mdev);
 		ERR_IF(!drbd_send_drequest(mdev,RSDataRequest,
-					  sector,size,(unsigned long)pr))
+					  sector,size,(unsigned long)pr)) {
 			dec_pending(mdev,HERE);
+			return 0; // FAILED. worker will abort!
+		}
 	}
 
-	return TRUE;
+   requeue:
+	__drbd_queue_work(mdev,&mdev->data.work,w);
+	return 1;
 }
 
-void drbd_start_resync(struct Drbd_Conf *mdev, Drbd_CState side)
+int w_start_resync(drbd_dev *mdev, struct drbd_work *w)
 {
-	set_cstate(mdev,side);
-	mdev->rs_left=mdev->rs_total;
-	mdev->rs_start=jiffies;
-	mdev->rs_mark_left=mdev->rs_left;
-	mdev->rs_mark_time=mdev->rs_start;
-
-	INFO("Resync started as %s (need to sync %lu KB).\n",
-	     side == SyncTarget ? "target" : "source", mdev->rs_left/2);
+	PARANOIA_BUG_ON(w != &mdev->resync_work);
 
-	if(side == SyncTarget) {
-		set_bit(START_SYNC,&mdev->flags);
-		// FIXME do this more elegant ...
-		// for now, this ensures that meta data is "consistent"
-		if ( mdev->rs_total == 0 ) set_bit(SYNC_FINISHED,&mdev->flags);
-		wake_up_interruptible(&mdev->dsender_wait);
+	if(mdev->cstate == SyncTarget) {
+		mdev->gen_cnt[Flags] &= ~MDF_Consistent;
+		bm_reset(mdev->mbds_id);
+		w->cb = w_make_resync_request;
+		__drbd_queue_work(mdev,&mdev->data.work,w);
 	} else {
 		// If we are SyncSource we must be consistent :)
 		mdev->gen_cnt[Flags] |= MDF_Consistent;
-		if ( mdev->rs_total == 0 ) set_cstate(mdev,Connected);
+		w->cb = w_resync_source;
+		if ( mdev->rs_total == 0 ) {
+			w->cb = w_resync_finished;
+			__drbd_queue_work(mdev,&mdev->data.work,w);
+		}
+	}
+	drbd_md_write(mdev);
+
+	return 1;
+}
+
+int w_resync_finished(drbd_dev* mdev, struct drbd_work* w)
+{
+	unsigned long dt;
+
+	PARANOIA_BUG_ON(w != &mdev->resync_work);
+	D_ASSERT(mdev->rs_left == 0);
+
+	dt = (jiffies - mdev->rs_start) / HZ + 1;
+	INFO("Resync done (total %lu sec; %lu K/sec)\n",
+	     dt,(mdev->rs_total/2)/dt);
+
+	if (mdev->cstate == SyncTarget) {
+		mdev->gen_cnt[Flags] |= MDF_Consistent;
+		drbd_md_write(mdev);
+		drbd_send_short_cmd(mdev,SyncDone);
 	}
+	mdev->rs_total = 0;
+	set_cstate(mdev,Connected);
+
+	// assert that all bit-map parts are cleared.
+	D_ASSERT(list_empty(&mdev->resync->lru));
+	w->cb = w_resync_inactive;
+	INIT_LIST_HEAD(&w->list);
+	return 1;
 }
 
-static inline int _dsender_cond(struct Drbd_Conf *mdev)
+int w_e_end_data_req(drbd_dev *mdev, struct drbd_work *w)
 {
-	int rv;
-	rv = test_bit(START_SYNC,&mdev->flags) // TODO Use Lars' style _FLAG 
-		|| test_bit(SYNC_FINISHED,&mdev->flags)
-		|| test_bit(SYNC_CONTINUE,&mdev->flags);
+	struct Tl_epoch_entry *e = (struct Tl_epoch_entry*)w;
+	int ok;
+
+	ok=drbd_send_block(mdev, DataReply, e);
+	dec_unacked(mdev,HERE); // THINK unconditional?
 
 	spin_lock_irq(&mdev->ee_lock);
-	rv |= !list_empty(&mdev->rdone_ee);
+	drbd_put_ee(mdev,e);
 	spin_unlock_irq(&mdev->ee_lock);
 
-	return rv;
+	return ok;
 }
 
-int drbd_dsender(struct Drbd_thread *thi)
+int w_e_end_rsdata_req(drbd_dev *mdev, struct drbd_work *w)
 {
-	long time=MAX_SCHEDULE_TIMEOUT;
-	drbd_dev *mdev = thi->mdev;
+	struct Tl_epoch_entry *e = (struct Tl_epoch_entry*)w;
+	int ok;
 
-	sprintf(current->comm, "drbd%d_dsender", (int)(mdev-drbd_conf));
+	drbd_rs_complete_io(mdev,e->pbh.b_blocknr);
+	inc_pending(mdev);
+	ok=drbd_send_block(mdev, DataReply, e);
+	dec_unacked(mdev,HERE); // THINK unconditional?
 
-	while( thi->t_state == Running ) {
+	spin_lock_irq(&mdev->ee_lock);
+	drbd_put_ee(mdev,e);
+	spin_unlock_irq(&mdev->ee_lock);
 
-		wait_event_interruptible_timeout(
-			mdev->dsender_wait,_dsender_cond(mdev),time);
+	return ok;
+}
 
-		spin_lock_irq(&mdev->ee_lock);
-		drbd_process_rdone_ee(mdev);
-		spin_unlock_irq(&mdev->ee_lock);
-
-		if(test_and_clear_bit(START_SYNC,&mdev->flags)) {
-			time=SLEEP_TIME;
-			mdev->gen_cnt[Flags] &= ~MDF_Consistent;
-			drbd_md_write(mdev);
-			bm_reset(mdev->mbds_id);
-		}
+void drbd_start_resync(struct Drbd_Conf *mdev, Drbd_CState side)
+{
+	set_cstate(mdev,side);
+	mdev->rs_left=mdev->rs_total;
+	mdev->rs_start=jiffies;
+	mdev->rs_mark_left=mdev->rs_left;
+	mdev->rs_mark_time=mdev->rs_start;
 
-		if(test_and_clear_bit(SYNC_FINISHED,&mdev->flags)) {
-			unsigned long dt;
-			dt = (jiffies - mdev->rs_start) / HZ + 1;
-			INFO("Resync done (total %lu sec; %lu K/sec)\n",
-			     dt,(mdev->rs_total/2)/dt);
-
-			if(mdev->cstate == SyncTarget) {
-				mdev->gen_cnt[Flags] |= MDF_Consistent;
-				drbd_md_write(mdev);
-			}
-			mdev->rs_total = 0;
-			set_cstate(mdev,Connected);
+	INFO("Resync started as %s (need to sync %lu KB).\n",
+	     side == SyncTarget ? "target" : "source", mdev->rs_left/2);
 
-			// assert that all bit-map parts are cleared.
-			D_ASSERT(list_empty(&mdev->resync->lru));
-		}
+	PARANOIA_BUG_ON(!list_empty(&mdev->resync_work.list));
+	PARANOIA_BUG_ON(mdev->resync_work.cb != w_resync_inactive);
 
-		if(test_and_clear_bit(SYNC_CONTINUE,&mdev->flags) && 
-		   (mdev->cstate == PausedSyncT) ) {
-			time=SLEEP_TIME;
-			INFO("resumed synchronisation.\n");
-			drbd_send_short_cmd(mdev,SyncCont);
-			set_cstate(mdev,SyncTarget);
-		}
+	mdev->resync_work.cb = w_start_resync;
+	__drbd_queue_work(mdev,&mdev->data.work,&mdev->resync_work);
+}
 
-		if(time == SLEEP_TIME) {
-			if (!ds_issue_requests(mdev)) {
-				time=MAX_SCHEDULE_TIMEOUT;
-			}
-			if (!disable_io_hints) {
-				Drbd_Header h;
-				drbd_send_cmd(mdev,mdev->data.socket,WriteHint,&h,
-					      sizeof(h));
+int drbd_worker(struct Drbd_thread *thi)
+{
+	drbd_dev *mdev = thi->mdev;
+	struct drbd_work *w;
+	unsigned long flags;
+	int intr;
+
+	sprintf(current->comm, "drbd%d_worker", (int)(mdev-drbd_conf));
+
+	for (;;) {
+		intr = down_interruptible(&mdev->data.work.s);
+
+		if (intr) {
+			D_ASSERT(intr == -EINTR);
+			LOCK_SIGMASK(current,flags);
+			if (sigismember(&current->pending.signal, SIGTERM)) {
+				sigdelset(&current->pending.signal, SIGTERM);
+				RECALC_SIGPENDING(current);
 			}
+			UNLOCK_SIGMASK(current,flags);
+			if (thi->t_state != Running )
+				break;
+			continue;
 		}
+
+		if (thi->t_state != Running )
+			break;
+		if (need_resched())
+			schedule();
+
+		w = NULL;
+		spin_lock_irq(&mdev->req_lock);
+		if (!list_empty(&mdev->data.work.q)) {
+			w = list_entry(mdev->data.work.q.next,struct drbd_work,list);
+			list_del(&w->list);
+		}
+		spin_unlock_irq(&mdev->req_lock);
+
+		ERR_IF (!w)
+			continue; // BUG()... racy up() somewhere ??
+
+		ERR_IF ( !w->cb(mdev,w) )
+			break;
 	}
 
+	INFO("worker terminated\n");
+
 	return 0;
 }
-
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_fs.c,v
retrieving revision 1.28.2.43
retrieving revision 1.28.2.44
diff -u -3 -r1.28.2.43 -r1.28.2.44
--- drbd_fs.c	12 Jan 2004 09:31:36 -0000	1.28.2.43
+++ drbd_fs.c	15 Jan 2004 14:26:29 -0000	1.28.2.44
@@ -215,7 +215,7 @@
 	}
 
 	fsync_dev(MKDEV(MAJOR_NR, minor));
-	drbd_thread_stop(&mdev->dsender);
+	drbd_thread_stop(&mdev->worker);
 	drbd_thread_stop(&mdev->asender);
 	drbd_thread_stop(&mdev->receiver);
 	drbd_free_resources(mdev);
@@ -337,7 +337,7 @@
 	*/
 
 	fsync_dev(MKDEV(MAJOR_NR, minor));
-	drbd_thread_stop(&mdev->dsender);
+	drbd_thread_stop(&mdev->worker);
 	drbd_thread_stop(&mdev->asender);
 	drbd_thread_stop(&mdev->receiver);
 	drbd_free_sock(mdev);
@@ -591,7 +591,7 @@
 		/* FIXME what if fsync returns error */
 		fsync_dev(MKDEV(MAJOR_NR, minor));
 		set_bit(DO_NOT_INC_CONCNT,&mdev->flags);
-		drbd_thread_stop(&mdev->dsender);
+		drbd_thread_stop(&mdev->worker);
 		drbd_thread_stop(&mdev->asender);
 		drbd_thread_stop(&mdev->receiver);
 
@@ -608,7 +608,7 @@
 
 		fsync_dev(MKDEV(MAJOR_NR, minor));
 		set_bit(DO_NOT_INC_CONCNT,&mdev->flags);
-		drbd_thread_stop(&mdev->dsender);
+		drbd_thread_stop(&mdev->worker);
 		drbd_thread_stop(&mdev->asender);
 		drbd_thread_stop(&mdev->receiver);
 		drbd_free_resources(mdev);
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_int.h,v
retrieving revision 1.58.2.89
retrieving revision 1.58.2.90
diff -u -3 -r1.58.2.89 -r1.58.2.90
--- drbd_int.h	15 Jan 2004 10:55:26 -0000	1.58.2.89
+++ drbd_int.h	15 Jan 2004 14:26:29 -0000	1.58.2.90
@@ -344,6 +344,7 @@
 	SyncParam,
 	SyncStop,
 	SyncCont,
+	SyncDone,
 	MAX_CMD,
 	MayIgnore = 0x100, // Flag only to test if (cmd > MayIgnore) ...
 	MAX_OPT_CMD,
@@ -374,6 +375,7 @@
 	case SyncParam       : return "SyncParam";
 	case SyncStop        : return "SyncStop";
 	case SyncCont        : return "SyncCont";
+	case SyncDone        : return "SyncDone";
 	case MAX_CMD         : return "MAX_CMD";
 	case MayIgnore       : return "MayIgnore";
 	case MAX_OPT_CMD     : return "MAX_OPT_CMD";
@@ -405,6 +407,7 @@
  *   WriteHint
  *   SyncStop
  *   SyncCont
+ *   SyncDone
  */
 
 /*
@@ -598,13 +601,13 @@
 #define COLLECT_ZOMBIES    1
 #define SEND_PING          2
 #define WRITER_PRESENT     3
-#define START_SYNC         4
+//#define START_SYNC         4
 #define DO_NOT_INC_CONCNT  5
 #define WRITE_HINT_QUEUED  6
 #define PARTNER_DISKLESS   7
-#define SYNC_FINISHED      8
+//#define SYNC_FINISHED      8
 #define PROCESS_EE_RUNNING 9
-#define SYNC_CONTINUE     10
+//#define SYNC_CONTINUE     10
 
 struct BitMap {
 	unsigned long dev_size;
@@ -627,28 +630,29 @@
 #define BME_NO_WRITES    0
 #define BME_LOCKED       1
 
-struct drbd_hook {
-	struct list_head list;
-	void* data;
-	int (*callback) (drbd_dev*, struct drbd_hook* );
-};
-
-
 // TODO sort members for performance
 // MAYBE group them further
 
+/* THINK maybe we actually want to use the default "event/%s" worker threads
+ * or similar in linux 2.6, which uses per cpu data and threads.
+ *
+ * To be general, this might need a spin_lock member.
+ * For now, please use the mdev->req_lock to protect list_head,
+ * see __drbd_queue_work below.
+ */
+struct drbd_work_queue {
+	struct list_head q;
+	struct semaphore s; // producers up it, worker down()s it
+};
+
 /* If Philipp agrees, we remove the "mutex", and make_request will only
  * (throttle on "queue full" condition and) queue it to the worker thread...
  * which then is free to do whatever is needed, and has exclusive send access
  * to the data socket ...
- * 
- * I want to see how this works first. I have the feeling in my guts that
- * it will lead to OOM deadlock.
  */
 struct drbd_socket {
+	struct drbd_work_queue work;
 	struct semaphore  mutex;
-	struct semaphore  work;      // producers up it, worker down()s it
-	struct list_head  work_q;
 	struct socket    *socket;
 	Drbd_Polymorph_Packet sbuf;  // this way we get our
 	Drbd_Polymorph_Packet rbuf;  // send/receive buffers off the stack
@@ -700,11 +704,9 @@
 	unsigned long rs_start;    // Syncer's start time [unit jiffies]
 	sector_t rs_mark_left;// block not up-to-date at mark [unit sect.]
 	unsigned long rs_mark_time;// marks's time [unit jiffies]
-	spinlock_t rs_lock; // used to protect the rs_variables.
 	struct Drbd_thread receiver;
-	struct Drbd_thread dsender;
+	struct Drbd_thread worker;
 	struct Drbd_thread asender;
-	wait_queue_head_t dsender_wait;
 	struct BitMap* mbds_id;
 	struct lru_cache* resync; // Used to track operations of resync...
 	int open_cnt;
@@ -716,7 +718,7 @@
 	struct list_head sync_ee;   // IO in progress
 	struct list_head done_ee;   // send ack
 	struct list_head read_ee;   // IO in progress
-	struct list_head rdone_ee;  // send result or CondRequest
+	// struct list_head rdone_ee;  // send result or CondRequest
 	spinlock_t pr_lock;
 	struct list_head app_reads;
 	struct list_head resync_reads;
@@ -868,10 +870,19 @@
 		      unsigned int cmd, unsigned long arg);
 
 // drbd_dsender.c
-extern int drbd_dsender(struct Drbd_thread *thi);
-extern void drbd_dio_end_read(struct buffer_head *bh, int uptodate);
+//extern int drbd_dsender(struct Drbd_thread *thi);
+extern int drbd_worker(struct Drbd_thread *thi);
+extern void enslaved_read_bh_end_io(struct buffer_head *bh, int uptodate);
 extern void drbd_start_resync(drbd_dev *mdev, Drbd_CState side);
 extern unsigned long drbd_hash(struct buffer_head *bh);
+// worker callbacks
+extern int w_e_end_data_req      (drbd_dev *mdev, struct drbd_work*w);
+extern int w_e_end_rsdata_req    (drbd_dev *mdev, struct drbd_work*w);
+extern int w_make_resync_request (drbd_dev *mdev, struct drbd_work*w);
+extern int w_resync_finished     (drbd_dev *mdev, struct drbd_work*w);
+extern int w_resync_inactive     (drbd_dev *mdev, struct drbd_work*w);
+extern int w_resync_source       (drbd_dev *mdev, struct drbd_work*w);
+extern int w_start_resync        (drbd_dev *mdev, struct drbd_work*w);
 
 // drbd_receiver.c
 extern int drbd_release_ee(drbd_dev* mdev,struct list_head* list);
@@ -988,6 +999,17 @@
 /*
  * inline helper functions
  *************************/
+
+static inline void
+__drbd_queue_work(drbd_dev *mdev, struct drbd_work_queue *q,
+		  struct drbd_work *w)
+{
+	unsigned long flags;
+	spin_lock_irqsave(&mdev->req_lock,flags);
+	list_add_tail(&w->list,&q->q);
+	spin_unlock_irqrestore(&mdev->req_lock,flags);
+	up(&q->s);
+}
 
 static inline void wake_asender(drbd_dev *mdev) {
 	drbd_queue_signal(DRBD_SIG, mdev->asender.task);
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_main.c,v
retrieving revision 1.73.2.96
retrieving revision 1.73.2.97
diff -u -3 -r1.73.2.96 -r1.73.2.97
--- drbd_main.c	15 Jan 2004 10:55:26 -0000	1.73.2.96
+++ drbd_main.c	15 Jan 2004 14:26:29 -0000	1.73.2.97
@@ -87,7 +87,8 @@
 #endif
 
 int drbdd_init(struct Drbd_thread*);
-int drbd_dsender(struct Drbd_thread*);
+//int drbd_dsender(struct Drbd_thread*);
+int drbd_worker(struct Drbd_thread*);
 int drbd_asender(struct Drbd_thread*);
 
 int drbd_init(void);
@@ -314,28 +315,21 @@
 
 void set_cstate(drbd_dev* mdev,Drbd_CState cs)
 {
-	struct list_head workset;
 	struct list_head *le;
-	struct drbd_hook *dh;
+	struct drbd_work *w;
+	struct Drbd_Conf *that_dev;
 	unsigned long flags;
-	int run_again;
-
-	INIT_LIST_HEAD(&workset);
 
 	spin_lock_irqsave(&mdev->req_lock,flags);
 	mdev->cstate = cs;
 
-	list_add(&workset,&mdev->cstate_hook);
-	list_del_init(&mdev->cstate_hook);
-
-	while(!list_empty(&workset)) {
-		le = workset.next;
-		dh = list_entry(le, struct drbd_hook,list);
+	while(!list_empty(&mdev->cstate_hook)) {
+		le = mdev->cstate_hook.next;
+		w  = list_entry(le,struct drbd_work,list);
 		list_del(le);
-		spin_unlock_irqrestore(&mdev->req_lock,flags);
-		run_again = dh->callback(mdev,dh);
-		spin_lock_irqsave(&mdev->req_lock,flags);
-		if(run_again) list_add(le,&mdev->cstate_hook);
+		that_dev = container_of(w,struct Drbd_Conf,resync_work);
+		PARANOIA_BUG_ON(!IS_VALID_MDEV(mdev));
+		list_add_tail(&w->list,&that_dev->data.work.q);
 	}
 	spin_unlock_irqrestore(&mdev->req_lock,flags);
 
@@ -873,10 +867,6 @@
 	msg.msg_controllen = 0;
 	msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
 
-	/* FIXME remove. since nbd does not do this either,
-	 * it seems to be safe ... well, or *they* have a bug there :-)
-	 * lock_kernel();  //  check if this is still necessary
-	 */
 	oldfs = get_fs();
 	set_fs(KERNEL_DS);
 
@@ -1024,10 +1014,9 @@
 	init_MUTEX(&mdev->md_io_mutex);
 	init_MUTEX(&mdev->data.mutex);
 	init_MUTEX(&mdev->meta.mutex);
-	sema_init(&mdev->data.work,0);
-	sema_init(&mdev->meta.work,0);
+	sema_init(&mdev->data.work.s,0);
+	sema_init(&mdev->meta.work.s,0);
 
-	mdev->rs_lock        = SPIN_LOCK_UNLOCKED;
 	mdev->al_lock        = SPIN_LOCK_UNLOCKED;
 	mdev->tl_lock        = SPIN_LOCK_UNLOCKED;
 	mdev->ee_lock        = SPIN_LOCK_UNLOCKED;
@@ -1041,21 +1030,22 @@
 	INIT_LIST_HEAD(&mdev->sync_ee);
 	INIT_LIST_HEAD(&mdev->done_ee);
 	INIT_LIST_HEAD(&mdev->read_ee);
-	INIT_LIST_HEAD(&mdev->rdone_ee);
+	// INIT_LIST_HEAD(&mdev->rdone_ee);
 	INIT_LIST_HEAD(&mdev->busy_blocks);
 	INIT_LIST_HEAD(&mdev->app_reads);
 	INIT_LIST_HEAD(&mdev->resync_reads);
-	INIT_LIST_HEAD(&mdev->data.work_q);
-	INIT_LIST_HEAD(&mdev->meta.work_q);
+	INIT_LIST_HEAD(&mdev->data.work.q);
+	INIT_LIST_HEAD(&mdev->meta.work.q);
+	INIT_LIST_HEAD(&mdev->resync_work.list);
+	mdev->resync_work.cb = w_resync_inactive;
 
 	init_waitqueue_head(&mdev->state_wait);
 	init_waitqueue_head(&mdev->cstate_wait);
-	init_waitqueue_head(&mdev->dsender_wait);
 	init_waitqueue_head(&mdev->ee_wait);
 	init_waitqueue_head(&mdev->al_wait);
 
 	drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
-	drbd_thread_init(mdev, &mdev->dsender, drbd_dsender);
+	drbd_thread_init(mdev, &mdev->worker, drbd_worker);
 	drbd_thread_init(mdev, &mdev->asender, drbd_asender);
 
 	mdev->write_hint_tq.routine = &drbd_send_write_hint;
@@ -1166,10 +1156,6 @@
 			if(rr) printk(KERN_ERR DEVICE_NAME
 				       "%d: %d EEs in done list found!\n",i,rr);
 
-			rr = drbd_release_ee(mdev,&mdev->rdone_ee);
-			if(rr) printk(KERN_ERR DEVICE_NAME
-				       "%d: %d EEs in rdone list found!\n",i,rr);
-
 			rr = drbd_release_ee(mdev,&mdev->read_ee);
 			if(rr) printk(KERN_ERR DEVICE_NAME
 				       "%d: %d EEs in read list found!\n",i,rr);
@@ -1349,7 +1335,7 @@
 		drbd_set_state(drbd_conf+i,Secondary);
 		fsync_dev(MKDEV(MAJOR_NR, i));
 		set_bit(DO_NOT_INC_CONCNT,&drbd_conf[i].flags);
-		drbd_thread_stop(&drbd_conf[i].dsender);
+		drbd_thread_stop(&drbd_conf[i].worker);
 		drbd_thread_stop(&drbd_conf[i].receiver);
 		drbd_thread_stop(&drbd_conf[i].asender);
 	}
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_receiver.c,v
retrieving revision 1.97.2.80
retrieving revision 1.97.2.81
diff -u -3 -r1.97.2.80 -r1.97.2.81
--- drbd_receiver.c	15 Jan 2004 10:55:26 -0000	1.97.2.80
+++ drbd_receiver.c	15 Jan 2004 14:26:29 -0000	1.97.2.81
@@ -353,7 +353,7 @@
 
 /* It is important that the head list is really empty when returning,
    from this function. Note, this function is called from all three
-   threads (receiver, dsender and asender). To ensure this I only allow
+   threads (receiver, worker and asender). To ensure this I only allow
    one thread at a time in the body of the function */
 STATIC int _drbd_process_ee(drbd_dev *mdev,struct list_head *head)
 {
@@ -705,7 +705,7 @@
 	set_cstate(mdev,WFReportParams);
 
 	drbd_thread_start(&mdev->asender);
-	drbd_thread_start(&mdev->dsender);
+	drbd_thread_start(&mdev->worker);
 
 	drbd_send_param(mdev);
 
@@ -1161,26 +1161,6 @@
 	return TRUE;
 }
 
-STATIC int e_end_data_req(drbd_dev *mdev, struct drbd_work *w)
-{
-	struct Tl_epoch_entry *e = (struct Tl_epoch_entry*)w;
-	int ok;
-	ok=drbd_send_block(mdev, DataReply, e);
-	dec_unacked(mdev,HERE); // THINK unconditional?
-	return ok;
-}
-
-STATIC int e_end_rsdata_req(drbd_dev *mdev, struct drbd_work *w)
-{
-	struct Tl_epoch_entry *e = (struct Tl_epoch_entry*)w;
-	int ok;
-	drbd_rs_complete_io(mdev,e->pbh.b_blocknr);
-	inc_pending(mdev);
-	ok=drbd_send_block(mdev, DataReply, e);
-	dec_unacked(mdev,HERE); // THINK unconditional?
-	return ok;
-}
-
 STATIC int receive_DataRequest(drbd_dev *mdev,Drbd_Header *h)
 {
 	sector_t sector;
@@ -1205,13 +1185,15 @@
 
 	switch (h->command) {
 	case DataRequest:
-		e->w.cb = e_end_data_req;
+		e->w.cb = w_e_end_data_req;
 		break;
 	case RSDataRequest:
-		e->w.cb = e_end_rsdata_req;
+		e->w.cb = w_e_end_rsdata_req;
 		/* Eventually this should become asynchrously. Currently it
 		 * blocks the whole receiver just to delay the reading of a
-		 * resync data block. */
+		 * resync data block.
+		 * the drbd_work_queue mechanism is made for this...
+		 */
 		drbd_rs_begin_io(mdev,sector);
 		break;
 	default:
@@ -1220,7 +1202,7 @@
 
 	clear_bit(BH_Uptodate, &e->pbh.b_state);
 	set_bit(BH_Lock, &e->pbh.b_state);
-	e->pbh.b_end_io = drbd_dio_end_read;
+	e->pbh.b_end_io = enslaved_read_bh_end_io;
 
 	mdev->read_cnt += e->pbh.b_size >> 9;
 	inc_unacked(mdev);
@@ -1550,6 +1532,27 @@
 	return TRUE; // cannot fail ?
 }
 
+STATIC int receive_SyncDone(drbd_dev *mdev, Drbd_Header *h)
+{
+	unsigned long dt;
+	D_ASSERT(mdev->cstate == SyncSource);
+	D_ASSERT(mdev->resync_work.cb == w_resync_source);
+
+	INIT_LIST_HEAD(&mdev->resync_work.list);
+	mdev->resync_work.cb = w_resync_inactive;
+
+	dt = (jiffies - mdev->rs_start) / HZ + 1;
+	INFO("Resync done (total %lu sec; %lu K/sec)\n",
+	     dt,(mdev->rs_total/2)/dt);
+
+	mdev->rs_total = 0;
+	set_cstate(mdev,Connected);
+
+	// assert that all bit-map parts are cleared.
+	D_ASSERT(list_empty(&mdev->resync->lru));
+	return TRUE; // cannot fail ?
+}
+
 typedef int (*drbd_cmd_handler_f)(drbd_dev*,Drbd_Header*);
 
 static drbd_cmd_handler_f drbd_default_handler[] = {
@@ -1573,6 +1576,7 @@
 	[SyncParam]        = receive_SyncParam,
 	[SyncStop]         = receive_SyncStop,
 	[SyncCont]         = receive_SyncCont,
+	[SyncDone]         = receive_SyncDone,
 };
 
 static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
@@ -1613,7 +1617,7 @@
 void drbd_disconnect(drbd_dev *mdev)
 {
 	mdev->o_state = Unknown;
-	drbd_thread_stop_nowait(&mdev->dsender);
+	drbd_thread_stop_nowait(&mdev->worker);
 	drbd_thread_stop(&mdev->asender);
 
 	while(down_trylock(&mdev->data.mutex))
@@ -1635,7 +1639,7 @@
 	drbd_free_sock(mdev);
 	up(&mdev->data.mutex);
 
-	drbd_thread_stop(&mdev->dsender);
+	drbd_thread_stop(&mdev->worker);
 	drbd_collect_zombies(mdev);
 
 	if(mdev->cstate != StandAlone)