[DRBD-cvs] drbd by phil; * Removed DRBD_IOCTL_UNCONFIG_BOTH ioctl...

drbd-user@lists.linbit.com drbd-user@lists.linbit.com
Sun, 8 Feb 2004 13:26:19 +0100 (CET)


DRBD CVS committal

Author  : phil
Module  : drbd

Dir     : drbd/drbd


Modified Files:
      Tag: rel-0_7-branch
	drbd.h drbd_actlog.c drbd_dsender.c drbd_fs.c drbd_int.h 
	drbd_main.c drbd_proc.c drbd_receiver.c drbd_req-2.4.c 


Log Message:
* Removed DRBD_IOCTL_UNCONFIG_BOTH ioctl()
* introduced inc_local() and dec_local() to track usage of local
  disk.
* removed state_wait. Made the remaining users use cstate_wait.
* Introduced various NegAck Packets, to handle disk failures on
  seconary nodes.

drbdsetup /dev/nbd/x detach 
drbdsetup /dev/nbd/x disk

should now be free of races. 

[Completely untested!]

===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd.h,v
retrieving revision 1.34.2.20
retrieving revision 1.34.2.21
diff -u -3 -r1.34.2.20 -r1.34.2.21
--- drbd.h	6 Feb 2004 15:43:55 -0000	1.34.2.20
+++ drbd.h	8 Feb 2004 12:26:13 -0000	1.34.2.21
@@ -187,7 +187,6 @@
 #define DRBD_IOCTL_SET_DISK_CONFIG _IOW( 'D', 0x06, struct ioctl_disk_config )
 #define DRBD_IOCTL_SET_NET_CONFIG _IOW( 'D', 0x07, struct ioctl_net_config )
 #define DRBD_IOCTL_UNCONFIG_NET  _IO ( 'D', 0x08 )
-#define DRBD_IOCTL_UNCONFIG_BOTH _IO ( 'D', 0x09 )
 #define DRBD_IOCTL_GET_CONFIG    _IOW( 'D', 0x0A, struct ioctl_get_config)
 #define DRBD_IOCTL_SECONDARY_REM _IOR( 'D', 0x0C, int )
 #define DRBD_IOCTL_INVALIDATE    _IO ( 'D', 0x0D )
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_actlog.c,v
retrieving revision 1.1.2.69
retrieving revision 1.1.2.70
diff -u -3 -r1.1.2.69 -r1.1.2.70
--- drbd_actlog.c	7 Feb 2004 16:37:49 -0000	1.1.2.69
+++ drbd_actlog.c	8 Feb 2004 12:26:14 -0000	1.1.2.70
@@ -144,6 +144,7 @@
 	unsigned int enr = (sector >> (AL_EXTENT_SIZE_B-9));
 	struct lc_element *al_ext;
 
+	D_ASSERT(atomic_read(&mdev->local_cnt)>0);
 	wait_event(mdev->al_wait, (al_ext = _al_get(mdev,enr)) );
 
 	if (al_ext->lc_number != enr) {
@@ -425,12 +426,13 @@
 {
 	unsigned int exts,i;
 
-	if( mdev->lo_file == 0) return;
+	if( !inc_local_md_only(mdev) ) return;
 	exts = div_ceil(mdev->mbds_id->size,BM_EXTENT_SIZE);
 
 	for(i=0;i<exts;i++) {
 		drbd_update_on_disk_bm(mdev,i);
 	}
+	dec_local(mdev);
 }
 
 static inline int _try_lc_del(struct Drbd_Conf *mdev,struct lc_element *al_ext)
@@ -528,6 +530,7 @@
 	int want,buf_i,bm_words,bm_i;
 	sector_t sector;
 
+	D_ASSERT(atomic_read(&mdev->local_cnt)>0);
 	enr = (enr & ~(EXTENTS_PER_SECTOR-1) );
 
 	bm = mdev->mbds_id->bm;
@@ -558,7 +561,13 @@
 {
 	struct update_odbm_work *udw = (struct update_odbm_work*)w;
 
+	if( !inc_local_md_only(mdev) ) {
+		WARN("Can not update on disk bitmap, local IO disabled.\n");
+		return 1;
+	}
+
 	drbd_update_on_disk_bm(mdev,udw->enr);
+	dec_local(mdev);
 
 	kfree(udw);
 
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_dsender.c,v
retrieving revision 1.1.2.69
retrieving revision 1.1.2.70
diff -u -3 -r1.1.2.69 -r1.1.2.70
--- drbd_dsender.c	7 Feb 2004 16:37:49 -0000	1.1.2.69
+++ drbd_dsender.c	8 Feb 2004 12:26:14 -0000	1.1.2.70
@@ -149,6 +149,7 @@
 
 	drbd_end_req(req, RQ_DRBD_WRITTEN, uptodate, drbd_req_get_sector(req));
 	drbd_al_complete_io(mdev,drbd_req_get_sector(req));
+	dec_local(mdev);
 }
 
 #else
@@ -371,6 +372,7 @@
 
 	ok=drbd_send_block(mdev, DataReply, e);
 	dec_unacked(mdev,HERE); // THINK unconditional?
+	dec_local(mdev);
 
 	spin_lock_irq(&mdev->ee_lock);
 	drbd_put_ee(mdev,e);
@@ -389,6 +391,7 @@
 	inc_rs_pending(mdev);
 	ok=drbd_send_block(mdev, DataReply, e);
 	dec_unacked(mdev,HERE); // THINK unconditional?
+	dec_local(mdev);
 
 	spin_lock_irq(&mdev->ee_lock);
 	drbd_put_ee(mdev,e);
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_fs.c,v
retrieving revision 1.28.2.66
retrieving revision 1.28.2.67
diff -u -3 -r1.28.2.66 -r1.28.2.67
--- drbd_fs.c	7 Feb 2004 16:37:49 -0000	1.28.2.66
+++ drbd_fs.c	8 Feb 2004 12:26:14 -0000	1.28.2.67
@@ -180,7 +180,7 @@
 	minor=(int)(mdev-drbd_conf);
 
 	/* if you want to reconfigure, please tear down first */
-	if (mdev->lo_file)
+	if (!test_bit(DISKLESS,&mdev->flags))
 		return -EBUSY;
 
 	/* FIXME if this was "adding" a lo dev to a previously "diskless" node,
@@ -326,6 +326,7 @@
 })
 #undef min_not_zero
 
+	set_bit(MD_IO_ALLOWED,&mdev->flags);
 	i = drbd_md_read(mdev);
 	drbd_determin_dev_size(mdev);
 	if(i) drbd_read_bm(mdev);
@@ -360,6 +361,10 @@
 	if(mdev->cstate == Unconfigured ) set_cstate(mdev,StandAlone);
 	if(mdev->cstate >= Connected ) {
 		drbd_send_param(mdev,1);
+	} else {
+		clear_bit(DISKLESS,&mdev->flags);
+		smp_wmb();
+		clear_bit(MD_IO_ALLOWED,&mdev->flags);
 	}
 
 	return 0;
@@ -544,7 +549,7 @@
 	drbd_sync_me(mdev);
 
 	/* Wait until nothing is on the fly :) */
-	if ( wait_event_interruptible( mdev->state_wait,
+	if ( wait_event_interruptible( mdev->cstate_wait,
 			atomic_read(&mdev->ap_pending_cnt) == 0 ) ) {
 ONLY_IN_26(
 		if ( newstate & Secondary )
@@ -760,45 +765,35 @@
 		set_cstate(mdev,StandAlone);
 		break;
 
-	case DRBD_IOCTL_UNCONFIG_BOTH:
+	case DRBD_IOCTL_UNCONFIG_DISK:
 		if (mdev->cstate == Unconfigured) break;
 
+		if ( mdev->state == Primary && mdev->cstate < Connected) {
+			err=-EBUSY; // TODO error printf in drbdsetup.c
+			break;
+		}
+		/*
 		if (mdev->open_cnt > 1) {
 			err=-EBUSY;
 			break;
 		}
-
-		drbd_sync_me(mdev);
-		set_bit(DO_NOT_INC_CONCNT,&mdev->flags);
-		drbd_thread_stop(&mdev->worker);
-		drbd_thread_stop(&mdev->asender);
-		drbd_thread_stop(&mdev->receiver);
-		drbd_free_resources(mdev);
-		if (mdev->mbds_id) {
-			bm_resize(mdev->mbds_id,0);
-			drbd_set_my_capacity(mdev,0);
-		}
-
-		set_cstate(mdev,Unconfigured);
-		mdev->state = Secondary;
-
-		break;
-
-	case DRBD_IOCTL_UNCONFIG_DISK:
-		if (mdev->cstate == Unconfigured) break;
-
+		*/
 		if (mdev->cstate > Connected) {
 			err=-EBUSY;
 			break;
 		}
-		if (!mdev->lo_file || 
+		if (test_bit(DISKLESS,&mdev->flags) ||
 		    test_bit(PARTNER_DISKLESS,&mdev->flags) ) {
 			err=-ENXIO;
 			break;
 		}
-		// TODO: Fix all this. Currently it is the 
-		// blissfully ignorant implementation.
-		drbd_free_ll_dev(mdev); 
+		drbd_sync_me(mdev);
+
+		set_bit(DISKLESS,&mdev->flags);
+		smp_mb__after_clear_bit();
+		wait_event(mdev->cstate_wait,atomic_read(&mdev->local_cnt)==0);
+		drbd_free_ll_dev(mdev);
+
 		if (mdev->cstate == Connected) drbd_send_param(mdev,0);
 		if (mdev->cstate == StandAlone) set_cstate(mdev,Unconfigured);
 
@@ -850,7 +845,7 @@
 
 	case DRBD_IOCTL_INVALIDATE:
 		if( mdev->cstate != Connected ||
-		    !mdev->lo_file || 
+		    test_bit(DISKLESS,&mdev->flags) || 
 		    test_bit(PARTNER_DISKLESS,&mdev->flags) ) {
 			err = -EINPROGRESS;
 			break;
@@ -865,7 +860,7 @@
 
 	case DRBD_IOCTL_INVALIDATE_REM:
 		if( mdev->cstate != Connected ||
-		    !mdev->lo_file || 
+		    test_bit(DISKLESS,&mdev->flags) || 
 		    test_bit(PARTNER_DISKLESS,&mdev->flags) ) {
 			err = -EINPROGRESS;
 			break;
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_int.h,v
retrieving revision 1.58.2.117
retrieving revision 1.58.2.118
diff -u -3 -r1.58.2.117 -r1.58.2.118
--- drbd_int.h	7 Feb 2004 16:37:49 -0000	1.58.2.117
+++ drbd_int.h	8 Feb 2004 12:26:14 -0000	1.58.2.118
@@ -294,21 +294,25 @@
 typedef enum {
 	Data,
 	DataReply,
-	RecvAck,      // Used in protocol B
-	WriteAck,     // Used in protocol C
 	Barrier,
-	BarrierAck,
 	ReportParams,
 	ReportBitMap,
-	Ping,
-	PingAck,
 	BecomeSyncTarget,
 	BecomeSyncSource,
 	BecomeSec,     // Secondary asking primary to become secondary
-	WriteHint,     // Used in protocol C to hint the secondary to call tq_disk
+	WriteHint,     // Used in protocol C to hint the secondary to hurry up
 	DataRequest,   // Used to ask for a data block
 	RSDataRequest, // Used to ask for a data block
 	SyncParam,
+
+	Ping,         // These are sent on the meta socket...
+	PingAck,
+	RecvAck,      // Used in protocol B
+	WriteAck,     // Used in protocol C
+	NegAck,       // Sent if local disk is unusable
+	NegDReply,    // Local disk is broken...
+	BarrierAck,
+
 	MAX_CMD,
 	MayIgnore = 0x100, // Flag only to test if (cmd > MayIgnore) ...
 	MAX_OPT_CMD,
@@ -570,8 +574,10 @@
 #define STOP_SYNC_TIMER    4
 #define DO_NOT_INC_CONCNT  5
 #define WRITE_HINT_QUEUED  6
-#define PARTNER_DISKLESS   7
+#define DISKLESS           7
+#define PARTNER_DISKLESS   8
 #define PROCESS_EE_RUNNING 9
+#define MD_IO_ALLOWED     10
 
 struct BitMap {
 	unsigned long dev_size;
@@ -658,8 +664,7 @@
 	unsigned long p_size;     /* partner's disk size */
 	Drbd_State state;
 	Drbd_CState cstate;
-	wait_queue_head_t cstate_wait;
-	wait_queue_head_t state_wait;  // TODO: Remove state_wait.
+	wait_queue_head_t cstate_wait; // TODO Rename into "misc_wait". 
 	Drbd_State o_state;
 	unsigned long int la_size; // last agreed disk size
 	unsigned int send_cnt;
@@ -671,6 +676,7 @@
 	atomic_t ap_pending_cnt;
 	atomic_t rs_pending_cnt;
 	atomic_t unacked_cnt;
+	atomic_t local_cnt;
 	spinlock_t req_lock;
 	spinlock_t tl_lock;
 	struct drbd_barrier* newest_barrier;
@@ -1029,7 +1035,7 @@
 static inline void dec_ap_pending(drbd_dev* mdev, const char* where)
 {
 	if(atomic_dec_and_test(&mdev->ap_pending_cnt))
-		wake_up_interruptible(&mdev->state_wait);
+		wake_up_interruptible(&mdev->cstate_wait);
 
 	if(atomic_read(&mdev->ap_pending_cnt)<0)
 		ERR("in %s: pending_cnt = %d < 0 !\n",
@@ -1065,6 +1071,44 @@
 		ERR("in %s: unacked_cnt = %d < 0 !\n",
 		    where,
 		    atomic_read(&mdev->unacked_cnt));
+}
+
+/**
+ * inc_local: Returns TRUE when local IO is possible. If it returns
+ * TRUE you should call dec_local() after IO is completed.
+ */
+static inline int inc_local(drbd_dev* mdev)
+{
+	int io_allowed;
+	atomic_inc(&mdev->local_cnt);
+	io_allowed = !test_bit(DISKLESS,&mdev->flags);
+	if( !io_allowed ) {
+		atomic_dec(&mdev->local_cnt);
+	}
+	return io_allowed;
+}
+
+static inline int inc_local_md_only(drbd_dev* mdev)
+{
+	int io_allowed;
+	atomic_inc(&mdev->local_cnt);
+	io_allowed = !test_bit(DISKLESS,&mdev->flags) ||
+		test_bit(MD_IO_ALLOWED,&mdev->flags);
+	if( !io_allowed ) {
+		atomic_dec(&mdev->local_cnt);
+	}
+	return io_allowed;
+}
+
+static inline void dec_local(drbd_dev* mdev)
+{
+	if(atomic_dec_and_test(&mdev->local_cnt) && 
+	   test_bit(DISKLESS,&mdev->flags) &&
+	   mdev->lo_file) {
+		wake_up(&mdev->cstate_wait);
+	}
+
+	D_ASSERT(atomic_read(&mdev->local_cnt)>0);
 }
 
 static inline void drbd_set_out_of_sync(drbd_dev* mdev,
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_main.c,v
retrieving revision 1.73.2.124
retrieving revision 1.73.2.125
diff -u -3 -r1.73.2.124 -r1.73.2.125
--- drbd_main.c	7 Feb 2004 16:37:49 -0000	1.73.2.124
+++ drbd_main.c	8 Feb 2004 12:26:14 -0000	1.73.2.125
@@ -341,6 +341,12 @@
 		mdev->resync_work.cb = w_resume_next_sg;
 		_drbd_queue_work(&mdev->data.work,&mdev->resync_work);
 	}
+	if(test_bit(MD_IO_ALLOWED,&mdev->flags) &&
+	   test_bit(DISKLESS,&mdev->flags) && ns < Connected) {
+		clear_bit(DISKLESS,&mdev->flags);
+		smp_wmb();
+		clear_bit(MD_IO_ALLOWED,&mdev->flags);
+	}
 }
 
 STATIC int drbd_thread_setup(void* arg)
@@ -550,7 +556,7 @@
 	int ok,i;
 	unsigned long m_size; // sector_t ??
 
-	if(mdev->lo_file) {
+	if(!test_bit(DISKLESS,&mdev->flags)) {
 		if (mdev->md_index == -1 ) m_size = drbd_md_ss(mdev)>>1;
 		else m_size = drbd_get_capacity(mdev->backing_bdev)>>1;
 	} else m_size = 0;
@@ -1021,10 +1027,11 @@
 #ifdef PARANOIA
 	SET_MDEV_MAGIC(mdev);
 #endif
+	mdev->flags = 1<<DISKLESS;
 
 	/* If the WRITE_HINT_QUEUED flag is set but it is not
 	   actually queued the functionality is completely disabled */
-	if (disable_io_hints) mdev->flags=1<<WRITE_HINT_QUEUED;
+	if (disable_io_hints) mdev->flags |= 1<<WRITE_HINT_QUEUED;
 
 	mdev->sync_conf.rate       = 250;
 	mdev->sync_conf.al_extents = 128; // 512 MB active set
@@ -1035,6 +1042,7 @@
 	atomic_set(&mdev->ap_pending_cnt,0);
 	atomic_set(&mdev->rs_pending_cnt,0);
 	atomic_set(&mdev->unacked_cnt,0);
+	atomic_set(&mdev->local_cnt,0);
 
 	init_MUTEX(&mdev->device_mutex);
 	init_MUTEX(&mdev->md_io_mutex);
@@ -1064,7 +1072,6 @@
 	mdev->resync_work.cb = w_resync_inactive;
 	init_timer(&mdev->resync_timer);
 
-	init_waitqueue_head(&mdev->state_wait);
 	init_waitqueue_head(&mdev->cstate_wait);
 	init_waitqueue_head(&mdev->ee_wait);
 	init_waitqueue_head(&mdev->al_wait);
@@ -1471,9 +1478,15 @@
 
 void drbd_free_ll_dev(drbd_dev *mdev)
 {
-	if (mdev->lo_file) {
+	struct file *lo_file;
+	
+	lo_file = mdev->lo_file;
+	mdev->lo_file = 0;
+	wmb();
+
+	if (lo_file) {
 NOT_IN_26(
-		blkdev_put(mdev->lo_file->f_dentry->d_inode->i_bdev,BDEV_FILE);
+		blkdev_put(lo_file->f_dentry->d_inode->i_bdev,BDEV_FILE);
 		blkdev_put(mdev->md_file->f_dentry->d_inode->i_bdev,BDEV_FILE);
 )
 ONLY_IN_26(
@@ -1483,9 +1496,9 @@
 		mdev->md_bdev =
 		mdev->backing_bdev = 0;
 
-		fput(mdev->lo_file);
+		fput(lo_file);
 		fput(mdev->md_file);
-		mdev->lo_file = 0;
+		// mdev->lo_file = 0;
 		mdev->md_file = 0;
 	}
 }
@@ -1856,7 +1869,7 @@
 	}
 )
 
-	if( mdev->lo_file == 0) return;
+	if(!inc_local_md_only(mdev)) return;
 
 	down(&mdev->md_io_mutex);
 	buffer = (struct meta_data_on_disk *)kmap(mdev->md_io_page);
@@ -1884,6 +1897,7 @@
 	drbd_md_sync_page_io(mdev,sector,WRITE);
 
 	up(&mdev->md_io_mutex);
+	dec_local(mdev);
 }
 
 int drbd_md_read(drbd_dev *mdev)
@@ -1892,7 +1906,7 @@
 	sector_t sector;
 	int i;
 
-	if( mdev->lo_file == 0) return -1;
+	if(!inc_local_md_only(mdev)) return -1;
 
 	down(&mdev->md_io_mutex);
 
@@ -1911,11 +1925,14 @@
 
 	kunmap(mdev->md_io_page);
 	up(&mdev->md_io_mutex);
+	dec_local(mdev);
+	
 	return 1;
 
  err:
 	kunmap(mdev->md_io_page);
 	up(&mdev->md_io_mutex);
+	dec_local(mdev);
 
 	INFO("Creating state block\n");
 
@@ -1923,7 +1940,6 @@
 	mdev->gen_cnt[Flags]=MDF_Consistent;
 
 	drbd_md_write(mdev);
-
 	return 0;
 }
 
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_proc.c,v
retrieving revision 1.8.2.17
retrieving revision 1.8.2.18
diff -u -3 -r1.8.2.17 -r1.8.2.18
--- drbd_proc.c	6 Feb 2004 15:43:55 -0000	1.8.2.17
+++ drbd_proc.c	8 Feb 2004 12:26:14 -0000	1.8.2.18
@@ -179,7 +179,7 @@
 			rlen += sprintf( buf + rlen,
 			   "%2d: cs:%s st:%s/%s ld:%s\n"
 			   "    ns:%u nr:%u dw:%u dr:%u al:%u bm:%u "
-			   "pe:%u ua:%u\n",
+			   "lo:%d pe:%d ua:%d\n",
 			   i, sn,
 			   state_names[drbd_conf[i].state],
 			   state_names[drbd_conf[i].o_state],
@@ -191,6 +191,7 @@
 			   drbd_conf[i].read_cnt/2,
 			   drbd_conf[i].al_writ_cnt,
  			   drbd_conf[i].bm_writ_cnt,
+			   atomic_read(&drbd_conf[i].local_cnt),
 			   atomic_read(&drbd_conf[i].ap_pending_cnt) +
 			   atomic_read(&drbd_conf[i].rs_pending_cnt),
 			   atomic_read(&drbd_conf[i].unacked_cnt)
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_receiver.c,v
retrieving revision 1.97.2.104
retrieving revision 1.97.2.105
diff -u -3 -r1.97.2.104 -r1.97.2.105
--- drbd_receiver.c	7 Feb 2004 16:37:49 -0000	1.97.2.104
+++ drbd_receiver.c	8 Feb 2004 12:26:14 -0000	1.97.2.105
@@ -772,6 +772,7 @@
 	drbd_set_in_sync(mdev, sector, drbd_ee_get_size(e));
 	drbd_send_ack(mdev,WriteAck,e);
 	dec_unacked(mdev,HERE); // FIXME unconditional ??
+	dec_local(mdev);
 	return TRUE;
 }
 
@@ -780,13 +781,22 @@
 {
 	struct Tl_epoch_entry *e;
 
-	// DBG("%s\n", __func__);
-
 	D_ASSERT( pr->d.sector == sector);
 
 	e = read_in_block(mdev,data_size);
 	ERR_IF(!e) return FALSE;
 
+	dec_rs_pending(mdev,HERE);
+
+	if(!inc_local(mdev)) {
+		ERR("Can not write resync data to local disk.\n");
+		drbd_send_ack(mdev,NegAck,e);
+		spin_lock_irq(&mdev->ee_lock);
+		drbd_put_ee(mdev,e);
+		spin_unlock_irq(&mdev->ee_lock);
+		return TRUE;
+	}
+
 	drbd_ee_prepare_write(mdev,e,sector,data_size);
 	e->block_id = ID_SYNCER;
 	e->w.cb     = e_end_resync_block;
@@ -795,7 +805,6 @@
 	list_add(&e->w.list,&mdev->sync_ee);
 	spin_unlock_irq(&mdev->ee_lock);
 
-	dec_rs_pending(mdev,HERE);
 	inc_unacked(mdev);
 
 	drbd_generic_make_request(WRITE,&e->private_bio);
@@ -835,6 +844,18 @@
 
 	drbd_bio_endio(bio,1); // propagate success for application read
 
+	dec_rs_pending(mdev,HERE);
+	dec_ap_pending(mdev,HERE);
+
+	if(!inc_local(mdev)) {
+		ERR("Can not write resync data to local disk.\n");
+		drbd_send_ack(mdev,NegAck,e);
+		spin_lock_irq(&mdev->ee_lock);
+		drbd_put_ee(mdev,e);
+		spin_unlock_irq(&mdev->ee_lock);
+		return TRUE;
+	}
+
 	drbd_ee_prepare_write(mdev, e, sector, data_size);
 	e->block_id = ID_SYNCER;
 	e->w.cb     = e_end_resync_block;
@@ -843,8 +864,6 @@
 	list_add(&e->w.list,&mdev->sync_ee);
 	spin_unlock_irq(&mdev->ee_lock);
 
-	dec_rs_pending(mdev,HERE);
-	dec_ap_pending(mdev,HERE);
 	inc_unacked(mdev);
 
 	drbd_generic_make_request(WRITE,&e->private_bio);
@@ -922,6 +941,7 @@
 		dec_unacked(mdev,HERE); // FIXME unconditional ??
 	}
 
+	dec_local(mdev);
 	return ok;
 }
 
@@ -933,8 +953,6 @@
 	Drbd_Data_Packet *p = (Drbd_Data_Packet*)h;
 	int header_size,data_size;
 
-	// DBG("%s\n", __func__);
-
 	// FIXME merge this code dups into some helper function
 	header_size = sizeof(*p) - sizeof(*h);
 	data_size   = h->length  - header_size;
@@ -954,6 +972,15 @@
 	e = read_in_block(mdev,data_size);
 	ERR_IF(!e) return FALSE;
 
+	if(!inc_local(mdev)) {
+		ERR("Can not write mirrored data block to local disk.\n");
+		drbd_send_ack(mdev,NegAck,e);
+		spin_lock_irq(&mdev->ee_lock);
+		drbd_put_ee(mdev,e);
+		spin_unlock_irq(&mdev->ee_lock);
+		return TRUE;
+	}
+
 	drbd_ee_prepare_write(mdev, e, sector, data_size);
 	e->block_id = p->block_id; // no meaning on this side, e* on partner
 	e->w.cb     = e_end_block;
@@ -998,11 +1025,21 @@
 	spin_lock_irq(&mdev->ee_lock);
 	e=drbd_get_ee(mdev);
 	// can we move it outside the lock?
-	drbd_ee_prepare_read(mdev,e,sector,data_size);
 	e->block_id = p->block_id; // no meaning on this side, pr* on partner
 	list_add(&e->w.list,&mdev->read_ee);
 	spin_unlock_irq(&mdev->ee_lock);
 
+	if(!inc_local(mdev)) {
+		ERR("Can not satisfy peer's read request, no local disk.\n");
+		drbd_send_ack(mdev,NegDReply,e);
+		spin_lock_irq(&mdev->ee_lock);
+		drbd_put_ee(mdev,e);
+		spin_unlock_irq(&mdev->ee_lock);
+		return TRUE;
+	}
+
+	drbd_ee_prepare_read(mdev,e,sector,data_size);
+
 	switch (h->command) {
 	case DataRequest:
 		e->w.cb = w_e_end_data_req;
@@ -1020,8 +1057,6 @@
 		D_ASSERT(0);
 	}
 
-	// FIXME do statistics here, or better within the end_io handler ?
-	// what about concurrent access to *_cnt ?
 	mdev->read_cnt += data_size >> 9;
 	inc_unacked(mdev);
 	drbd_generic_make_request(READ,&e->private_bio);
@@ -1091,7 +1126,7 @@
 
 	p_size=be64_to_cpu(p->p_size);
 
-	if(p_size == 0 && mdev->lo_file == 0) {
+	if(p_size == 0 && test_bit(DISKLESS,&mdev->flags)) {
 		ERR("some backing storage is needed\n");
 		set_cstate(mdev,Unconfigured);
 		mdev->receiver.t_state = Exiting;
@@ -1244,6 +1279,13 @@
 		D_ASSERT(0);
 	}
 
+	if(test_bit(MD_IO_ALLOWED,&mdev->flags) &&
+	   test_bit(DISKLESS,&mdev->flags)) {
+		clear_bit(DISKLESS,&mdev->flags);
+		smp_wmb();
+		clear_bit(MD_IO_ALLOWED,&mdev->flags);
+	}
+
 	ok=TRUE;
  out:
 	vfree(buffer);
@@ -1481,7 +1523,7 @@
 	   pending_cnt is zero. */
 	atomic_set(&mdev->ap_pending_cnt,0);
 	atomic_set(&mdev->rs_pending_cnt,0);
-	wake_up_interruptible(&mdev->state_wait);
+	wake_up_interruptible(&mdev->cstate_wait);
 
 	clear_bit(DO_NOT_INC_CONCNT,&mdev->flags);
 
@@ -1567,10 +1609,6 @@
 
 		drbd_end_req(req, RQ_DRBD_SENT, 1, sector);
 	}
-	/*WARN("BlockAck: %lx %lx %x\n",
-	    (long) p->block_id,
-	    (long) be64_to_cpu(p->sector),
-	    be32_to_cpu(p->blksize));*/
 
 	// TODO: Make sure that the block is in an active epoch!!
 	if(is_syncer_blk(mdev,p->block_id)) {
@@ -1582,6 +1620,50 @@
 	return TRUE;
 }
 
+STATIC int got_NegAck(drbd_dev *mdev, Drbd_Header* h)
+{
+	drbd_request_t *req;
+	Drbd_BlockAck_Packet *p = (Drbd_BlockAck_Packet*)h;
+	sector_t sector = be64_to_cpu(p->sector);
+	int blksize = be32_to_cpu(p->blksize);
+
+	WARN("Got NegAck packet. Peer is in troubles?\n");
+
+	if( !is_syncer_blk(mdev,p->block_id)) {
+		req=(drbd_request_t*)(long)p->block_id;
+
+		ERR_IF (!VALID_POINTER(req)) return FALSE;
+		drbd_set_out_of_sync(mdev,sector,blksize);
+		drbd_end_req(req, RQ_DRBD_SENT, 1, sector);
+	}
+
+	// TODO: Make sure that the block is in an active epoch!!
+	if(is_syncer_blk(mdev,p->block_id)) {
+		dec_rs_pending(mdev,HERE);
+	} else {
+		D_ASSERT(mdev->conf.wire_protocol != DRBD_PROT_A);
+		dec_ap_pending(mdev,HERE);
+	}
+	return TRUE;
+}
+
+STATIC int got_NegDReply(drbd_dev *mdev, Drbd_Header* h)
+{
+	struct Pending_read *pr;
+	Drbd_BlockAck_Packet *p = (Drbd_BlockAck_Packet*)h;
+
+	pr = (struct Pending_read *)(long)p->block_id;
+	ERR_IF(!VALID_POINTER(pr)) return FALSE;
+
+	spin_lock(&mdev->pr_lock);
+	list_del(&pr->w.list);
+	spin_unlock(&mdev->pr_lock);
+
+	ERR("Get NegDReply. WE ARE LOST. We lost our up-to-date disk.\n");
+	// TODO: Do simething like panic() or shut_down_cluster(). 
+	return TRUE;
+}
+
 STATIC int got_BarrierAck(drbd_dev *mdev, Drbd_Header* h)
 {
 	Drbd_BarrierAck_Packet *p = (Drbd_BarrierAck_Packet*)h;
@@ -1614,6 +1696,8 @@
 		[PingAck]   ={ sizeof(Drbd_Header),           got_PingAck },
 		[RecvAck]   ={ sizeof(Drbd_BlockAck_Packet),  got_BlockAck },
 		[WriteAck]  ={ sizeof(Drbd_BlockAck_Packet),  got_BlockAck },
+		[NegAck]    ={ sizeof(Drbd_BlockAck_Packet),  got_NegAck },
+		[NegDReply] ={ sizeof(Drbd_BlockAck_Packet),  got_NegDReply },
 		[BarrierAck]={ sizeof(Drbd_BarrierAck_Packet),got_BarrierAck },
 	};
 
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_req-2.4.c,v
retrieving revision 1.33.2.51
retrieving revision 1.33.2.52
diff -u -3 -r1.33.2.51 -r1.33.2.52
--- drbd_req-2.4.c	7 Feb 2004 16:37:49 -0000	1.33.2.51
+++ drbd_req-2.4.c	8 Feb 2004 12:26:14 -0000	1.33.2.52
@@ -200,7 +200,8 @@
 	nr_sectors = bio_sectors(bio); */
 #endif
 
-	if( mdev->lo_file == 0 ) {
+	    
+	if( !inc_local(mdev) ) {
 		if( mdev->cstate < Connected ) {
 			drbd_bio_IO_error(bio);
 			return 0;
@@ -284,6 +285,7 @@
 
 	if( rw == READ || rw == READA ) {
 		mdev->read_cnt += size >> 9;
+		dec_local(mdev);  // FIXME TODO -> completion handler
 #if LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)
 		bio->b_rdev  = mdev->backing_bdev;
 #else
@@ -297,13 +299,16 @@
 	if(mdev->cstate<Connected || test_bit(PARTNER_DISKLESS,&mdev->flags)) {
 		drbd_set_out_of_sync(mdev,sector,size);
 
+		/* This should be changed. We should not remap in this 
+		   case !!!!! FIXME TODO FIXME TODO 
+		 */
 		drbd_al_begin_io(mdev, sector);
-		drbd_al_complete_io(mdev, sector); // FIXME TODO
+		drbd_al_complete_io(mdev, sector); // FIXME TODO -> completion handler
+		dec_local(mdev);  // FIXME TODO -> completion handler
 #if LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)
 		bio->b_rdev  = mdev->backing_bdev;
 #else
 		bio->bi_bdev = mdev->backing_bdev;
-		/* I want to change it anyways so we never remap ... */
 #endif
 		return 1; // Not arranged for transfer ( but remapped :)
 	}
@@ -315,6 +320,7 @@
 	if (!req) {
 		ERR("could not kmalloc() req\n");
 		drbd_bio_IO_error(bio);
+		dec_local(mdev);
 		return 0;
 	}
 	SET_MAGIC(req);