[DRBD-cvs] drbd by phil; Drbd_thread got a spinlock for all the d...

drbd-user@lists.linbit.com drbd-user@lists.linbit.com
Wed, 5 May 2004 15:25:20 +0200 (CEST)


DRBD CVS committal

Author  : phil
Module  : drbd

Dir     : drbd/drbd


Modified Files:
      Tag: rel-0_7-branch
	drbd_dsender.c drbd_fs.c drbd_int.h drbd_main.c 
	drbd_receiver.c drbd_req-2.4.c 


Log Message:
Drbd_thread got a spinlock for all the drbd_thread operations to be
SMP save

In case of loss of connection set cstate to something < Connected 
as early as possible. Later drbd_disconnect() sets cstate to 
the whatever follows (also < Connected).

[Patch mainly by Lars]

===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_dsender.c,v
retrieving revision 1.1.2.102
retrieving revision 1.1.2.103
diff -u -3 -r1.1.2.102 -r1.1.2.103
--- drbd_dsender.c	5 May 2004 08:40:27 -0000	1.1.2.102
+++ drbd_dsender.c	5 May 2004 13:25:15 -0000	1.1.2.103
@@ -527,7 +527,7 @@
 
 	// assert that all bit-map parts are cleared.
 	D_ASSERT(list_empty(&mdev->resync->lru));
-	
+
 	set_cstate(mdev,Connected); // w_resume_next_sg() gets called here.
 	return 1;
 }
@@ -852,26 +852,27 @@
 		if (intr) {
 			D_ASSERT(intr == -EINTR);
 			drbd_flush_signals(current);
-			if (thi->t_state != Running )
-				break;
-			continue;
+			ERR_IF (get_t_state(thi) == Running)
+				continue;
+			break;
 		}
 
-		if (thi->t_state != Running )
+		ERR_IF (get_t_state(thi) != Running)
 			break;
-		if (need_resched())
-			schedule();
+		
+		// if (need_resched()) schedule();
 
 		w = 0;
 		spin_lock_irq(&mdev->req_lock);
-		if (!list_empty(&mdev->data.work.q)) {
-			w = list_entry(mdev->data.work.q.next,struct drbd_work,list);
-			list_del_init(&w->list);
-		}
+		D_ASSERT(!list_empty(&mdev->data.work.q));
+		w = list_entry(mdev->data.work.q.next,struct drbd_work,list);
+		list_del_init(&w->list);
 		spin_unlock_irq(&mdev->req_lock);
 
 		if(!w->cb(mdev,w, mdev->cstate < Connected )) {
 			ERR("worker: a callback failed! \n");
+			if (mdev->cstate >= Connected)
+				set_cstate(mdev,NetworkFailure);
 			drbd_thread_restart_nowait(&mdev->receiver);
 		}
 	}
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_fs.c,v
retrieving revision 1.28.2.84
retrieving revision 1.28.2.85
diff -u -3 -r1.28.2.84 -r1.28.2.85
--- drbd_fs.c	4 May 2004 15:03:02 -0000	1.28.2.84
+++ drbd_fs.c	5 May 2004 13:25:15 -0000	1.28.2.85
@@ -468,7 +468,6 @@
 	*/
 
 	drbd_sync_me(mdev);
-	drbd_thread_stop(&mdev->asender);
 	drbd_thread_stop(&mdev->receiver);
 	drbd_free_sock(mdev);
 
@@ -781,7 +780,7 @@
 		/* FIXME what if fsync returns error */
 		drbd_sync_me(mdev);
 		set_bit(DO_NOT_INC_CONCNT,&mdev->flags);
-		drbd_thread_stop(&mdev->asender);
+		set_cstate(mdev,Unconnected);
 		drbd_thread_stop(&mdev->receiver);
 
 		if (test_bit(DISKLESS,&mdev->flags)) {
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_int.h,v
retrieving revision 1.58.2.156
retrieving revision 1.58.2.157
diff -u -3 -r1.58.2.156 -r1.58.2.157
--- drbd_int.h	5 May 2004 08:40:27 -0000	1.58.2.156
+++ drbd_int.h	5 May 2004 13:25:15 -0000	1.58.2.157
@@ -472,19 +472,31 @@
 /**********************************************************************/
 
 typedef enum {
+	None,
 	Running,
 	Exiting,
 	Restarting
 } Drbd_thread_state;
 
 struct Drbd_thread {
+	spinlock_t t_lock;
 	struct task_struct *task;
-	struct semaphore mutex;
-	volatile Drbd_thread_state t_state;
+	struct completion startstop;
+	Drbd_thread_state t_state;
 	int (*function) (struct Drbd_thread *);
 	drbd_dev *mdev;
 };
 
+static inline Drbd_thread_state get_t_state(struct Drbd_thread *thi)
+{
+	/* THINK testing the t_state seems to be uncritical in all cases
+	 * (but thread_{start,stop}), so we can read it *without* the lock.
+	 * 	--lge */
+
+	smp_rmb();
+	return (volatile int)thi->t_state;
+}
+
 
 /*
  * Having this as the first member of a struct provides sort of "inheritance".
@@ -665,7 +677,7 @@
 	unsigned long lo_usize;   /* user provided size */
 	unsigned long p_size;     /* partner's disk size */
 	Drbd_State state;
-	Drbd_CState cstate;
+	volatile Drbd_CState cstate;
 	wait_queue_head_t cstate_wait; // TODO Rename into "misc_wait". 
 	Drbd_State o_state;
 	unsigned long int la_size; // last agreed disk size
@@ -1099,7 +1111,7 @@
 static inline void dec_ap_pending(drbd_dev* mdev, const char* where)
 {
 	if(atomic_dec_and_test(&mdev->ap_pending_cnt))
-		wake_up_interruptible(&mdev->cstate_wait);
+		wake_up(&mdev->cstate_wait);
 
 	if(atomic_read(&mdev->ap_pending_cnt)<0)
 		ERR("in %s: pending_cnt = %d < 0 !\n",
@@ -1171,7 +1183,7 @@
 	if(atomic_dec_and_test(&mdev->local_cnt) && 
 	   test_bit(DISKLESS,&mdev->flags) &&
 	   mdev->lo_file) {
-		wake_up_interruptible(&mdev->cstate_wait);
+		wake_up(&mdev->cstate_wait);
 	}
 
 	D_ASSERT(atomic_read(&mdev->local_cnt)>=0);
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_main.c,v
retrieving revision 1.73.2.160
retrieving revision 1.73.2.161
diff -u -3 -r1.73.2.160 -r1.73.2.161
--- drbd_main.c	4 May 2004 10:26:12 -0000	1.73.2.160
+++ drbd_main.c	5 May 2004 13:25:15 -0000	1.73.2.161
@@ -390,7 +390,7 @@
 	os = mdev->cstate;
 	mdev->cstate = ns;
 	smp_mb();
-	wake_up_interruptible(&mdev->cstate_wait);
+	wake_up(&mdev->cstate_wait);
 
 	if ( ( os==SyncSource || os==SyncTarget ) && ns <= Connected ) {
 		set_bit(STOP_SYNC_TIMER,&mdev->flags);
@@ -405,21 +405,25 @@
 STATIC int drbd_thread_setup(void* arg)
 {
 	struct Drbd_thread *thi = (struct Drbd_thread *) arg;
+	drbd_dev *mdev = thi->mdev;
 	int retval;
 
 	drbd_daemonize();
-	down(&thi->mutex); //ensures that thi->task is set.
+	D_ASSERT(get_t_state(thi) == Running);
+	D_ASSERT(thi->task == NULL);
+	thi->task = current;
+	smp_mb();
+	complete(&thi->startstop); // notify: thi->task is set.
 
 	retval = thi->function(thi);
 
+	spin_lock(&thi->t_lock);
 	thi->task = 0;
+	D_ASSERT(thi->t_state == Exiting);
+	spin_unlock(&thi->t_lock);
 
-	/* propagate task == NULL to other CPUs */
-	smp_mb();         // necessary?
-	set_current_state(TASK_UNINTERRUPTIBLE);
-	schedule_timeout(1);
-
-	up(&thi->mutex); //allow thread_stop to proceed
+	// THINK maybe two different completions?
+	complete(&thi->startstop); // notify: thi->task unset.
 
 	return retval;
 }
@@ -427,8 +431,11 @@
 STATIC void drbd_thread_init(drbd_dev *mdev, struct Drbd_thread *thi,
 		      int (*func) (struct Drbd_thread *))
 {
-	thi->task = NULL;
-	init_MUTEX(&thi->mutex);
+	thi->t_lock  = SPIN_LOCK_UNLOCKED;
+	thi->task    = NULL;
+	thi->t_state = None;
+	init_completion(&thi->startstop);
+
 	thi->function = func;
 	thi->mdev = mdev;
 }
@@ -438,53 +445,65 @@
 	int pid;
 	drbd_dev *mdev = thi->mdev;
 
-	if (thi->task == NULL) {
+	spin_lock(&thi->t_lock);
+	if (thi->t_state == None) {
+		D_ASSERT(thi->task == NULL);
+		// XXX D_ASSERT( thi->startstop something ? )
 		thi->t_state = Running;
+		spin_unlock(&thi->t_lock);
 
-		down(&thi->mutex);
 		pid = kernel_thread(drbd_thread_setup, (void *) thi, CLONE_FS);
-
 		if (pid < 0) {
 			ERR("Couldn't start thread (%d)\n", pid);
 			return;
 		}
-		/* printk(KERN_DEBUG DEVICE_NAME ": pid = %d\n", pid); */
-		read_lock(&tasklist_lock);
-		thi->task = find_task_by_pid(pid);
-		read_unlock(&tasklist_lock);
-		up(&thi->mutex);
+		wait_for_completion(&thi->startstop); // waits until thi->task is set
+	} else {
+		spin_unlock(&thi->t_lock);
 	}
 }
 
 
 void _drbd_thread_stop(struct Drbd_thread *thi, int restart,int wait)
 {
-	if (!thi->task) return;
-	/* test on ->state useless now since we use reparent_to_init */
-	if (thi->task->state == -1
-	    || thi->task->state == TASK_ZOMBIE
-	    || thi->task->flags & PF_EXITING   ) {
-		// unexpected death... clean up.
-		init_MUTEX(&thi->mutex);
-		thi->task = NULL;
+	drbd_dev *mdev = thi->mdev;
+
+	Drbd_thread_state ns = restart ? Restarting : Exiting;
+	spin_lock(&thi->t_lock);
+	if (thi->t_state == None) {
+		spin_unlock(&thi->t_lock);
 		return;
 	}
 
-	if (restart)
-		thi->t_state = Restarting;
-	else
-		thi->t_state = Exiting;
-
-	smp_mb(); /* should not be necessary, since the next
-		     instruction is spinlock, but anyways */
-	force_sig(DRBD_SIGKILL,thi->task);
-
-	if(wait) {
-		down(&thi->mutex); // wait until thread has exited
-		up(&thi->mutex);
+	if (thi->t_state != ns) {
+		ERR_IF (thi->task == NULL) {
+			spin_unlock(&thi->t_lock);
+			return;
+		}
+
+		if (ns == Restarting && thi->t_state == Exiting) {
+			// Already Exiting. Cannot restart!
+			spin_unlock(&thi->t_lock);
+			return;
+		}
 
-		current->state = TASK_INTERRUPTIBLE;
-		schedule_timeout(HZ / 10);
+		thi->t_state = ns;
+		smp_mb();
+		if (thi->task != current)
+			force_sig(DRBD_SIGKILL,thi->task);
+		else
+			D_ASSERT(!wait);
+
+	}
+	spin_unlock(&thi->t_lock);
+
+	if (wait) {
+		D_ASSERT(thi->t_state == Exiting);
+		wait_for_completion(&thi->startstop);
+		spin_lock(&thi->t_lock);
+		thi->t_state = None;
+		D_ASSERT(thi->task == NULL);
+		spin_unlock(&thi->t_lock);
 	}
 }
 
@@ -1289,9 +1308,8 @@
 				up(&mdev->device_mutex);
 				drbd_sync_me(mdev);
 				set_bit(DO_NOT_INC_CONCNT,&mdev->flags);
-				drbd_thread_stop(&mdev->worker);
 				drbd_thread_stop(&mdev->receiver);
-				drbd_thread_stop(&mdev->asender);
+				drbd_thread_stop(&mdev->worker);
 			}
 		}
 
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_receiver.c,v
retrieving revision 1.97.2.141
retrieving revision 1.97.2.142
diff -u -3 -r1.97.2.141 -r1.97.2.142
--- drbd_receiver.c	5 May 2004 08:40:27 -0000	1.97.2.141
+++ drbd_receiver.c	5 May 2004 13:25:15 -0000	1.97.2.142
@@ -518,6 +518,11 @@
 
 	set_fs(oldfs);
 
+	if(rv != size) {
+		set_cstate(mdev,BrokenPipe);
+		drbd_thread_restart_nowait(&mdev->receiver);
+	}
+
 	return rv;
 }
 
@@ -612,7 +617,7 @@
 		if(signal_pending(current)) {
 			drbd_flush_signals(current);
 			smp_rmb();
-			if ((volatile int)mdev->receiver.t_state == Exiting)
+			if (get_t_state(&mdev->receiver) == Exiting)
 				return 0;
 		}
 	}
@@ -1100,21 +1105,21 @@
 	if(be32_to_cpu(p->state) == Primary && mdev->state == Primary ) {
 		ERR("incompatible states\n");
 		set_cstate(mdev,StandAlone);
-		mdev->receiver.t_state = Exiting;
+		drbd_thread_stop_nowait(&mdev->receiver);
 		return FALSE;
 	}
 
 	if(be32_to_cpu(p->version)!=PRO_VERSION) {
-		ERR("incompatible releases \n");
+		ERR("incompatible releases\n");
 		set_cstate(mdev,StandAlone);
-		mdev->receiver.t_state = Exiting;
+		drbd_thread_stop_nowait(&mdev->receiver);
 		return FALSE;
 	}
 
 	if(be32_to_cpu(p->protocol)!=mdev->conf.wire_protocol) {
-		ERR("incompatible protocols \n");
+		ERR("incompatible protocols\n");
 		set_cstate(mdev,StandAlone);
-		mdev->receiver.t_state = Exiting;
+		drbd_thread_stop_nowait(&mdev->receiver);
 		return FALSE;
 	}
 
@@ -1122,9 +1127,7 @@
 
 	if(p_size == 0 && test_bit(DISKLESS,&mdev->flags)) {
 		ERR("some backing storage is needed\n");
-		set_cstate(mdev,Unconfigured);
-		drbd_mdev_cleanup(mdev); // FIXME. Is this valid here ?
-		mdev->receiver.t_state = Exiting;
+		drbd_thread_stop_nowait(&mdev->receiver);
 		return FALSE;
 	}
 	mdev->p_size=p_size;
@@ -1199,12 +1202,9 @@
 				set_cstate(mdev,WFBitMapS);
 			} else { // have_good == -1
 				if (mdev->state == Primary) {
-/*
-	FIXME
-*/
 					ERR("Current Primary shall become sync TARGET! Aborting to prevent data corruption.\n");
 					set_cstate(mdev,StandAlone);
-					mdev->receiver.t_state = Exiting;
+					drbd_thread_stop_nowait(&mdev->receiver);
 					return FALSE;
 				}
 				mdev->gen_cnt[Flags] &= ~MDF_Consistent;
@@ -1449,17 +1449,7 @@
 
 STATIC void drbd_disconnect(drbd_dev *mdev)
 {
-/*
- * FIXME what if
- * (state == Primary) && !(gen_cnt[Flags] & MDF_Consistent) ??
- * or I am DISKLESS ?
- * we need to *at least* block all IO
- *
- * maybe get a write mutex on mdev ?
- * sort of "suspend" the device, untill either operator, or monitoring
- * software, or load, or whatever, kills the box, OR connection to the
- * good data copy is reestablished.
- */
+	D_ASSERT(mdev->cstate < Connected);
 	mdev->o_state = Unknown;
 	drbd_thread_stop_nowait(&mdev->worker);
 	drbd_thread_stop(&mdev->asender);
@@ -1484,10 +1474,6 @@
 	up(&mdev->data.mutex);
 
 	drbd_thread_stop(&mdev->worker);
-	drbd_thread_start(&mdev->worker);
-
-	if(mdev->cstate != StandAlone)
-		set_cstate(mdev,Unconnected);
 
 	drbd_fail_pending_reads(mdev);
 	drbd_rs_cancel_all(mdev);
@@ -1506,12 +1492,6 @@
 
 	mdev->epoch_size=0;
 
-	if (mdev->state == Primary) {
-		if(!test_bit(DO_NOT_INC_CONCNT,&mdev->flags))
-			drbd_md_inc(mdev,ConnectedCnt);
-		drbd_md_write(mdev);
-	}
-
 	if(atomic_read(&mdev->unacked_cnt)) {
 		ERR("unacked_cnt!=0\n");
 		atomic_set(&mdev->unacked_cnt,0);
@@ -1519,10 +1499,38 @@
 
 	/* Since syncer's blocks are also counted, there is no hope that
 	   pending_cnt is zero. */
-	atomic_set(&mdev->ap_pending_cnt,0);
-	atomic_set(&mdev->rs_pending_cnt,0);
+	ERR_IF(atomic_read(&mdev->ap_pending_cnt))
+		atomic_set(&mdev->ap_pending_cnt,0);
+	ERR_IF(atomic_read(&mdev->rs_pending_cnt))
+		atomic_set(&mdev->rs_pending_cnt,0);
+
 	wake_up_interruptible(&mdev->cstate_wait);
 
+	if ( mdev->state == Primary &&
+	    ( test_bit(DISKLESS,&mdev->flags)
+	    || !(mdev->gen_cnt[Flags] & MDF_Consistent) ) ) {
+		drbd_panic("Sorry, I have no access to good data anymore.\n");
+	}
+
+	if (get_t_state(&mdev->receiver) == Exiting) {
+		if (test_bit(DISKLESS,&mdev->flags)) {
+			// Secondary
+			set_cstate(mdev,Unconfigured);
+			drbd_mdev_cleanup(mdev);
+		} else {
+			set_cstate(mdev,StandAlone);
+			drbd_thread_start(&mdev->worker);
+		}
+	} else {
+		set_cstate(mdev,Unconnected);
+		drbd_thread_start(&mdev->worker);
+	}
+
+	if (mdev->state == Primary) {
+		if(!test_bit(DO_NOT_INC_CONCNT,&mdev->flags))
+			drbd_md_inc(mdev,ConnectedCnt);
+		drbd_md_write(mdev);
+	}
 	clear_bit(DO_NOT_INC_CONCNT,&mdev->flags);
 
 	INFO("Connection lost.\n");
@@ -1542,28 +1550,23 @@
 			WARN("Discarding network configuration.\n");
 			break;
 		}
-		if (thi->t_state == Exiting) break;
+		if (get_t_state(thi) == Exiting) break;
 		drbdd(mdev);
 		drbd_disconnect(mdev);
-		if (thi->t_state == Exiting) break;
+		if (get_t_state(thi) == Exiting) break;
 		else {
 			if (signal_pending(current)) {
 				drbd_flush_signals(current);
-				if (thi->t_state != Restarting)
-					ERR("unexpected thread state: %d\n",
-					    thi->t_state);
 			}
+			spin_lock(&thi->t_lock);
+			D_ASSERT(thi->t_state == Restarting);
 			thi->t_state = Running;
+			spin_unlock(&thi->t_lock);
 		}
 	}
 
 	INFO("receiver exiting\n");
 
-	if(test_bit(DISKLESS,&mdev->flags)) {
-	  	set_cstate(mdev, Unconfigured);
-		drbd_mdev_cleanup(mdev);
-	} else set_cstate(mdev, StandAlone);
-
 	return 0;
 }
 
@@ -1726,7 +1729,7 @@
 	current->policy = SCHED_RR;  /* Make this a realtime task! */
 	current->rt_priority = 2;    /* more important than all other tasks */
 
-	while (thi->t_state == Running) {
+	while (get_t_state(thi) == Running) {
 		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
 			ERR_IF(!drbd_send_ping(mdev)) goto err;
 			// half ack timeout only,
@@ -1801,6 +1804,8 @@
 
 	if(0) {
 	err:
+		if (mdev->cstate >= Connected)
+			set_cstate(mdev,NetworkFailure);
 		drbd_thread_restart_nowait(&mdev->receiver);
 	}
 
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_req-2.4.c,v
retrieving revision 1.33.2.68
retrieving revision 1.33.2.69
diff -u -3 -r1.33.2.68 -r1.33.2.69
--- drbd_req-2.4.c	4 May 2004 15:03:02 -0000	1.33.2.68
+++ drbd_req-2.4.c	5 May 2004 13:25:15 -0000	1.33.2.69
@@ -298,7 +298,11 @@
 			 * but we ignore it here. Is it actually void,
 			 * because error handling takes place elsewhere?
 			 */
-			drbd_send_dblock(mdev,req);
+			if (!drbd_send_dblock(mdev,req)) {
+				if (mdev->cstate >= Connected)
+					set_cstate(mdev,NetworkFailure);
+				drbd_thread_restart_nowait(&mdev->receiver);
+			}
 		} else if (target_area_out_of_sync) {
 			drbd_read_remote(mdev,req);
 		} else {