[DRBD-cvs] drbd by phil; Drbd_thread got a spinlock for all the d...
drbd-user@lists.linbit.com
drbd-user@lists.linbit.com
Wed, 5 May 2004 15:25:20 +0200 (CEST)
DRBD CVS committal
Author : phil
Module : drbd
Dir : drbd/drbd
Modified Files:
Tag: rel-0_7-branch
drbd_dsender.c drbd_fs.c drbd_int.h drbd_main.c
drbd_receiver.c drbd_req-2.4.c
Log Message:
Drbd_thread got a spinlock for all the drbd_thread operations to be
SMP save
In case of loss of connection set cstate to something < Connected
as early as possible. Later drbd_disconnect() sets cstate to
the whatever follows (also < Connected).
[Patch mainly by Lars]
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_dsender.c,v
retrieving revision 1.1.2.102
retrieving revision 1.1.2.103
diff -u -3 -r1.1.2.102 -r1.1.2.103
--- drbd_dsender.c 5 May 2004 08:40:27 -0000 1.1.2.102
+++ drbd_dsender.c 5 May 2004 13:25:15 -0000 1.1.2.103
@@ -527,7 +527,7 @@
// assert that all bit-map parts are cleared.
D_ASSERT(list_empty(&mdev->resync->lru));
-
+
set_cstate(mdev,Connected); // w_resume_next_sg() gets called here.
return 1;
}
@@ -852,26 +852,27 @@
if (intr) {
D_ASSERT(intr == -EINTR);
drbd_flush_signals(current);
- if (thi->t_state != Running )
- break;
- continue;
+ ERR_IF (get_t_state(thi) == Running)
+ continue;
+ break;
}
- if (thi->t_state != Running )
+ ERR_IF (get_t_state(thi) != Running)
break;
- if (need_resched())
- schedule();
+
+ // if (need_resched()) schedule();
w = 0;
spin_lock_irq(&mdev->req_lock);
- if (!list_empty(&mdev->data.work.q)) {
- w = list_entry(mdev->data.work.q.next,struct drbd_work,list);
- list_del_init(&w->list);
- }
+ D_ASSERT(!list_empty(&mdev->data.work.q));
+ w = list_entry(mdev->data.work.q.next,struct drbd_work,list);
+ list_del_init(&w->list);
spin_unlock_irq(&mdev->req_lock);
if(!w->cb(mdev,w, mdev->cstate < Connected )) {
ERR("worker: a callback failed! \n");
+ if (mdev->cstate >= Connected)
+ set_cstate(mdev,NetworkFailure);
drbd_thread_restart_nowait(&mdev->receiver);
}
}
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_fs.c,v
retrieving revision 1.28.2.84
retrieving revision 1.28.2.85
diff -u -3 -r1.28.2.84 -r1.28.2.85
--- drbd_fs.c 4 May 2004 15:03:02 -0000 1.28.2.84
+++ drbd_fs.c 5 May 2004 13:25:15 -0000 1.28.2.85
@@ -468,7 +468,6 @@
*/
drbd_sync_me(mdev);
- drbd_thread_stop(&mdev->asender);
drbd_thread_stop(&mdev->receiver);
drbd_free_sock(mdev);
@@ -781,7 +780,7 @@
/* FIXME what if fsync returns error */
drbd_sync_me(mdev);
set_bit(DO_NOT_INC_CONCNT,&mdev->flags);
- drbd_thread_stop(&mdev->asender);
+ set_cstate(mdev,Unconnected);
drbd_thread_stop(&mdev->receiver);
if (test_bit(DISKLESS,&mdev->flags)) {
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_int.h,v
retrieving revision 1.58.2.156
retrieving revision 1.58.2.157
diff -u -3 -r1.58.2.156 -r1.58.2.157
--- drbd_int.h 5 May 2004 08:40:27 -0000 1.58.2.156
+++ drbd_int.h 5 May 2004 13:25:15 -0000 1.58.2.157
@@ -472,19 +472,31 @@
/**********************************************************************/
typedef enum {
+ None,
Running,
Exiting,
Restarting
} Drbd_thread_state;
struct Drbd_thread {
+ spinlock_t t_lock;
struct task_struct *task;
- struct semaphore mutex;
- volatile Drbd_thread_state t_state;
+ struct completion startstop;
+ Drbd_thread_state t_state;
int (*function) (struct Drbd_thread *);
drbd_dev *mdev;
};
+static inline Drbd_thread_state get_t_state(struct Drbd_thread *thi)
+{
+ /* THINK testing the t_state seems to be uncritical in all cases
+ * (but thread_{start,stop}), so we can read it *without* the lock.
+ * --lge */
+
+ smp_rmb();
+ return (volatile int)thi->t_state;
+}
+
/*
* Having this as the first member of a struct provides sort of "inheritance".
@@ -665,7 +677,7 @@
unsigned long lo_usize; /* user provided size */
unsigned long p_size; /* partner's disk size */
Drbd_State state;
- Drbd_CState cstate;
+ volatile Drbd_CState cstate;
wait_queue_head_t cstate_wait; // TODO Rename into "misc_wait".
Drbd_State o_state;
unsigned long int la_size; // last agreed disk size
@@ -1099,7 +1111,7 @@
static inline void dec_ap_pending(drbd_dev* mdev, const char* where)
{
if(atomic_dec_and_test(&mdev->ap_pending_cnt))
- wake_up_interruptible(&mdev->cstate_wait);
+ wake_up(&mdev->cstate_wait);
if(atomic_read(&mdev->ap_pending_cnt)<0)
ERR("in %s: pending_cnt = %d < 0 !\n",
@@ -1171,7 +1183,7 @@
if(atomic_dec_and_test(&mdev->local_cnt) &&
test_bit(DISKLESS,&mdev->flags) &&
mdev->lo_file) {
- wake_up_interruptible(&mdev->cstate_wait);
+ wake_up(&mdev->cstate_wait);
}
D_ASSERT(atomic_read(&mdev->local_cnt)>=0);
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_main.c,v
retrieving revision 1.73.2.160
retrieving revision 1.73.2.161
diff -u -3 -r1.73.2.160 -r1.73.2.161
--- drbd_main.c 4 May 2004 10:26:12 -0000 1.73.2.160
+++ drbd_main.c 5 May 2004 13:25:15 -0000 1.73.2.161
@@ -390,7 +390,7 @@
os = mdev->cstate;
mdev->cstate = ns;
smp_mb();
- wake_up_interruptible(&mdev->cstate_wait);
+ wake_up(&mdev->cstate_wait);
if ( ( os==SyncSource || os==SyncTarget ) && ns <= Connected ) {
set_bit(STOP_SYNC_TIMER,&mdev->flags);
@@ -405,21 +405,25 @@
STATIC int drbd_thread_setup(void* arg)
{
struct Drbd_thread *thi = (struct Drbd_thread *) arg;
+ drbd_dev *mdev = thi->mdev;
int retval;
drbd_daemonize();
- down(&thi->mutex); //ensures that thi->task is set.
+ D_ASSERT(get_t_state(thi) == Running);
+ D_ASSERT(thi->task == NULL);
+ thi->task = current;
+ smp_mb();
+ complete(&thi->startstop); // notify: thi->task is set.
retval = thi->function(thi);
+ spin_lock(&thi->t_lock);
thi->task = 0;
+ D_ASSERT(thi->t_state == Exiting);
+ spin_unlock(&thi->t_lock);
- /* propagate task == NULL to other CPUs */
- smp_mb(); // necessary?
- set_current_state(TASK_UNINTERRUPTIBLE);
- schedule_timeout(1);
-
- up(&thi->mutex); //allow thread_stop to proceed
+ // THINK maybe two different completions?
+ complete(&thi->startstop); // notify: thi->task unset.
return retval;
}
@@ -427,8 +431,11 @@
STATIC void drbd_thread_init(drbd_dev *mdev, struct Drbd_thread *thi,
int (*func) (struct Drbd_thread *))
{
- thi->task = NULL;
- init_MUTEX(&thi->mutex);
+ thi->t_lock = SPIN_LOCK_UNLOCKED;
+ thi->task = NULL;
+ thi->t_state = None;
+ init_completion(&thi->startstop);
+
thi->function = func;
thi->mdev = mdev;
}
@@ -438,53 +445,65 @@
int pid;
drbd_dev *mdev = thi->mdev;
- if (thi->task == NULL) {
+ spin_lock(&thi->t_lock);
+ if (thi->t_state == None) {
+ D_ASSERT(thi->task == NULL);
+ // XXX D_ASSERT( thi->startstop something ? )
thi->t_state = Running;
+ spin_unlock(&thi->t_lock);
- down(&thi->mutex);
pid = kernel_thread(drbd_thread_setup, (void *) thi, CLONE_FS);
-
if (pid < 0) {
ERR("Couldn't start thread (%d)\n", pid);
return;
}
- /* printk(KERN_DEBUG DEVICE_NAME ": pid = %d\n", pid); */
- read_lock(&tasklist_lock);
- thi->task = find_task_by_pid(pid);
- read_unlock(&tasklist_lock);
- up(&thi->mutex);
+ wait_for_completion(&thi->startstop); // waits until thi->task is set
+ } else {
+ spin_unlock(&thi->t_lock);
}
}
void _drbd_thread_stop(struct Drbd_thread *thi, int restart,int wait)
{
- if (!thi->task) return;
- /* test on ->state useless now since we use reparent_to_init */
- if (thi->task->state == -1
- || thi->task->state == TASK_ZOMBIE
- || thi->task->flags & PF_EXITING ) {
- // unexpected death... clean up.
- init_MUTEX(&thi->mutex);
- thi->task = NULL;
+ drbd_dev *mdev = thi->mdev;
+
+ Drbd_thread_state ns = restart ? Restarting : Exiting;
+ spin_lock(&thi->t_lock);
+ if (thi->t_state == None) {
+ spin_unlock(&thi->t_lock);
return;
}
- if (restart)
- thi->t_state = Restarting;
- else
- thi->t_state = Exiting;
-
- smp_mb(); /* should not be necessary, since the next
- instruction is spinlock, but anyways */
- force_sig(DRBD_SIGKILL,thi->task);
-
- if(wait) {
- down(&thi->mutex); // wait until thread has exited
- up(&thi->mutex);
+ if (thi->t_state != ns) {
+ ERR_IF (thi->task == NULL) {
+ spin_unlock(&thi->t_lock);
+ return;
+ }
+
+ if (ns == Restarting && thi->t_state == Exiting) {
+ // Already Exiting. Cannot restart!
+ spin_unlock(&thi->t_lock);
+ return;
+ }
- current->state = TASK_INTERRUPTIBLE;
- schedule_timeout(HZ / 10);
+ thi->t_state = ns;
+ smp_mb();
+ if (thi->task != current)
+ force_sig(DRBD_SIGKILL,thi->task);
+ else
+ D_ASSERT(!wait);
+
+ }
+ spin_unlock(&thi->t_lock);
+
+ if (wait) {
+ D_ASSERT(thi->t_state == Exiting);
+ wait_for_completion(&thi->startstop);
+ spin_lock(&thi->t_lock);
+ thi->t_state = None;
+ D_ASSERT(thi->task == NULL);
+ spin_unlock(&thi->t_lock);
}
}
@@ -1289,9 +1308,8 @@
up(&mdev->device_mutex);
drbd_sync_me(mdev);
set_bit(DO_NOT_INC_CONCNT,&mdev->flags);
- drbd_thread_stop(&mdev->worker);
drbd_thread_stop(&mdev->receiver);
- drbd_thread_stop(&mdev->asender);
+ drbd_thread_stop(&mdev->worker);
}
}
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_receiver.c,v
retrieving revision 1.97.2.141
retrieving revision 1.97.2.142
diff -u -3 -r1.97.2.141 -r1.97.2.142
--- drbd_receiver.c 5 May 2004 08:40:27 -0000 1.97.2.141
+++ drbd_receiver.c 5 May 2004 13:25:15 -0000 1.97.2.142
@@ -518,6 +518,11 @@
set_fs(oldfs);
+ if(rv != size) {
+ set_cstate(mdev,BrokenPipe);
+ drbd_thread_restart_nowait(&mdev->receiver);
+ }
+
return rv;
}
@@ -612,7 +617,7 @@
if(signal_pending(current)) {
drbd_flush_signals(current);
smp_rmb();
- if ((volatile int)mdev->receiver.t_state == Exiting)
+ if (get_t_state(&mdev->receiver) == Exiting)
return 0;
}
}
@@ -1100,21 +1105,21 @@
if(be32_to_cpu(p->state) == Primary && mdev->state == Primary ) {
ERR("incompatible states\n");
set_cstate(mdev,StandAlone);
- mdev->receiver.t_state = Exiting;
+ drbd_thread_stop_nowait(&mdev->receiver);
return FALSE;
}
if(be32_to_cpu(p->version)!=PRO_VERSION) {
- ERR("incompatible releases \n");
+ ERR("incompatible releases\n");
set_cstate(mdev,StandAlone);
- mdev->receiver.t_state = Exiting;
+ drbd_thread_stop_nowait(&mdev->receiver);
return FALSE;
}
if(be32_to_cpu(p->protocol)!=mdev->conf.wire_protocol) {
- ERR("incompatible protocols \n");
+ ERR("incompatible protocols\n");
set_cstate(mdev,StandAlone);
- mdev->receiver.t_state = Exiting;
+ drbd_thread_stop_nowait(&mdev->receiver);
return FALSE;
}
@@ -1122,9 +1127,7 @@
if(p_size == 0 && test_bit(DISKLESS,&mdev->flags)) {
ERR("some backing storage is needed\n");
- set_cstate(mdev,Unconfigured);
- drbd_mdev_cleanup(mdev); // FIXME. Is this valid here ?
- mdev->receiver.t_state = Exiting;
+ drbd_thread_stop_nowait(&mdev->receiver);
return FALSE;
}
mdev->p_size=p_size;
@@ -1199,12 +1202,9 @@
set_cstate(mdev,WFBitMapS);
} else { // have_good == -1
if (mdev->state == Primary) {
-/*
- FIXME
-*/
ERR("Current Primary shall become sync TARGET! Aborting to prevent data corruption.\n");
set_cstate(mdev,StandAlone);
- mdev->receiver.t_state = Exiting;
+ drbd_thread_stop_nowait(&mdev->receiver);
return FALSE;
}
mdev->gen_cnt[Flags] &= ~MDF_Consistent;
@@ -1449,17 +1449,7 @@
STATIC void drbd_disconnect(drbd_dev *mdev)
{
-/*
- * FIXME what if
- * (state == Primary) && !(gen_cnt[Flags] & MDF_Consistent) ??
- * or I am DISKLESS ?
- * we need to *at least* block all IO
- *
- * maybe get a write mutex on mdev ?
- * sort of "suspend" the device, untill either operator, or monitoring
- * software, or load, or whatever, kills the box, OR connection to the
- * good data copy is reestablished.
- */
+ D_ASSERT(mdev->cstate < Connected);
mdev->o_state = Unknown;
drbd_thread_stop_nowait(&mdev->worker);
drbd_thread_stop(&mdev->asender);
@@ -1484,10 +1474,6 @@
up(&mdev->data.mutex);
drbd_thread_stop(&mdev->worker);
- drbd_thread_start(&mdev->worker);
-
- if(mdev->cstate != StandAlone)
- set_cstate(mdev,Unconnected);
drbd_fail_pending_reads(mdev);
drbd_rs_cancel_all(mdev);
@@ -1506,12 +1492,6 @@
mdev->epoch_size=0;
- if (mdev->state == Primary) {
- if(!test_bit(DO_NOT_INC_CONCNT,&mdev->flags))
- drbd_md_inc(mdev,ConnectedCnt);
- drbd_md_write(mdev);
- }
-
if(atomic_read(&mdev->unacked_cnt)) {
ERR("unacked_cnt!=0\n");
atomic_set(&mdev->unacked_cnt,0);
@@ -1519,10 +1499,38 @@
/* Since syncer's blocks are also counted, there is no hope that
pending_cnt is zero. */
- atomic_set(&mdev->ap_pending_cnt,0);
- atomic_set(&mdev->rs_pending_cnt,0);
+ ERR_IF(atomic_read(&mdev->ap_pending_cnt))
+ atomic_set(&mdev->ap_pending_cnt,0);
+ ERR_IF(atomic_read(&mdev->rs_pending_cnt))
+ atomic_set(&mdev->rs_pending_cnt,0);
+
wake_up_interruptible(&mdev->cstate_wait);
+ if ( mdev->state == Primary &&
+ ( test_bit(DISKLESS,&mdev->flags)
+ || !(mdev->gen_cnt[Flags] & MDF_Consistent) ) ) {
+ drbd_panic("Sorry, I have no access to good data anymore.\n");
+ }
+
+ if (get_t_state(&mdev->receiver) == Exiting) {
+ if (test_bit(DISKLESS,&mdev->flags)) {
+ // Secondary
+ set_cstate(mdev,Unconfigured);
+ drbd_mdev_cleanup(mdev);
+ } else {
+ set_cstate(mdev,StandAlone);
+ drbd_thread_start(&mdev->worker);
+ }
+ } else {
+ set_cstate(mdev,Unconnected);
+ drbd_thread_start(&mdev->worker);
+ }
+
+ if (mdev->state == Primary) {
+ if(!test_bit(DO_NOT_INC_CONCNT,&mdev->flags))
+ drbd_md_inc(mdev,ConnectedCnt);
+ drbd_md_write(mdev);
+ }
clear_bit(DO_NOT_INC_CONCNT,&mdev->flags);
INFO("Connection lost.\n");
@@ -1542,28 +1550,23 @@
WARN("Discarding network configuration.\n");
break;
}
- if (thi->t_state == Exiting) break;
+ if (get_t_state(thi) == Exiting) break;
drbdd(mdev);
drbd_disconnect(mdev);
- if (thi->t_state == Exiting) break;
+ if (get_t_state(thi) == Exiting) break;
else {
if (signal_pending(current)) {
drbd_flush_signals(current);
- if (thi->t_state != Restarting)
- ERR("unexpected thread state: %d\n",
- thi->t_state);
}
+ spin_lock(&thi->t_lock);
+ D_ASSERT(thi->t_state == Restarting);
thi->t_state = Running;
+ spin_unlock(&thi->t_lock);
}
}
INFO("receiver exiting\n");
- if(test_bit(DISKLESS,&mdev->flags)) {
- set_cstate(mdev, Unconfigured);
- drbd_mdev_cleanup(mdev);
- } else set_cstate(mdev, StandAlone);
-
return 0;
}
@@ -1726,7 +1729,7 @@
current->policy = SCHED_RR; /* Make this a realtime task! */
current->rt_priority = 2; /* more important than all other tasks */
- while (thi->t_state == Running) {
+ while (get_t_state(thi) == Running) {
if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
ERR_IF(!drbd_send_ping(mdev)) goto err;
// half ack timeout only,
@@ -1801,6 +1804,8 @@
if(0) {
err:
+ if (mdev->cstate >= Connected)
+ set_cstate(mdev,NetworkFailure);
drbd_thread_restart_nowait(&mdev->receiver);
}
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_req-2.4.c,v
retrieving revision 1.33.2.68
retrieving revision 1.33.2.69
diff -u -3 -r1.33.2.68 -r1.33.2.69
--- drbd_req-2.4.c 4 May 2004 15:03:02 -0000 1.33.2.68
+++ drbd_req-2.4.c 5 May 2004 13:25:15 -0000 1.33.2.69
@@ -298,7 +298,11 @@
* but we ignore it here. Is it actually void,
* because error handling takes place elsewhere?
*/
- drbd_send_dblock(mdev,req);
+ if (!drbd_send_dblock(mdev,req)) {
+ if (mdev->cstate >= Connected)
+ set_cstate(mdev,NetworkFailure);
+ drbd_thread_restart_nowait(&mdev->receiver);
+ }
} else if (target_area_out_of_sync) {
drbd_read_remote(mdev,req);
} else {