[DRBD-cvs] svn commit by lars - r2548 - in trunk: drbd drbd/linux user - implementaion of the conflict detection

drbd-cvs at lists.linbit.com drbd-cvs at lists.linbit.com
Thu Oct 19 15:22:57 CEST 2006


Author: lars
Date: 2006-10-19 15:22:52 +0200 (Thu, 19 Oct 2006)
New Revision: 2548

Modified:
   trunk/drbd/drbd_int.h
   trunk/drbd/drbd_main.c
   trunk/drbd/drbd_nl.c
   trunk/drbd/drbd_proc.c
   trunk/drbd/drbd_receiver.c
   trunk/drbd/drbd_req.c
   trunk/drbd/drbd_req.h
   trunk/drbd/drbd_worker.c
   trunk/drbd/linux/drbd.h
   trunk/drbd/linux/drbd_config.h
   trunk/drbd/lru_cache.c
   trunk/user/drbdsetup.c
Log:
implementaion of the conflict detection



Modified: trunk/drbd/drbd_int.h
===================================================================
--- trunk/drbd/drbd_int.h	2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/drbd_int.h	2006-10-19 13:22:52 UTC (rev 2548)
@@ -285,19 +285,23 @@
 	PingAck,
 	RecvAck,      // Used in protocol B
 	WriteAck,     // Used in protocol C
+	DiscardAck,   // Used in protocol C, two-primaries conflict detection
 	NegAck,       // Sent if local disk is unusable
 	NegDReply,    // Local disk is broken...
 	NegRSDReply,  // Local disk is broken...
 	BarrierAck,
-	DiscardNote,
 	StateChgReply,
 
 	MAX_CMD,
 	MayIgnore = 0x100, // Flag only to test if (cmd > MayIgnore) ...
 	MAX_OPT_CMD,
 
+	/* FIXME
+	 * to get a more useful error message with drbd-8 <-> drbd 0.7.x,
+	 * these could be reimplemented as special case of HandShake. */
 	HandShakeM = 0xfff1, // First Packet on the MetaSock
 	HandShakeS = 0xfff2, // First Packet on the Socket
+
 	HandShake  = 0xfffe  // FIXED for the next century!
 } Drbd_Packet_Cmd;
 
@@ -329,11 +333,11 @@
 		[PingAck]          = "PingAck",
 		[RecvAck]          = "RecvAck",
 		[WriteAck]         = "WriteAck",
+		[DiscardAck]       = "DiscardAck",
 		[NegAck]           = "NegAck",
 		[NegDReply]        = "NegDReply",
 		[NegRSDReply]      = "NegRSDReply",
 		[BarrierAck]       = "BarrierAck",
-		[DiscardNote]      = "DiscardNote",
 		[StateChgRequest]  = "StateChgRequest",
 		[StateChgReply]    = "StateChgReply"
 	};
@@ -539,7 +543,6 @@
 	Drbd_Req_State_Packet	 ReqState;
 	Drbd_RqS_Reply_Packet	 RqSReply;
 	Drbd_BlockRequest_Packet BlockRequest;
-	Drbd_Discard_Packet	 Discard;
 } __attribute((packed)) Drbd_Polymorph_Packet;
 
 /**********************************************************************/
@@ -650,9 +653,11 @@
 
 /* ee flag bits */
 enum {
-	__CALL_AL_COMPLETE_IO,
+	__EE_CALL_AL_COMPLETE_IO,
+	__EE_CONFLICT_PENDING,
 };
-#define CALL_AL_COMPLETE_IO (1<<__CALL_AL_COMPLETE_IO)
+#define EE_CALL_AL_COMPLETE_IO (1<<__EE_CALL_AL_COMPLETE_IO)
+#define EE_CONFLICT_PENDING    (1<<__EE_CONFLICT_PENDING)
 
 
 /* global flag bits */
@@ -668,7 +673,7 @@
 	UNPLUG_REMOTE,		// whether sending a "UnplugRemote" makes sense
 	MD_DIRTY,		// current gen counts and flags not yet on disk
 	SYNC_STARTED,		// Needed to agree on the exact point in time..
-	UNIQUE,                 // Set on one node, cleared on the peer!
+	DISCARD_CONCURRENT,     // Set on one node, cleared on the peer!
 	USE_DEGR_WFC_T,		// Use degr-wfc-timeout instead of wfc-timeout.
 	CLUSTER_ST_CHANGE,      // Cluster wide state change going on...
 	CL_ST_CHG_SUCCESS,
@@ -708,12 +713,6 @@
 	Drbd_Polymorph_Packet rbuf;  // send/receive buffers off the stack
 };
 
-struct drbd_discard_note {
-	struct list_head list;
-	u64 block_id;
-	int seq_num;
-};
-
 struct drbd_md {
 	u64 md_offset;		/* sector offset to 'super' block */
 
@@ -843,10 +842,10 @@
 	int al_tr_cycle;
 	int al_tr_pos;     // position of the next transaction in the journal
 	struct crypto_tfm* cram_hmac_tfm;
+	wait_queue_head_t seq_wait;
 	atomic_t packet_seq;
-	int peer_seq;
+	unsigned int peer_seq;
 	spinlock_t peer_seq_lock;
-	struct list_head discard;
 	int minor;
 };
 
@@ -954,7 +953,6 @@
 			      sector_t sector,int size, u64 block_id);
 extern int drbd_send_bitmap(drbd_dev *mdev);
 extern int _drbd_send_bitmap(drbd_dev *mdev);
-extern int drbd_send_discard(drbd_dev *mdev, drbd_request_t *req);
 extern int drbd_send_sr_reply(drbd_dev *mdev, int retcode);
 extern void drbd_free_bc(struct drbd_backing_dev* bc);
 extern int drbd_io_error(drbd_dev* mdev, int forcedetach);
@@ -1618,11 +1616,11 @@
 		    __func__ , __LINE__ ,			\
 		    atomic_read(&mdev->which))
 
-#define dec_ap_pending(mdev)					\
+#define dec_ap_pending(mdev)	do {				\
 	typecheck(drbd_dev*,mdev);				\
 	if(atomic_dec_and_test(&mdev->ap_pending_cnt))		\
 		wake_up(&mdev->cstate_wait);			\
-	ERR_IF_CNT_IS_NEGATIVE(ap_pending_cnt)
+	ERR_IF_CNT_IS_NEGATIVE(ap_pending_cnt); } while (0)
 
 /* counts how many resync-related answers we still expect from the peer
  *                   increase                   decrease
@@ -1635,10 +1633,10 @@
 	atomic_inc(&mdev->rs_pending_cnt);
 }
 
-#define dec_rs_pending(mdev)					\
+#define dec_rs_pending(mdev)	do {				\
 	typecheck(drbd_dev*,mdev);				\
 	atomic_dec(&mdev->rs_pending_cnt);			\
-	ERR_IF_CNT_IS_NEGATIVE(rs_pending_cnt)
+	ERR_IF_CNT_IS_NEGATIVE(rs_pending_cnt); } while (0)
 
 /* counts how many answers we still need to send to the peer.
  * increased on
@@ -1654,15 +1652,15 @@
 	atomic_inc(&mdev->unacked_cnt);
 }
 
-#define dec_unacked(mdev)					\
+#define dec_unacked(mdev)	do {				\
 	typecheck(drbd_dev*,mdev);				\
 	atomic_dec(&mdev->unacked_cnt);				\
-	ERR_IF_CNT_IS_NEGATIVE(unacked_cnt)
+	ERR_IF_CNT_IS_NEGATIVE(unacked_cnt); } while (0)
 
-#define sub_unacked(mdev, n)					\
+#define sub_unacked(mdev, n)	do {				\
 	typecheck(drbd_dev*,mdev);				\
 	atomic_sub(n, &mdev->unacked_cnt);			\
-	ERR_IF_CNT_IS_NEGATIVE(unacked_cnt)
+	ERR_IF_CNT_IS_NEGATIVE(unacked_cnt); } while (0)
 
 
 static inline void dec_net(drbd_dev* mdev)
@@ -1692,6 +1690,13 @@
  * this is mood...
  */
 
+static inline void dec_local(drbd_dev* mdev)
+{
+	if(atomic_dec_and_test(&mdev->local_cnt)) {
+		wake_up(&mdev->cstate_wait);
+	}
+	D_ASSERT(atomic_read(&mdev->local_cnt)>=0);
+}
 /**
  * inc_local: Returns TRUE when local IO is possible. If it returns
  * TRUE you should call dec_local() after IO is completed.
@@ -1701,9 +1706,9 @@
 	int io_allowed;
 
 	atomic_inc(&mdev->local_cnt);
-	io_allowed = (mdev->state.disk >= mins ); 
+	io_allowed = (mdev->state.disk >= mins );
 	if( !io_allowed ) {
-		atomic_dec(&mdev->local_cnt);
+		dec_local(mdev);
 	}
 	return io_allowed;
 }
@@ -1712,17 +1717,6 @@
 	return inc_local_if_state(mdev, Inconsistent);
 }
 
-
-static inline void dec_local(drbd_dev* mdev)
-{
-	if(atomic_dec_and_test(&mdev->local_cnt) &&
-	   mdev->state.disk == Diskless && mdev->bc ) {
-		wake_up(&mdev->cstate_wait);
-	}
-
-	D_ASSERT(atomic_read(&mdev->local_cnt)>=0);
-}
-
 /* this throttles on-the-fly application requests
  * according to max_buffers settings;
  * maybe re-implement using semaphores? */
@@ -1785,24 +1779,30 @@
 	if (ap_bio < mxb) wake_up(&mdev->cstate_wait);
 }
 
-/* FIXME does not handle wrap around yet */
-static inline void update_peer_seq(drbd_dev* mdev, int new_seq)
+static inline int seq_cmp(u32 a, u32 b)
 {
-	spin_lock(&mdev->peer_seq_lock);
-	mdev->peer_seq = max(mdev->peer_seq, new_seq);
-	spin_unlock(&mdev->peer_seq_lock);
-	wake_up(&mdev->cstate_wait);
-	/* FIXME introduce seq_wait, no point in waking up a number of
-	 * processes with each and every Ack received... */
+	/* we assume wrap around at 32bit.
+	 * for wrap around at 24bit (old atomic_t),
+	 * we'd have to
+	 *  a <<= 8; b <<= 8;
+	 */
+	return ((s32)(a) - (s32)(b));
 }
+#define seq_lt(a,b) (seq_cmp((a),(b)) < 0)
+#define seq_gt(a,b) (seq_cmp((a),(b)) > 0)
+#define seq_ge(a,b) (seq_cmp((a),(b)) >= 0)
+#define seq_le(a,b) (seq_cmp((a),(b)) <= 0)
+/* CAUTION: please no side effects in arguments! */
+#define seq_max(a,b) ((u32)(seq_gt((a),(b)) ? (a) : (b)))
 
-static inline int peer_seq(drbd_dev* mdev)
+static inline void update_peer_seq(drbd_dev* mdev, unsigned int new_seq)
 {
-	int seq;
+	unsigned int m;
 	spin_lock(&mdev->peer_seq_lock);
-	seq = mdev->peer_seq;
+	m = seq_max(mdev->peer_seq, new_seq);
+	mdev->peer_seq = m;
 	spin_unlock(&mdev->peer_seq_lock);
-	return seq;
+	if (m == new_seq) wake_up(&mdev->seq_wait);
 }
 
 static inline int drbd_queue_order_type(drbd_dev* mdev)

Modified: trunk/drbd/drbd_main.c
===================================================================
--- trunk/drbd/drbd_main.c	2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/drbd_main.c	2006-10-19 13:22:52 UTC (rev 2548)
@@ -300,40 +300,6 @@
 	spin_unlock_irq(&mdev->req_lock);
 }
 
-// warning LGE "FIXME code missing"
-#if 0
-/* FIXME "wrong"
- * see comment in receive_Data */
-drbd_request_t * _req_have_write(drbd_dev *mdev, struct Tl_epoch_entry *e)
-{
-	struct hlist_head *slot;
-	struct hlist_node *n;
-	drbd_request_t * req;
-	sector_t sector = e->sector;
-	int size = e->drbd_ee_get_size(e);
-	int i;
-
-	MUST_HOLD(&mdev->req_lock);
-	D_ASSERT(size <= 1<<(HT_SHIFT+9) );
-
-#define OVERLAPS overlaps(req->sector, req->size, sector, size)
-	slot = mdev->tl_hash + tl_hash_fn(mdev, sector);
-	hlist_for_each_entry(req, n, slot, colision) {
-		if (OVERLAPS) return req;
-	} // hlist_for_each_entry()
-#undef OVERLAPS
-	req = NULL;
-	// Good, no conflict found
-	INIT_HLIST_NODE(&e->colision);
-	hlist_add_head( &e->colision, mdev->ee_hash +
-			ee_hash_fn(mdev, sector) );
- out:
-	spin_unlock_irq(&mdev->tl_lock);
-
-	return req;
-}
-#endif
-
 /**
  * drbd_io_error: Handles the on_io_error setting, should be called in the
  * unlikely(!drbd_bio_uptodate(e->bio)) case from kernel thread context.
@@ -1355,17 +1321,6 @@
 	return ok;
 }
 
-int drbd_send_discard(drbd_dev *mdev, drbd_request_t *req)
-{
-	Drbd_Discard_Packet p;
-
-	p.block_id = (unsigned long)req;
-	p.seq_num  = cpu_to_be32(req->seq_num);
-
-	return drbd_send_cmd(mdev,USE_META_SOCKET,DiscardNote,
-			     (Drbd_Header*)&p,sizeof(p));
-}
-
 int drbd_send_state(drbd_dev *mdev)
 {
 	Drbd_State_Packet p;
@@ -1924,7 +1879,7 @@
 
 	/* only if connected */
 	spin_lock_irq(&mdev->req_lock);
-	if (mdev->state.pdsk >= Inconsistent) /* implies cs >= Connected */ {
+	if (mdev->state.pdsk >= Inconsistent && mdev->state.conn >= Connected) {
 		D_ASSERT(mdev->state.role == Primary);
 		if (test_and_clear_bit(UNPLUG_REMOTE,&mdev->flags)) {
 			/* add to the data.work queue,
@@ -1998,7 +1953,6 @@
 	INIT_LIST_HEAD(&mdev->resync_work.list);
 	INIT_LIST_HEAD(&mdev->unplug_work.list);
 	INIT_LIST_HEAD(&mdev->md_sync_work.list);
-	INIT_LIST_HEAD(&mdev->discard);
 	mdev->resync_work.cb  = w_resync_inactive;
 	mdev->unplug_work.cb  = w_send_write_hint;
 	mdev->md_sync_work.cb = w_md_sync;
@@ -2012,6 +1966,7 @@
 	init_waitqueue_head(&mdev->cstate_wait);
 	init_waitqueue_head(&mdev->ee_wait);
 	init_waitqueue_head(&mdev->al_wait);
+	init_waitqueue_head(&mdev->seq_wait);
 
 	drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
 	drbd_thread_init(mdev, &mdev->worker, drbd_worker);
@@ -3140,6 +3095,7 @@
 
 	case RecvAck:
 	case WriteAck:
+	case DiscardAck:
 	case NegAck:
 	case NegRSDReply:
 		INFOP("%s (sector %llx, size %x, id %s, seq %x)\n", cmdname(cmd),
@@ -3171,7 +3127,7 @@
 		      be64_to_cpu(p->GenCnt.uuid[History_start]),
 		      be64_to_cpu(p->GenCnt.uuid[History_end]));
 		break;
-		      
+
 	case ReportSizes:
 		INFOP("%s (d %lluMiB, u %lluMiB, c %lldMiB, max bio %x, q order %x)\n", cmdname(cmd), 
 		      (long long)(be64_to_cpu(p->Sizes.d_size)>>(20-9)),
@@ -3200,12 +3156,6 @@
 		      be32_to_cpu(p->RqSReply.retcode));
 		break;
 
-	case DiscardNote:
-		INFOP("%s (id %llx, seq %x)\n", cmdname(cmd),
-		      (long long)p->Discard.block_id,
-		      be32_to_cpu(p->Discard.seq_num));
-		break;
-
 	case Ping:
 	case PingAck:
 		/*

Modified: trunk/drbd/drbd_nl.c
===================================================================
--- trunk/drbd/drbd_nl.c	2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/drbd_nl.c	2006-10-19 13:22:52 UTC (rev 2548)
@@ -945,7 +945,7 @@
 	}
 
 	new_conf = kmalloc(sizeof(struct net_conf),GFP_KERNEL);
-	if(!new_conf) {			
+	if(!new_conf) {
 		retcode=KMallocFailed;
 		goto fail;
 	}
@@ -968,6 +968,7 @@
 		new_conf->after_sb_2p     = DRBD_AFTER_SB_2P_DEF;
 		new_conf->want_lose       = 0;
 		new_conf->two_primaries   = 0;
+		new_conf->wire_protocol   = DRBD_PROT_C;
 	}
 
 	if (!net_conf_from_tags(mdev,nlp->tag_list,new_conf)) {
@@ -975,6 +976,11 @@
 		goto fail;
 	}
 
+	if (new_conf->two_primaries && (new_conf->wire_protocol != DRBD_PROT_C)) {
+		retcode=ProtocolCRequired;
+		goto fail;
+	};
+
 	if( mdev->state.role == Primary && new_conf->want_lose ) {
 		retcode=DiscardNotAllowed;
 		goto fail;

Modified: trunk/drbd/drbd_proc.c
===================================================================
--- trunk/drbd/drbd_proc.c	2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/drbd_proc.c	2006-10-19 13:22:52 UTC (rev 2548)
@@ -210,7 +210,7 @@
 			seq_printf( seq, "%2d: cs:Unconfigured\n", i);
 		} else {
 			seq_printf( seq,
-			   "%2d: cs:%s st:%s/%s ds:%s/%s %c%c%c%c\n"
+			   "%2d: cs:%s st:%s/%s ds:%s/%s %c %c%c%c%c\n"
 			   "    ns:%u nr:%u dw:%u dr:%u al:%u bm:%u "
 			   "lo:%d pe:%d ua:%d ap:%d\n",
 			   i, sn,
@@ -218,6 +218,8 @@
 			   roles_to_name(mdev->state.peer),
 			   disks_to_name(mdev->state.disk),
 			   disks_to_name(mdev->state.pdsk),
+			   (mdev->net_conf == NULL ? ' ' :
+			    (mdev->net_conf->wire_protocol - DRBD_PROT_A+'A')),
 			   mdev->state.susp ? 's' : 'r',
 			   mdev->state.aftr_isp ? 'a' : '-',
 			   mdev->state.peer_isp ? 'p' : '-',

Modified: trunk/drbd/drbd_receiver.c
===================================================================
--- trunk/drbd/drbd_receiver.c	2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/drbd_receiver.c	2006-10-19 13:22:52 UTC (rev 2548)
@@ -316,6 +316,8 @@
 
 	bio_put(bio);
 
+	BUG_ON(!hlist_unhashed(&e->colision));
+
 	mempool_free(e, drbd_ee_mempool);
 }
 
@@ -379,13 +381,9 @@
 	list_splice_init(&mdev->done_ee,&work_list);
 	spin_unlock_irq(&mdev->req_lock);
 
-	/* XXX maybe wake_up here already?
-	 * or wake_up withing drbd_free_ee just after mempool_free?
-	 */
-
 	/* possible callbacks here:
-	 * e_end_block, and e_end_resync_block.
-	 * both ignore the last argument.
+	 * e_end_block, and e_end_resync_block, e_send_discard_ack.
+	 * all ignore the last argument.
 	 */
 	list_for_each_entry_safe(e, t, &work_list, w.list) {
 		MTRACE(TraceTypeEE,TraceLvlAll,
@@ -393,7 +391,7 @@
 			    (long long)e->sector,e->size,e);
 			);
 		// list_del not necessary, next/prev members not touched
-		ok = ok && e->w.cb(mdev,&e->w,0);
+		if (e->w.cb(mdev,&e->w,0) == 0) ok = 0;
 		drbd_free_ee(mdev,e);
 	}
 	if (do_clear_bit)
@@ -690,7 +688,7 @@
 	D_ASSERT(!mdev->data.socket);
 
 	if(drbd_request_state(mdev,NS(conn,WFConnection)) < SS_Success ) return 0;
-	clear_bit(UNIQUE, &mdev->flags);
+	clear_bit(DISCARD_CONCURRENT, &mdev->flags);
 
 	sock  = NULL;
 	msock = NULL;
@@ -737,7 +735,7 @@
 			case HandShakeM:
 				if(msock) sock_release(msock);
 				msock = s;
-				set_bit(UNIQUE, &mdev->flags);
+				set_bit(DISCARD_CONCURRENT, &mdev->flags);
 				break;
 			default:
 				WARN("Error receiving initial packet\n");
@@ -1195,55 +1193,87 @@
 			 * maybe assert this?  */
 		}
 		dec_unacked(mdev);
-		return ok;
 	} else if(unlikely(!drbd_bio_uptodate(e->private_bio))) {
 		ok = drbd_io_error(mdev, FALSE);
 	}
 
-// warning LGE "FIXME code missing"
-#if 0
 	/* we delete from the conflict detection hash _after_ we sent out the
 	 * WriteAck / NegAck, to get the sequence number right.  */
+	if (mdev->net_conf->two_primaries) {
+		spin_lock_irq(&mdev->req_lock);
+		D_ASSERT(!hlist_unhashed(&e->colision));
+		hlist_del_init(&e->colision);
+		spin_unlock_irq(&mdev->req_lock);
+	} else {
+		D_ASSERT(hlist_unhashed(&e->colision));
+	}
+
+	return ok;
+}
+
+STATIC int e_send_discard_ack(drbd_dev *mdev, struct drbd_work *w, int unused)
+{
+	struct Tl_epoch_entry *e = (struct Tl_epoch_entry*)w;
+	int ok=1;
+
+	D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
+	ok = drbd_send_ack(mdev,DiscardAck,e);
+
+	spin_lock_irq(&mdev->req_lock);
 	D_ASSERT(!hlist_unhashed(&e->colision));
-	/* FIXME "wake" any conflicting requests
-	 * that have been waiting for this one to finish */
 	hlist_del_init(&e->colision);
-#endif
+	spin_unlock_irq(&mdev->req_lock);
 
+	dec_unacked(mdev);
+
 	return ok;
 }
 
-/* FIXME implementation wrong.
- * For the algorithm to be correct, we need to send and store the
- * sector and size, not the block id. We have to check for overlap.
- * We may _only_ remove the info when its sequence number is less than
- * the current sequence number.
+/* Called from receive_Data.
+ * Synchronize packets on sock with packets on msock.
  *
- * I think the "discard info" are the wrong way, anyways.
- * Instead of silently discarding such writes, we should send a DiscardAck,
- * and we should retard sending of the data until we get that Discard Ack
- * and thus the conflicting request is done.
- */
-STATIC int drbd_chk_discard(drbd_dev *mdev,struct Tl_epoch_entry *e)
+ * This is here so even when a Data packet traveling via sock overtook an Ack
+ * packet traveling on msock, they are still processed in the order they have
+ * been sent.
+ *
+ * Note: we don't care for Ack packets overtaking Data packets.
+ *
+ * In case packet_seq is larger than mdev->peer_seq number, there are
+ * outstanding packets on the msock. We wait for them to arrive.
+ * In case we are the logically next packet, we update mdev->peer_seq
+ * ourselves. Correctly handles 32bit wrap around.
+ * FIXME verify that atomic_t guarantees 32bit wrap around,
+ * otherwise we have to play tricks with << ...
+ *
+ * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
+ * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
+ * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
+ * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
+ *
+ * returns 0 if we may process the packet,
+ * -ERESTARTSYS if we were interrupted (by disconnect signal). */
+static int drbd_wait_peer_seq(drbd_dev *mdev, const u32 packet_seq)
 {
-	struct drbd_discard_note *dn;
-	struct list_head *le,*tmp;
-
-	MUST_HOLD(&mdev->peer_seq_lock);
-	list_for_each_safe(le,tmp,&mdev->discard) {
-		dn = list_entry(le, struct drbd_discard_note, list);
-		if( dn->seq_num == mdev->peer_seq ) {
-			D_ASSERT( dn->block_id == e->block_id );
-			list_del(le);
-			kfree(dn);
-			return 1;
+	DEFINE_WAIT(wait);
+	int ret = 0;
+	spin_lock(&mdev->peer_seq_lock);
+	for (;;) {
+		prepare_to_wait(&mdev->seq_wait,&wait,TASK_INTERRUPTIBLE);
+		if (seq_le(packet_seq,mdev->peer_seq+1))
+			break;
+		spin_unlock(&mdev->peer_seq_lock);
+		if (signal_pending(current)) {
+			ret = -ERESTARTSYS;
+			break;
 		}
-		if( dn->seq_num < mdev->peer_seq ) {
-			list_del(le);
-			kfree(dn);
-		}
+		schedule();
+		spin_lock(&mdev->peer_seq_lock);
 	}
-	return 0;
+	finish_wait(&mdev->seq_wait, &wait);
+	if (mdev->peer_seq+1 == packet_seq)
+		mdev->peer_seq++;
+	spin_unlock(&mdev->peer_seq_lock);
+	return ret;
 }
 
 // mirrored write
@@ -1251,11 +1281,8 @@
 {
 	sector_t sector;
 	struct Tl_epoch_entry *e;
-	/* FIXME currently some unused variable intended
-	 * for the now not-implemented conflict detection */
-	drbd_request_t * req;
 	Drbd_Data_Packet *p = (Drbd_Data_Packet*)h;
-	int header_size, data_size, packet_seq, discard, rv;
+	int header_size, data_size;
 	unsigned int barrier_nr = 0;
 	unsigned int epoch_size = 0;
 	u32 dp_flags;
@@ -1291,98 +1318,143 @@
 	e->private_bio->bi_end_io = drbd_endio_write_sec;
 	e->w.cb = e_end_block;
 
-	/* FIXME drbd_al_begin_io in case we have two primaries... */
+	dp_flags = be32_to_cpu(p->dp_flags);
+	if ( dp_flags & DP_HARDBARRIER ) {
+		e->private_bio->bi_rw |= BIO_RW_BARRIER;
+	}
+	if ( dp_flags & DP_RW_SYNC ) {
+		e->private_bio->bi_rw |= BIO_RW_SYNC;
+	}
 
-// warning LGE "FIXME code missing"
-#if 0 
-/* sorry.
- * to get this patch in a shape where it can be committed,
- * I need to disable the broken conflict detection code for now.
- * will implement the correct as soon as possible...
- * it is done in my head already, "only have to write it down",
- * which will take an other couple of days, probably.
- */
+	/* I'm the receiver, I do hold a net_cnt reference. */
+	if (!mdev->net_conf->two_primaries) {
+		spin_lock_irq(&mdev->req_lock);
+	} else {
+		/* don't get the req_lock yet,
+		 * we may sleep in drbd_wait_peer_seq */
+		const sector_t sector = e->sector;
+		const int size = e->size;
+		const int discard = test_bit(DISCARD_CONCURRENT,&mdev->flags);
+		DEFINE_WAIT(wait);
+		drbd_request_t *i;
+		struct hlist_node *n;
+		struct hlist_head *slot;
+		int first;
 
-	/* This wait_event is here so even when a DATA packet traveling via
-	 * sock overtook an ACK packet traveling on msock, they are still
-	 * processed in the order they have been sent.
-	 * FIXME TODO: Wrap around of seq_num !!!
-	 */
-	if (mdev->net_conf->two_primaries) {
-		packet_seq = be32_to_cpu(p->seq_num);
-		if( wait_event_interruptible(mdev->cstate_wait,
-					     packet_seq <= peer_seq(mdev)+1)) {
-			rv = FALSE;
-			goto out2;
-		}
+		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
+		BUG_ON(mdev->ee_hash == NULL);
+		BUG_ON(mdev->tl_hash == NULL);
 
-		/* FIXME current discard implementation is wrong */
-		spin_lock(&mdev->peer_seq_lock);
-		mdev->peer_seq = max(mdev->peer_seq, packet_seq);
-		/* is update_peer_seq(mdev,packet_seq); */
-		discard = drbd_chk_discard(mdev,e);
-		spin_unlock(&mdev->peer_seq_lock);
+		/* conflict detection and handling:
+		 * 1. wait on the sequence number,
+		 *    in case this data packet overtook ACK packets.
+		 * 2. check our hash tables for conflicting requests.
+		 *    we only need to walk the tl_hash, since an ee can not
+		 *    have a conflict with an other ee: on the submitting
+		 *    node, the corresponding req had already been conflicting,
+		 *    and a conflicting req is never sent.
+		 *
+		 * Note: for two_primaries, we are protocol C,
+		 * so there cannot be any request that is DONE
+		 * but still on the transfer log.
+		 *
+		 * unconditionally add to the ee_hash.
+		 *
+		 * if no conflicting request is found:
+		 *    submit.
+		 *
+		 * if any conflicting request is found that has not yet been acked,
+		 * AND I have the "discard concurrent writes" flag:
+		 *       queue (via done_ee) the DiscardAck; OUT.
+		 *
+		 * if any conflicting request is found:
+		 *       block the receiver, waiting on cstate_wait
+		 *       until no more conflicting requests are there,
+		 *       or we get interrupted (disconnect).
+		 *
+		 *       we do not just write after local io completion of those
+		 *       requests, but only after req is done completely, i.e.
+		 *       we wait for the DiscardAck to arrive!
+		 *
+		 *       then proceed normally, i.e. submit.
+		 */
+		if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
+			goto out_interrupted;
 
-		if(discard) {
-			WARN("Concurrent write! [DISCARD BY LIST] sec=%lu\n",
-			     (unsigned long)sector);
-			rv = TRUE;
-			goto out2;
-		}
+		spin_lock_irq(&mdev->req_lock);
 
-		req = req_have_write(mdev, e);
+		hlist_add_head(&e->colision,ee_hash_slot(mdev,sector));
 
-		if(req) {
-			/* FIXME RACE
-			 * rq_status may be changing while we are looking.
-			 * in rare cases it could even disappear right now.
-			 * e.g. when it has already been ACK'ed, and the local
-			 * storage has been way too slow, and only now
-			 * completes the thing.
-			 */
-			if( req->rq_status & RQ_DRBD_SENT ) {
-				/* Conflicting write, got ACK */
-				/* write afterwards ...*/
-				WARN("Concurrent write! [W AFTERWARDS1] "
-				     "sec=%lu\n",(unsigned long)sector);
-				if( wait_event_interruptible(mdev->cstate_wait,
-					       !req_have_write(mdev,e))) {
-					rv = FALSE;
-					goto out2;
-				}
-			} else {
-				/* Conflicting write, no ACK by now*/
-				if (test_bit(UNIQUE,&mdev->flags)) {
-					WARN("Concurrent write! [DISCARD BY FLAG] sec=%lu\n",
-					     (unsigned long)sector);
-					rv = TRUE;
-					goto out2;
-				} else {
-					/* write afterwards do not exp ACK */
-					WARN("Concurrent write! [W AFTERWARDS2] sec=%lu\n",
-					     (unsigned long)sector);
-					drbd_send_discard(mdev,req);
-					drbd_end_req(req, RQ_DRBD_SENT, 1, sector);
-					dec_ap_pending(mdev);
-					if( wait_event_interruptible(mdev->cstate_wait,
-								     !req_have_write(mdev,e))) {
-						rv = FALSE;
-						goto out2;
+#define OVERLAPS overlaps(i->sector, i->size, sector, size)
+		slot = tl_hash_slot(mdev,sector);
+		first = 1;
+		for(;;) {
+			int have_unacked = 0;
+			int have_conflict = 0;
+			prepare_to_wait(&mdev->cstate_wait,&wait,TASK_INTERRUPTIBLE);
+			hlist_for_each_entry(i, n, slot, colision) {
+				if (OVERLAPS) {
+					if (first) {
+						/* only ALERT on first iteration,
+						 * we may be woken up early... */
+						ALERT("%s[%u] Concurrent local write detected!"
+						      "	new: %llu +%d; pending: %llu +%d\n",
+						      current->comm, current->pid,
+						      (unsigned long long)sector, size,
+						      (unsigned long long)i->sector, i->size);
 					}
+					if (i->rq_state & RQ_NET_PENDING) ++have_unacked;
+					++have_conflict;
 				}
 			}
+#undef OVERLAPS
+			if (!have_conflict) break;
+
+			/* Discard Ack only for the _first_ iteration */
+			if (first && discard && have_unacked) {
+				ALERT("Concurrent write! [DISCARD BY FLAG] sec=%llu\n",
+				     (unsigned long long)sector);
+				inc_unacked(mdev);
+				mdev->epoch_size++;
+				e->w.cb = e_send_discard_ack;
+				list_add_tail(&e->w.list,&mdev->done_ee);
+
+				spin_unlock_irq(&mdev->req_lock);
+
+				/* we could probably send that DiscardAck ourselves,
+				 * but I don't like the receiver using the msock */
+
+				dec_local(mdev);
+				wake_asender(mdev);
+				finish_wait(&mdev->cstate_wait, &wait);
+				return TRUE;
+			}
+
+			if (signal_pending(current)) {
+				hlist_del_init(&e->colision);
+
+				spin_unlock_irq(&mdev->req_lock);
+
+				finish_wait(&mdev->cstate_wait, &wait);
+				goto out_interrupted;
+			}
+
+			spin_unlock_irq(&mdev->req_lock);
+			if (first) {
+				first = 0;
+				ALERT("Concurrent write! [W AFTERWARDS] "
+				     "sec=%llu\n",(unsigned long long)sector);
+			} else if (discard) {
+				/* we had none on the first iteration.
+				 * there must be none now. */
+				D_ASSERT(have_unacked == 0);
+			}
+			schedule();
+			spin_lock_irq(&mdev->req_lock);
 		}
+		finish_wait(&mdev->cstate_wait, &wait);
 	}
-#endif
 
-	dp_flags = be32_to_cpu(p->dp_flags);
-	if ( dp_flags & DP_HARDBARRIER ) {
-		e->private_bio->bi_rw |= BIO_RW_BARRIER;
-	}
-	if ( dp_flags & DP_RW_SYNC ) {
-		e->private_bio->bi_rw |= BIO_RW_SYNC;
-	}
-
 	/* when using TCQ:
 	 * note that, when using tagged command queuing, we may
 	 * have more than one reorder domain "active" at a time.
@@ -1427,8 +1499,6 @@
 	 * otherwise, tag the write with the barrier number, so it
 	 * will trigger the b_ack before its own ack.
 	 */
-
-	spin_lock_irq(&mdev->req_lock);
 	if (mdev->next_barrier_nr) {
 		/* only when using TCQ */
 		if (list_empty(&mdev->active_ee)) {
@@ -1471,7 +1541,7 @@
 	if(mdev->state.pdsk == Diskless) {
 		// In case we have the only disk of the cluster, 
 		drbd_set_out_of_sync(mdev,e->sector,e->size);
-		e->flags |= CALL_AL_COMPLETE_IO;
+		e->flags |= EE_CALL_AL_COMPLETE_IO;
 		drbd_al_begin_io(mdev, e->sector);
 	}
 
@@ -1486,13 +1556,13 @@
 	maybe_kick_lo(mdev);
 	return TRUE;
 
- out2:
+  out_interrupted:
 	/* yes, the epoch_size now is imbalanced.
 	 * but we drop the connection anyways, so we don't have a chance to
 	 * receive a barrier... atomic_inc(&mdev->epoch_size); */
 	dec_local(mdev);
 	drbd_free_ee(mdev,e);
-	return rv;
+	return FALSE;
 }
 
 STATIC int receive_DataRequest(drbd_dev *mdev,Drbd_Header *h)
@@ -2166,14 +2236,14 @@
 	mask.i = be32_to_cpu(p->mask);
 	val.i = be32_to_cpu(p->val);
 
-	if (test_bit(UNIQUE,&mdev->flags)) drbd_state_lock(mdev);
+	if (test_bit(DISCARD_CONCURRENT,&mdev->flags)) drbd_state_lock(mdev);
 
 	mask = convert_state(mask);
 	val = convert_state(val);
 
 	rv = drbd_change_state(mdev,ChgStateVerbose,mask,val);
 
-	if (test_bit(UNIQUE,&mdev->flags)) drbd_state_unlock(mdev);
+	if (test_bit(DISCARD_CONCURRENT,&mdev->flags)) drbd_state_unlock(mdev);
 
 	drbd_send_sr_reply(mdev,rv);
 	drbd_md_sync(mdev);
@@ -2407,6 +2477,8 @@
 	drbd_cmd_handler_f handler;
 	Drbd_Header *header = &mdev->data.rbuf.head;
 
+	/* FIXME test for thread state == RUNNING here,
+	 * in case we missed some signal! */
 	for (;;) {
 		if (!drbd_recv_header(mdev,header))
 			break;
@@ -2974,10 +3046,25 @@
 				return FALSE;
 			}
 
-			_req_mod(req,
-				 h->command == cpu_to_be16(WriteAck)
-				 ? write_acked_by_peer
-				 : recv_acked_by_peer);
+			switch (be16_to_cpu(h->command)) {
+			case WriteAck:
+				D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
+				_req_mod(req,write_acked_by_peer);
+				break;
+			case RecvAck:
+				D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_A);
+				_req_mod(req,recv_acked_by_peer);
+				break;
+			case DiscardAck:
+				D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
+				ALERT("Got DiscardAck packet %llu +%u!"
+				      " DRBD is not a random data generator!\n",
+				      (unsigned long long)req->sector, req->size);
+				_req_mod(req, conflict_discarded_by_peer);
+				break;
+			default:
+				D_ASSERT(0);
+			}
 			spin_unlock_irq(&mdev->req_lock);
 		}
 	}
@@ -3076,31 +3163,6 @@
 	return TRUE;
 }
 
-/* FIXME implementation wrong.
- * For the algorithm to be correct, we need to send and store the
- * sector and size too.
- */
-STATIC int got_Discard(drbd_dev *mdev, Drbd_Header* h)
-{
-	Drbd_Discard_Packet *p = (Drbd_Discard_Packet*)h;
-	struct drbd_discard_note *dn;
-
-	dn = kmalloc(sizeof(struct drbd_discard_note),GFP_KERNEL);
-	if(!dn) {
-		ERR("kmalloc(drbd_discard_note) failed.");
-		return FALSE;
-	}
-
-	dn->block_id = p->block_id;
-	dn->seq_num = be32_to_cpu(p->seq_num);
-
-	spin_lock(&mdev->peer_seq_lock);
-	list_add(&dn->list,&mdev->discard);
-	spin_unlock(&mdev->peer_seq_lock);
-
-	return TRUE;
-}
-
 struct asender_cmd {
 	size_t pkt_size;
 	int (*process)(drbd_dev *mdev, Drbd_Header* h);
@@ -3123,11 +3185,11 @@
 		[PingAck]   ={ sizeof(Drbd_Header),           got_PingAck },
 		[RecvAck]   ={ sizeof(Drbd_BlockAck_Packet),  got_BlockAck },
 		[WriteAck]  ={ sizeof(Drbd_BlockAck_Packet),  got_BlockAck },
+		[DiscardAck]={ sizeof(Drbd_BlockAck_Packet),  got_BlockAck },
 		[NegAck]    ={ sizeof(Drbd_BlockAck_Packet),  got_NegAck },
 		[NegDReply] ={ sizeof(Drbd_BlockAck_Packet),  got_NegDReply },
 		[NegRSDReply]={sizeof(Drbd_BlockAck_Packet),  got_NegRSDReply},
 		[BarrierAck]={ sizeof(Drbd_BarrierAck_Packet),got_BarrierAck },
-		[DiscardNote]={sizeof(Drbd_Discard_Packet),   got_Discard },
 		[StateChgReply]={sizeof(Drbd_RqS_Reply_Packet),got_RqSReply },
 	};
 

Modified: trunk/drbd/drbd_req.c
===================================================================
--- trunk/drbd/drbd_req.c	2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/drbd_req.c	2006-10-19 13:22:52 UTC (rev 2548)
@@ -34,8 +34,8 @@
 #include "drbd_req.h"
 
 //#define VERBOSE_REQUEST_CODE
-#ifdef VERBOSE_REQUEST_CODE
-void print_req_mod(drbd_request_t *req,drbd_req_event_t what)
+#if defined(VERBOSE_REQUEST_CODE) || defined(ENABLE_DYNAMIC_TRACE)
+void _print_req_mod(drbd_request_t *req,drbd_req_event_t what)
 {
 	drbd_dev *mdev = req->mdev;
 	const int rw = (req->master_bio == NULL ||
@@ -46,9 +46,6 @@
 		[created] = "created",
 		[to_be_send] = "to_be_send",
 		[to_be_submitted] = "to_be_submitted",
-		[suspend_because_of_conflict] = "suspend_because_of_conflict",
-		[conflicting_req_done] = "conflicting_req_done",
-		[conflicting_ee_done] = "conflicting_ee_done",
 		[queue_for_net_write] = "queue_for_net_write",
 		[queue_for_net_read] = "queue_for_net_read",
 		[send_canceled] = "send_canceled",
@@ -58,6 +55,7 @@
 		[recv_acked_by_peer] = "recv_acked_by_peer",
 		[write_acked_by_peer] = "write_acked_by_peer",
 		[neg_acked] = "neg_acked",
+		[conflict_discarded_by_peer] = "conflict_discarded_by_peer",
 		[barrier_acked] = "barrier_acked",
 		[data_received] = "data_received",
 		[read_completed_with_error] = "read_completed_with_error",
@@ -68,7 +66,7 @@
 	INFO("_req_mod(%p %c ,%s)\n", req, rw, rq_event_names[what]);
 }
 
-void print_rq_state(drbd_request_t *req, const char *txt)
+void _print_rq_state(drbd_request_t *req, const char *txt)
 {
 	const unsigned long s = req->rq_state;
 	drbd_dev *mdev = req->mdev;
@@ -90,8 +88,16 @@
 	     conns_to_name(mdev->state.conn));
 }
 
+# ifdef ENABLE_DYNAMIC_TRACE
+#  define print_rq_state(R,T) MTRACE(TraceTypeRq,TraceLvlMetrics,_print_rq_state(R,T);)
+#  define print_req_mod(T,W)  MTRACE(TraceTypeRq,TraceLvlMetrics,_print_req_mod(T,W);)
+# else
+#  define print_rq_state(R,T) _print_rq_state(R,T)
+#  define print_req_mod(T,W)  _print_req_mod(T,W)
+# endif
+
 #else
-#define print_rq_state(R,T) 
+#define print_rq_state(R,T)
 #define print_req_mod(T,W)
 #endif
 
@@ -127,7 +133,13 @@
 		 * what we need to do here is just: complete the master_bio.
 		 */
 		int ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
-		rw = bio_data_dir(req->master_bio); 
+		rw = bio_data_dir(req->master_bio);
+
+		/* remove the request from the conflict detection
+		 * respective block_id verification hash */
+		if (!hlist_unhashed(&req->colision)) hlist_del(&req->colision);
+		else D_ASSERT((s & RQ_NET_MASK) == 0);
+
 		if (rw == WRITE) {
 			drbd_request_t *i;
 			struct Tl_epoch_entry *e;
@@ -139,39 +151,51 @@
 			if (req->epoch == mdev->newest_barrier->br_number)
 				set_bit(ISSUE_BARRIER,&mdev->flags);
 
-			/* and maybe "wake" those conflicting requests that
-			 * wait for this request to finish.
-			 * we just have to walk starting from req->next,
-			 * see _req_add_hash_check_colision(); */
-#define OVERLAPS overlaps(req->sector, req->size, i->sector, i->size)
-			n = req->colision.next;
-			/* hlist_del ... done below */
-			hlist_for_each_entry_from(i, n, colision) {
-				if (OVERLAPS)
-					drbd_queue_work(&mdev->data.work,&i->w);
-			}
+			/* we need to do the conflict detection stuff,
+			 * if we have the ee_hash (two_primaries) and
+			 * this has been on the network */
+			if ((s & RQ_NET_DONE) && mdev->ee_hash != NULL) {
+				const sector_t sector = req->sector;
+				const int size = req->size;
 
-			/* and maybe "wake" those conflicting epoch entries
-			 * that wait for this request to finish */
-			/* FIXME looks alot like we could consolidate some code
-			 * and maybe even hash tables? */
+				/* ASSERT:
+				 * there must be no conflicting requests, since
+				 * they must have been failed on the spot */
+#define OVERLAPS overlaps(sector, size, i->sector, i->size)
+				slot = tl_hash_slot(mdev,sector);
+				hlist_for_each_entry(i, n, slot, colision) {
+					if (OVERLAPS) {
+						ALERT("LOGIC BUG: completed: %p %llu +%d; other: %p %llu +%d\n",
+						      req, (unsigned long long)sector, size,
+						      i,   (unsigned long long)i->sector, i->size);
+					}
+				}
+
+				/* maybe "wake" those conflicting epoch entries
+				 * that wait for this request to finish.
+				 *
+				 * currently, there can be only _one_ such ee
+				 * (well, or some more, which would be pending
+				 * DiscardAck not yet sent by the asender...),
+				 * since we block the receiver thread upon the
+				 * first conflict detection, which will wait on
+				 * cstate_wait.  maybe we want to assert that?
+				 *
+				 * anyways, if we found one,
+				 * we just have to do a wake_up.  */
 #undef OVERLAPS
-#define OVERLAPS overlaps(req->sector, req->size, e->sector, e->size)
-			if(mdev->ee_hash_s) {
+#define OVERLAPS overlaps(sector, size, e->sector, e->size)
 				slot = ee_hash_slot(mdev,req->sector);
 				hlist_for_each_entry(e, n, slot, colision) {
-					if (OVERLAPS)
-						drbd_queue_work(&mdev->data.work,&e->w);
+					if (OVERLAPS) {
+						wake_up(&mdev->cstate_wait);
+						break;
+					}
 				}
 			}
 #undef OVERLAPS
 		}
-		/* else: READ, READA: nothing more to do */
 
-		/* remove the request from the conflict detection
-		 * respective block_id verification hash */
-		if(!hlist_unhashed(&req->colision)) hlist_del(&req->colision);
-
 		/* FIXME not yet implemented...
 		 * in case we got "suspended" (on_disconnect: freeze io)
 		 * we may not yet complete the request...
@@ -250,15 +274,14 @@
 }
 
 /*
- * checks whether there was an overlapping request already registered.
- * if so, add the request to the colision hash
- *        _after_ the (first) overlapping request,
- *	  and return 1
- * if no overlap was found, add this request to the front of the chain,
- *        and return 0
+ * checks whether there was an overlapping request
+ * or ee already registered.
  *
- * corresponding hlist_del is in _req_may_be_done()
+ * if so, return 1, in which case this request is completed on the spot,
+ * without ever being submitted or send.
  *
+ * return 0 if it is ok to submit this request.
+ *
  * NOTE:
  * paranoia: assume something above us is broken, and issues different write
  * requests for the same block simultaneously...
@@ -273,7 +296,7 @@
  * second hlist_for_each_entry becomes a noop. This is even simpler than to
  * grab a reference on the net_conf, and check for the two_primaries flag...
  */
-STATIC int _req_add_hash_check_colision(drbd_request_t *req)
+STATIC int _req_conflicts(drbd_request_t *req)
 {
 	drbd_dev *mdev = req->mdev;
 	const sector_t sector = req->sector;
@@ -285,6 +308,17 @@
 
 	MUST_HOLD(&mdev->req_lock);
 	D_ASSERT(hlist_unhashed(&req->colision));
+
+	/* FIXME should this inc_net/dec_net
+	 * rather be done in drbd_make_request_common? */
+	if (!inc_net(mdev))
+		return 0;
+
+	/* BUG_ON */
+	ERR_IF (mdev->tl_hash_s == 0)
+		goto out_no_conflict;
+	BUG_ON(mdev->tl_hash == NULL);
+
 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
 	slot = tl_hash_slot(mdev,sector);
 	hlist_for_each_entry(i, n, slot, colision) {
@@ -294,16 +328,13 @@
 			      current->comm, current->pid,
 			      (unsigned long long)sector, size,
 			      (unsigned long long)i->sector, i->size);
-			hlist_add_after(n,&req->colision);
-			return 1;
+			goto out_conflict;
 		}
 	}
-	/* no overlapping request with local origin found,
-	 * register in front */
-	hlist_add_head(&req->colision,slot);
 
 	if(mdev->ee_hash_s) {
 		/* now, check for overlapping requests with remote origin */
+		BUG_ON(mdev->ee_hash == NULL);
 #undef OVERLAPS
 #define OVERLAPS overlaps(e->sector, e->size, sector, size)
 		slot = ee_hash_slot(mdev,sector);
@@ -314,15 +345,21 @@
 				      current->comm, current->pid,
 				      (unsigned long long)sector, size,
 				      (unsigned long long)e->sector, e->size);
-				return 1;
+				goto out_conflict;
 			}
 		}
 	}
 #undef OVERLAPS
 
+  out_no_conflict:
 	/* this is like it should be, and what we expected.
-	 * out users do behave after all... */
+	 * our users do behave after all... */
+	dec_net(mdev);
 	return 0;
+
+  out_conflict:
+	dec_net(mdev);
+	return 1;
 }
 
 /* obviously this could be coded as many single functions
@@ -372,17 +409,6 @@
 		req->rq_state |= RQ_LOCAL_PENDING;
 		break;
 
-#if 0
-		/* done inline below */
-	case suspend_because_of_conflict:
-		/* assert something? */
-		/* reached via drbd_make_request_common */
-		/* update state flag? why? which one? */
-		req->w.cb = w_req_cancel_conflict;
-		/* no queue here, see below! */
-		break;
-#endif
-
 	/* FIXME these *_completed_* are basically the same.
 	 * can probably be merged with some if (what == xy) */
 
@@ -474,44 +500,17 @@
 		/* assert something? */
 		/* from drbd_make_request_common only */
 
+		hlist_add_head(&req->colision,tl_hash_slot(mdev,req->sector));
+		/* corresponding hlist_del is in _req_may_be_done() */
+
 		/* NOTE
 		 * In case the req ended up on the transfer log before being
 		 * queued on the worker, it could lead to this request being
 		 * missed during cleanup after connection loss.
 		 * So we have to do both operations here,
 		 * within the same lock that protects the transfer log.
-		 */
-
-		/* register this request on the colison detection hash
-		 * tables. if we have a conflict, just leave here.
-		 * the request will be "queued" for faked "completion"
-		 * once the conflicting request is done.
-		 */
-		if (_req_add_hash_check_colision(req)) {
-			/* this is a conflicting request.
-			 * even though it may have been only _partially_
-			 * overlapping with one of the currently pending requests,
-			 * without even submitting or sending it,
-			 * we will pretend that it was successfully served
-			 * once the pending conflicting request is done.
-			 */
-			/* _req_mod(req, suspend_because_of_conflict); */
-			/* this callback is just for ASSERT purposes */
-			req->w.cb = w_req_cancel_conflict;
-
-			/* we don't add this to any epoch (barrier) object.
-			 * assign the "invalid" barrier_number 0.
-			 * it should be 0 anyways, still,
-			 * but being explicit won't harm. */
-			req->epoch = 0;
-
-			/*
-			 * EARLY break here!
-			 */
-			break;
-		}
-
-		/* _req_add_to_epoch(req); this has to be after the
+		 *
+		 * _req_add_to_epoch(req); this has to be after the
 		 * _maybe_start_new_epoch(req); which happened in
 		 * drbd_make_request_common, because we now may set the bit
 		 * again ourselves to close the current epoch.
@@ -535,31 +534,6 @@
 		drbd_queue_work(&mdev->data.work, &req->w);
 		break;
 
-	case conflicting_req_done:
-	case conflicting_ee_done:
-		/* reached via bio_endio of the
-		 * conflicting request or epoch entry.
-		 * we now just "fake" completion of this request.
-		 * THINK: I'm going to _FAIL_ this request.
-		 */
-		D_ASSERT(req->w.cb == w_req_cancel_conflict);
-		D_ASSERT(req->epoch == 0);
-		{
-			const unsigned long s = req->rq_state;
-			if (s & RQ_LOCAL_MASK) {
-				D_ASSERT(s & RQ_LOCAL_PENDING);
-				bio_put(req->private_bio);
-				req->private_bio = NULL;
-				dec_local(mdev);
-			}
-			D_ASSERT((s & RQ_NET_MASK) == RQ_NET_PENDING);
-			dec_ap_pending(mdev);
-		}
-		/* no _OK ... this is going to be an io-error */
-		req->rq_state = RQ_LOCAL_COMPLETED|RQ_NET_DONE;
-		_req_may_be_done(req);
-		break;
-
 	/* FIXME
 	 * to implement freeze-io,
 	 * we may not finish the request just yet.
@@ -619,6 +593,8 @@
 			_req_may_be_done(req);
 		break;
 
+	case conflict_discarded_by_peer:
+		/* interesstingly, this is the same thing! */
 	case write_acked_by_peer:
 		/* assert something? */
 		/* protocol C; successfully written on peer */
@@ -936,61 +912,51 @@
 	if (remote) _req_mod(req, to_be_send);
 	if (local)  _req_mod(req, to_be_submitted);
 
-	/* NOTE remote first: to get he concurrent write detection right, we
-	 * must register the request before start of local IO.  */
+	/* check this request on the colison detection hash tables.
+	 * if we have a conflict, just complete it here.
+	 * THINK do we want to check reads, too? (I don't think so...) */
+	if (rw == WRITE && _req_conflicts(req)) {
+		/* this is a conflicting request.
+		 * even though it may have been only _partially_
+		 * overlapping with one of the currently pending requests,
+		 * without even submitting or sending it, we will
+		 * pretend that it was successfully served right now.
+		 */
+		if (local) {
+			bio_put(req->private_bio);
+			req->private_bio = NULL;
+			drbd_al_complete_io(mdev, req->sector);
+			dec_local(mdev);
+			local = 0;
+		}
+		if (remote) dec_ap_pending(mdev);
+		dump_bio(mdev,req->master_bio,1);
+		/* THINK: do we want to fail it (-EIO), or pretend success? */
+		bio_endio(req->master_bio, req->master_bio->bi_size, 0);
+		req->master_bio = NULL;
+		dec_ap_bio(mdev);
+		drbd_req_free(req);
+		local = remote = 0;
+	}
+
+	/* NOTE remote first: to get the concurrent write detection right,
+	 * we must register the request before start of local IO.  */
 	if (remote) {
 		/* either WRITE and Connected,
 		 * or READ, and no local disk,
 		 * or READ, but not in sync.
 		 */
-		_req_mod(req, rw == WRITE
-				? queue_for_net_write
-				: queue_for_net_read);
+		if (rw == WRITE) _req_mod(req,queue_for_net_write);
+		else		 _req_mod(req,queue_for_net_read);
 	}
+	spin_unlock_irq(&mdev->req_lock);
+	if (b) kfree(b); /* if someone else has beaten us to it... */
 
-	/* still holding the req_lock.
-	 * not strictly neccessary, but for the statistic counters... */
-
-#if 0
 	if (local) {
-		/* FIXME I think this branch can go completely.  */
-		if (rw == WRITE) {
-			/* we defer the drbd_set_out_of_sync to the bio_endio
-			 * function. we only need to make sure the bit is set
-			 * before we do drbd_al_complete_io. */
-			 if (!remote) drbd_set_out_of_sync(mdev,sector,size);
-		} else {
-			D_ASSERT(!remote); /* we should not read from both */
-		}
-		/* FIXME
-		 * Should we add even local reads to some list, so
-		 * they can be grabbed and freed somewhen?
-		 *
-		 * They already have a reference count (sort of...)
-		 * on mdev via inc_local()
-		 */
-
-		/* XXX we probably should not update these here but in bio_endio.
-		 * especially the read_cnt could go wrong for all the READA
-		 * that may just be failed because of "overload"... */
-		if(rw == WRITE) mdev->writ_cnt += size>>9;
-		else            mdev->read_cnt += size>>9;
-
 		/* FIXME what ref count do we have to ensure the backing_bdev
 		 * was not detached below us? */
-		req->private_bio->bi_rw = rw; /* redundant */
 		req->private_bio->bi_bdev = mdev->bc->backing_bdev;
-	}
-#endif
 
-	spin_unlock_irq(&mdev->req_lock);
-	if (b) kfree(b); /* if someone else has beaten us to it... */
-
-	/* extra if branch so I don't need to write spin_unlock_irq twice */
-
-	if (local) {
-		req->private_bio->bi_bdev = mdev->bc->backing_bdev;
-
 		if (FAULT_ACTIVE(rw==WRITE? DRBD_FAULT_DT_WR : DRBD_FAULT_DT_RD))
 			bio_endio(req->private_bio, req->private_bio->bi_size, -EIO);
 		else
@@ -1004,6 +970,7 @@
 	return 0;
 
   fail_and_free_req:
+	if (b) kfree(b);
 	bio_endio(bio, bio->bi_size, -EIO);
 	drbd_req_free(req);
 	return 0;
@@ -1041,7 +1008,9 @@
 	 */
 	if ( mdev->state.disk <= Inconsistent &&
 	     mdev->state.conn < Connected) {
-		ERR("Sorry, I have no access to good data anymore.\n");
+		if (DRBD_ratelimit(5*HZ,5)) {
+			ERR("Sorry, I have no access to good data anymore.\n");
+		}
 		/*
 		 * FIXME suspend, loop waiting on cstate wait?
 		 */

Modified: trunk/drbd/drbd_req.h
===================================================================
--- trunk/drbd/drbd_req.h	2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/drbd_req.h	2006-10-19 13:22:52 UTC (rev 2548)
@@ -88,10 +88,6 @@
 	to_be_send,
 	to_be_submitted,
 
-	suspend_because_of_conflict,
-	conflicting_req_done,
-	conflicting_ee_done,
-
 	/* XXX yes, now I am inconsistent...
 	 * these two are not "events" but "actions"
 	 * oh, well... */
@@ -104,6 +100,7 @@
 	connection_lost_while_pending,
 	recv_acked_by_peer,
 	write_acked_by_peer,
+	conflict_discarded_by_peer,
 	neg_acked,
 	barrier_acked, /* in protocol A and B */
 	data_received, /* (remote read) */

Modified: trunk/drbd/drbd_worker.c
===================================================================
--- trunk/drbd/drbd_worker.c	2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/drbd_worker.c	2006-10-19 13:22:52 UTC (rev 2548)
@@ -141,7 +141,7 @@
 
 	if (do_wake) wake_up(&mdev->ee_wait);
 
-	if(e->flags & CALL_AL_COMPLETE_IO) drbd_al_complete_io(mdev,e->sector);
+	if(e->flags & EE_CALL_AL_COMPLETE_IO) drbd_al_complete_io(mdev,e->sector);
 
 	wake_asender(mdev);
 	dec_local(mdev);
@@ -233,14 +233,6 @@
 	return 1; // Simply ignore this!
 }
 
-/* for debug assertion only */
-int w_req_cancel_conflict(drbd_dev *mdev, struct drbd_work *w, int cancel)
-{
-	ERR("w_req_cancel_conflict: this callback should never be called!\n");
-	if (cancel) return 1; /* does it matter? */
-	return 0;
-}
-
 void resync_timer_fn(unsigned long data)
 {
 	unsigned long flags;

Modified: trunk/drbd/linux/drbd.h
===================================================================
--- trunk/drbd/linux/drbd.h	2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/linux/drbd.h	2006-10-19 13:22:52 UTC (rev 2548)
@@ -106,7 +106,10 @@
 	FailedToClaimMyself,
 	UnknownNetLinkPacket,
 	HaveNoDiskConfig,
-	AfterLastRetCode,
+	ProtocolCRequired,
+
+	/* insert new ones above this line */
+	AfterLastRetCode
 };
 
 #define DRBD_PROT_A   1

Modified: trunk/drbd/linux/drbd_config.h
===================================================================
--- trunk/drbd/linux/drbd_config.h	2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/linux/drbd_config.h	2006-10-19 13:22:52 UTC (rev 2548)
@@ -23,8 +23,8 @@
 extern const char * drbd_buildtag(void);
 
 #define REL_VERSION "8.0pre5"
-#define API_VERSION 84
-#define PRO_VERSION 83
+#define API_VERSION 85
+#define PRO_VERSION 84
 
 // undef if you need the workaround in drbd_receiver
 #define HAVE_UML_TO_VIRT 1

Modified: trunk/drbd/lru_cache.c
===================================================================
--- trunk/drbd/lru_cache.c	2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/lru_cache.c	2006-10-19 13:22:52 UTC (rev 2548)
@@ -284,6 +284,7 @@
 
 	PARANOIA_ENTRY();
 	BUG_ON(e->refcnt == 0);
+	BUG_ON(e == lc->changing_element);
 	if ( --e->refcnt == 0) {
 		list_move(&e->list,&lc->lru); // move it to the front of LRU.
 		lc->used--;

Modified: trunk/user/drbdsetup.c
===================================================================
--- trunk/user/drbdsetup.c	2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/user/drbdsetup.c	2006-10-19 13:22:52 UTC (rev 2548)
@@ -369,7 +369,16 @@
 	EM(DiskLowerThanOutdated) = "DiskLowerThanOutdated",
 	EM(FailedToClaimMyself) = "FailedToClaimMyself",
 	EM(HaveNoDiskConfig) = "HaveNoDiskConfig",
+	EM(ProtocolCRequired) = "ProtocolCRequired"
 };
+#define MAX_ERROR (sizeof(error_messages)/sizeof(*error_messages))
+const char * error_to_string(int err_no)
+{
+	const unsigned int idx = err_no - RetCodeBase;
+	if (idx >= MAX_ERROR) return "Unknown... maybe API_VERSION mismatch?";
+	return error_messages[idx];
+}
+#undef MAX_ERROR
 
 char* cmdname = 0;
 
@@ -705,7 +714,7 @@
 	} else {
 		if(err_no > RetCodeBase ) {
 			fprintf(stderr,"Failure: (%d) %s\n",err_no,
-				error_messages[err_no-RetCodeBase]);
+					error_to_string(err_no));
 			rv = 10;
 		} else if (err_no == SS_UnknownError) {
 			fprintf(stderr,"State change failed: (%d)"
@@ -1606,9 +1615,10 @@
 
 	/* == '-' catches -h, --help, and similar */
 	if (argc > 1 && (!strcmp(argv[1],"help") || argv[1][0] == '-')) {
-		if(argc == 3) {
+		if(argc >= 3) {
 			cmd=find_cmd_by_name(argv[2]);
 			if(cmd) print_command_usage(cmd-commands,NULL,0);
+			else print_usage("unknown command");
 			exit(0);
 		}
 	}



More information about the drbd-cvs mailing list