[DRBD-cvs] drbd by lars; * _drbd_send_page does no longer use tc...

drbd-user@lists.linbit.com drbd-user@lists.linbit.com
Thu, 17 Jun 2004 03:44:42 +0200 (CEST)


DRBD CVS committal

Author  : lars
Module  : drbd

Dir     : drbd/drbd


Modified Files:
      Tag: rel-0_7-branch
	drbd_actlog.c drbd_bitmap.c drbd_dsender.c drbd_fs.c 
	drbd_int.h drbd_main.c drbd_proc.c drbd_receiver.c 
	drbd_req-2.4.c lru_cache.c 


Log Message:

* _drbd_send_page does no longer use tcp_sendpage.
  THIS has been our show stopper!
  though I don't understand where we use it wrong,
  as soon as we use sendmsg instead of sendpage, it works.

other goodies:

* new metadata flag MDF_FullSync
  to indicate that we need a full sync next time.
  typically followed by drbd_bm_set_all(); drbd_bm_write();
  and then cleared again.
* PARTNER_CONSISTENT flag, so we won't sync against or read from
  some inconsistent peer.
* sync handshake improved. detects split brain,
  detects inconsistent local or peer data,
  detects whether full sync is neccessary.
* moved syncer handshake and detach ioctl into their own functions
* access gen_cnt[Flags] through access functions.
  TODO:  maybe these should be inlines.
  maybe these need to be protected by some lock.
* meta data is initialised as inconsistent, need full sync.
* asserts md_io_mutex locked in drbd_md_sync_page_io

if this survives the SuSE test cluster iterations, this shall become -rc1

	:)


===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_actlog.c,v
retrieving revision 1.1.2.108
retrieving revision 1.1.2.109
diff -u -3 -r1.1.2.108 -r1.1.2.109
--- drbd_actlog.c	16 Jun 2004 07:50:37 -0000	1.1.2.108
+++ drbd_actlog.c	17 Jun 2004 01:44:36 -0000	1.1.2.109
@@ -40,6 +40,8 @@
 	struct completion event;
 	int ok = 0;
 
+	D_ASSERT(semaphore_is_locked(&mdev->md_io_mutex));
+
 	if (!mdev->md_bdev) {
 		if (DRBD_ratelimit(5*HZ,5)) {
 			ERR("mdev->md_bdev==NULL\n");
@@ -84,6 +86,8 @@
 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
 	int ok = 0;
 
+	D_ASSERT(semaphore_is_locked(&mdev->md_io_mutex));
+
 	if (!mdev->md_bdev) {
 		if (DRBD_ratelimit(5*HZ,5)) {
 			ERR("mdev->md_bdev==NULL\n");
@@ -475,7 +479,7 @@
 	for(i=0;i<mdev->act_log->nr_elements;i++) {
 		enr = lc_entry(mdev->act_log,i)->lc_number;
 		if(enr == LC_FREE) continue;
-		add += drbd_bm_e_set_all(mdev, enr);
+		add += drbd_bm_ALe_set_all(mdev, enr);
 	}
 
 	lc_unlock(mdev->act_log);
@@ -584,7 +588,15 @@
 			//WARN("Recounting sectors in %d (resync LRU too small?)\n", enr);
 			// This element should be in the cache
 			// since drbd_rs_begin_io() pulled it already in.
-			ext->rs_left = drbd_bm_e_weight(mdev,enr);
+			int rs_left = drbd_bm_e_weight(mdev,enr);
+			if (ext->flags != 0) {
+				WARN("changing resync lce: %d[%u;%02lx]"
+				     " -> %d[%u;00]\n",
+				     ext->lce.lc_number, ext->rs_left,
+				     ext->flags, enr, rs_left);
+				ext->flags = 0;
+			}
+			ext->rs_left = rs_left;
 			lc_changed(mdev->resync,&ext->lce);
 		}
 		lc_put(mdev->resync,&ext->lce);
@@ -607,6 +619,12 @@
 			udw->enr = ext->lce.lc_number;
 			udw->w.cb = w_update_odbm;
 			drbd_queue_work_front(mdev,&mdev->data.work,&udw->w);
+			if (ext->flags != 0) {
+				WARN("deleting resync lce: %d[%u;%02lx]\n",
+				     ext->lce.lc_number, ext->rs_left,
+				     ext->flags);
+				ext->flags = 0;
+			}
 			lc_del(mdev->resync,&ext->lce);
 		}
 	}
@@ -813,12 +831,14 @@
 		sig = wait_event_interruptible( mdev->al_wait,
 				!_is_in_al(mdev,enr*AL_EXT_PER_BM_SECT+i) );
 		if (sig) {
+			spin_lock_irq(&mdev->al_lock);
 			if( lc_put(mdev->resync,&bm_ext->lce) == 0 ) {
 				clear_bit(BME_NO_WRITES,&bm_ext->flags);
 				atomic_dec(&mdev->resync_locked);
 				wake_up(&mdev->al_wait);
-				return 0;
 			}
+			spin_unlock_irq(&mdev->al_lock);
+			return 0;
 		}
 	}
 
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_bitmap.c,v
retrieving revision 1.1.2.3
retrieving revision 1.1.2.4
diff -u -3 -r1.1.2.3 -r1.1.2.4
--- drbd_bitmap.c	16 Jun 2004 10:00:43 -0000	1.1.2.3
+++ drbd_bitmap.c	17 Jun 2004 01:44:36 -0000	1.1.2.4
@@ -446,12 +446,12 @@
 
 /* read one sector of the on disk bitmap into memory.
  * on disk bitmap is little endian.
- * @sector is _sector_ offset from start of on disk bitmap (aka bm-extent nr).
+ * @enr is _sector_ offset from start of on disk bitmap (aka bm-extent nr).
  * returns 0 on success, -EIO on failure
  */
-int drbd_bm_read_sect(drbd_dev *mdev,sector_t sector)
+int drbd_bm_read_sect(drbd_dev *mdev,unsigned long enr)
 {
-	sector_t on_disk_sector = sector + drbd_md_ss(mdev) + MD_BM_OFFSET;
+	sector_t on_disk_sector = enr + drbd_md_ss(mdev) + MD_BM_OFFSET;
 	int bm_words, num_words, offset, err  = 0;
 
 	// MUST_BE_LOCKED(); not neccessarily global ...
@@ -459,11 +459,11 @@
 	down(&mdev->md_io_mutex);
 	if(drbd_md_sync_page_io(mdev,on_disk_sector,READ)) {
 		bm_words  = drbd_bm_words(mdev);
-		offset    = S2W(sector);	// word offset into bitmap
+		offset    = S2W(enr);	// word offset into bitmap
 		num_words = min(S2W(1), bm_words - offset);
 #if DUMP_MD >= 3
 	INFO("write_sect: sector=%lu offset=%u num_words=%u\n",
-			(unsigned long) sector, offset, num_words);
+			enr, offset, num_words);
 #endif
 		drbd_bm_set_lel( mdev, offset, num_words,
 				 page_address(mdev->md_io_page) );
@@ -472,11 +472,11 @@
 		err = -EIO;
 		ERR( "IO ERROR reading bitmap sector %lu "
 		     "(meta-disk sector %lu)\n",
-		     (unsigned long)sector, (unsigned long)on_disk_sector );
+		     enr, (unsigned long)on_disk_sector );
 		drbd_chk_io_error(mdev, 1);
 		drbd_io_error(mdev);
 		for (i = 0; i < AL_EXT_PER_BM_SECT; i++)
-			drbd_bm_e_set_all(mdev,sector*AL_EXT_PER_BM_SECT+i);
+			drbd_bm_ALe_set_all(mdev,enr*AL_EXT_PER_BM_SECT+i);
 	}
 	up(&mdev->md_io_mutex);
 	return err;
@@ -509,23 +509,23 @@
  * drbd_bm_write_sect: Writes a 512 byte piece of the bitmap to its
  * on disk location. On disk bitmap is little endian.
  *
- * @sector: The _sector_ offset from the start of the bitmap.
+ * @enr: The _sector_ offset from the start of the bitmap.
  *
  */
-int drbd_bm_write_sect(struct Drbd_Conf *mdev,sector_t sector)
+int drbd_bm_write_sect(struct Drbd_Conf *mdev,unsigned long enr)
 {
-	sector_t on_disk_sector = sector + drbd_md_ss(mdev) + MD_BM_OFFSET;
+	sector_t on_disk_sector = enr + drbd_md_ss(mdev) + MD_BM_OFFSET;
 	int bm_words, num_words, offset, err  = 0;
 
 	// MUST_BE_LOCKED(); not neccessarily global...
 
 	down(&mdev->md_io_mutex);
 	bm_words  = drbd_bm_words(mdev);
-	offset    = S2W(sector);	// word offset into bitmap
+	offset    = S2W(enr);	// word offset into bitmap
 	num_words = min(S2W(1), bm_words - offset);
 #if DUMP_MD >= 3
 	INFO("write_sect: sector=%lu offset=%u num_words=%u\n",
-			(unsigned long) sector, offset, num_words);
+			enr, offset, num_words);
 #endif
 	drbd_bm_get_lel( mdev, offset, num_words,
 			 page_address(mdev->md_io_page) );
@@ -534,11 +534,11 @@
 		err = -EIO;
 		ERR( "IO ERROR reading bitmap sector %lu "
 		     "(meta-disk sector %lu)\n",
-		     (unsigned long)sector, (unsigned long)on_disk_sector );
+		     enr, (unsigned long)on_disk_sector );
 		drbd_chk_io_error(mdev, 1);
 		drbd_io_error(mdev);
 		for (i = 0; i < AL_EXT_PER_BM_SECT; i++)
-			drbd_bm_e_set_all(mdev,sector*AL_EXT_PER_BM_SECT+i);
+			drbd_bm_ALe_set_all(mdev,enr*AL_EXT_PER_BM_SECT+i);
 	}
 	mdev->bm_writ_cnt++;
 	up(&mdev->md_io_mutex);
@@ -723,7 +723,7 @@
  * reference count of some bitmap extent element from some lru instead...
  *
  */
-int drbd_bm_e_weight(drbd_dev *mdev, unsigned int enr)
+int drbd_bm_e_weight(drbd_dev *mdev, unsigned long enr)
 {
 	struct drbd_bitmap *b = mdev->bitmap;
 	int count, s, e;
@@ -750,8 +750,8 @@
 	return count;
 }
 
-/* set all bits covered by the bm-extent enr */
-unsigned long drbd_bm_e_set_all(drbd_dev *mdev, unsigned int enr)
+/* set all bits covered by the AL-extent al_enr */
+unsigned long drbd_bm_ALe_set_all(drbd_dev *mdev, unsigned long al_enr)
 {
 	struct drbd_bitmap *b = mdev->bitmap;
 	unsigned long weight;
@@ -764,8 +764,8 @@
 	BM_PARANOIA_CHECK();
 	weight = b->bm_set;
 
-	s = S2W(enr);
-	e = min((size_t)S2W(enr+1),b->bm_words);
+	s = al_enr * BM_WORDS_PER_AL_EXT;
+	e = min_t(size_t, s + BM_WORDS_PER_AL_EXT, b->bm_words);
 	count = 0;
 	if (s < b->bm_words) {
 		const unsigned long* w = b->bm+s;
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_dsender.c,v
retrieving revision 1.1.2.121
retrieving revision 1.1.2.122
diff -u -3 -r1.1.2.121 -r1.1.2.122
--- drbd_dsender.c	15 Jun 2004 13:42:29 -0000	1.1.2.121
+++ drbd_dsender.c	17 Jun 2004 01:44:36 -0000	1.1.2.122
@@ -349,7 +349,13 @@
 	drbd_request_t *req = (drbd_request_t*)w;
 	int ok;
 
-	// TODO send a "set_out_of_sync" packet to the peer
+	/* FIXME send a "set_out_of_sync" packet to the peer
+	 * in the PassOn case...
+	 * in the Detach (or Panic) case, we (try to) send
+	 * a "we are diskless" param packet anyways, and the peer
+	 * will then set the FullSync bit in the meta data ...
+	 */
+	D_ASSERT(mdev->on_io_error != PassOn);
 
 	INVALIDATE_MAGIC(req);
 	mempool_free(req,drbd_request_mempool);
@@ -369,7 +375,7 @@
 	smp_rmb();
 	if ( cancel ||
 	     mdev->cstate < Connected ||
-	     test_bit(PARTNER_DISKLESS,&mdev->flags) ) {
+	     !test_bit(PARTNER_CONSISTENT,&mdev->flags) ) {
 		drbd_panic("WE ARE LOST. Local IO failure, no peer.\n");
 
 		// does not make much sense, but anyways...
@@ -455,7 +461,9 @@
 		return 0;
 	}
 
-	D_ASSERT(mdev->cstate == SyncTarget);
+	if (mdev->cstate != SyncTarget) {
+		ERR("%s in w_make_resync_request\n", cstate_to_name(mdev->cstate));
+	}
 
         number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
 
@@ -525,8 +533,14 @@
 	     dt,(unsigned long)n);
 
 	if (mdev->cstate == SyncTarget) {
-		mdev->gen_cnt[Flags] |= MDF_Consistent;
+		drbd_md_set_flag(mdev,MDF_Consistent);
+		ERR_IF(drbd_md_test_flag(mdev,MDF_FullSync))
+			drbd_md_clear_flag(mdev,MDF_FullSync);
 		drbd_md_write(mdev);
+	} else if (mdev->cstate == SyncSource) {
+		set_bit(PARTNER_CONSISTENT, &mdev->flags);
+	} else {
+		D_ASSERT(0);
 	}
 
 	// assert that all bit-map parts are cleared.
@@ -665,9 +679,16 @@
 
 	if(mdev->cstate == SyncTarget) {
 		ERR_IF(test_bit(STOP_SYNC_TIMER,&mdev->flags)) {
+			unsigned long rs_left = drbd_bm_total_weight(mdev);
 			clear_bit(STOP_SYNC_TIMER,&mdev->flags);
+			if (rs_left == 0) {
+				INFO("rs_left==0 in _drbd_rs_resume\n");
+			} else {
+				ERR("STOP_SYNC_TIMER was set in "
+				    "_drbd_rs_resume, but rs_left still %lu\n",
+				    rs_left);
+			}
 		}
-		D_ASSERT(drbd_bm_total_weight(mdev) > 0);
 		mod_timer(&mdev->resync_timer,jiffies);
 	}
 }
@@ -809,14 +830,21 @@
 void drbd_start_resync(drbd_dev *mdev, Drbd_CState side)
 {
 	if(side == SyncTarget) {
-		mdev->gen_cnt[Flags] &= ~MDF_Consistent;
+		drbd_md_clear_flag(mdev,MDF_Consistent);
 		drbd_bm_reset_find(mdev);
-	} else {
+	} else if (side == SyncSource) {
+		clear_bit(PARTNER_CONSISTENT, &mdev->flags);
 		/* If we are SyncSource we must be consistent.
 		 * FIXME this should be an assertion only,
 		 * otherwise it masks a logic bug somewhere else...
 		 */
-		mdev->gen_cnt[Flags] |= MDF_Consistent;
+		ERR_IF (!drbd_md_test_flag(mdev,MDF_Consistent)) {
+			// FIXME this is actually a BUG()!
+			drbd_md_set_flag(mdev,MDF_Consistent);
+		}
+	} else {
+		D_ASSERT(0);
+		return;
 	}
 	drbd_md_write(mdev);
 
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_fs.c,v
retrieving revision 1.28.2.105
retrieving revision 1.28.2.106
diff -u -3 -r1.28.2.105 -r1.28.2.106
--- drbd_fs.c	15 Jun 2004 10:07:32 -0000	1.28.2.105
+++ drbd_fs.c	17 Jun 2004 01:44:36 -0000	1.28.2.106
@@ -163,6 +163,57 @@
 	return rv;
 }
 
+/* checks that the al lru is of requested size, and if neccessary tries to
+ * allocate a new one. returns -EBUSY if current al lru is still used,
+ * -ENOMEM when allocation failed, and 0 on success.
+ */  
+STATIC int drbd_check_al_size(drbd_dev *mdev)
+{
+	struct lru_cache *n,*t;
+	struct lc_element *e;
+	unsigned int in_use;
+	int i;
+
+	ERR_IF(mdev->sync_conf.al_extents < 7)
+		mdev->sync_conf.al_extents = 127;
+
+	if ( mdev->act_log &&
+	     mdev->act_log->nr_elements == mdev->sync_conf.al_extents )
+		return 0;
+
+	in_use = 0;
+	t = mdev->act_log;
+	n = lc_alloc(mdev->sync_conf.al_extents,
+		     sizeof(struct lc_element), mdev);
+
+	if (n==NULL) {
+		ERR("Cannot allocate act_log lru!\n");
+		return -ENOMEM;
+	}
+	spin_lock_irq(&mdev->al_lock);
+	if (t) {
+		for (i=0; i < t->nr_elements; i++) {
+			e = lc_entry(t,i);
+			if (e->refcnt)
+				ERR("refcnt(%d)==%d\n",
+				    e->lc_number, e->refcnt);
+			in_use += e->refcnt;
+		}
+	}
+	if (!in_use) {
+		mdev->act_log = n;
+	}
+	spin_unlock_irq(&mdev->al_lock);
+	if (in_use) {
+		ERR("Activity log still in use!\n");
+		lc_free(n);
+		return -EBUSY;
+	} else {
+		if (t) lc_free(t);
+	}
+	return 0;
+}
+
 STATIC
 int drbd_ioctl_set_disk(struct Drbd_Conf *mdev,
 			struct ioctl_disk_config * arg)
@@ -198,10 +249,33 @@
 	if (copy_from_user(&new_conf, &arg->config,sizeof(struct disk_config)))
 		return -EFAULT;
 
+	/* FIXME
+	 * I'd like to do it here, so I can just fail this ioctl with ENOMEM.
+	 * but drbd_md_read below might change the al_nr_extens again, so need
+	 * to do it there again anyways...
+	 * but then I already changed it all and cannot easily undo it..
+	 * for now, do it there, but then if it fails, rather panic than later
+	 * have a NULL pointer dereference.
+	 *
+	i = drbd_check_al_size(mdev);
+	if (i) return i;
+	 *
+	 */
+
 	if (mdev->cstate == Unconfigured) {
 		// ioctl already has a refcnt
 		__module_get(THIS_MODULE);
 		mput = 1;
+	} else {
+		/* FIXME allow reattach while connected,
+		 * and allow it in Primary/Diskless state...
+		 * currently there are strange races leading to a distributed
+		 * deadlock in that case...
+		 */
+		if ( mdev->cstate != StandAlone /* &&
+		    mdev->cstate != Connected */) {
+			return -EBUSY;
+		}
 	}
 
 	if ( new_conf.meta_index < -1) {
@@ -348,41 +422,44 @@
 
 	drbd_bm_lock(mdev); // racy...
 	drbd_determin_dev_size(mdev);
+	/* FIXME
+	 * what if we now have la_size == 0 ?? eh?
+	 */
 
-	if(md_gc_valid > 0) drbd_bm_read(mdev);
-	else {
+	if (md_gc_valid <= 0) {
 		INFO("Assuming that all blocks are out of sync (aka FullSync)\n");
 		drbd_bm_set_all(mdev);
 		drbd_bm_write(mdev);
-	}
-
-	D_ASSERT(mdev->sync_conf.al_extents >= 7);
-
-	if ( !mdev->act_log ||
-	     mdev->act_log->nr_elements != mdev->sync_conf.al_extents )
-	{
-		struct lru_cache *n,*t;
-		n = lc_alloc(mdev->sync_conf.al_extents,
-			     sizeof(struct lc_element), mdev);
-		ERR_IF (n==NULL) {
-			/* FIXME
-			 * allocation failed.
-			 * how do we cleanup this mess now?
-			 */
+		drbd_md_clear_flag(mdev,MDF_FullSync);
+		drbd_md_write(mdev);
+	} else { // md_gc_valid > 0
+		/* FIXME this still does not propagate io errors! */
+		drbd_bm_read(mdev);
+	}
+
+	i = drbd_check_al_size(mdev);
+	if (i) {
+// FATAL!
+		/* FIXME see the comment above.
+		 * if this fails I need to undo all changes,
+		 * go back into Unconfigured,
+		 * and fail the ioctl with ENOMEM...
+		 */
+		// return i;
+		drbd_panic("Cannot allocate act_log\n");
+		set_current_state(TASK_ZOMBIE);
+		schedule(); // drbdsetup suicide...
+	}
+
+	if (md_gc_valid > 0) {
+		drbd_al_read_log(mdev);
+		if (drbd_md_test_flag(mdev,MDF_PrimaryInd)) {
+			drbd_al_apply_to_bm(mdev);
+			drbd_al_to_on_disk_bm(mdev);
 		}
-		// FIXME if (still_in_use) BUG();
-		spin_lock_irq(&mdev->al_lock);
-		t = mdev->act_log;
-		mdev->act_log = n;
-		spin_unlock_irq(&mdev->al_lock);
-		if (t) lc_free(t);
-	}
-
-	drbd_al_read_log(mdev);
-	if(mdev->gen_cnt[Flags] & MDF_PrimaryInd) {
-		drbd_al_apply_to_bm(mdev);
-		drbd_al_to_on_disk_bm(mdev);
-	}
+	} /* else {
+	     FIXME wipe out on disk al!
+	} */
 
 	drbd_set_blocksize(mdev,INITIAL_BLOCK_SIZE);
 
@@ -397,7 +474,11 @@
 // FIXME EXPLAIN:
 	clear_bit(MD_IO_ALLOWED,&mdev->flags);
 
-	if(mdev->cstate >= Connected ) {
+	/* FIXME currently only StandAlone here...
+	 * Connected is not possible, since
+	 * above we return -EBUSY in that case  */
+	D_ASSERT(mdev->cstate <= Connected);
+	if(mdev->cstate == Connected ) {
 		drbd_send_param(mdev,1);
 	}
 	drbd_bm_unlock(mdev);
@@ -561,6 +642,8 @@
 
 int drbd_set_state(drbd_dev *mdev,Drbd_State newstate)
 {
+	int forced = 0;
+	int dont_have_good_data;
 	NOT_IN_26(int minor = mdev-drbd_conf;)
 
 	D_ASSERT(semaphore_is_locked(&mdev->device_mutex));
@@ -578,12 +661,13 @@
 
 #if LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)
 	smp_rmb();
-	if(newstate == Secondary &&
+	if ( (newstate & Secondary) &&
 	   (test_bit(WRITER_PRESENT,&mdev->flags) ||
 	    drbd_is_mounted(minor) == MountedRW))
 		return -EBUSY;
 #else
-	if(mdev->this_bdev->bd_contains == 0) {
+	ERR_IF (mdev->this_bdev->bd_contains == 0) {
+		// FIXME this masks a bug somewhere else!
 		mdev->this_bdev->bd_contains = mdev->this_bdev;
 	}
 
@@ -596,13 +680,50 @@
 	}
 #endif
 
-	if( (newstate & Primary) &&
-	    !(mdev->gen_cnt[Flags] & MDF_Consistent) &&
-	    (mdev->cstate < Connected) &&
-	    !(newstate & DontBlameDrbd) )
-		return -EIO;
 
-// FIXME if necessary set MDF_Consistent
+	/* I dont have access to good data anywhere, if:
+	 *  ( I am diskless OR inconsistent )
+	 *  AND
+	 *  ( not connected, or partner has no consistent data either )
+	 */
+	dont_have_good_data =
+		(    test_bit(DISKLESS, &mdev->flags)
+		  || !drbd_md_test_flag(mdev,MDF_Consistent) )
+		&&
+		( mdev->cstate < Connected
+		  || test_bit(PARTNER_DISKLESS, &mdev->flags)
+		  || !test_bit(PARTNER_CONSISTENT, &mdev->flags) );
+
+	if (newstate & Primary) {
+		if ( test_bit(DISKLESS,&mdev->flags)
+		    && mdev->cstate < Connected ) {
+			/* not even brute force can find data without disk.
+			 * FIXME choose a usefull Error,
+			 * and update drbsetup accordingly */
+			return -EIO;
+		} else if (dont_have_good_data) {
+			/* ok, either we have a disk (which may be inconsistent)
+			 * or we have a connection */
+			if (newstate & DontBlameDrbd) {
+				forced = 1;
+				/* make sure the Human count is increased if
+				 * we got here only because it was forced.
+				 * maybe we want to force a FullSync? */
+				newstate |= Human;
+			} else {
+				return -EIO;
+			}
+		}
+#if 0
+		else if (mdev->cstate >= Connected) {
+			/* do NOT increase the Human count if we are connected,
+			 * and there is no reason for it.  I'm not yet sure
+			 * wether this is what I mean, though...
+			 */
+			newstate &= ~(Human|DontBlameDrbd);
+		}
+#endif
+	}
 
 	drbd_sync_me(mdev);
 
@@ -624,9 +745,17 @@
 	 * but that means someone is misusing DRBD...
 	 * */
 
+	if (forced) {
+		/* this was --do-what-I-say ... */
+		drbd_md_set_flag(mdev,MDF_Consistent);
+	}
+	set_bit(MD_DIRTY,&mdev->flags); // we are changing state!
+	INFO( "%s/%s --> %s/%s\n",
+	      nodestate_to_name(mdev->state),
+	      nodestate_to_name(mdev->o_state),
+	      nodestate_to_name(newstate & 0x03),
+	      nodestate_to_name(mdev->o_state)   );
 	mdev->state = (Drbd_State) newstate & 0x03;
-	INFO( "switched to %s/%s state\n", nodestate_to_name(mdev->state),
-			nodestate_to_name(mdev->o_state) );
 	if(newstate & Primary) {
 		NOT_IN_26( set_device_ro(MKDEV(MAJOR_NR, minor), FALSE ); )
 
@@ -651,16 +780,16 @@
 		ONLY_IN_26( set_disk_ro(mdev->vdisk, TRUE ); )
 	}
 
-	if(!test_bit(DISKLESS,&mdev->flags)) {
-		if(newstate & Secondary) {
-			drbd_al_to_on_disk_bm(mdev);
-		}
-		/* Primary indicator has changed in any case. */
-		drbd_md_write(mdev);
+	if(!test_bit(DISKLESS,&mdev->flags) && (newstate & Secondary)) {
+		drbd_al_to_on_disk_bm(mdev);
 	}
+	/* Primary indicator has changed in any case. */
+	drbd_md_write(mdev);
 
-	if (mdev->cstate >= WFReportParams)
-		drbd_send_param(mdev,0);
+	if (mdev->cstate >= WFReportParams) {
+		/* if this was forced, we should consider sync */
+		drbd_send_param(mdev,forced);
+	}
 
 	return 0;
 }
@@ -675,7 +804,7 @@
 		return -EFAULT;
 	}
 
-	if( mdev->gen_cnt[Flags] & MDF_ConnectedInd) {
+	if( drbd_md_test_flag(mdev,MDF_ConnectedInd) ) {
 		time=p.wfc_timeout;
 		//ERR("using wfc_timeout.\n");
 	} else {
@@ -695,6 +824,7 @@
 				 struct ioctl_syncer_config* arg)
 {
 	struct syncer_config sc;
+	int err;
 
 	if(copy_from_user(&sc,&arg->config,sizeof(sc))) return -EFAULT;
 
@@ -714,28 +844,8 @@
 	mdev->sync_conf.skip       = sc.skip;
 	mdev->sync_conf.al_extents = sc.al_extents;
 
-	if ( !mdev->act_log ||
-	     mdev->act_log->nr_elements != mdev->sync_conf.al_extents )	{
-		struct lru_cache *n,*t;
-		struct lc_element *e;
-		unsigned int in_use=0;
-		int i;
-		n = lc_alloc(mdev->sync_conf.al_extents,
-			     sizeof(struct lc_element), mdev);
-		D_ASSERT(n); // FIXME if (n==NULL) scream out loud ...
-		spin_lock_irq(&mdev->al_lock);
-		t = mdev->act_log;
-		mdev->act_log = n;
-		spin_unlock_irq(&mdev->al_lock);
-		for (i=0; i < t->nr_elements; i++) {
-			e = lc_entry(t,i);
-			if (e->refcnt)
-				ERR("refcnt(%d)==%d\n", e->lc_number, e->refcnt);
-			in_use += e->refcnt;
-		}
-		BUG_ON(in_use);
-		if (t) lc_free(t);
-	}
+	err = drbd_check_al_size(mdev);
+	if (err) return err;
 
 	if (mdev->cstate > WFConnection)
 		drbd_send_sync_param(mdev,&sc);
@@ -745,6 +855,67 @@
 	return 0;
 }
 
+STATIC int drbd_detach_ioctl(drbd_dev *mdev)
+{
+	int would_discard_last_good_data;
+	int interrupted;
+
+	// not during resync. no.
+	if (mdev->cstate > Connected) return -EBUSY;
+
+	/* this was the last good data copy, if:
+	 *  (I am Primary, and not connected ),
+	 *  OR
+	 *  (we are connected, and Peer has no good data himself)
+	 */
+	would_discard_last_good_data =
+		( mdev->state == Primary && mdev->cstate < Connected )
+		||
+		( mdev->cstate >= Connected
+		  && (    test_bit(PARTNER_DISKLESS, &mdev->flags)
+		      || !test_bit(PARTNER_CONSISTENT, &mdev->flags) ) );
+
+	if ( would_discard_last_good_data ) {
+		return -ENETRESET;
+	}
+	if (test_bit(DISKLESS,&mdev->flags) ||
+	    test_bit(PARTNER_DISKLESS,&mdev->flags) ) {
+		return -ENXIO;
+	}
+
+	drbd_sync_me(mdev);
+
+	set_bit(DISKLESS,&mdev->flags);
+	smp_wmb();
+
+	interrupted = wait_event_interruptible(mdev->cstate_wait,
+				      atomic_read(&mdev->local_cnt)==0);
+	if ( interrupted ) {
+		clear_bit(DISKLESS,&mdev->flags);
+		return -EINTR;
+	}
+
+	drbd_free_ll_dev(mdev);
+
+/* FIXME race with sync start
+*/
+	if (mdev->cstate == Connected) drbd_send_param(mdev,0);
+/* FIXME
+* if you detach while connected, you are *at least* inconsistent now,
+* and should clear MDF_Consistent in metadata, and maybe even set the bitmap
+* out of sync.
+* since if you reattach, this might be a different lo dev, and then it needs
+* to receive a sync!
+*/
+	if (mdev->cstate == StandAlone) {
+		// maybe  < Connected is better?
+		set_cstate(mdev,Unconfigured);
+		drbd_mdev_cleanup(mdev);
+		module_put(THIS_MODULE);
+	}
+	return 0;
+}
+
 int drbd_ioctl(struct inode *inode, struct file *file,
 			   unsigned int cmd, unsigned long arg)
 {
@@ -810,10 +981,11 @@
 
 	case DRBD_IOCTL_SET_STATE:
 		if (arg & ~(Primary|Secondary|Human|TimeoutExpired|
-			    DontBlameDrbd) )
-			return -EINVAL;
-
-		err = drbd_set_state(mdev,arg);
+			    DontBlameDrbd) ) {
+			err = -EINVAL;
+		} else {
+			err = drbd_set_state(mdev,arg);
+		}
 		break;
 
 	case DRBD_IOCTL_SET_DISK_CONFIG:
@@ -852,7 +1024,7 @@
 		if (  (   mdev->state  == Primary
 		       && test_bit(DISKLESS,&mdev->flags) )
 		   || (   mdev->o_state == Primary
-		       && test_bit(PARTNER_DISKLESS,&mdev->flags) ) )
+		       && !test_bit(PARTNER_CONSISTENT,&mdev->flags) ) )
 		{
 			err=-ENODATA;
 			break;
@@ -873,55 +1045,7 @@
 
 	case DRBD_IOCTL_UNCONFIG_DISK:
 		if (mdev->cstate == Unconfigured) break;
-
-		if ( mdev->state == Primary && mdev->cstate < Connected) {
-			err=-ENETRESET;
-			break;
-		}
-		/*
-		if (mdev->open_cnt > 1) {
-			err=-EBUSY;
-			break;
-		}
-		*/
-		if (mdev->cstate > Connected) {
-			err=-EBUSY;
-			break;
-		}
-		if (test_bit(DISKLESS,&mdev->flags) ||
-		    test_bit(PARTNER_DISKLESS,&mdev->flags) ) {
-			err=-ENXIO;
-			break;
-		}
-		drbd_sync_me(mdev);
-
-		set_bit(DISKLESS,&mdev->flags);
-		smp_wmb();
-		if ( wait_event_interruptible(mdev->cstate_wait,
-					      atomic_read(&mdev->local_cnt)==0) ) {
-			clear_bit(DISKLESS,&mdev->flags);
-			err=-EINTR;
-			break;
-		}
-
-		drbd_free_ll_dev(mdev);
-
-/* FIXME race with sync start
- */
-		if (mdev->cstate == Connected) drbd_send_param(mdev,0);
-/* FIXME
- * if you detach while connected, you are *at least* inconsistent now,
- * and should clear MDF_Consistent in metadata, and maybe even set the bitmap
- * out of sync.
- * since if you reattach, this might be a different lo dev, and then it needs
- * to receive a sync!
- */
-		if (mdev->cstate == StandAlone) {
-			set_cstate(mdev,Unconfigured);
-			drbd_mdev_cleanup(mdev);
-			module_put(THIS_MODULE);
-		}
-
+		err = drbd_detach_ioctl(mdev);
 		break;
 
 	case DRBD_IOCTL_WAIT_CONNECT:
@@ -980,11 +1104,25 @@
 			break;
 		}
 
+		/* avoid races with set_in_sync
+		 * for successfull mirrored writes
+		 */
+		set_cstate(mdev,WFBitMapT);
+		wait_event(mdev->cstate_wait,
+		     atomic_read(&mdev->ap_bio_cnt)==0);
+
 		drbd_bm_lock(mdev); // racy...
 
+		drbd_md_set_flag(mdev,MDF_FullSync);
+		drbd_md_clear_flag(mdev,MDF_Consistent);
+		drbd_md_write(mdev);
+
 		drbd_bm_set_all(mdev);
 		drbd_bm_write(mdev);
 
+		drbd_md_clear_flag(mdev,MDF_FullSync);
+		drbd_md_write(mdev);
+
 		drbd_send_short_cmd(mdev,BecomeSyncSource);
 		drbd_start_resync(mdev,SyncTarget);
 
@@ -999,11 +1137,29 @@
 			err = -EINPROGRESS;
 			break;
 		}
+		if ( !drbd_md_test_flag(mdev,MDF_Consistent) ) {
+			// FIXME use a more descriptive error number
+			err = -EINVAL;
+			break;
+		}
+
+		drbd_md_set_flag(mdev,MDF_FullSync);
+		drbd_md_write(mdev);
+
+		/* avoid races with set_in_sync
+		 * for successfull mirrored writes
+		 */
+		set_cstate(mdev,WFBitMapS);
+		wait_event(mdev->cstate_wait,
+		     atomic_read(&mdev->ap_bio_cnt)==0);
 
 		drbd_bm_lock(mdev); // racy...
 
 		drbd_bm_set_all(mdev);
 		drbd_bm_write(mdev);
+
+		drbd_md_clear_flag(mdev,MDF_FullSync);
+		drbd_md_write(mdev);
 
 		drbd_send_short_cmd(mdev,BecomeSyncTarget);
 		drbd_start_resync(mdev,SyncSource);
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_int.h,v
retrieving revision 1.58.2.174
retrieving revision 1.58.2.175
diff -u -3 -r1.58.2.174 -r1.58.2.175
--- drbd_int.h	16 Jun 2004 10:00:43 -0000	1.58.2.174
+++ drbd_int.h	17 Jun 2004 01:44:36 -0000	1.58.2.175
@@ -280,20 +280,26 @@
 #define RQ_DRBD_IN_TL     0x0040
 
 enum MetaDataFlags {
-	MDF_Consistent   = 1,
-	MDF_PrimaryInd   = 2,
-	MDF_ConnectedInd = 4,
-};
+	__MDF_Consistent,
+	__MDF_PrimaryInd,
+	__MDF_ConnectedInd,
+	__MDF_FullSync,
+};
+#define MDF_Consistent      (1<<__MDF_Consistent)
+#define MDF_PrimaryInd      (1<<__MDF_PrimaryInd)
+#define MDF_ConnectedInd    (1<<__MDF_ConnectedInd)
+#define MDF_FullSync        (1<<__MDF_FullSync)
+
 /* drbd_meta-data.c (still in drbd_main.c) */
 enum MetaDataIndex {
 	Flags,          /* Consistency flag,connected-ind,primary-ind */
 	HumanCnt,       /* human-intervention-count */
 	TimeoutCnt,     /* timout-count */
 	ConnectedCnt,   /* connected-count */
-	ArbitraryCnt    /* arbitrary-count */
+	ArbitraryCnt,   /* arbitrary-count */
+	GEN_CNT_SIZE	// MUST BE LAST! (and Flags must stay first...)
 };
 
-#define GEN_CNT_SIZE 5
 #define DRBD_MD_MAGIC (DRBD_MAGIC+3) // 3nd incarnation of the file format.
 
 #define DRBD_PANIC 2
@@ -606,9 +612,11 @@
 	UNPLUG_REMOTE,		// whether sending a "WriteHint" makes sense
 	DISKLESS,		// no local disk
 	PARTNER_DISKLESS,	// partner has no storage
+	PARTNER_CONSISTENT,	// partner has consistent data
 	PROCESS_EE_RUNNING,	// eek!
 	MD_IO_ALLOWED,		// EXPLAIN
 	SENT_DISK_FAILURE,	// sending it once is enough
+	MD_DIRTY,		// current gen counts and flags not yet on disk
 };
 
 struct drbd_bitmap; // opaque for Drbd_Conf
@@ -783,11 +791,9 @@
 extern void drbd_dump_md(drbd_dev *, Drbd_Parameter_Packet *, int );
 // maybe define them below as inline?
 extern void drbd_md_inc(drbd_dev *mdev, enum MetaDataIndex order);
-/* comming soon {
 extern void drbd_md_set_flag(drbd_dev *mdev, int flags);
 extern void drbd_md_clear_flag(drbd_dev *mdev, int flags);
 extern int drbd_md_test_flag(drbd_dev *mdev, int flag);
-} */
 
 /* Meta data layout
    We reserve a 128MB Block (4k aligned)
@@ -855,6 +861,7 @@
 
 /* in one sector of the bitmap, we have this many activity_log extents. */
 #define AL_EXT_PER_BM_SECT  (1 << (BM_EXT_SIZE_B - AL_EXTENT_SIZE_B) )
+#define BM_WORDS_PER_AL_EXT (1 << (AL_EXTENT_SIZE_B-BM_BLOCK_SIZE_B-LN2_BPL))
 
 
 /* I want the packet to fit within one page
@@ -886,12 +893,12 @@
 extern int  drbd_bm_set_bit   (drbd_dev *mdev, unsigned long bitnr);
 extern int  drbd_bm_test_bit  (drbd_dev *mdev, unsigned long bitnr);
 extern int  drbd_bm_clear_bit (drbd_dev *mdev, unsigned long bitnr);
-extern int  drbd_bm_e_weight  (drbd_dev *mdev, unsigned int enr);
-extern int  drbd_bm_read_sect (drbd_dev *mdev, sector_t offset);
-extern int  drbd_bm_write_sect(drbd_dev *mdev, sector_t offset);
+extern int  drbd_bm_e_weight  (drbd_dev *mdev, unsigned long enr);
+extern int  drbd_bm_read_sect (drbd_dev *mdev, unsigned long enr);
+extern int  drbd_bm_write_sect(drbd_dev *mdev, unsigned long enr);
 extern void drbd_bm_read      (drbd_dev *mdev);
 extern void drbd_bm_write     (drbd_dev *mdev);
-extern unsigned long drbd_bm_e_set_all   (drbd_dev *mdev, unsigned int enr);
+extern unsigned long drbd_bm_ALe_set_all (drbd_dev *mdev, unsigned long al_enr);
 extern size_t        drbd_bm_words       (drbd_dev *mdev);
 extern unsigned long drbd_bm_find_next   (drbd_dev *mdev);
 extern unsigned long drbd_bm_total_weight(drbd_dev *mdev);
@@ -1077,6 +1084,14 @@
 			drbd_panic("IO error on backing device!\n");
 			break;
 		case Detach:
+			/*lge:
+			 *  I still do not fully grasp when to set or clear
+			 *  this flag... but I want to be able to at least
+			 *  still _try_ and write the "I am inconsistent, and
+			 *  need full sync" information to the MD. */
+			set_bit(MD_IO_ALLOWED,&mdev->flags);
+			drbd_md_set_flag(mdev,MDF_FullSync);
+			drbd_md_clear_flag(mdev,MDF_Consistent);
 			if (!test_and_set_bit(DISKLESS,&mdev->flags)) {
 				smp_mb(); // Nack is sent in w_e handlers.
 				ERR("Local IO failed. Detaching...\n");
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_main.c,v
retrieving revision 1.73.2.187
retrieving revision 1.73.2.188
diff -u -3 -r1.73.2.187 -r1.73.2.188
--- drbd_main.c	15 Jun 2004 10:07:32 -0000	1.73.2.187
+++ drbd_main.c	17 Jun 2004 01:44:37 -0000	1.73.2.188
@@ -348,6 +348,11 @@
  * drbd_io_error: Handles the on_io_error setting, should be called in the
  * unlikely(!drbd_bio_uptodate(e->bio)) case from kernel thread context.
  * See also drbd_chk_io_error
+ *
+ * NOTE: we set ourselves DISKLESS here.
+ * But we try to write the "need full sync bit" here anyways.  This is to make sure
+ * that you get a resynchronisation of the full device the next time you
+ * connect.
  */
 int drbd_io_error(drbd_dev* mdev)
 {
@@ -359,19 +364,29 @@
 	D_ASSERT(test_bit(DISKLESS,&mdev->flags));
 	ok = drbd_send_param(mdev,0);
 	WARN("Notified peer that my disk is broken.\n");
+
+	D_ASSERT(drbd_md_test_flag(mdev,MDF_FullSync));
+	D_ASSERT(!drbd_md_test_flag(mdev,MDF_Consistent));
+	if (test_bit(MD_DIRTY,&mdev->flags)) {
+		// try to get "inconsistent, need full sync" to MD
+		drbd_md_write(mdev);
+	}
+
 	if(mdev->cstate > Connected ) {
 		WARN("Resync aborted.\n");
-		if(mdev->cstate == SyncTarget)
-			set_bit(STOP_SYNC_TIMER,&mdev->flags);
 		set_cstate(mdev,Connected);
 	}
 	if ( wait_event_interruptible_timeout(mdev->cstate_wait,
 		     atomic_read(&mdev->local_cnt) == 0 , HZ ) <= 0) {
 		WARN("Not releasing backing storage device.\n");
+		/* FIXME if there *are* still references,
+		 * we should be here again soon enough.
+		 * but what if not?
+		 * we still should free our ll and md devices */
 	} else {
-		/* FIXME I see a race here, with local_cnt... no?
-		 * it it is harmless, please EXPLAIN why.
-		 */
+		/* no race. since the DISKLESS bit is set first,
+		 * further references to local_cnt are shortlived,
+		 * and no real references on the device. */
 		WARN("Releasing backing storage device.\n");
 		drbd_free_ll_dev(mdev);
 		mdev->la_size=0;
@@ -431,7 +446,11 @@
 	smp_mb();
 	wake_up(&mdev->cstate_wait);
 
-	if ( ( os==SyncSource || os==SyncTarget ) && ns <= Connected ) {
+	/* THINK.
+	 * was:
+	 * if ( ( os==SyncSource || os==SyncTarget ) && ns <= Connected ) {
+	 */
+	if ( ( os >= SyncSource ) && ns <= Connected ) {
 		set_bit(STOP_SYNC_TIMER,&mdev->flags);
 		mod_timer(&mdev->resync_timer,jiffies);
 	}
@@ -654,6 +673,7 @@
 	    && (mdev->cstate == SkippedSyncS || mdev->cstate == SkippedSyncT)
 	    && !sc->skip )
 	{
+		/* FIXME EXPLAIN. I think this cannot work properly! -lge */
 		set_cstate(mdev,WFReportParams);
 		ok = drbd_send_param(mdev,0);
 	}
@@ -663,10 +683,11 @@
 int drbd_send_param(drbd_dev *mdev, int flags)
 {
 	Drbd_Parameter_Packet p;
-	int ok,i;
+	int i, ok, have_disk;
 	unsigned long m_size; // sector_t ??
 
-	if(!test_bit(DISKLESS,&mdev->flags) || test_bit(MD_IO_ALLOWED,&mdev->flags)) {
+	have_disk=inc_local_md_only(mdev);
+	if(have_disk) {
 		D_ASSERT(mdev->backing_bdev);
 		if (mdev->md_index == -1 ) m_size = drbd_md_ss(mdev)>>1;
 		else m_size = drbd_get_capacity(mdev->backing_bdev)>>1;
@@ -679,8 +700,8 @@
 	p.protocol = cpu_to_be32(mdev->conf.wire_protocol);
 	p.version  = cpu_to_be32(PRO_VERSION);
 
-	for(i=Flags;i<=ArbitraryCnt;i++) {
-		p.gen_cnt[i]     = cpu_to_be32(mdev->gen_cnt[i]);
+	for (i = Flags; i < GEN_CNT_SIZE; i++) {
+		p.gen_cnt[i] = cpu_to_be32(mdev->gen_cnt[i]);
 	}
 	p.sync_rate      = cpu_to_be32(mdev->sync_conf.rate);
 	p.sync_use_csums = cpu_to_be32(mdev->sync_conf.use_csums);
@@ -689,6 +710,7 @@
 	p.flags          = cpu_to_be32(flags);
 
 	ok = drbd_send_cmd(mdev,mdev->data.socket,ReportParams,(Drbd_Header*)&p,sizeof(p));
+	if (have_disk) dec_local(mdev);
 	return ok;
 }
 
@@ -707,6 +729,21 @@
 	p  = vmalloc(PAGE_SIZE); // sleeps. cannot fail.
 	buffer = (unsigned long*)p->payload;
 
+	if (drbd_md_test_flag(mdev,MDF_FullSync)) {
+		drbd_bm_set_all(mdev);
+		drbd_bm_write(mdev);
+		if (unlikely(test_bit(DISKLESS,&mdev->flags))) {
+			/* write_bm did fail! panic.
+			 * FIXME can we do something better than panic?
+			 */
+			drbd_panic("Failed to write bitmap to disk\n!");
+			ok = FALSE;
+			goto out;
+		}
+		drbd_md_clear_flag(mdev,MDF_FullSync);
+		drbd_md_write(mdev);
+	}
+
 	/*
 	 * maybe TODO use some simple compression scheme, nowadays there are
 	 * some such algorithms in the kernel anyways.
@@ -722,6 +759,7 @@
 		bm_i += num_words;
 	} while (ok && want);
 
+  out:
 	vfree(p);
 	return ok;
 }
@@ -826,6 +864,10 @@
 	return drop_it; /* && (mdev->state == Primary) */;
 }
 
+#if 0
+/* I suspect this zero copy code somehow is plain wrong!
+ * btw, uml network sockets don't have zero copy,
+ * and fall back to sock_no_sendpage in tcp_sendpage... */
 int _drbd_send_page(drbd_dev *mdev, struct page *page,
 		    int offset, size_t size)
 {
@@ -863,6 +905,16 @@
 		mdev->send_cnt += size>>9;
 	return ok;
 }
+#else
+int _drbd_send_page(drbd_dev *mdev, struct page *page,
+		    int offset, size_t size)
+{
+	int ret;
+	ret = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, 0);
+	kunmap(page);
+	return ret;
+}
+#endif
 
 // Used to send write requests: bh->b_rsector !!
 int drbd_send_dblock(drbd_dev *mdev, drbd_request_t *req)
@@ -915,7 +967,7 @@
 		tl_add(mdev,req);
 		dump_packet(mdev,mdev->data.socket,0,(void*)&p, __FILE__, __LINE__);
 		set_bit(UNPLUG_REMOTE,&mdev->flags);
-		ok = (drbd_send(mdev,mdev->data.socket,&p,sizeof(p),MSG_MORE) == sizeof(p));
+		ok = drbd_send(mdev,mdev->data.socket,&p,sizeof(p),MSG_MORE) == sizeof(p);
 		if(ok) {
 			ok = _drbd_send_zc_bio(mdev,&req->private_bio);
 		}
@@ -1545,6 +1597,8 @@
 	SZO(struct bm_extent);
 	SZO(struct lc_element);
 	SZO(struct semaphore);
+	SZO(struct drbd_request);
+	SZO(struct bio);
 	SZO(wait_queue_head_t);
 	SZO(spinlock_t);
 	return -EBUSY;
@@ -1726,7 +1780,7 @@
 void drbd_free_ll_dev(drbd_dev *mdev)
 {
 	struct file *lo_file;
-	
+
 	lo_file = mdev->lo_file;
 	mdev->lo_file = 0;
 	wmb();
@@ -1794,17 +1848,18 @@
 	sector_t sector;
 	int i;
 
-	if(!inc_local_md_only(mdev)) return;
+	ERR_IF(!inc_local_md_only(mdev)) return;
 
 	down(&mdev->md_io_mutex);
 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
+	memset(buffer,0,512);
 
-	flags=mdev->gen_cnt[Flags] & ~(MDF_PrimaryInd|MDF_ConnectedInd);
-	if(mdev->state==Primary) flags |= MDF_PrimaryInd;
-	if(mdev->cstate>=WFReportParams) flags |= MDF_ConnectedInd;
-	mdev->gen_cnt[Flags]=flags;
+	flags = mdev->gen_cnt[Flags] & ~(MDF_PrimaryInd|MDF_ConnectedInd);
+	if (mdev->state  == Primary)        flags |= MDF_PrimaryInd;
+	if (mdev->cstate >= WFReportParams) flags |= MDF_ConnectedInd;
+	mdev->gen_cnt[Flags] = flags;
 
-	for(i=Flags;i<=ArbitraryCnt;i++)
+	for (i = Flags; i < GEN_CNT_SIZE; i++)
 		buffer->gc[i]=cpu_to_be32(mdev->gen_cnt[i]);
 	buffer->la_size=cpu_to_be64(drbd_get_capacity(mdev->this_bdev)>>1);
 	buffer->magic=cpu_to_be32(DRBD_MD_MAGIC);
@@ -1817,14 +1872,45 @@
 
 	sector = drbd_md_ss(mdev) + MD_GC_OFFSET;
 
-	/* FIXME what if this fails ?? */
-	drbd_md_sync_page_io(mdev,sector,WRITE);
+#if 0
+	/* FIXME sooner or later I'd like to use the MD_DIRTY flag everywhere,
+	 * so we can avoid unneccessary md writes.
+	 */
+	ERR_IF (!test_bit(MD_DIRTY,&mdev->flags)) {
+		dump_stack();
+	}
+#endif
+
+	if (drbd_md_sync_page_io(mdev,sector,WRITE)) {
+		clear_bit(MD_DIRTY,&mdev->flags);
+	} else {
+		if (test_bit(DISKLESS,&mdev->flags)) {
+			/* this was a try anyways ... */
+			ERR("meta data update failed!\n");
+		} else {
+			/* If we cannot write our meta data,
+			 * but we are supposed to be able to,
+			 * tough!
+			 */
+			drbd_panic("meta data update failed!\n");
+		}
+	}
+
+	// why is this here?? please EXPLAIN.
 	mdev->la_size = drbd_get_capacity(mdev->this_bdev)>>1;
 
 	up(&mdev->md_io_mutex);
 	dec_local(mdev);
 }
 
+/*
+ * return:
+ *   < 0 if we had an error (currently never ...)
+ *   = 0 if we need a FullSync because either the flag is set,
+ *       or the gen counts are invalid
+ *   > 0 if we could read valid gen counts,
+ *       and reading the bitmap and act log does make sense.
+ */
 int drbd_md_read(drbd_dev *mdev)
 {
 	struct meta_data_on_disk * buffer;
@@ -1854,7 +1940,7 @@
 	up(&mdev->md_io_mutex);
 	dec_local(mdev);
 
-	return 1;
+	return !drbd_md_test_flag(mdev,MDF_FullSync);
 
  err:
 	up(&mdev->md_io_mutex);
@@ -1862,8 +1948,16 @@
 
 	INFO("Creating state block\n");
 
-	for(i=HumanCnt;i<=ArbitraryCnt;i++) mdev->gen_cnt[i]=1;
-	mdev->gen_cnt[Flags]=MDF_Consistent;
+	/* if we need to create a state block, we are
+	 * not consistent, and need a sync of the full device!
+	 * if one knows what he is doing, he can manipulate gcs by hand,
+	 * and avoid the initial full sync...
+	 * otherwise, one of us will have to be forced (--do-what-I-say)
+	 * to be primary, before anything is usable.
+	 */
+	set_bit(MD_DIRTY,&mdev->flags);
+	mdev->gen_cnt[Flags] = MDF_FullSync;
+	for(i = HumanCnt; i < GEN_CNT_SIZE; i++) mdev->gen_cnt[i]=1;
 
 /* FIXME might have IO errors! */
 	drbd_md_write(mdev);
@@ -1896,6 +1990,8 @@
 			PeGC(ArbitraryCnt),
 			PeGC(Flags) & MDF_PrimaryInd   ? '1' : '0',
 			PeGC(Flags) & MDF_ConnectedInd ? '1' : '0');
+	} else {
+		INFO("Peer Unknown.\n");
 	}
 	if (verbose) {
 		/* TODO
@@ -1920,6 +2016,18 @@
 	int i;
 	u32 me,other;
 
+	/* FIXME
+	 * we should not only rely on the consistent bit, but at least check
+	 * whether the rest of the gencounts is plausible, to detect a previous
+	 * split brain situation, and refuse anything until we are told
+	 * otherwise!
+	 *
+	 * And we should refuse to become SyncSource if we are not consistent!
+	 *
+	 * though DRBD is not to blame for it,
+	 * someone eventually will try to blame it ...
+	 */
+
 	me=mdev->gen_cnt[Flags] & MDF_Consistent;
 	other=be32_to_cpu(partner->gen_cnt[Flags]) & MDF_Consistent;
 	if( me > other ) return 1;
@@ -1940,9 +2048,29 @@
 	return 0;
 }
 
+/* THINK do these have to be protected by some lock ? */
 void drbd_md_inc(drbd_dev *mdev, enum MetaDataIndex order)
 {
+	set_bit(MD_DIRTY,&mdev->flags);
 	mdev->gen_cnt[order]++;
+}
+void drbd_md_set_flag(drbd_dev *mdev, int flag)
+{
+	if ( (mdev->gen_cnt[Flags] & flag) != flag) {
+		set_bit(MD_DIRTY,&mdev->flags);
+		mdev->gen_cnt[Flags] |= flag;
+	}
+}
+void drbd_md_clear_flag(drbd_dev *mdev, int flag)
+{
+	if ( (mdev->gen_cnt[Flags] & flag) != 0 ) {
+		set_bit(MD_DIRTY,&mdev->flags);
+		mdev->gen_cnt[Flags] &= ~flag;
+	}
+}
+int drbd_md_test_flag(drbd_dev *mdev, int flag)
+{
+	return ((mdev->gen_cnt[Flags] & flag) != 0);
 }
 
 module_init(drbd_init)
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_proc.c,v
retrieving revision 1.8.2.31
retrieving revision 1.8.2.32
diff -u -3 -r1.8.2.31 -r1.8.2.32
--- drbd_proc.c	15 Jun 2004 10:07:32 -0000	1.8.2.31
+++ drbd_proc.c	17 Jun 2004 01:44:37 -0000	1.8.2.32
@@ -205,6 +205,7 @@
 			   nodestate_to_name(drbd_conf[i].o_state),
 			   (drbd_conf[i].gen_cnt[Flags]
 			    & MDF_Consistent) ? "Consistent" : "Inconsistent",
+			// FIXME partner consistent?
 			   drbd_conf[i].send_cnt/2,
 			   drbd_conf[i].recv_cnt/2,
 			   drbd_conf[i].writ_cnt/2,
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_receiver.c,v
retrieving revision 1.97.2.171
retrieving revision 1.97.2.172
diff -u -3 -r1.97.2.171 -r1.97.2.172
--- drbd_receiver.c	15 Jun 2004 10:07:32 -0000	1.97.2.171
+++ drbd_receiver.c	17 Jun 2004 01:44:37 -0000	1.97.2.172
@@ -308,7 +308,10 @@
 			schedule();
 			spin_lock_irq(&mdev->ee_lock);
 			finish_wait(&mdev->ee_wait, &wait);
-			if (signal_pending(current)) return 0;
+			if (signal_pending(current)) {
+				WARN("drbd_get_ee interrupted!\n");
+				return 0;
+			}
 			// finish wait is inside, so that we are TASK_RUNNING 
 			// in _drbd_process_ee (which might sleep by itself.)
 			_drbd_process_ee(mdev,&mdev->done_ee);
@@ -698,15 +701,6 @@
 
 	set_cstate(mdev,WFReportParams);
 
-	/* in case one of the other threads said: restart_nowait(receiver),
-	 * it may still hang around itself.  make sure threads are
-	 * really stopped before trying to restart them.
-	 * drbd_disconnect should have taken care of that, but I still
-	 * get these "resync inactive, but callback triggered".
-	 *
-	 * and I saw "connection lost... established", and no more
-	 * worker thread :(
-	 */
 	D_ASSERT(mdev->asender.task == NULL);
 
 	drbd_thread_start(&mdev->asender);
@@ -795,6 +789,8 @@
 		spin_lock_irq(&mdev->ee_lock);
 		drbd_put_ee(mdev,e);
 		spin_unlock_irq(&mdev->ee_lock);
+		WARN("short read receiving data block: read %d expected %d\n",
+			rr, data_size);
 		return 0;
 	}
 	mdev->recv_cnt+=data_size>>9;
@@ -1011,7 +1007,7 @@
 	sector = be64_to_cpu(p->sector);
 
 	e = read_in_block(mdev,data_size);
-	ERR_IF(!e) return FALSE;
+	if (!e) return FALSE;
 	e->block_id = p->block_id; // no meaning on this side, e* on partner
 
 	if(!inc_local(mdev)) {
@@ -1117,7 +1113,12 @@
 		 * resync data block.
 		 * the drbd_work_queue mechanism is made for this...
 		 */
-		drbd_rs_begin_io(mdev,sector);
+		if (!drbd_rs_begin_io(mdev,sector)) {
+			// we have been interrupted, probably connection lost!
+			D_ASSERT(signal_pending(current));
+			drbd_put_ee(mdev,e);
+			return 0;
+		}
 		break;
 	default:
 		D_ASSERT(0);
@@ -1157,6 +1158,117 @@
 	return ok;
 }
 
+STATIC int drbd_sync_handshake(drbd_dev *mdev, Drbd_Parameter_Packet *p)
+{
+	int have_good,sync;
+
+	have_good = drbd_md_compare(mdev,p);
+
+	if(have_good==0) {
+		if (drbd_md_test_flag(mdev,MDF_PrimaryInd)) {
+			/* gen counts compare the same, but I have the
+			 * PrimaryIndicator set.  so the peer has, too
+			 * (otherwise this would not compare the same).
+			 * so we had a split brain!
+			 *
+			 * FIXME maybe log MDF_SplitBran into metadata,
+			 * and refuse to do anything until told otherwise!
+			 *
+			 * for now: just go StandAlone.
+			 */
+			ALERT("Split-Brain detected, dropping connection!\n");
+			set_cstate(mdev,StandAlone);
+			drbd_thread_stop_nowait(&mdev->receiver);
+			return FALSE;
+		}
+		sync=0;
+	} else {
+		sync=1;
+	}
+
+	drbd_dump_md(mdev,p,0);
+	// INFO("have_good=%d sync=%d\n", have_good, sync);
+
+	if (have_good > 0 && !drbd_md_test_flag(mdev,MDF_Consistent)) {
+		/* doh. I cannot become SyncSource when I am inconsistent!
+		 */
+		ERR("I shall become SyncSource, but I am inconsistent!\n");
+		set_cstate(mdev,StandAlone);
+		drbd_thread_stop_nowait(&mdev->receiver);
+		return FALSE;
+	}
+	if (have_good < 0 &&
+	    !(be32_to_cpu(p->gen_cnt[Flags]) & MDF_Consistent) ) {
+		/* doh. Peer cannot become SyncSource when inconsistent
+		 */
+		ERR("I shall become SyncTarget, but Peer is inconsistent!\n");
+		set_cstate(mdev,StandAlone);
+		drbd_thread_stop_nowait(&mdev->receiver);
+		return FALSE;
+	}
+
+	if ( mdev->sync_conf.skip && sync ) {
+		if (have_good == 1)
+			set_cstate(mdev,SkippedSyncS);
+		else // have_good == -1
+			set_cstate(mdev,SkippedSyncT);
+		return TRUE;
+	}
+
+	if( sync ) {
+		if(have_good == 1) {
+			D_ASSERT(drbd_md_test_flag(mdev,MDF_Consistent));
+			set_cstate(mdev,WFBitMapS);
+			wait_event(mdev->cstate_wait,
+			     atomic_read(&mdev->ap_bio_cnt)==0);
+			drbd_send_bitmap(mdev);
+		} else { // have_good == -1
+			if ( (mdev->state == Primary) &&
+			     drbd_md_test_flag(mdev,MDF_Consistent) ) {
+				/* FIXME
+				 * allow Primary become SyncTarget if it was
+				 * diskless, and now had a storage reattached.
+				 * only somewhere the MDF_Consistent flag is
+				 * set where it should not... I think.
+				 */
+				ERR("Current Primary shall become sync TARGET!"
+				    " Aborting to prevent data corruption.\n");
+				set_cstate(mdev,StandAlone);
+				drbd_thread_stop_nowait(&mdev->receiver);
+				return FALSE;
+			}
+			drbd_md_clear_flag(mdev,MDF_Consistent);
+			set_cstate(mdev,WFBitMapT);
+		}
+	} else {
+		set_cstate(mdev,Connected);
+		if(mdev->rs_total) {
+			if (drbd_md_test_flag(mdev,MDF_Consistent)) {
+				/* We are not going to do a resync but there
+				   are marks in the bitmap.
+				   (Could be from the AL, or someone used
+				   the write_gc.pl program)
+				   Clean the bitmap...
+				 */
+				INFO("No resync -> clearing bit map.\n");
+				drbd_bm_set_all(mdev);
+				drbd_bm_write(mdev);
+			} else {
+				WARN("I am inconsistent, but there is no sync? BOTH nodes inconsistent!\n");
+			}
+		}
+	}
+
+	if (have_good == -1) {
+		/* Sync-Target has to adopt source's gen_cnt. */
+		int i;
+		for(i=HumanCnt;i<=ArbitraryCnt;i++) {
+			mdev->gen_cnt[i]=be32_to_cpu(p->gen_cnt[i]);
+		}
+	}
+	return TRUE;
+}
+
 STATIC int receive_param(drbd_dev *mdev, Drbd_Header *h)
 {
 	Drbd_Parameter_Packet *p = (Drbd_Parameter_Packet*)h;
@@ -1194,6 +1306,7 @@
 
 	if(p_size == 0 && test_bit(DISKLESS,&mdev->flags)) {
 		ERR("some backing storage is needed\n");
+		set_cstate(mdev,StandAlone);
 		drbd_thread_stop_nowait(&mdev->receiver);
 		return FALSE;
 	}
@@ -1201,6 +1314,14 @@
 	drbd_bm_lock(mdev);
 	mdev->p_size=p_size;
 
+	set_bit(MD_DIRTY,&mdev->flags); // we are changing state!
+
+/*lge:
+ * FIXME
+ * please get the order of tests (re)settings for consider_sync
+ * right, and comment them!
+ */
+
 	consider_sync = (mdev->cstate == WFReportParams);
 	if(drbd_determin_dev_size(mdev)) consider_sync=0;
 
@@ -1226,8 +1347,23 @@
 	}
 
 	if(!p_size) {
-		if (!test_and_set_bit(PARTNER_DISKLESS, &mdev->flags))
+		/* no point in trying to sync a diskless peer: */
+		consider_sync = 0;
+		if (!test_and_set_bit(PARTNER_DISKLESS, &mdev->flags)) {
+			/* if we got here, we *do* have a disk.
+			 * but it may be inconsistent...
+			 * anyways, record that next time we need a full sync.
+			 */
+			clear_bit(PARTNER_CONSISTENT, &mdev->flags);
+			drbd_md_set_flag(mdev,MDF_FullSync);
+			drbd_md_write(mdev);
+			/* actually we'd need to bm_fill_bm(,-1); drbd_write_bm(mdev);
+			 * but this is not necessary _now_.
+			 * we have the MDF_FullSync bit on disk.
+			 * on the next _drbd_send_bitmap this will be done.
+			 */
 			WARN("PARTNER DISKLESS\n");
+		}
 		if(mdev->cstate >= Connected ) {
 			if(mdev->state == Primary) tl_clear(mdev);
 			if(mdev->state == Primary ||
@@ -1246,77 +1382,20 @@
 			WARN("Partner no longer diskless\n");
 	}
 
+	if (be32_to_cpu(p->gen_cnt[Flags]) & MDF_Consistent) {
+		set_bit(PARTNER_CONSISTENT, &mdev->flags);
+	} else {
+		clear_bit(PARTNER_CONSISTENT, &mdev->flags);
+	}
+
 	if (mdev->cstate == WFReportParams) {
 		INFO("Connection established.\n");
 	}
 
 	if (consider_sync) {
-		int have_good,sync;
-
-		have_good = drbd_md_compare(mdev,p);
-
-		if(have_good==0) sync=0;
-		else sync=1;
-
-		drbd_dump_md(mdev,p,0);
-		//INFO("have_good=%d sync=%d\n", have_good, sync);
-
-		if ( mdev->sync_conf.skip && sync ) {
-			if (have_good == 1)
-				set_cstate(mdev,SkippedSyncS);
-			else // have_good == -1
-				set_cstate(mdev,SkippedSyncT);
-			goto skipped;
-		}
-
-		if( sync ) {
-			if(have_good == 1) {
-				set_cstate(mdev,WFBitMapS);
-				wait_event(mdev->cstate_wait,
-				     atomic_read(&mdev->ap_bio_cnt)==0);
-				drbd_send_bitmap(mdev);
-			} else { // have_good == -1
-				if ( (mdev->state == Primary) &&
-				     (mdev->gen_cnt[Flags] & MDF_Consistent) ) {
-					/* FIXME
-					 * allow Primary become SyncTarget if it was diskless, and now had a storage reattached.
-					 * only somewhere the MDF_Consistent flag is set where it should not... I think.
-					 */
-					ERR("Current Primary shall become sync TARGET! Aborting to prevent data corruption.\n");
-					set_cstate(mdev,StandAlone);
-					drbd_thread_stop_nowait(&mdev->receiver);
-					drbd_bm_unlock(mdev);
-					return FALSE;
-				}
-				mdev->gen_cnt[Flags] &= ~MDF_Consistent;
-				set_cstate(mdev,WFBitMapT);
-			}
-		} else {
-			set_cstate(mdev,Connected);
-			if(drbd_bm_total_weight(mdev)) {
-				/* We are not going to do a resync but there
-				   are marks in the bitmap.
-				   (Could be from the AL, or someone used
-				   the write_gc.pl program)
-				   Clean the bitmap...
-				 */
-				INFO("No resync -> clearing bit map.\n");
-				drbd_bm_clear_all(mdev);
-				drbd_bm_write(mdev);
-			}
-		}
-
-		if (have_good == -1) {
-			/* Sync-Target has to adopt source's gen_cnt. */
-			int i;
-			for(i=HumanCnt;i<=ArbitraryCnt;i++) {
-				mdev->gen_cnt[i]=be32_to_cpu(p->gen_cnt[i]);
-			}
-		}
+		if (!drbd_sync_handshake(mdev,p)) return FALSE;
 	}
 
-skipped:	// do not adopt gen counts when sync was skipped ...
-
 	if (mdev->cstate == WFReportParams) set_cstate(mdev,Connected);
 	// see above. if (p_size && mdev->cstate==Connected) clear_bit(PARTNER_DISKLESS,&mdev->flags);
 
@@ -1326,8 +1405,11 @@
 		drbd_md_inc(mdev,ConnectedCnt);
 	}
 	if (oo_state != mdev->o_state) {
-		INFO( "now %s/%s\n", nodestate_to_name(mdev->state),
-				nodestate_to_name(mdev->o_state) );
+		INFO( "%s/%s --> %s/%s\n",
+		      nodestate_to_name(mdev->state),
+		      nodestate_to_name(oo_state),
+		      nodestate_to_name(mdev->state),
+		      nodestate_to_name(mdev->o_state) );
 	}
 
 	drbd_md_write(mdev); // update connected indicator, la_size, ...
@@ -1586,6 +1668,7 @@
 	D_ASSERT(mdev->oldest_barrier->n_req == 0);
 
 	// both
+	clear_bit(PARTNER_CONSISTENT, &mdev->flags);
 	clear_bit(PARTNER_DISKLESS,&mdev->flags);
 
 	D_ASSERT(mdev->ee_in_use == 0);
@@ -1622,7 +1705,7 @@
 
 	if ( mdev->state == Primary &&
 	    ( test_bit(DISKLESS,&mdev->flags)
-	    || !(mdev->gen_cnt[Flags] & MDF_Consistent) ) ) {
+	    || !drbd_md_test_flag(mdev,MDF_Consistent) ) ) {
 		drbd_panic("Sorry, I have no access to good data anymore.\n");
 	}
 
@@ -1748,10 +1831,11 @@
 
 STATIC int got_NegAck(drbd_dev *mdev, Drbd_Header* h)
 {
-#if 0
 	Drbd_BlockAck_Packet *p = (Drbd_BlockAck_Packet*)h;
+#if 0
 	sector_t sector = be64_to_cpu(p->sector);
 	int size = be32_to_cpu(p->blksize);
+#endif
 
 	/* do nothing here.
 	 * we expect to get a "report param" on the data socket soon,
@@ -1759,7 +1843,9 @@
 	 */
 	if(is_syncer_blk(mdev,p->block_id)) {
 		dec_rs_pending(mdev,HERE);
-	} else {
+	}
+#if 0
+	else {
 		D_ASSERT(bm_get_bit(mdev->mbds_id,sector,size));
 		// tl_clear() must have set this out of sync!
 		D_ASSERT(mdev->conf.wire_protocol != DRBD_PROT_A);
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_req-2.4.c,v
retrieving revision 1.33.2.85
retrieving revision 1.33.2.86
diff -u -3 -r1.33.2.85 -r1.33.2.86
--- drbd_req-2.4.c	15 Jun 2004 10:07:32 -0000	1.33.2.85
+++ drbd_req-2.4.c	17 Jun 2004 01:44:37 -0000	1.33.2.86
@@ -146,7 +146,7 @@
 	unsigned long sbnr,ebnr,bnr;
 	sector_t esector, nr_sectors;
 
-	if (mdev->gen_cnt[Flags] & MDF_Consistent) return 1;
+	if (drbd_md_test_flag(mdev,MDF_Consistent)) return 1;
 
 	nr_sectors = drbd_get_capacity(mdev->this_bdev);
 	esector = sector + (size>>9) -1;
@@ -192,7 +192,7 @@
 	 * the connection *after* we test for the cstate.
 	 */
 	if ( (    test_bit(DISKLESS,&mdev->flags)
-	      || !(mdev->gen_cnt[Flags] & MDF_Consistent)
+	      || !drbd_md_test_flag(mdev,MDF_Consistent)
 	     ) && mdev->cstate < Connected )
 	{
 		ERR("Sorry, I have no access to good data anymore.\n");
@@ -260,7 +260,7 @@
 				dec_local(mdev);
 			}
 		}
-		remote = !local;
+		remote = !local && test_bit(PARTNER_CONSISTENT, &mdev->flags);
 	} else {
 		remote = 1;
 	}
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/lru_cache.c,v
retrieving revision 1.1.2.27
retrieving revision 1.1.2.28
diff -u -3 -r1.1.2.27 -r1.1.2.28
--- lru_cache.c	8 Jun 2004 12:04:35 -0000	1.1.2.27
+++ lru_cache.c	17 Jun 2004 01:44:37 -0000	1.1.2.28
@@ -259,7 +259,7 @@
 	if ( --e->refcnt == 0) {
 		list_move(&e->list,&lc->lru); // move it to the front of LRU.
 		clear_bit(__LC_STARVING,&lc->flags);
-		smp_mb__after_clear_bit();		
+		smp_mb__after_clear_bit();
 	}
 	RETURN(e->refcnt);
 }