[DRBD-cvs] drbd by phil; * drbd_threads are thread safe now :) * ...

drbd-user@lists.linbit.com drbd-user@lists.linbit.com
Wed, 5 May 2004 19:07:07 +0200 (CEST)


DRBD CVS committal

Author  : phil
Module  : drbd

Dir     : drbd/drbd


Modified Files:
      Tag: rel-0_7-branch
	drbd_actlog.c drbd_dsender.c drbd_int.h drbd_main.c 
	drbd_proc.c drbd_receiver.c 


Log Message:
* drbd_threads are thread safe now :)
* drbd_rs_begin_io is now INTERRUPTIBLE, to aviod deadlock in drbd_disconnect()
* worker's cleanup improved
* Fixed a painfull bug in drbd_cleanup_ee()

===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_actlog.c,v
retrieving revision 1.1.2.97
retrieving revision 1.1.2.98
diff -u -3 -r1.1.2.97 -r1.1.2.98
--- drbd_actlog.c	4 May 2004 08:44:15 -0000	1.1.2.97
+++ drbd_actlog.c	5 May 2004 17:07:02 -0000	1.1.2.98
@@ -773,21 +773,34 @@
  *
  * @sector: The sector number
  */
-void drbd_rs_begin_io(drbd_dev* mdev, sector_t sector)
+int drbd_rs_begin_io(drbd_dev* mdev, sector_t sector)
 {
 	unsigned int enr = (sector >> (BM_EXTENT_SIZE_B-9));
 	struct bm_extent* bm_ext;
 	int i;
 
-	wait_event(mdev->al_wait, (bm_ext = _bme_get(mdev,enr)) );
+	if( wait_event_interruptible(mdev->al_wait, 
+				     (bm_ext = _bme_get(mdev,enr)) ) ) {
+		return 0;
+	}
 
-	if(test_bit(BME_LOCKED,&bm_ext->flags)) return;
+	if(test_bit(BME_LOCKED,&bm_ext->flags)) return 1;
 
 	for(i=0;i<SM;i++) {
-		wait_event(mdev->al_wait, !_is_in_al(mdev,enr*SM+i) );
+		if( wait_event_interruptible(mdev->al_wait, 
+					     !_is_in_al(mdev,enr*SM+i) ) ) {
+			if( lc_put(mdev->resync,&bm_ext->lce) == 0 ) {
+				clear_bit(BME_NO_WRITES,&bm_ext->flags);
+				atomic_dec(&mdev->resync_locked);
+				wake_up(&mdev->al_wait);
+				return 0;
+			}
+		}
 	}
 
 	set_bit(BME_LOCKED,&bm_ext->flags);
+
+	return 1;
 }
 
 void drbd_rs_complete_io(drbd_dev* mdev, sector_t sector)
@@ -828,13 +841,13 @@
 	for(i=0;i<mdev->resync->nr_elements;i++) {
 		bm_ext = (struct bm_extent*) lc_entry(mdev->resync,i);
 		if(bm_ext->lce.lc_number == LC_FREE) continue;
+		atomic_sub(bm_ext->lce.refcnt,&mdev->rs_pending_cnt);
 		bm_ext->lce.refcnt = 0; // Rude but ok.
 		bm_ext->rs_left = 0;
 		clear_bit(BME_LOCKED,&bm_ext->flags);
 		clear_bit(BME_NO_WRITES,&bm_ext->flags);
 		lc_del(mdev->resync,&bm_ext->lce);
 	}
-
-	wake_up(&mdev->al_wait);
 	spin_unlock_irq(&mdev->al_lock);
+	wake_up(&mdev->al_wait);
 }
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_dsender.c,v
retrieving revision 1.1.2.103
retrieving revision 1.1.2.104
diff -u -3 -r1.1.2.103 -r1.1.2.104
--- drbd_dsender.c	5 May 2004 13:25:15 -0000	1.1.2.103
+++ drbd_dsender.c	5 May 2004 17:07:02 -0000	1.1.2.104
@@ -479,7 +479,8 @@
 			return 1;
 		}
 
-		drbd_rs_begin_io(mdev,sector);
+		if(!drbd_rs_begin_io(mdev,sector)) return 0;
+
 		if(unlikely(!bm_get_bit(mdev->mbds_id,sector,BM_BLOCK_SIZE))) {
 		      //INFO("Block got synced while in drbd_rs_begin_io()\n");
 			drbd_rs_complete_io(mdev,sector);
@@ -541,6 +542,7 @@
 		spin_lock_irq(&mdev->ee_lock);
 		drbd_put_ee(mdev,e);
 		spin_unlock_irq(&mdev->ee_lock);
+		dec_unacked(mdev,HERE);
 		return 1;
 	}
 
@@ -571,6 +573,7 @@
 		spin_lock_irq(&mdev->ee_lock);
 		drbd_put_ee(mdev,e);
 		spin_unlock_irq(&mdev->ee_lock);
+		dec_unacked(mdev,HERE);
 		return 1;
 	}
 
@@ -834,7 +837,8 @@
 {
 	drbd_dev *mdev = thi->mdev;
 	struct drbd_work *w = 0;
-	int intr;
+	LIST_HEAD(work_list);
+	int intr,i;
 
 	sprintf(current->comm, "drbd%d_worker", (int)(mdev-drbd_conf));
 
@@ -859,7 +863,7 @@
 
 		ERR_IF (get_t_state(thi) != Running)
 			break;
-		
+
 		// if (need_resched()) schedule();
 
 		w = 0;
@@ -881,17 +885,22 @@
 
 	drbd_wait_ee(mdev,&mdev->read_ee);
 
-
-	while(!down_trylock(&mdev->data.work.s)) {
-		spin_lock_irq(&mdev->req_lock);
-		if (!list_empty(&mdev->data.work.q)) {
-			w = list_entry(mdev->data.work.q.next,
-				       struct drbd_work,list);
-			list_del_init(&w->list);
-		}
-		spin_unlock_irq(&mdev->req_lock);
+	spin_lock_irq(&mdev->req_lock);
+	list_splice_init(&mdev->data.work.q,&work_list);
+	spin_unlock_irq(&mdev->req_lock);
+
+	i = 0;
+	while(!list_empty(&work_list)) {
+		w = list_entry(work_list.next, struct drbd_work,list);
+		list_del_init(&w->list);
 		w->cb(mdev,w,1);
+		i++;
 	}
+
+	spin_lock_irq(&mdev->req_lock);
+	D_ASSERT(list_empty(&mdev->data.work.q));
+	sema_init(&mdev->data.work.s,0);
+	spin_unlock_irq(&mdev->req_lock);
 
 	INFO("worker terminated\n");
 
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_int.h,v
retrieving revision 1.58.2.157
retrieving revision 1.58.2.158
diff -u -3 -r1.58.2.157 -r1.58.2.158
--- drbd_int.h	5 May 2004 13:25:15 -0000	1.58.2.157
+++ drbd_int.h	5 May 2004 17:07:02 -0000	1.58.2.158
@@ -894,12 +894,14 @@
 // drbd_proc.c
 extern struct proc_dir_entry *drbd_proc;
 extern int drbd_proc_get_info(char *, char **, off_t, int, int *, void *);
+extern const char* cstate_to_name(Drbd_CState s);
+extern const char* nodestate_to_name(Drbd_State s);
 
 // drbd_actlog.c
 extern void drbd_al_begin_io(struct Drbd_Conf *mdev, sector_t sector);
 extern void drbd_al_complete_io(struct Drbd_Conf *mdev, sector_t sector);
 extern void drbd_rs_complete_io(struct Drbd_Conf *mdev, sector_t sector);
-extern void drbd_rs_begin_io(struct Drbd_Conf *mdev, sector_t sector);
+extern int drbd_rs_begin_io(struct Drbd_Conf *mdev, sector_t sector);
 extern void drbd_rs_cancel_all(drbd_dev* mdev);
 extern void drbd_al_read_log(struct Drbd_Conf *mdev);
 extern void drbd_set_in_sync(drbd_dev* mdev, sector_t sector,int blk_size);
@@ -1196,16 +1198,21 @@
 		bm_set_bit(mdev, sector, blk_size, SS_OUT_OF_SYNC);
 }
 
-#if 0
+#ifdef DUMP_ALL_PACKETS
 /*
  * enable to dump information about every packet exchange.
  */
+#define INFOP(fmt, args...) \
+	INFO("%s:%d: %s [%d] %s %s " fmt , \
+	     file, line, current->comm, current->pid, \
+	     sockname, recv?"<<<":">>>" \
+	     , ## args )
 static inline void
 dump_packet(drbd_dev *mdev, struct socket *sock,
-	    int recv, Drbd_Polymorph_Packet *p)
+	    int recv, Drbd_Polymorph_Packet *p, char* file, int line)
 {
 	char *sockname = sock == mdev->meta.socket ? "meta" : "data";
-	int cmd = be16_to_cpu(p->head.command);
+	int cmd = (recv == 2) ? p->head.command : be16_to_cpu(p->head.command);
 	switch (cmd) {
 	case Ping:
 	case PingAck:
@@ -1215,8 +1222,10 @@
 
 	case SyncParam:
 	case ReportParams:
-		INFO(" %s [%d] %s %s %s\n", current->comm, current->pid,
-		     sockname, recv?"<<<":">>>", cmdname(cmd));
+		INFOP("%s\n", cmdname(cmd));
+		break;
+
+	case ReportBitMap: /* don't report this */
 		break;
 
 	case Data:
@@ -1229,31 +1238,23 @@
 
 	case DataRequest:
 	case RSDataRequest:
-		INFO(" %s [%d] %s %s %s (%lu,%lx)\n", current->comm, current->pid,
-		     sockname, recv?"<<<":">>>", cmdname(cmd),
-		     (long)be64_to_cpu(p->Data.sector), (long)p->Data.block_id
+		INFOP("%s (%lu,%llx)\n", cmdname(cmd),
+		     (long)be64_to_cpu(p->Data.sector), (long long)p->Data.block_id
 		);
 		break;
 
 	case Barrier:
 	case BarrierAck:
-		INFO(" %s [%d] %s %s %s (%u)\n", current->comm, current->pid,
-		     sockname, recv?"<<<":">>>", cmdname(cmd),
-		     p->Barrier.barrier
-		);
+		INFOP("%s (%u)\n", cmdname(cmd), p->Barrier.barrier);
 		break;
 
 	default:
-		INFO(" %s [%d] %s %s %s (%u)\n", current->comm, current->pid,
-		     sockname, recv?"<<<":">>>", cmdname(cmd), cmd
-		);
+		INFOP("%s (%u)\n",cmdname(cmd), cmd);
 		break;
 	}
 }
 #else
-static inline void
-dump_packet(drbd_dev *mdev, struct socket *sock,
-	    int recv, Drbd_Polymorph_Packet *p)   { /* DO NOTHING */ }
+#define dump_packet(ignored...) ((void)0)
 #endif
 
 
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_main.c,v
retrieving revision 1.73.2.161
retrieving revision 1.73.2.162
diff -u -3 -r1.73.2.161 -r1.73.2.162
--- drbd_main.c	5 May 2004 13:25:15 -0000	1.73.2.161
+++ drbd_main.c	5 May 2004 17:07:02 -0000	1.73.2.162
@@ -388,6 +388,10 @@
 	Drbd_CState os;
 
 	os = mdev->cstate;
+
+	INFO("%s [%d]: cstate %s --> %s\n", current->comm, current->pid,
+	     cstate_to_name(os), cstate_to_name(ns) );
+
 	mdev->cstate = ns;
 	smp_mb();
 	wake_up(&mdev->cstate_wait);
@@ -409,9 +413,10 @@
 	int retval;
 
 	drbd_daemonize();
-	D_ASSERT(get_t_state(thi) == Running);
+	D_ASSERT(get_t_state(thi) == None);
 	D_ASSERT(thi->task == NULL);
 	thi->task = current;
+	thi->t_state = Running;
 	smp_mb();
 	complete(&thi->startstop); // notify: thi->task is set.
 
@@ -419,7 +424,8 @@
 
 	spin_lock(&thi->t_lock);
 	thi->task = 0;
-	D_ASSERT(thi->t_state == Exiting);
+	thi->t_state = Exiting;
+	smp_mb();
 	spin_unlock(&thi->t_lock);
 
 	// THINK maybe two different completions?
@@ -446,10 +452,17 @@
 	drbd_dev *mdev = thi->mdev;
 
 	spin_lock(&thi->t_lock);
+
+	/* INFO("%s [%d]: %s %d -> Running\n",
+	     current->comm, current->pid,
+	     thi == &mdev->receiver ? "receiver" :
+             thi == &mdev->asender  ? "asender"  :
+             thi == &mdev->worker   ? "worker"   : "NONSENSE",
+	     thi->t_state); */
+
 	if (thi->t_state == None) {
 		D_ASSERT(thi->task == NULL);
 		// XXX D_ASSERT( thi->startstop something ? )
-		thi->t_state = Running;
 		spin_unlock(&thi->t_lock);
 
 		pid = kernel_thread(drbd_thread_setup, (void *) thi, CLONE_FS);
@@ -458,18 +471,27 @@
 			return;
 		}
 		wait_for_completion(&thi->startstop); // waits until thi->task is set
+		D_ASSERT(thi->task);
+		D_ASSERT(get_t_state(thi) == Running);
 	} else {
 		spin_unlock(&thi->t_lock);
 	}
+
 }
 
 
 void _drbd_thread_stop(struct Drbd_thread *thi, int restart,int wait)
 {
 	drbd_dev *mdev = thi->mdev;
-
 	Drbd_thread_state ns = restart ? Restarting : Exiting;
+
 	spin_lock(&thi->t_lock);
+
+	/* INFO("%s [%d]: %s %d -> %d; %d\n",
+	     current->comm, current->pid,
+	     thi->task ? thi->task->comm : "NULL", thi->t_state, ns, wait); */
+
+
 	if (thi->t_state == None) {
 		spin_unlock(&thi->t_lock);
 		return;
@@ -502,6 +524,7 @@
 		wait_for_completion(&thi->startstop);
 		spin_lock(&thi->t_lock);
 		thi->t_state = None;
+		smp_mb();
 		D_ASSERT(thi->task == NULL);
 		spin_unlock(&thi->t_lock);
 	}
@@ -543,6 +566,7 @@
 	h->command = cpu_to_be16(cmd);
 	h->length  = cpu_to_be16(size-sizeof(Drbd_Header));
 
+	dump_packet(mdev,sock,0,(void*)h, __FILE__, __LINE__);
 	sent = drbd_send(mdev,sock,h,size,msg_flags);
 
 	ok = ( sent == size );
@@ -550,7 +574,6 @@
 		ERR("short sent %s size=%d sent=%d\n",
 		    cmdname(cmd), (int)size, sent);
 	}
-	dump_packet(mdev,sock,0,(void*)h);
 	return ok;
 }
 
@@ -882,6 +905,7 @@
 	mdev->send_task=current;
 	spin_unlock(&mdev->send_task_lock);
 
+	dump_packet(mdev,mdev->data.socket,0,(void*)&p, __FILE__, __LINE__);
 	ok =  (drbd_send(mdev,mdev->data.socket,&p,sizeof(p),MSG_MORE) == sizeof(p))
 	   && _drbd_send_zc_bio(mdev,&e->private_bio);
 
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_proc.c,v
retrieving revision 1.8.2.23
retrieving revision 1.8.2.24
diff -u -3 -r1.8.2.23 -r1.8.2.24
--- drbd_proc.c	1 Apr 2004 06:47:38 -0000	1.8.2.23
+++ drbd_proc.c	5 May 2004 17:07:02 -0000	1.8.2.24
@@ -124,18 +124,14 @@
 	return sz;
 }
 
-int drbd_proc_get_info(char *buf, char **start, off_t offset,
-		       int len, int *unused, void *data)
-{
-	int rlen, i;
-	const char *sn;
-
+const char* cstate_to_name(Drbd_CState s) {
 	static const char *cstate_names[] = {
 		[Unconfigured]   = "Unconfigured",
 		[StandAlone]     = "StandAlone",
 		[Unconnected]    = "Unconnected",
 		[Timeout]        = "Timeout",
 		[BrokenPipe]     = "BrokenPipe",
+		[NetworkFailure] = "NetworkFailure",
 		[WFConnection]   = "WFConnection",
 		[WFReportParams] = "WFReportParams",
 		[Connected]      = "Connected",
@@ -148,12 +144,29 @@
 		[PausedSyncS]    = "PausedSyncS",
 		[PausedSyncT]    = "PausedSyncT",
 	};
+
+	return s < Unconfigured ? "TO_SMALL" :
+	       s > PausedSyncS  ? "TO_LARGE"
+		                : cstate_names[s];
+}
+
+const char* nodestate_to_name(Drbd_State s) {
 	static const char *state_names[] = {
 		[Primary]   = "Primary",
 		[Secondary] = "Secondary",
 		[Unknown]   = "Unknown"
 	};
 
+	return s < Unknown    ? "TO_SMALL" :
+	       s > Secondary  ? "TO_LARGE"
+		              : state_names[s];
+}
+
+int drbd_proc_get_info(char *buf, char **start, off_t offset,
+		       int len, int *unused, void *data)
+{
+	int rlen, i;
+	const char *sn;
 
 	rlen = sprintf(buf, "version: " REL_VERSION " (api:%d/proto:%d)\n\n",
 		       API_VERSION,PRO_VERSION);
@@ -172,7 +185,7 @@
 	*/
 
 	for (i = 0; i < minor_count; i++) {
-		sn = cstate_names[drbd_conf[i].cstate];
+		sn = cstate_to_name(drbd_conf[i].cstate);
 		if(drbd_conf[i].cstate == Connected) {
 			if(test_bit(DISKLESS,&drbd_conf[i].flags)) 
 				sn = "DiskLessClient";
@@ -188,8 +201,8 @@
 			   "    ns:%u nr:%u dw:%u dr:%u al:%u bm:%u "
 			   "lo:%d pe:%d ua:%d\n",
 			   i, sn,
-			   state_names[drbd_conf[i].state],
-			   state_names[drbd_conf[i].o_state],
+			   nodestate_to_name(drbd_conf[i].state),
+			   nodestate_to_name(drbd_conf[i].o_state),
 			   (drbd_conf[i].gen_cnt[Flags]
 			    & MDF_Consistent) ? "Consistent" : "Inconsistent",
 			   drbd_conf[i].send_cnt/2,
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_receiver.c,v
retrieving revision 1.97.2.142
retrieving revision 1.97.2.143
diff -u -3 -r1.97.2.142 -r1.97.2.143
--- drbd_receiver.c	5 May 2004 13:25:15 -0000	1.97.2.142
+++ drbd_receiver.c	5 May 2004 17:07:02 -0000	1.97.2.143
@@ -372,12 +372,11 @@
 		le = mdev->done_ee.next;
 		list_del(le);
 		e = list_entry(le, struct Tl_epoch_entry, w.list);
-		drbd_put_ee(mdev,e);
 		if(mdev->conf.wire_protocol == DRBD_PROT_C ||
 		   is_syncer_blk(mdev,e->block_id)) {
 			dec_unacked(mdev,HERE);
 		}
-
+		drbd_put_ee(mdev,e);
 	}
 
 	spin_unlock_irq(&mdev->ee_lock);
@@ -509,9 +508,9 @@
 		} else if (rv == 0) {
 			INFO("sock was shut down by peer\n");
 			break;
-		} else {
-			// if data comes in bytewise, this might trigger ...
-			ERR("logic error: sock_recvmsg returned %d\n",rv);
+		} else  {
+			/* signal came in after we read a partial message */
+			D_ASSERT(signal_pending(current));
 			break;
 		}
 	};
@@ -683,7 +682,6 @@
 		ERR("short read expecting header on sock: r=%d\n",r);
 		return FALSE;
 	};
-	dump_packet(mdev,mdev->data.socket,1,(void*)h);
 	h->command = be16_to_cpu(h->command);
 	h->length  = be16_to_cpu(h->length);
 	if (unlikely( h->magic != BE_DRBD_MAGIC )) {
@@ -1317,17 +1315,15 @@
 
 STATIC void drbd_fail_pending_reads(drbd_dev *mdev)
 {
-	struct list_head workset,*le;
+	struct list_head *le;
 	drbd_bio_t *bio;
+	LIST_HEAD(workset);
 
 	/*
 	 * Application READ requests
 	 */
 	spin_lock(&mdev->pr_lock);
-	// FIXME use list_splice_init
-	list_add(&workset,&mdev->app_reads);
-	list_del(&mdev->app_reads);
-	INIT_LIST_HEAD(&mdev->app_reads);
+	list_splice_init(&mdev->app_reads,&workset);
 	spin_unlock(&mdev->pr_lock);
 
 	while(!list_empty(&workset)) {
@@ -1444,6 +1440,7 @@
 			    cmdname(header->command), header->length);
 			break;
 		}
+		dump_packet(mdev,mdev->data.socket,2,&mdev->data.rbuf, __FILE__, __LINE__);
 	}
 }
 
@@ -1493,16 +1490,17 @@
 	mdev->epoch_size=0;
 
 	if(atomic_read(&mdev->unacked_cnt)) {
-		ERR("unacked_cnt!=0\n");
+		ERR("unacked_cnt = %d\n",atomic_read(&mdev->unacked_cnt));
 		atomic_set(&mdev->unacked_cnt,0);
 	}
 
-	/* Since syncer's blocks are also counted, there is no hope that
-	   pending_cnt is zero. */
+	if(atomic_read(&mdev->rs_pending_cnt)) {
+		ERR("rs_pending_cnt = %d\n",atomic_read(&mdev->rs_pending_cnt));
+		atomic_set(&mdev->rs_pending_cnt,0);
+	}
+
 	ERR_IF(atomic_read(&mdev->ap_pending_cnt))
 		atomic_set(&mdev->ap_pending_cnt,0);
-	ERR_IF(atomic_read(&mdev->rs_pending_cnt))
-		atomic_set(&mdev->rs_pending_cnt,0);
 
 	wake_up_interruptible(&mdev->cstate_wait);
 
@@ -1786,13 +1784,13 @@
 			}
 			expect = asender_tbl[cmd].pkt_size;
 			ERR_IF(len != expect-sizeof(Drbd_Header)) {
-				dump_packet(mdev,mdev->meta.socket,1,(void*)h);
+				dump_packet(mdev,mdev->meta.socket,1,(void*)h, __FILE__, __LINE__);
 				DUMPI(expect);
 			}
 		}
 		if(received == expect) {
 			D_ASSERT(cmd != -1);
-			dump_packet(mdev,mdev->meta.socket,1,(void*)h);
+			dump_packet(mdev,mdev->meta.socket,1,(void*)h, __FILE__, __LINE__);
 			if(!asender_tbl[cmd].process(mdev,h)) goto err;
 
 			buf      = h;