[DRBD-cvs] DRBD CVS: drbd by phil from
drbd-user@lists.linbit.com
drbd-user@lists.linbit.com
Thu, 15 Jan 2004 15:26:30 +0100 (CET)
DRBD CVS committal
Author : phil
Host :
Module : drbd
Dir : drbd/drbd
Modified Files:
Tag: rel-0_7-branch
drbd_actlog.c drbd_dsender.c drbd_fs.c drbd_int.h drbd_main.c
drbd_receiver.c
Log Message:
[Patch by Lars]
dsender becomes worker.
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_actlog.c,v
retrieving revision 1.1.2.52
retrieving revision 1.1.2.53
diff -u -3 -r1.1.2.52 -r1.1.2.53
--- drbd_actlog.c 14 Jan 2004 13:36:01 -0000 1.1.2.52
+++ drbd_actlog.c 15 Jan 2004 14:26:29 -0000 1.1.2.53
@@ -560,30 +560,21 @@
from other places in non IRQ */
unsigned long flags=0;
int cleared;
- int wake_dsender=0;
+ /* notify of SYNC_FINISHED is now done by other means */
cleared = bm_set_bit(mdev, sector, blk_size, SS_IN_SYNC);
- spin_lock_irqsave(&mdev->rs_lock,flags);
+ spin_lock_irqsave(&mdev->al_lock,flags);
mdev->rs_left -= cleared;
D_ASSERT((long)mdev->rs_left >= 0);
- if( cleared && mdev->rs_left == 0 ) {
- spin_lock(&mdev->ee_lock); // IRQ lock already taken by rs_lock
- set_bit(SYNC_FINISHED,&mdev->flags);
- spin_unlock(&mdev->ee_lock);
- wake_dsender=1;
- }
if(jiffies - mdev->rs_mark_time > HZ*10) {
mdev->rs_mark_time=jiffies;
mdev->rs_mark_left=mdev->rs_left;
}
- spin_unlock_irqrestore(&mdev->rs_lock,flags);
+ spin_unlock_irqrestore(&mdev->al_lock,flags);
drbd_try_clear_on_disk_bm(mdev,sector,cleared,may_sleep);
- if(wake_dsender) {// must happen after drbd_try_clear_on_disk_bm();
- wake_up_interruptible(&mdev->dsender_wait);
- }
}
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_dsender.c,v
retrieving revision 1.1.2.46
retrieving revision 1.1.2.47
diff -u -3 -r1.1.2.46 -r1.1.2.47
--- drbd_dsender.c 14 Jan 2004 16:05:36 -0000 1.1.2.46
+++ drbd_dsender.c 15 Jan 2004 14:26:29 -0000 1.1.2.47
@@ -44,7 +44,7 @@
#include "drbd.h"
#include "drbd_int.h"
-void drbd_dio_end_read(struct buffer_head *bh, int uptodate)
+void enslaved_read_bh_end_io(struct buffer_head *bh, int uptodate)
{
/* This callback will be called in irq context by the IDE drivers,
and in Softirqs/Tasklets/BH context by the SCSI drivers.
@@ -67,36 +67,21 @@
smp_mb__after_clear_bit();
list_del(&e->w.list);
- list_add(&e->w.list,&mdev->rdone_ee);
-
spin_unlock_irqrestore(&mdev->ee_lock,flags);
- wake_up_interruptible(&mdev->dsender_wait);
+ __drbd_queue_work(mdev,&mdev->data.work,&e->w);
}
-int drbd_process_rdone_ee(struct Drbd_Conf* mdev)
+int w_resync_source(drbd_dev *mdev, struct drbd_work *w)
{
- struct Tl_epoch_entry *e;
- struct list_head *le;
- int ok=1;
-
- MUST_HOLD(&mdev->ee_lock);
-
- while(!list_empty(&mdev->rdone_ee)) {
- le = mdev->rdone_ee.next;
- e = list_entry(le, struct Tl_epoch_entry,w.list);
- spin_unlock_irq(&mdev->ee_lock);
- ok = ok && e->w.cb(mdev,&e->w);
-
- spin_lock_irq(&mdev->ee_lock);
- list_del(le); // remove from list first.
-
- drbd_put_ee(mdev,e);
- }
-
- wake_up_interruptible(&mdev->ee_wait);
+ ERR("I seem to be resync source, but callback triggered??\n");
+ return 0;
+}
- return ok;
+int w_resync_inactive(drbd_dev *mdev, struct drbd_work *w)
+{
+ ERR("resync inactive, but callback triggered??\n");
+ return 0;
}
STATIC drbd_dev *ds_find_osg(drbd_dev *mdev)
@@ -115,107 +100,64 @@
return 0;
}
-STATIC int _ds_wait_osg(drbd_dev* odev, struct drbd_hook* dh)
+int w_make_resync_request(drbd_dev* mdev, struct drbd_work* w)
{
- // This is a callback, I better not assume that this
- // is a context which allows to send something from.
- unsigned long flags;
- drbd_dev *mdev = (drbd_dev*) dh->data;
- int added=0;
-
- if(odev->cstate <= Connected) {
- retry:
- if( (odev = ds_find_osg(mdev)) ) {
- spin_lock_irqsave(&odev->req_lock,flags);
- if(odev->cstate > Connected) {
- list_add_tail(&dh->list,&odev->cstate_hook);
- added=1;
- }
- spin_unlock_irqrestore(&odev->req_lock,flags);
- if(!added) goto retry;
- } else {
- set_bit(SYNC_CONTINUE,&mdev->flags);
- wake_up_interruptible(&mdev->dsender_wait);
- kfree(dh);
- }
- return 0; // do not add to this hook again.
- }
- return 1; // run again.
-}
+ struct Pending_read *pr;
+ struct Drbd_Conf *odev;
+ sector_t sector;
-STATIC int drbd_wait_for_other_sync_groups(drbd_dev *mdev)
-{
- drbd_dev *odev;
- struct drbd_hook *dh=NULL;
- int added=0;
+ PARANOIA_BUG_ON(w != &mdev->resync_work);
- retry:
+ // wait_for_other_sync_groups
odev = ds_find_osg(mdev);
- if(!odev) return FALSE;
-
- while( dh == NULL ) {
- dh=kmalloc(sizeof(struct drbd_hook),GFP_KERNEL);
- if(dh) break;
- ERR("could not kmalloc drbd_hook\n");
- current->state = TASK_INTERRUPTIBLE;
- schedule_timeout(HZ);
+ if (odev) {
+ spin_lock_irq(&odev->req_lock);
+ if (odev->cstate > Connected) {
+ list_add_tail(&w->list,&odev->cstate_hook);
+ spin_unlock_irq(&odev->req_lock);
+ INFO("Syncer waits for sync group %i\n",
+ odev->sync_conf.group);
+ drbd_send_short_cmd(mdev,SyncStop);
+ set_cstate(mdev,PausedSyncT);
+ return 1;
+ }
+ // state change while we were looking at it. never mind ...
+ spin_unlock_irq(&odev->req_lock);
}
- dh->data = mdev;
- dh->callback = _ds_wait_osg;
-
- spin_lock_irq(&odev->req_lock);
- if(odev->cstate > Connected) {
- list_add_tail(&dh->list,&odev->cstate_hook);
- added=1;
+ if (mdev->cstate == PausedSyncT) {
+ INFO("resumed synchronisation.\n");
+ drbd_send_short_cmd(mdev,SyncCont);
+ set_cstate(mdev,SyncTarget);
}
- spin_unlock_irq(&odev->req_lock);
- if(!added) goto retry;
-
- INFO("Syncer waits for sync group %i\n",
- odev->sync_conf.group);
- drbd_send_short_cmd(mdev,SyncStop);
- set_cstate(mdev,PausedSyncT);
-
- return TRUE;
-}
-
-/* bool */
-STATIC int ds_issue_requests(struct Drbd_Conf* mdev)
-{
- int number,i;
- sector_t sector;
-
-#define SLEEP_TIME (HZ/10)
-
- number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
+ D_ASSERT(mdev->cstate == SyncTarget);
- // Remove later
- if(number > 1000) number=1000;
- if(atomic_read(&mdev->pending_cnt)>1200) {
+ if (atomic_read(&mdev->pending_cnt)>1200) {
ERR("pending cnt high -- throttling resync.\n");
- return TRUE;
- }
- // /Remove later
+ // schedule_timeout(HZ/10); ??
+ // FIXME do we send a write_hint here ?
- if(drbd_wait_for_other_sync_groups(mdev)) return FALSE;
+ /* FIXME if (current_sync_throughput > mdev->sync_conf.rate)
+ * and (we have other pending writes [//reads ?? ])
+ * throttle, too...
+ * If drbd_make_request would not send itself, but just
+ * queue the work to the worker, the latter (condition)
+ * simplifies to (!list_empty(work.q)) -lge
+ */
+ goto requeue;
+ }
- for(i=0;i<number;i++) {
- struct Pending_read *pr;
+ pr = mempool_alloc(drbd_pr_mempool, GFP_USER);
+ if (likely(pr!= NULL)) {
int size=BM_BLOCK_SIZE;
- pr = mempool_alloc(drbd_pr_mempool, GFP_USER);
- if (!pr) return TRUE;
- SET_MAGIC(pr);
-
sector = bm_get_sector(mdev->mbds_id,&size);
-
- if(sector == MBDS_DONE) {
- Drbd_Header h;
+ if (sector == MBDS_DONE) {
INVALIDATE_MAGIC(pr);
mempool_free(pr,drbd_pr_mempool);
- drbd_send_cmd(mdev,mdev->data.socket,WriteHint,&h,sizeof(h));
- return FALSE;
+ // __queue_work ... shortcut.
+ w_resync_finished(mdev,w);
+ return 1;
}
pr->d.sector = sector;
@@ -226,111 +168,162 @@
inc_pending(mdev);
ERR_IF(!drbd_send_drequest(mdev,RSDataRequest,
- sector,size,(unsigned long)pr))
+ sector,size,(unsigned long)pr)) {
dec_pending(mdev,HERE);
+ return 0; // FAILED. worker will abort!
+ }
}
- return TRUE;
+ requeue:
+ __drbd_queue_work(mdev,&mdev->data.work,w);
+ return 1;
}
-void drbd_start_resync(struct Drbd_Conf *mdev, Drbd_CState side)
+int w_start_resync(drbd_dev *mdev, struct drbd_work *w)
{
- set_cstate(mdev,side);
- mdev->rs_left=mdev->rs_total;
- mdev->rs_start=jiffies;
- mdev->rs_mark_left=mdev->rs_left;
- mdev->rs_mark_time=mdev->rs_start;
-
- INFO("Resync started as %s (need to sync %lu KB).\n",
- side == SyncTarget ? "target" : "source", mdev->rs_left/2);
+ PARANOIA_BUG_ON(w != &mdev->resync_work);
- if(side == SyncTarget) {
- set_bit(START_SYNC,&mdev->flags);
- // FIXME do this more elegant ...
- // for now, this ensures that meta data is "consistent"
- if ( mdev->rs_total == 0 ) set_bit(SYNC_FINISHED,&mdev->flags);
- wake_up_interruptible(&mdev->dsender_wait);
+ if(mdev->cstate == SyncTarget) {
+ mdev->gen_cnt[Flags] &= ~MDF_Consistent;
+ bm_reset(mdev->mbds_id);
+ w->cb = w_make_resync_request;
+ __drbd_queue_work(mdev,&mdev->data.work,w);
} else {
// If we are SyncSource we must be consistent :)
mdev->gen_cnt[Flags] |= MDF_Consistent;
- if ( mdev->rs_total == 0 ) set_cstate(mdev,Connected);
+ w->cb = w_resync_source;
+ if ( mdev->rs_total == 0 ) {
+ w->cb = w_resync_finished;
+ __drbd_queue_work(mdev,&mdev->data.work,w);
+ }
+ }
+ drbd_md_write(mdev);
+
+ return 1;
+}
+
+int w_resync_finished(drbd_dev* mdev, struct drbd_work* w)
+{
+ unsigned long dt;
+
+ PARANOIA_BUG_ON(w != &mdev->resync_work);
+ D_ASSERT(mdev->rs_left == 0);
+
+ dt = (jiffies - mdev->rs_start) / HZ + 1;
+ INFO("Resync done (total %lu sec; %lu K/sec)\n",
+ dt,(mdev->rs_total/2)/dt);
+
+ if (mdev->cstate == SyncTarget) {
+ mdev->gen_cnt[Flags] |= MDF_Consistent;
+ drbd_md_write(mdev);
+ drbd_send_short_cmd(mdev,SyncDone);
}
+ mdev->rs_total = 0;
+ set_cstate(mdev,Connected);
+
+ // assert that all bit-map parts are cleared.
+ D_ASSERT(list_empty(&mdev->resync->lru));
+ w->cb = w_resync_inactive;
+ INIT_LIST_HEAD(&w->list);
+ return 1;
}
-static inline int _dsender_cond(struct Drbd_Conf *mdev)
+int w_e_end_data_req(drbd_dev *mdev, struct drbd_work *w)
{
- int rv;
- rv = test_bit(START_SYNC,&mdev->flags) // TODO Use Lars' style _FLAG
- || test_bit(SYNC_FINISHED,&mdev->flags)
- || test_bit(SYNC_CONTINUE,&mdev->flags);
+ struct Tl_epoch_entry *e = (struct Tl_epoch_entry*)w;
+ int ok;
+
+ ok=drbd_send_block(mdev, DataReply, e);
+ dec_unacked(mdev,HERE); // THINK unconditional?
spin_lock_irq(&mdev->ee_lock);
- rv |= !list_empty(&mdev->rdone_ee);
+ drbd_put_ee(mdev,e);
spin_unlock_irq(&mdev->ee_lock);
- return rv;
+ return ok;
}
-int drbd_dsender(struct Drbd_thread *thi)
+int w_e_end_rsdata_req(drbd_dev *mdev, struct drbd_work *w)
{
- long time=MAX_SCHEDULE_TIMEOUT;
- drbd_dev *mdev = thi->mdev;
+ struct Tl_epoch_entry *e = (struct Tl_epoch_entry*)w;
+ int ok;
- sprintf(current->comm, "drbd%d_dsender", (int)(mdev-drbd_conf));
+ drbd_rs_complete_io(mdev,e->pbh.b_blocknr);
+ inc_pending(mdev);
+ ok=drbd_send_block(mdev, DataReply, e);
+ dec_unacked(mdev,HERE); // THINK unconditional?
- while( thi->t_state == Running ) {
+ spin_lock_irq(&mdev->ee_lock);
+ drbd_put_ee(mdev,e);
+ spin_unlock_irq(&mdev->ee_lock);
- wait_event_interruptible_timeout(
- mdev->dsender_wait,_dsender_cond(mdev),time);
+ return ok;
+}
- spin_lock_irq(&mdev->ee_lock);
- drbd_process_rdone_ee(mdev);
- spin_unlock_irq(&mdev->ee_lock);
-
- if(test_and_clear_bit(START_SYNC,&mdev->flags)) {
- time=SLEEP_TIME;
- mdev->gen_cnt[Flags] &= ~MDF_Consistent;
- drbd_md_write(mdev);
- bm_reset(mdev->mbds_id);
- }
+void drbd_start_resync(struct Drbd_Conf *mdev, Drbd_CState side)
+{
+ set_cstate(mdev,side);
+ mdev->rs_left=mdev->rs_total;
+ mdev->rs_start=jiffies;
+ mdev->rs_mark_left=mdev->rs_left;
+ mdev->rs_mark_time=mdev->rs_start;
- if(test_and_clear_bit(SYNC_FINISHED,&mdev->flags)) {
- unsigned long dt;
- dt = (jiffies - mdev->rs_start) / HZ + 1;
- INFO("Resync done (total %lu sec; %lu K/sec)\n",
- dt,(mdev->rs_total/2)/dt);
-
- if(mdev->cstate == SyncTarget) {
- mdev->gen_cnt[Flags] |= MDF_Consistent;
- drbd_md_write(mdev);
- }
- mdev->rs_total = 0;
- set_cstate(mdev,Connected);
+ INFO("Resync started as %s (need to sync %lu KB).\n",
+ side == SyncTarget ? "target" : "source", mdev->rs_left/2);
- // assert that all bit-map parts are cleared.
- D_ASSERT(list_empty(&mdev->resync->lru));
- }
+ PARANOIA_BUG_ON(!list_empty(&mdev->resync_work.list));
+ PARANOIA_BUG_ON(mdev->resync_work.cb != w_resync_inactive);
- if(test_and_clear_bit(SYNC_CONTINUE,&mdev->flags) &&
- (mdev->cstate == PausedSyncT) ) {
- time=SLEEP_TIME;
- INFO("resumed synchronisation.\n");
- drbd_send_short_cmd(mdev,SyncCont);
- set_cstate(mdev,SyncTarget);
- }
+ mdev->resync_work.cb = w_start_resync;
+ __drbd_queue_work(mdev,&mdev->data.work,&mdev->resync_work);
+}
- if(time == SLEEP_TIME) {
- if (!ds_issue_requests(mdev)) {
- time=MAX_SCHEDULE_TIMEOUT;
- }
- if (!disable_io_hints) {
- Drbd_Header h;
- drbd_send_cmd(mdev,mdev->data.socket,WriteHint,&h,
- sizeof(h));
+int drbd_worker(struct Drbd_thread *thi)
+{
+ drbd_dev *mdev = thi->mdev;
+ struct drbd_work *w;
+ unsigned long flags;
+ int intr;
+
+ sprintf(current->comm, "drbd%d_worker", (int)(mdev-drbd_conf));
+
+ for (;;) {
+ intr = down_interruptible(&mdev->data.work.s);
+
+ if (intr) {
+ D_ASSERT(intr == -EINTR);
+ LOCK_SIGMASK(current,flags);
+ if (sigismember(¤t->pending.signal, SIGTERM)) {
+ sigdelset(¤t->pending.signal, SIGTERM);
+ RECALC_SIGPENDING(current);
}
+ UNLOCK_SIGMASK(current,flags);
+ if (thi->t_state != Running )
+ break;
+ continue;
}
+
+ if (thi->t_state != Running )
+ break;
+ if (need_resched())
+ schedule();
+
+ w = NULL;
+ spin_lock_irq(&mdev->req_lock);
+ if (!list_empty(&mdev->data.work.q)) {
+ w = list_entry(mdev->data.work.q.next,struct drbd_work,list);
+ list_del(&w->list);
+ }
+ spin_unlock_irq(&mdev->req_lock);
+
+ ERR_IF (!w)
+ continue; // BUG()... racy up() somewhere ??
+
+ ERR_IF ( !w->cb(mdev,w) )
+ break;
}
+ INFO("worker terminated\n");
+
return 0;
}
-
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_fs.c,v
retrieving revision 1.28.2.43
retrieving revision 1.28.2.44
diff -u -3 -r1.28.2.43 -r1.28.2.44
--- drbd_fs.c 12 Jan 2004 09:31:36 -0000 1.28.2.43
+++ drbd_fs.c 15 Jan 2004 14:26:29 -0000 1.28.2.44
@@ -215,7 +215,7 @@
}
fsync_dev(MKDEV(MAJOR_NR, minor));
- drbd_thread_stop(&mdev->dsender);
+ drbd_thread_stop(&mdev->worker);
drbd_thread_stop(&mdev->asender);
drbd_thread_stop(&mdev->receiver);
drbd_free_resources(mdev);
@@ -337,7 +337,7 @@
*/
fsync_dev(MKDEV(MAJOR_NR, minor));
- drbd_thread_stop(&mdev->dsender);
+ drbd_thread_stop(&mdev->worker);
drbd_thread_stop(&mdev->asender);
drbd_thread_stop(&mdev->receiver);
drbd_free_sock(mdev);
@@ -591,7 +591,7 @@
/* FIXME what if fsync returns error */
fsync_dev(MKDEV(MAJOR_NR, minor));
set_bit(DO_NOT_INC_CONCNT,&mdev->flags);
- drbd_thread_stop(&mdev->dsender);
+ drbd_thread_stop(&mdev->worker);
drbd_thread_stop(&mdev->asender);
drbd_thread_stop(&mdev->receiver);
@@ -608,7 +608,7 @@
fsync_dev(MKDEV(MAJOR_NR, minor));
set_bit(DO_NOT_INC_CONCNT,&mdev->flags);
- drbd_thread_stop(&mdev->dsender);
+ drbd_thread_stop(&mdev->worker);
drbd_thread_stop(&mdev->asender);
drbd_thread_stop(&mdev->receiver);
drbd_free_resources(mdev);
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_int.h,v
retrieving revision 1.58.2.89
retrieving revision 1.58.2.90
diff -u -3 -r1.58.2.89 -r1.58.2.90
--- drbd_int.h 15 Jan 2004 10:55:26 -0000 1.58.2.89
+++ drbd_int.h 15 Jan 2004 14:26:29 -0000 1.58.2.90
@@ -344,6 +344,7 @@
SyncParam,
SyncStop,
SyncCont,
+ SyncDone,
MAX_CMD,
MayIgnore = 0x100, // Flag only to test if (cmd > MayIgnore) ...
MAX_OPT_CMD,
@@ -374,6 +375,7 @@
case SyncParam : return "SyncParam";
case SyncStop : return "SyncStop";
case SyncCont : return "SyncCont";
+ case SyncDone : return "SyncDone";
case MAX_CMD : return "MAX_CMD";
case MayIgnore : return "MayIgnore";
case MAX_OPT_CMD : return "MAX_OPT_CMD";
@@ -405,6 +407,7 @@
* WriteHint
* SyncStop
* SyncCont
+ * SyncDone
*/
/*
@@ -598,13 +601,13 @@
#define COLLECT_ZOMBIES 1
#define SEND_PING 2
#define WRITER_PRESENT 3
-#define START_SYNC 4
+//#define START_SYNC 4
#define DO_NOT_INC_CONCNT 5
#define WRITE_HINT_QUEUED 6
#define PARTNER_DISKLESS 7
-#define SYNC_FINISHED 8
+//#define SYNC_FINISHED 8
#define PROCESS_EE_RUNNING 9
-#define SYNC_CONTINUE 10
+//#define SYNC_CONTINUE 10
struct BitMap {
unsigned long dev_size;
@@ -627,28 +630,29 @@
#define BME_NO_WRITES 0
#define BME_LOCKED 1
-struct drbd_hook {
- struct list_head list;
- void* data;
- int (*callback) (drbd_dev*, struct drbd_hook* );
-};
-
-
// TODO sort members for performance
// MAYBE group them further
+/* THINK maybe we actually want to use the default "event/%s" worker threads
+ * or similar in linux 2.6, which uses per cpu data and threads.
+ *
+ * To be general, this might need a spin_lock member.
+ * For now, please use the mdev->req_lock to protect list_head,
+ * see __drbd_queue_work below.
+ */
+struct drbd_work_queue {
+ struct list_head q;
+ struct semaphore s; // producers up it, worker down()s it
+};
+
/* If Philipp agrees, we remove the "mutex", and make_request will only
* (throttle on "queue full" condition and) queue it to the worker thread...
* which then is free to do whatever is needed, and has exclusive send access
* to the data socket ...
- *
- * I want to see how this works first. I have the feeling in my guts that
- * it will lead to OOM deadlock.
*/
struct drbd_socket {
+ struct drbd_work_queue work;
struct semaphore mutex;
- struct semaphore work; // producers up it, worker down()s it
- struct list_head work_q;
struct socket *socket;
Drbd_Polymorph_Packet sbuf; // this way we get our
Drbd_Polymorph_Packet rbuf; // send/receive buffers off the stack
@@ -700,11 +704,9 @@
unsigned long rs_start; // Syncer's start time [unit jiffies]
sector_t rs_mark_left;// block not up-to-date at mark [unit sect.]
unsigned long rs_mark_time;// marks's time [unit jiffies]
- spinlock_t rs_lock; // used to protect the rs_variables.
struct Drbd_thread receiver;
- struct Drbd_thread dsender;
+ struct Drbd_thread worker;
struct Drbd_thread asender;
- wait_queue_head_t dsender_wait;
struct BitMap* mbds_id;
struct lru_cache* resync; // Used to track operations of resync...
int open_cnt;
@@ -716,7 +718,7 @@
struct list_head sync_ee; // IO in progress
struct list_head done_ee; // send ack
struct list_head read_ee; // IO in progress
- struct list_head rdone_ee; // send result or CondRequest
+ // struct list_head rdone_ee; // send result or CondRequest
spinlock_t pr_lock;
struct list_head app_reads;
struct list_head resync_reads;
@@ -868,10 +870,19 @@
unsigned int cmd, unsigned long arg);
// drbd_dsender.c
-extern int drbd_dsender(struct Drbd_thread *thi);
-extern void drbd_dio_end_read(struct buffer_head *bh, int uptodate);
+//extern int drbd_dsender(struct Drbd_thread *thi);
+extern int drbd_worker(struct Drbd_thread *thi);
+extern void enslaved_read_bh_end_io(struct buffer_head *bh, int uptodate);
extern void drbd_start_resync(drbd_dev *mdev, Drbd_CState side);
extern unsigned long drbd_hash(struct buffer_head *bh);
+// worker callbacks
+extern int w_e_end_data_req (drbd_dev *mdev, struct drbd_work*w);
+extern int w_e_end_rsdata_req (drbd_dev *mdev, struct drbd_work*w);
+extern int w_make_resync_request (drbd_dev *mdev, struct drbd_work*w);
+extern int w_resync_finished (drbd_dev *mdev, struct drbd_work*w);
+extern int w_resync_inactive (drbd_dev *mdev, struct drbd_work*w);
+extern int w_resync_source (drbd_dev *mdev, struct drbd_work*w);
+extern int w_start_resync (drbd_dev *mdev, struct drbd_work*w);
// drbd_receiver.c
extern int drbd_release_ee(drbd_dev* mdev,struct list_head* list);
@@ -988,6 +999,17 @@
/*
* inline helper functions
*************************/
+
+static inline void
+__drbd_queue_work(drbd_dev *mdev, struct drbd_work_queue *q,
+ struct drbd_work *w)
+{
+ unsigned long flags;
+ spin_lock_irqsave(&mdev->req_lock,flags);
+ list_add_tail(&w->list,&q->q);
+ spin_unlock_irqrestore(&mdev->req_lock,flags);
+ up(&q->s);
+}
static inline void wake_asender(drbd_dev *mdev) {
drbd_queue_signal(DRBD_SIG, mdev->asender.task);
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_main.c,v
retrieving revision 1.73.2.96
retrieving revision 1.73.2.97
diff -u -3 -r1.73.2.96 -r1.73.2.97
--- drbd_main.c 15 Jan 2004 10:55:26 -0000 1.73.2.96
+++ drbd_main.c 15 Jan 2004 14:26:29 -0000 1.73.2.97
@@ -87,7 +87,8 @@
#endif
int drbdd_init(struct Drbd_thread*);
-int drbd_dsender(struct Drbd_thread*);
+//int drbd_dsender(struct Drbd_thread*);
+int drbd_worker(struct Drbd_thread*);
int drbd_asender(struct Drbd_thread*);
int drbd_init(void);
@@ -314,28 +315,21 @@
void set_cstate(drbd_dev* mdev,Drbd_CState cs)
{
- struct list_head workset;
struct list_head *le;
- struct drbd_hook *dh;
+ struct drbd_work *w;
+ struct Drbd_Conf *that_dev;
unsigned long flags;
- int run_again;
-
- INIT_LIST_HEAD(&workset);
spin_lock_irqsave(&mdev->req_lock,flags);
mdev->cstate = cs;
- list_add(&workset,&mdev->cstate_hook);
- list_del_init(&mdev->cstate_hook);
-
- while(!list_empty(&workset)) {
- le = workset.next;
- dh = list_entry(le, struct drbd_hook,list);
+ while(!list_empty(&mdev->cstate_hook)) {
+ le = mdev->cstate_hook.next;
+ w = list_entry(le,struct drbd_work,list);
list_del(le);
- spin_unlock_irqrestore(&mdev->req_lock,flags);
- run_again = dh->callback(mdev,dh);
- spin_lock_irqsave(&mdev->req_lock,flags);
- if(run_again) list_add(le,&mdev->cstate_hook);
+ that_dev = container_of(w,struct Drbd_Conf,resync_work);
+ PARANOIA_BUG_ON(!IS_VALID_MDEV(mdev));
+ list_add_tail(&w->list,&that_dev->data.work.q);
}
spin_unlock_irqrestore(&mdev->req_lock,flags);
@@ -873,10 +867,6 @@
msg.msg_controllen = 0;
msg.msg_flags = msg_flags | MSG_NOSIGNAL;
- /* FIXME remove. since nbd does not do this either,
- * it seems to be safe ... well, or *they* have a bug there :-)
- * lock_kernel(); // check if this is still necessary
- */
oldfs = get_fs();
set_fs(KERNEL_DS);
@@ -1024,10 +1014,9 @@
init_MUTEX(&mdev->md_io_mutex);
init_MUTEX(&mdev->data.mutex);
init_MUTEX(&mdev->meta.mutex);
- sema_init(&mdev->data.work,0);
- sema_init(&mdev->meta.work,0);
+ sema_init(&mdev->data.work.s,0);
+ sema_init(&mdev->meta.work.s,0);
- mdev->rs_lock = SPIN_LOCK_UNLOCKED;
mdev->al_lock = SPIN_LOCK_UNLOCKED;
mdev->tl_lock = SPIN_LOCK_UNLOCKED;
mdev->ee_lock = SPIN_LOCK_UNLOCKED;
@@ -1041,21 +1030,22 @@
INIT_LIST_HEAD(&mdev->sync_ee);
INIT_LIST_HEAD(&mdev->done_ee);
INIT_LIST_HEAD(&mdev->read_ee);
- INIT_LIST_HEAD(&mdev->rdone_ee);
+ // INIT_LIST_HEAD(&mdev->rdone_ee);
INIT_LIST_HEAD(&mdev->busy_blocks);
INIT_LIST_HEAD(&mdev->app_reads);
INIT_LIST_HEAD(&mdev->resync_reads);
- INIT_LIST_HEAD(&mdev->data.work_q);
- INIT_LIST_HEAD(&mdev->meta.work_q);
+ INIT_LIST_HEAD(&mdev->data.work.q);
+ INIT_LIST_HEAD(&mdev->meta.work.q);
+ INIT_LIST_HEAD(&mdev->resync_work.list);
+ mdev->resync_work.cb = w_resync_inactive;
init_waitqueue_head(&mdev->state_wait);
init_waitqueue_head(&mdev->cstate_wait);
- init_waitqueue_head(&mdev->dsender_wait);
init_waitqueue_head(&mdev->ee_wait);
init_waitqueue_head(&mdev->al_wait);
drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
- drbd_thread_init(mdev, &mdev->dsender, drbd_dsender);
+ drbd_thread_init(mdev, &mdev->worker, drbd_worker);
drbd_thread_init(mdev, &mdev->asender, drbd_asender);
mdev->write_hint_tq.routine = &drbd_send_write_hint;
@@ -1166,10 +1156,6 @@
if(rr) printk(KERN_ERR DEVICE_NAME
"%d: %d EEs in done list found!\n",i,rr);
- rr = drbd_release_ee(mdev,&mdev->rdone_ee);
- if(rr) printk(KERN_ERR DEVICE_NAME
- "%d: %d EEs in rdone list found!\n",i,rr);
-
rr = drbd_release_ee(mdev,&mdev->read_ee);
if(rr) printk(KERN_ERR DEVICE_NAME
"%d: %d EEs in read list found!\n",i,rr);
@@ -1349,7 +1335,7 @@
drbd_set_state(drbd_conf+i,Secondary);
fsync_dev(MKDEV(MAJOR_NR, i));
set_bit(DO_NOT_INC_CONCNT,&drbd_conf[i].flags);
- drbd_thread_stop(&drbd_conf[i].dsender);
+ drbd_thread_stop(&drbd_conf[i].worker);
drbd_thread_stop(&drbd_conf[i].receiver);
drbd_thread_stop(&drbd_conf[i].asender);
}
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_receiver.c,v
retrieving revision 1.97.2.80
retrieving revision 1.97.2.81
diff -u -3 -r1.97.2.80 -r1.97.2.81
--- drbd_receiver.c 15 Jan 2004 10:55:26 -0000 1.97.2.80
+++ drbd_receiver.c 15 Jan 2004 14:26:29 -0000 1.97.2.81
@@ -353,7 +353,7 @@
/* It is important that the head list is really empty when returning,
from this function. Note, this function is called from all three
- threads (receiver, dsender and asender). To ensure this I only allow
+ threads (receiver, worker and asender). To ensure this I only allow
one thread at a time in the body of the function */
STATIC int _drbd_process_ee(drbd_dev *mdev,struct list_head *head)
{
@@ -705,7 +705,7 @@
set_cstate(mdev,WFReportParams);
drbd_thread_start(&mdev->asender);
- drbd_thread_start(&mdev->dsender);
+ drbd_thread_start(&mdev->worker);
drbd_send_param(mdev);
@@ -1161,26 +1161,6 @@
return TRUE;
}
-STATIC int e_end_data_req(drbd_dev *mdev, struct drbd_work *w)
-{
- struct Tl_epoch_entry *e = (struct Tl_epoch_entry*)w;
- int ok;
- ok=drbd_send_block(mdev, DataReply, e);
- dec_unacked(mdev,HERE); // THINK unconditional?
- return ok;
-}
-
-STATIC int e_end_rsdata_req(drbd_dev *mdev, struct drbd_work *w)
-{
- struct Tl_epoch_entry *e = (struct Tl_epoch_entry*)w;
- int ok;
- drbd_rs_complete_io(mdev,e->pbh.b_blocknr);
- inc_pending(mdev);
- ok=drbd_send_block(mdev, DataReply, e);
- dec_unacked(mdev,HERE); // THINK unconditional?
- return ok;
-}
-
STATIC int receive_DataRequest(drbd_dev *mdev,Drbd_Header *h)
{
sector_t sector;
@@ -1205,13 +1185,15 @@
switch (h->command) {
case DataRequest:
- e->w.cb = e_end_data_req;
+ e->w.cb = w_e_end_data_req;
break;
case RSDataRequest:
- e->w.cb = e_end_rsdata_req;
+ e->w.cb = w_e_end_rsdata_req;
/* Eventually this should become asynchrously. Currently it
* blocks the whole receiver just to delay the reading of a
- * resync data block. */
+ * resync data block.
+ * the drbd_work_queue mechanism is made for this...
+ */
drbd_rs_begin_io(mdev,sector);
break;
default:
@@ -1220,7 +1202,7 @@
clear_bit(BH_Uptodate, &e->pbh.b_state);
set_bit(BH_Lock, &e->pbh.b_state);
- e->pbh.b_end_io = drbd_dio_end_read;
+ e->pbh.b_end_io = enslaved_read_bh_end_io;
mdev->read_cnt += e->pbh.b_size >> 9;
inc_unacked(mdev);
@@ -1550,6 +1532,27 @@
return TRUE; // cannot fail ?
}
+STATIC int receive_SyncDone(drbd_dev *mdev, Drbd_Header *h)
+{
+ unsigned long dt;
+ D_ASSERT(mdev->cstate == SyncSource);
+ D_ASSERT(mdev->resync_work.cb == w_resync_source);
+
+ INIT_LIST_HEAD(&mdev->resync_work.list);
+ mdev->resync_work.cb = w_resync_inactive;
+
+ dt = (jiffies - mdev->rs_start) / HZ + 1;
+ INFO("Resync done (total %lu sec; %lu K/sec)\n",
+ dt,(mdev->rs_total/2)/dt);
+
+ mdev->rs_total = 0;
+ set_cstate(mdev,Connected);
+
+ // assert that all bit-map parts are cleared.
+ D_ASSERT(list_empty(&mdev->resync->lru));
+ return TRUE; // cannot fail ?
+}
+
typedef int (*drbd_cmd_handler_f)(drbd_dev*,Drbd_Header*);
static drbd_cmd_handler_f drbd_default_handler[] = {
@@ -1573,6 +1576,7 @@
[SyncParam] = receive_SyncParam,
[SyncStop] = receive_SyncStop,
[SyncCont] = receive_SyncCont,
+ [SyncDone] = receive_SyncDone,
};
static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
@@ -1613,7 +1617,7 @@
void drbd_disconnect(drbd_dev *mdev)
{
mdev->o_state = Unknown;
- drbd_thread_stop_nowait(&mdev->dsender);
+ drbd_thread_stop_nowait(&mdev->worker);
drbd_thread_stop(&mdev->asender);
while(down_trylock(&mdev->data.mutex))
@@ -1635,7 +1639,7 @@
drbd_free_sock(mdev);
up(&mdev->data.mutex);
- drbd_thread_stop(&mdev->dsender);
+ drbd_thread_stop(&mdev->worker);
drbd_collect_zombies(mdev);
if(mdev->cstate != StandAlone)