[DRBD-cvs] r1740 - in trunk: . drbd

svn at svn.drbd.org svn at svn.drbd.org
Fri Jan 28 16:02:26 CET 2005


Author: phil
Date: 2005-01-28 16:02:23 +0100 (Fri, 28 Jan 2005)
New Revision: 1740

Modified:
   trunk/ROADMAP
   trunk/drbd/drbd_int.h
   trunk/drbd/drbd_main.c
   trunk/drbd/drbd_receiver.c
Log:
Worked on item 9 of the Roadmap. Most of the coding done, although
a hash table over the sector numbers of the ee is still missing.


Modified: trunk/ROADMAP
===================================================================
--- trunk/ROADMAP	2005-01-28 11:01:30 UTC (rev 1739)
+++ trunk/ROADMAP	2005-01-28 15:02:23 UTC (rev 1740)
@@ -278,7 +278,7 @@
   data packet [drbd_receiver].
 
   *  If the sequence number of the data packet is higher than
-     last_seq+1 sleep until last_seq-1 == seq_num(data packet)
+     last_seq+1 sleep until last_seq+1 == seq_num(data packet)
 
   1. If the packet's sequence number is on the discard list,
      simply drop it.
@@ -313,7 +313,8 @@
         to find IO operations starting in the same 4k block of
         data quickly. -> With two lookups the hash table we can
 	find any concurrent access.
-  10% DONE
+  70% DONE ; Implement real overlap check, Implement discard info
+	     Packets. Look for example case 5.
 
 10 Change Sync-groups to sync-after
   

Modified: trunk/drbd/drbd_int.h
===================================================================
--- trunk/drbd/drbd_int.h	2005-01-28 11:01:30 UTC (rev 1739)
+++ trunk/drbd/drbd_int.h	2005-01-28 15:02:23 UTC (rev 1740)
@@ -240,6 +240,7 @@
 #define RQ_DRBD_LOCAL     0x0020
 #define RQ_DRBD_DONE      0x0030
 #define RQ_DRBD_IN_TL     0x0040
+#define RQ_DRBD_RECVW     0x0080
 
 /* drbd_meta-data.c (still in drbd_main.c) */
 #define DRBD_MD_MAGIC (DRBD_MAGIC+4) // 4th incarnation of the disk layout.
@@ -389,6 +390,8 @@
 	Drbd_Header head;
 	u64         sector;    // 64 bits sector number
 	u64         block_id;  // Used in protocol B&C for the address of the req.
+	u32         seq_num;
+	u32         pad;
 } __attribute((packed)) Drbd_Data_Packet;
 
 /*
@@ -401,7 +404,7 @@
 	u64         sector;
 	u64         block_id;
 	u32         blksize;
-	u32         pad;	//make sure packet is a multiple of 8 Byte
+	u32         seq_num;
 } __attribute((packed)) Drbd_BlockAck_Packet;
 
 typedef struct {
@@ -656,6 +659,12 @@
 	Drbd_Polymorph_Packet rbuf;  // send/receive buffers off the stack
 };
 
+struct drbd_discard_note {
+	struct list_head list;
+	u64 block_id;
+	int seq_num;
+};
+
 struct Drbd_Conf {
 #ifdef PARANOIA
 	long magic;
@@ -748,6 +757,10 @@
 	int al_tr_cycle;
 	int al_tr_pos;     // position of the next transaction in the journal
 	struct crypto_tfm* cram_hmac_tfm;
+	atomic_t packet_seq;
+	int peer_seq;
+	spinlock_t peer_seq_lock;
+	struct list_head discard;
 };
 
 
@@ -773,6 +786,9 @@
 extern void tl_clear(drbd_dev *mdev);
 extern int tl_dependence(drbd_dev *mdev, drbd_request_t * item);
 extern int tl_verify(drbd_dev *mdev, drbd_request_t * item, sector_t sector);
+#define TLHW_FLAG_SENT   0x10000000
+#define TLHW_FLAG_RECVW  0x20000000
+extern int tl_have_write(drbd_dev *mdev, sector_t sector, int size_n_flags);
 extern void drbd_free_sock(drbd_dev *mdev);
 extern int drbd_send(drbd_dev *mdev, struct socket *sock,
 		     void* buf, size_t size, unsigned msg_flags);
@@ -1362,6 +1378,23 @@
 	D_ASSERT(atomic_read(&mdev->ap_bio_cnt)>=0);
 }
 
+static inline void update_peer_seq(drbd_dev* mdev, int new_seq)
+{
+	spin_lock(&mdev->peer_seq_lock);
+	mdev->peer_seq = max(mdev->peer_seq, new_seq);
+	wake_up(&mdev->cstate_wait);
+	spin_unlock(&mdev->peer_seq_lock);
+}
+
+static inline int peer_seq(drbd_dev* mdev)
+{
+	int seq;
+	spin_lock(&mdev->peer_seq_lock);
+	seq = mdev->peer_seq;
+	spin_unlock(&mdev->peer_seq_lock);
+	return seq;
+}
+
 #ifdef DUMP_EACH_PACKET
 /*
  * enable to dump information about every packet exchange.

Modified: trunk/drbd/drbd_main.c
===================================================================
--- trunk/drbd/drbd_main.c	2005-01-28 11:01:30 UTC (rev 1739)
+++ trunk/drbd/drbd_main.c	2005-01-28 15:02:23 UTC (rev 1740)
@@ -283,6 +283,41 @@
 	return rv;
 }
 
+/* Return values:
+ *
+ * 0 ... no conflicting write
+ * 1 ... a conflicting write, have not got ack by now.
+ * 2 ... a conflicting write, have got also got ack.
+ */
+int tl_have_write(drbd_dev *mdev, sector_t sector, int size_n_flags)
+{
+	// PRE TODO: Real overlap check... using size etc...
+	struct hlist_head *slot = mdev->tl_hash + tl_hash_fn(mdev,sector);
+	struct hlist_node *n;
+	drbd_request_t * i;
+	int rv=0;
+
+	spin_lock_irq(&mdev->tl_lock);
+
+	hlist_for_each_entry(i, n, slot, colision) {
+		if (drbd_req_get_sector(i) == sector) {
+			rv=1;
+			if( i->rq_status & RQ_DRBD_SENT ) rv++;
+			if(size_n_flags & TLHW_FLAG_SENT) {
+				i->rq_status |= RQ_DRBD_SENT;
+			}
+			if(size_n_flags & TLHW_FLAG_RECVW) {
+				i->rq_status |= RQ_DRBD_RECVW;
+			}
+			break;
+		}
+	}
+
+	spin_unlock_irq(&mdev->tl_lock);
+
+	return rv;
+}
+
 /* tl_dependence reports if this sector was present in the current
    epoch.
    As side effect it clears also the pointer to the request if it
@@ -302,6 +337,8 @@
 	r = ( item->barrier == mdev->newest_barrier );
 	list_del(&item->w.list);
 
+	if( item->rq_status & RQ_DRBD_RECVW ) wake_up(&mdev->cstate_wait);
+
 	spin_unlock_irqrestore(&mdev->tl_lock,flags);
 	return r;
 }
@@ -1006,6 +1043,7 @@
 	p.sector   = cpu_to_be64(drbd_ee_get_sector(e));
 	p.block_id = e->block_id;
 	p.blksize  = cpu_to_be32(drbd_ee_get_size(e));
+	p.seq_num  = cpu_to_be32(atomic_add_return(1,&mdev->packet_seq));
 
 	if (!mdev->meta.socket || mdev->state.s.conn < Connected) return FALSE;
 	ok=drbd_send_cmd(mdev,mdev->meta.socket,cmd,(Drbd_Header*)&p,sizeof(p));
@@ -1205,6 +1243,7 @@
 
 	p.sector   = cpu_to_be64(drbd_req_get_sector(req));
 	p.block_id = (unsigned long)req;
+	p.seq_num  = cpu_to_be32(atomic_add_return(1,&mdev->packet_seq));
 
 	/* About tl_add():
 	1. This must be within the semaphor,
@@ -1280,6 +1319,7 @@
 
 	p.sector   = cpu_to_be64(drbd_ee_get_sector(e));
 	p.block_id = e->block_id;
+	p.seq_num  = cpu_to_be32(atomic_add_return(1,&mdev->packet_seq));
 
 	/* Only called by our kernel thread.
 	 * This one may be interupted by DRBD_SIG and/or DRBD_SIGKILL
@@ -1526,6 +1566,7 @@
 	spin_lock_init(&mdev->req_lock);
 	spin_lock_init(&mdev->pr_lock);
 	spin_lock_init(&mdev->send_task_lock);
+	spin_lock_init(&mdev->peer_seq_lock);
 
 	INIT_LIST_HEAD(&mdev->free_ee);
 	INIT_LIST_HEAD(&mdev->active_ee);
@@ -1541,6 +1582,7 @@
 	INIT_LIST_HEAD(&mdev->resync_work.list);
 	INIT_LIST_HEAD(&mdev->barrier_work.list);
 	INIT_LIST_HEAD(&mdev->unplug_work.list);
+	INIT_LIST_HEAD(&mdev->discard);
 	mdev->resync_work.cb  = w_resync_inactive;
 	mdev->barrier_work.cb = w_try_send_barrier;
 	mdev->unplug_work.cb  = w_send_write_hint;

Modified: trunk/drbd/drbd_receiver.c
===================================================================
--- trunk/drbd/drbd_receiver.c	2005-01-28 11:01:30 UTC (rev 1739)
+++ trunk/drbd/drbd_receiver.c	2005-01-28 15:02:23 UTC (rev 1740)
@@ -1011,13 +1011,32 @@
 	return ok;
 }
 
+STATIC int drbd_chk_discard(drbd_dev *mdev,u64 block_id,int seq_num)
+{
+	struct drbd_discard_note *dn;
+	struct list_head *le;
+
+	MUST_HOLD(&mdev->peer_seq_lock);
+
+	list_for_each(le,&mdev->discard) {
+		dn = list_entry(le, struct drbd_discard_note, list);
+		if( dn->seq_num == seq_num ) {
+			D_ASSERT( dn->block_id == block_id );
+			list_del(le);
+			kfree(dn);
+			return 1;
+		} 
+	}
+	return 0;
+}
+
 // mirrored write
 STATIC int receive_Data(drbd_dev *mdev,Drbd_Header* h)
 {
 	sector_t sector;
 	struct Tl_epoch_entry *e;
 	Drbd_Data_Packet *p = (Drbd_Data_Packet*)h;
-	int header_size,data_size;
+	int header_size,data_size, packet_seq,discard;
 
 	// FIXME merge this code dups into some helper function
 	header_size = sizeof(*p) - sizeof(*h);
@@ -1033,10 +1052,72 @@
 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
 		return FALSE;
 
+	/* This wait_event is here to make sure that never ever an
+	   DATA packet traveling via sock can overtake an ACK packet
+	   traveling on msock 
+	   PRE TODO: Wrap around of seq_num !!! 
+	*/
+	packet_seq = be32_to_cpu(p->seq_num);
+	if( wait_event_interruptible(mdev->cstate_wait, 
+				     packet_seq <= peer_seq(mdev)+1) )
+		return FALSE;
+
+	spin_lock(&mdev->peer_seq_lock); 
+	mdev->peer_seq = max(mdev->peer_seq, packet_seq);
+ 	/* is update_peer_seq(mdev,packet_seq); */
+	discard = drbd_chk_discard(mdev,p->block_id,packet_seq);
+	spin_unlock(&mdev->peer_seq_lock);
+
 	sector = be64_to_cpu(p->sector);
-
 	e = read_in_block(mdev,data_size);
 	if (!e) return FALSE;
+
+	if(discard) {
+		spin_lock_irq(&mdev->ee_lock);
+		drbd_put_ee(mdev,e);
+		spin_unlock_irq(&mdev->ee_lock);
+		WARN("Concurrent write! [DISCARD BY LIST] sec=%lu\n",
+		     (unsigned long)sector);
+		return TRUE;
+	}
+
+	switch( tl_have_write(mdev, sector, data_size) ) {
+	case 2: /* Conflicting write, got ACK */
+		/* write afterwards ...*/
+		WARN("Concurrent write! [W AFTERWARDS] sec=%lu\n",
+		     (unsigned long)sector);
+		if( wait_event_interruptible(mdev->cstate_wait,
+		     !tl_have_write(mdev,sector,data_size|TLHW_FLAG_RECVW))) {
+			spin_lock_irq(&mdev->ee_lock);
+			drbd_put_ee(mdev,e);
+			spin_unlock_irq(&mdev->ee_lock);
+			return FALSE;
+		}
+	case 1: /* Conflicting write, no ACK by now*/
+		if (test_bit(UNIQUE,&mdev->flags)) {
+			spin_lock_irq(&mdev->ee_lock);
+			drbd_put_ee(mdev,e);
+			spin_unlock_irq(&mdev->ee_lock);
+			WARN("Concurrent write! [DISCARD BY FLAG] sec=%lu\n",
+			     (unsigned long)sector);
+			return TRUE;
+		} else {
+			/* write afterwards, do not expect ACK */
+			WARN("Concurrent write! [W AFTERWARDS] sec=%lu\n",
+			     (unsigned long)sector);
+			if( wait_event_interruptible(mdev->cstate_wait,
+			      !tl_have_write(mdev,sector,data_size|
+				            TLHW_FLAG_RECVW|TLHW_FLAG_SENT))) {
+				spin_lock_irq(&mdev->ee_lock);
+				drbd_put_ee(mdev,e);
+				spin_unlock_irq(&mdev->ee_lock);
+				return FALSE;
+			}
+		}
+		/* case 0:   no conflicting write. */
+		/* write it to disk now... */
+	}
+
 	e->block_id = p->block_id; // no meaning on this side, e* on partner
 
 	if(!inc_local(mdev)) {
@@ -2069,6 +2150,8 @@
 	sector_t sector = be64_to_cpu(p->sector);
 	int blksize = be32_to_cpu(p->blksize);
 
+	update_peer_seq(mdev,be32_to_cpu(p->seq_num));
+
 	smp_rmb();
 	if(likely(mdev->state.s.pdsk >= Inconsistent )) {
 		// test_bit(PARTNER_DISKLESS,&mdev->flags)
@@ -2111,6 +2194,8 @@
 {
 	Drbd_BlockAck_Packet *p = (Drbd_BlockAck_Packet*)h;
 
+	update_peer_seq(mdev,be32_to_cpu(p->seq_num));
+
 	/* do nothing here.
 	 * we expect to get a "report param" on the data socket soon,
 	 * and will do the cleanup then and there.



More information about the drbd-cvs mailing list