[DRBD-cvs] r1740 - in trunk: . drbd
svn at svn.drbd.org
svn at svn.drbd.org
Fri Jan 28 16:02:26 CET 2005
Author: phil
Date: 2005-01-28 16:02:23 +0100 (Fri, 28 Jan 2005)
New Revision: 1740
Modified:
trunk/ROADMAP
trunk/drbd/drbd_int.h
trunk/drbd/drbd_main.c
trunk/drbd/drbd_receiver.c
Log:
Worked on item 9 of the Roadmap. Most of the coding done, although
a hash table over the sector numbers of the ee is still missing.
Modified: trunk/ROADMAP
===================================================================
--- trunk/ROADMAP 2005-01-28 11:01:30 UTC (rev 1739)
+++ trunk/ROADMAP 2005-01-28 15:02:23 UTC (rev 1740)
@@ -278,7 +278,7 @@
data packet [drbd_receiver].
* If the sequence number of the data packet is higher than
- last_seq+1 sleep until last_seq-1 == seq_num(data packet)
+ last_seq+1 sleep until last_seq+1 == seq_num(data packet)
1. If the packet's sequence number is on the discard list,
simply drop it.
@@ -313,7 +313,8 @@
to find IO operations starting in the same 4k block of
data quickly. -> With two lookups the hash table we can
find any concurrent access.
- 10% DONE
+ 70% DONE ; Implement real overlap check, Implement discard info
+ Packets. Look for example case 5.
10 Change Sync-groups to sync-after
Modified: trunk/drbd/drbd_int.h
===================================================================
--- trunk/drbd/drbd_int.h 2005-01-28 11:01:30 UTC (rev 1739)
+++ trunk/drbd/drbd_int.h 2005-01-28 15:02:23 UTC (rev 1740)
@@ -240,6 +240,7 @@
#define RQ_DRBD_LOCAL 0x0020
#define RQ_DRBD_DONE 0x0030
#define RQ_DRBD_IN_TL 0x0040
+#define RQ_DRBD_RECVW 0x0080
/* drbd_meta-data.c (still in drbd_main.c) */
#define DRBD_MD_MAGIC (DRBD_MAGIC+4) // 4th incarnation of the disk layout.
@@ -389,6 +390,8 @@
Drbd_Header head;
u64 sector; // 64 bits sector number
u64 block_id; // Used in protocol B&C for the address of the req.
+ u32 seq_num;
+ u32 pad;
} __attribute((packed)) Drbd_Data_Packet;
/*
@@ -401,7 +404,7 @@
u64 sector;
u64 block_id;
u32 blksize;
- u32 pad; //make sure packet is a multiple of 8 Byte
+ u32 seq_num;
} __attribute((packed)) Drbd_BlockAck_Packet;
typedef struct {
@@ -656,6 +659,12 @@
Drbd_Polymorph_Packet rbuf; // send/receive buffers off the stack
};
+struct drbd_discard_note {
+ struct list_head list;
+ u64 block_id;
+ int seq_num;
+};
+
struct Drbd_Conf {
#ifdef PARANOIA
long magic;
@@ -748,6 +757,10 @@
int al_tr_cycle;
int al_tr_pos; // position of the next transaction in the journal
struct crypto_tfm* cram_hmac_tfm;
+ atomic_t packet_seq;
+ int peer_seq;
+ spinlock_t peer_seq_lock;
+ struct list_head discard;
};
@@ -773,6 +786,9 @@
extern void tl_clear(drbd_dev *mdev);
extern int tl_dependence(drbd_dev *mdev, drbd_request_t * item);
extern int tl_verify(drbd_dev *mdev, drbd_request_t * item, sector_t sector);
+#define TLHW_FLAG_SENT 0x10000000
+#define TLHW_FLAG_RECVW 0x20000000
+extern int tl_have_write(drbd_dev *mdev, sector_t sector, int size_n_flags);
extern void drbd_free_sock(drbd_dev *mdev);
extern int drbd_send(drbd_dev *mdev, struct socket *sock,
void* buf, size_t size, unsigned msg_flags);
@@ -1362,6 +1378,23 @@
D_ASSERT(atomic_read(&mdev->ap_bio_cnt)>=0);
}
+static inline void update_peer_seq(drbd_dev* mdev, int new_seq)
+{
+ spin_lock(&mdev->peer_seq_lock);
+ mdev->peer_seq = max(mdev->peer_seq, new_seq);
+ wake_up(&mdev->cstate_wait);
+ spin_unlock(&mdev->peer_seq_lock);
+}
+
+static inline int peer_seq(drbd_dev* mdev)
+{
+ int seq;
+ spin_lock(&mdev->peer_seq_lock);
+ seq = mdev->peer_seq;
+ spin_unlock(&mdev->peer_seq_lock);
+ return seq;
+}
+
#ifdef DUMP_EACH_PACKET
/*
* enable to dump information about every packet exchange.
Modified: trunk/drbd/drbd_main.c
===================================================================
--- trunk/drbd/drbd_main.c 2005-01-28 11:01:30 UTC (rev 1739)
+++ trunk/drbd/drbd_main.c 2005-01-28 15:02:23 UTC (rev 1740)
@@ -283,6 +283,41 @@
return rv;
}
+/* Return values:
+ *
+ * 0 ... no conflicting write
+ * 1 ... a conflicting write, have not got ack by now.
+ * 2 ... a conflicting write, have got also got ack.
+ */
+int tl_have_write(drbd_dev *mdev, sector_t sector, int size_n_flags)
+{
+ // PRE TODO: Real overlap check... using size etc...
+ struct hlist_head *slot = mdev->tl_hash + tl_hash_fn(mdev,sector);
+ struct hlist_node *n;
+ drbd_request_t * i;
+ int rv=0;
+
+ spin_lock_irq(&mdev->tl_lock);
+
+ hlist_for_each_entry(i, n, slot, colision) {
+ if (drbd_req_get_sector(i) == sector) {
+ rv=1;
+ if( i->rq_status & RQ_DRBD_SENT ) rv++;
+ if(size_n_flags & TLHW_FLAG_SENT) {
+ i->rq_status |= RQ_DRBD_SENT;
+ }
+ if(size_n_flags & TLHW_FLAG_RECVW) {
+ i->rq_status |= RQ_DRBD_RECVW;
+ }
+ break;
+ }
+ }
+
+ spin_unlock_irq(&mdev->tl_lock);
+
+ return rv;
+}
+
/* tl_dependence reports if this sector was present in the current
epoch.
As side effect it clears also the pointer to the request if it
@@ -302,6 +337,8 @@
r = ( item->barrier == mdev->newest_barrier );
list_del(&item->w.list);
+ if( item->rq_status & RQ_DRBD_RECVW ) wake_up(&mdev->cstate_wait);
+
spin_unlock_irqrestore(&mdev->tl_lock,flags);
return r;
}
@@ -1006,6 +1043,7 @@
p.sector = cpu_to_be64(drbd_ee_get_sector(e));
p.block_id = e->block_id;
p.blksize = cpu_to_be32(drbd_ee_get_size(e));
+ p.seq_num = cpu_to_be32(atomic_add_return(1,&mdev->packet_seq));
if (!mdev->meta.socket || mdev->state.s.conn < Connected) return FALSE;
ok=drbd_send_cmd(mdev,mdev->meta.socket,cmd,(Drbd_Header*)&p,sizeof(p));
@@ -1205,6 +1243,7 @@
p.sector = cpu_to_be64(drbd_req_get_sector(req));
p.block_id = (unsigned long)req;
+ p.seq_num = cpu_to_be32(atomic_add_return(1,&mdev->packet_seq));
/* About tl_add():
1. This must be within the semaphor,
@@ -1280,6 +1319,7 @@
p.sector = cpu_to_be64(drbd_ee_get_sector(e));
p.block_id = e->block_id;
+ p.seq_num = cpu_to_be32(atomic_add_return(1,&mdev->packet_seq));
/* Only called by our kernel thread.
* This one may be interupted by DRBD_SIG and/or DRBD_SIGKILL
@@ -1526,6 +1566,7 @@
spin_lock_init(&mdev->req_lock);
spin_lock_init(&mdev->pr_lock);
spin_lock_init(&mdev->send_task_lock);
+ spin_lock_init(&mdev->peer_seq_lock);
INIT_LIST_HEAD(&mdev->free_ee);
INIT_LIST_HEAD(&mdev->active_ee);
@@ -1541,6 +1582,7 @@
INIT_LIST_HEAD(&mdev->resync_work.list);
INIT_LIST_HEAD(&mdev->barrier_work.list);
INIT_LIST_HEAD(&mdev->unplug_work.list);
+ INIT_LIST_HEAD(&mdev->discard);
mdev->resync_work.cb = w_resync_inactive;
mdev->barrier_work.cb = w_try_send_barrier;
mdev->unplug_work.cb = w_send_write_hint;
Modified: trunk/drbd/drbd_receiver.c
===================================================================
--- trunk/drbd/drbd_receiver.c 2005-01-28 11:01:30 UTC (rev 1739)
+++ trunk/drbd/drbd_receiver.c 2005-01-28 15:02:23 UTC (rev 1740)
@@ -1011,13 +1011,32 @@
return ok;
}
+STATIC int drbd_chk_discard(drbd_dev *mdev,u64 block_id,int seq_num)
+{
+ struct drbd_discard_note *dn;
+ struct list_head *le;
+
+ MUST_HOLD(&mdev->peer_seq_lock);
+
+ list_for_each(le,&mdev->discard) {
+ dn = list_entry(le, struct drbd_discard_note, list);
+ if( dn->seq_num == seq_num ) {
+ D_ASSERT( dn->block_id == block_id );
+ list_del(le);
+ kfree(dn);
+ return 1;
+ }
+ }
+ return 0;
+}
+
// mirrored write
STATIC int receive_Data(drbd_dev *mdev,Drbd_Header* h)
{
sector_t sector;
struct Tl_epoch_entry *e;
Drbd_Data_Packet *p = (Drbd_Data_Packet*)h;
- int header_size,data_size;
+ int header_size,data_size, packet_seq,discard;
// FIXME merge this code dups into some helper function
header_size = sizeof(*p) - sizeof(*h);
@@ -1033,10 +1052,72 @@
if (drbd_recv(mdev, h->payload, header_size) != header_size)
return FALSE;
+ /* This wait_event is here to make sure that never ever an
+ DATA packet traveling via sock can overtake an ACK packet
+ traveling on msock
+ PRE TODO: Wrap around of seq_num !!!
+ */
+ packet_seq = be32_to_cpu(p->seq_num);
+ if( wait_event_interruptible(mdev->cstate_wait,
+ packet_seq <= peer_seq(mdev)+1) )
+ return FALSE;
+
+ spin_lock(&mdev->peer_seq_lock);
+ mdev->peer_seq = max(mdev->peer_seq, packet_seq);
+ /* is update_peer_seq(mdev,packet_seq); */
+ discard = drbd_chk_discard(mdev,p->block_id,packet_seq);
+ spin_unlock(&mdev->peer_seq_lock);
+
sector = be64_to_cpu(p->sector);
-
e = read_in_block(mdev,data_size);
if (!e) return FALSE;
+
+ if(discard) {
+ spin_lock_irq(&mdev->ee_lock);
+ drbd_put_ee(mdev,e);
+ spin_unlock_irq(&mdev->ee_lock);
+ WARN("Concurrent write! [DISCARD BY LIST] sec=%lu\n",
+ (unsigned long)sector);
+ return TRUE;
+ }
+
+ switch( tl_have_write(mdev, sector, data_size) ) {
+ case 2: /* Conflicting write, got ACK */
+ /* write afterwards ...*/
+ WARN("Concurrent write! [W AFTERWARDS] sec=%lu\n",
+ (unsigned long)sector);
+ if( wait_event_interruptible(mdev->cstate_wait,
+ !tl_have_write(mdev,sector,data_size|TLHW_FLAG_RECVW))) {
+ spin_lock_irq(&mdev->ee_lock);
+ drbd_put_ee(mdev,e);
+ spin_unlock_irq(&mdev->ee_lock);
+ return FALSE;
+ }
+ case 1: /* Conflicting write, no ACK by now*/
+ if (test_bit(UNIQUE,&mdev->flags)) {
+ spin_lock_irq(&mdev->ee_lock);
+ drbd_put_ee(mdev,e);
+ spin_unlock_irq(&mdev->ee_lock);
+ WARN("Concurrent write! [DISCARD BY FLAG] sec=%lu\n",
+ (unsigned long)sector);
+ return TRUE;
+ } else {
+ /* write afterwards, do not expect ACK */
+ WARN("Concurrent write! [W AFTERWARDS] sec=%lu\n",
+ (unsigned long)sector);
+ if( wait_event_interruptible(mdev->cstate_wait,
+ !tl_have_write(mdev,sector,data_size|
+ TLHW_FLAG_RECVW|TLHW_FLAG_SENT))) {
+ spin_lock_irq(&mdev->ee_lock);
+ drbd_put_ee(mdev,e);
+ spin_unlock_irq(&mdev->ee_lock);
+ return FALSE;
+ }
+ }
+ /* case 0: no conflicting write. */
+ /* write it to disk now... */
+ }
+
e->block_id = p->block_id; // no meaning on this side, e* on partner
if(!inc_local(mdev)) {
@@ -2069,6 +2150,8 @@
sector_t sector = be64_to_cpu(p->sector);
int blksize = be32_to_cpu(p->blksize);
+ update_peer_seq(mdev,be32_to_cpu(p->seq_num));
+
smp_rmb();
if(likely(mdev->state.s.pdsk >= Inconsistent )) {
// test_bit(PARTNER_DISKLESS,&mdev->flags)
@@ -2111,6 +2194,8 @@
{
Drbd_BlockAck_Packet *p = (Drbd_BlockAck_Packet*)h;
+ update_peer_seq(mdev,be32_to_cpu(p->seq_num));
+
/* do nothing here.
* we expect to get a "report param" on the data socket soon,
* and will do the cleanup then and there.
More information about the drbd-cvs
mailing list