[DRBD-cvs] svn commit by lars - r2548 - in trunk: drbd drbd/linux
user - implementaion of the conflict detection
drbd-cvs at lists.linbit.com
drbd-cvs at lists.linbit.com
Thu Oct 19 15:22:57 CEST 2006
Author: lars
Date: 2006-10-19 15:22:52 +0200 (Thu, 19 Oct 2006)
New Revision: 2548
Modified:
trunk/drbd/drbd_int.h
trunk/drbd/drbd_main.c
trunk/drbd/drbd_nl.c
trunk/drbd/drbd_proc.c
trunk/drbd/drbd_receiver.c
trunk/drbd/drbd_req.c
trunk/drbd/drbd_req.h
trunk/drbd/drbd_worker.c
trunk/drbd/linux/drbd.h
trunk/drbd/linux/drbd_config.h
trunk/drbd/lru_cache.c
trunk/user/drbdsetup.c
Log:
implementaion of the conflict detection
Modified: trunk/drbd/drbd_int.h
===================================================================
--- trunk/drbd/drbd_int.h 2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/drbd_int.h 2006-10-19 13:22:52 UTC (rev 2548)
@@ -285,19 +285,23 @@
PingAck,
RecvAck, // Used in protocol B
WriteAck, // Used in protocol C
+ DiscardAck, // Used in protocol C, two-primaries conflict detection
NegAck, // Sent if local disk is unusable
NegDReply, // Local disk is broken...
NegRSDReply, // Local disk is broken...
BarrierAck,
- DiscardNote,
StateChgReply,
MAX_CMD,
MayIgnore = 0x100, // Flag only to test if (cmd > MayIgnore) ...
MAX_OPT_CMD,
+ /* FIXME
+ * to get a more useful error message with drbd-8 <-> drbd 0.7.x,
+ * these could be reimplemented as special case of HandShake. */
HandShakeM = 0xfff1, // First Packet on the MetaSock
HandShakeS = 0xfff2, // First Packet on the Socket
+
HandShake = 0xfffe // FIXED for the next century!
} Drbd_Packet_Cmd;
@@ -329,11 +333,11 @@
[PingAck] = "PingAck",
[RecvAck] = "RecvAck",
[WriteAck] = "WriteAck",
+ [DiscardAck] = "DiscardAck",
[NegAck] = "NegAck",
[NegDReply] = "NegDReply",
[NegRSDReply] = "NegRSDReply",
[BarrierAck] = "BarrierAck",
- [DiscardNote] = "DiscardNote",
[StateChgRequest] = "StateChgRequest",
[StateChgReply] = "StateChgReply"
};
@@ -539,7 +543,6 @@
Drbd_Req_State_Packet ReqState;
Drbd_RqS_Reply_Packet RqSReply;
Drbd_BlockRequest_Packet BlockRequest;
- Drbd_Discard_Packet Discard;
} __attribute((packed)) Drbd_Polymorph_Packet;
/**********************************************************************/
@@ -650,9 +653,11 @@
/* ee flag bits */
enum {
- __CALL_AL_COMPLETE_IO,
+ __EE_CALL_AL_COMPLETE_IO,
+ __EE_CONFLICT_PENDING,
};
-#define CALL_AL_COMPLETE_IO (1<<__CALL_AL_COMPLETE_IO)
+#define EE_CALL_AL_COMPLETE_IO (1<<__EE_CALL_AL_COMPLETE_IO)
+#define EE_CONFLICT_PENDING (1<<__EE_CONFLICT_PENDING)
/* global flag bits */
@@ -668,7 +673,7 @@
UNPLUG_REMOTE, // whether sending a "UnplugRemote" makes sense
MD_DIRTY, // current gen counts and flags not yet on disk
SYNC_STARTED, // Needed to agree on the exact point in time..
- UNIQUE, // Set on one node, cleared on the peer!
+ DISCARD_CONCURRENT, // Set on one node, cleared on the peer!
USE_DEGR_WFC_T, // Use degr-wfc-timeout instead of wfc-timeout.
CLUSTER_ST_CHANGE, // Cluster wide state change going on...
CL_ST_CHG_SUCCESS,
@@ -708,12 +713,6 @@
Drbd_Polymorph_Packet rbuf; // send/receive buffers off the stack
};
-struct drbd_discard_note {
- struct list_head list;
- u64 block_id;
- int seq_num;
-};
-
struct drbd_md {
u64 md_offset; /* sector offset to 'super' block */
@@ -843,10 +842,10 @@
int al_tr_cycle;
int al_tr_pos; // position of the next transaction in the journal
struct crypto_tfm* cram_hmac_tfm;
+ wait_queue_head_t seq_wait;
atomic_t packet_seq;
- int peer_seq;
+ unsigned int peer_seq;
spinlock_t peer_seq_lock;
- struct list_head discard;
int minor;
};
@@ -954,7 +953,6 @@
sector_t sector,int size, u64 block_id);
extern int drbd_send_bitmap(drbd_dev *mdev);
extern int _drbd_send_bitmap(drbd_dev *mdev);
-extern int drbd_send_discard(drbd_dev *mdev, drbd_request_t *req);
extern int drbd_send_sr_reply(drbd_dev *mdev, int retcode);
extern void drbd_free_bc(struct drbd_backing_dev* bc);
extern int drbd_io_error(drbd_dev* mdev, int forcedetach);
@@ -1618,11 +1616,11 @@
__func__ , __LINE__ , \
atomic_read(&mdev->which))
-#define dec_ap_pending(mdev) \
+#define dec_ap_pending(mdev) do { \
typecheck(drbd_dev*,mdev); \
if(atomic_dec_and_test(&mdev->ap_pending_cnt)) \
wake_up(&mdev->cstate_wait); \
- ERR_IF_CNT_IS_NEGATIVE(ap_pending_cnt)
+ ERR_IF_CNT_IS_NEGATIVE(ap_pending_cnt); } while (0)
/* counts how many resync-related answers we still expect from the peer
* increase decrease
@@ -1635,10 +1633,10 @@
atomic_inc(&mdev->rs_pending_cnt);
}
-#define dec_rs_pending(mdev) \
+#define dec_rs_pending(mdev) do { \
typecheck(drbd_dev*,mdev); \
atomic_dec(&mdev->rs_pending_cnt); \
- ERR_IF_CNT_IS_NEGATIVE(rs_pending_cnt)
+ ERR_IF_CNT_IS_NEGATIVE(rs_pending_cnt); } while (0)
/* counts how many answers we still need to send to the peer.
* increased on
@@ -1654,15 +1652,15 @@
atomic_inc(&mdev->unacked_cnt);
}
-#define dec_unacked(mdev) \
+#define dec_unacked(mdev) do { \
typecheck(drbd_dev*,mdev); \
atomic_dec(&mdev->unacked_cnt); \
- ERR_IF_CNT_IS_NEGATIVE(unacked_cnt)
+ ERR_IF_CNT_IS_NEGATIVE(unacked_cnt); } while (0)
-#define sub_unacked(mdev, n) \
+#define sub_unacked(mdev, n) do { \
typecheck(drbd_dev*,mdev); \
atomic_sub(n, &mdev->unacked_cnt); \
- ERR_IF_CNT_IS_NEGATIVE(unacked_cnt)
+ ERR_IF_CNT_IS_NEGATIVE(unacked_cnt); } while (0)
static inline void dec_net(drbd_dev* mdev)
@@ -1692,6 +1690,13 @@
* this is mood...
*/
+static inline void dec_local(drbd_dev* mdev)
+{
+ if(atomic_dec_and_test(&mdev->local_cnt)) {
+ wake_up(&mdev->cstate_wait);
+ }
+ D_ASSERT(atomic_read(&mdev->local_cnt)>=0);
+}
/**
* inc_local: Returns TRUE when local IO is possible. If it returns
* TRUE you should call dec_local() after IO is completed.
@@ -1701,9 +1706,9 @@
int io_allowed;
atomic_inc(&mdev->local_cnt);
- io_allowed = (mdev->state.disk >= mins );
+ io_allowed = (mdev->state.disk >= mins );
if( !io_allowed ) {
- atomic_dec(&mdev->local_cnt);
+ dec_local(mdev);
}
return io_allowed;
}
@@ -1712,17 +1717,6 @@
return inc_local_if_state(mdev, Inconsistent);
}
-
-static inline void dec_local(drbd_dev* mdev)
-{
- if(atomic_dec_and_test(&mdev->local_cnt) &&
- mdev->state.disk == Diskless && mdev->bc ) {
- wake_up(&mdev->cstate_wait);
- }
-
- D_ASSERT(atomic_read(&mdev->local_cnt)>=0);
-}
-
/* this throttles on-the-fly application requests
* according to max_buffers settings;
* maybe re-implement using semaphores? */
@@ -1785,24 +1779,30 @@
if (ap_bio < mxb) wake_up(&mdev->cstate_wait);
}
-/* FIXME does not handle wrap around yet */
-static inline void update_peer_seq(drbd_dev* mdev, int new_seq)
+static inline int seq_cmp(u32 a, u32 b)
{
- spin_lock(&mdev->peer_seq_lock);
- mdev->peer_seq = max(mdev->peer_seq, new_seq);
- spin_unlock(&mdev->peer_seq_lock);
- wake_up(&mdev->cstate_wait);
- /* FIXME introduce seq_wait, no point in waking up a number of
- * processes with each and every Ack received... */
+ /* we assume wrap around at 32bit.
+ * for wrap around at 24bit (old atomic_t),
+ * we'd have to
+ * a <<= 8; b <<= 8;
+ */
+ return ((s32)(a) - (s32)(b));
}
+#define seq_lt(a,b) (seq_cmp((a),(b)) < 0)
+#define seq_gt(a,b) (seq_cmp((a),(b)) > 0)
+#define seq_ge(a,b) (seq_cmp((a),(b)) >= 0)
+#define seq_le(a,b) (seq_cmp((a),(b)) <= 0)
+/* CAUTION: please no side effects in arguments! */
+#define seq_max(a,b) ((u32)(seq_gt((a),(b)) ? (a) : (b)))
-static inline int peer_seq(drbd_dev* mdev)
+static inline void update_peer_seq(drbd_dev* mdev, unsigned int new_seq)
{
- int seq;
+ unsigned int m;
spin_lock(&mdev->peer_seq_lock);
- seq = mdev->peer_seq;
+ m = seq_max(mdev->peer_seq, new_seq);
+ mdev->peer_seq = m;
spin_unlock(&mdev->peer_seq_lock);
- return seq;
+ if (m == new_seq) wake_up(&mdev->seq_wait);
}
static inline int drbd_queue_order_type(drbd_dev* mdev)
Modified: trunk/drbd/drbd_main.c
===================================================================
--- trunk/drbd/drbd_main.c 2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/drbd_main.c 2006-10-19 13:22:52 UTC (rev 2548)
@@ -300,40 +300,6 @@
spin_unlock_irq(&mdev->req_lock);
}
-// warning LGE "FIXME code missing"
-#if 0
-/* FIXME "wrong"
- * see comment in receive_Data */
-drbd_request_t * _req_have_write(drbd_dev *mdev, struct Tl_epoch_entry *e)
-{
- struct hlist_head *slot;
- struct hlist_node *n;
- drbd_request_t * req;
- sector_t sector = e->sector;
- int size = e->drbd_ee_get_size(e);
- int i;
-
- MUST_HOLD(&mdev->req_lock);
- D_ASSERT(size <= 1<<(HT_SHIFT+9) );
-
-#define OVERLAPS overlaps(req->sector, req->size, sector, size)
- slot = mdev->tl_hash + tl_hash_fn(mdev, sector);
- hlist_for_each_entry(req, n, slot, colision) {
- if (OVERLAPS) return req;
- } // hlist_for_each_entry()
-#undef OVERLAPS
- req = NULL;
- // Good, no conflict found
- INIT_HLIST_NODE(&e->colision);
- hlist_add_head( &e->colision, mdev->ee_hash +
- ee_hash_fn(mdev, sector) );
- out:
- spin_unlock_irq(&mdev->tl_lock);
-
- return req;
-}
-#endif
-
/**
* drbd_io_error: Handles the on_io_error setting, should be called in the
* unlikely(!drbd_bio_uptodate(e->bio)) case from kernel thread context.
@@ -1355,17 +1321,6 @@
return ok;
}
-int drbd_send_discard(drbd_dev *mdev, drbd_request_t *req)
-{
- Drbd_Discard_Packet p;
-
- p.block_id = (unsigned long)req;
- p.seq_num = cpu_to_be32(req->seq_num);
-
- return drbd_send_cmd(mdev,USE_META_SOCKET,DiscardNote,
- (Drbd_Header*)&p,sizeof(p));
-}
-
int drbd_send_state(drbd_dev *mdev)
{
Drbd_State_Packet p;
@@ -1924,7 +1879,7 @@
/* only if connected */
spin_lock_irq(&mdev->req_lock);
- if (mdev->state.pdsk >= Inconsistent) /* implies cs >= Connected */ {
+ if (mdev->state.pdsk >= Inconsistent && mdev->state.conn >= Connected) {
D_ASSERT(mdev->state.role == Primary);
if (test_and_clear_bit(UNPLUG_REMOTE,&mdev->flags)) {
/* add to the data.work queue,
@@ -1998,7 +1953,6 @@
INIT_LIST_HEAD(&mdev->resync_work.list);
INIT_LIST_HEAD(&mdev->unplug_work.list);
INIT_LIST_HEAD(&mdev->md_sync_work.list);
- INIT_LIST_HEAD(&mdev->discard);
mdev->resync_work.cb = w_resync_inactive;
mdev->unplug_work.cb = w_send_write_hint;
mdev->md_sync_work.cb = w_md_sync;
@@ -2012,6 +1966,7 @@
init_waitqueue_head(&mdev->cstate_wait);
init_waitqueue_head(&mdev->ee_wait);
init_waitqueue_head(&mdev->al_wait);
+ init_waitqueue_head(&mdev->seq_wait);
drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
drbd_thread_init(mdev, &mdev->worker, drbd_worker);
@@ -3140,6 +3095,7 @@
case RecvAck:
case WriteAck:
+ case DiscardAck:
case NegAck:
case NegRSDReply:
INFOP("%s (sector %llx, size %x, id %s, seq %x)\n", cmdname(cmd),
@@ -3171,7 +3127,7 @@
be64_to_cpu(p->GenCnt.uuid[History_start]),
be64_to_cpu(p->GenCnt.uuid[History_end]));
break;
-
+
case ReportSizes:
INFOP("%s (d %lluMiB, u %lluMiB, c %lldMiB, max bio %x, q order %x)\n", cmdname(cmd),
(long long)(be64_to_cpu(p->Sizes.d_size)>>(20-9)),
@@ -3200,12 +3156,6 @@
be32_to_cpu(p->RqSReply.retcode));
break;
- case DiscardNote:
- INFOP("%s (id %llx, seq %x)\n", cmdname(cmd),
- (long long)p->Discard.block_id,
- be32_to_cpu(p->Discard.seq_num));
- break;
-
case Ping:
case PingAck:
/*
Modified: trunk/drbd/drbd_nl.c
===================================================================
--- trunk/drbd/drbd_nl.c 2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/drbd_nl.c 2006-10-19 13:22:52 UTC (rev 2548)
@@ -945,7 +945,7 @@
}
new_conf = kmalloc(sizeof(struct net_conf),GFP_KERNEL);
- if(!new_conf) {
+ if(!new_conf) {
retcode=KMallocFailed;
goto fail;
}
@@ -968,6 +968,7 @@
new_conf->after_sb_2p = DRBD_AFTER_SB_2P_DEF;
new_conf->want_lose = 0;
new_conf->two_primaries = 0;
+ new_conf->wire_protocol = DRBD_PROT_C;
}
if (!net_conf_from_tags(mdev,nlp->tag_list,new_conf)) {
@@ -975,6 +976,11 @@
goto fail;
}
+ if (new_conf->two_primaries && (new_conf->wire_protocol != DRBD_PROT_C)) {
+ retcode=ProtocolCRequired;
+ goto fail;
+ };
+
if( mdev->state.role == Primary && new_conf->want_lose ) {
retcode=DiscardNotAllowed;
goto fail;
Modified: trunk/drbd/drbd_proc.c
===================================================================
--- trunk/drbd/drbd_proc.c 2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/drbd_proc.c 2006-10-19 13:22:52 UTC (rev 2548)
@@ -210,7 +210,7 @@
seq_printf( seq, "%2d: cs:Unconfigured\n", i);
} else {
seq_printf( seq,
- "%2d: cs:%s st:%s/%s ds:%s/%s %c%c%c%c\n"
+ "%2d: cs:%s st:%s/%s ds:%s/%s %c %c%c%c%c\n"
" ns:%u nr:%u dw:%u dr:%u al:%u bm:%u "
"lo:%d pe:%d ua:%d ap:%d\n",
i, sn,
@@ -218,6 +218,8 @@
roles_to_name(mdev->state.peer),
disks_to_name(mdev->state.disk),
disks_to_name(mdev->state.pdsk),
+ (mdev->net_conf == NULL ? ' ' :
+ (mdev->net_conf->wire_protocol - DRBD_PROT_A+'A')),
mdev->state.susp ? 's' : 'r',
mdev->state.aftr_isp ? 'a' : '-',
mdev->state.peer_isp ? 'p' : '-',
Modified: trunk/drbd/drbd_receiver.c
===================================================================
--- trunk/drbd/drbd_receiver.c 2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/drbd_receiver.c 2006-10-19 13:22:52 UTC (rev 2548)
@@ -316,6 +316,8 @@
bio_put(bio);
+ BUG_ON(!hlist_unhashed(&e->colision));
+
mempool_free(e, drbd_ee_mempool);
}
@@ -379,13 +381,9 @@
list_splice_init(&mdev->done_ee,&work_list);
spin_unlock_irq(&mdev->req_lock);
- /* XXX maybe wake_up here already?
- * or wake_up withing drbd_free_ee just after mempool_free?
- */
-
/* possible callbacks here:
- * e_end_block, and e_end_resync_block.
- * both ignore the last argument.
+ * e_end_block, and e_end_resync_block, e_send_discard_ack.
+ * all ignore the last argument.
*/
list_for_each_entry_safe(e, t, &work_list, w.list) {
MTRACE(TraceTypeEE,TraceLvlAll,
@@ -393,7 +391,7 @@
(long long)e->sector,e->size,e);
);
// list_del not necessary, next/prev members not touched
- ok = ok && e->w.cb(mdev,&e->w,0);
+ if (e->w.cb(mdev,&e->w,0) == 0) ok = 0;
drbd_free_ee(mdev,e);
}
if (do_clear_bit)
@@ -690,7 +688,7 @@
D_ASSERT(!mdev->data.socket);
if(drbd_request_state(mdev,NS(conn,WFConnection)) < SS_Success ) return 0;
- clear_bit(UNIQUE, &mdev->flags);
+ clear_bit(DISCARD_CONCURRENT, &mdev->flags);
sock = NULL;
msock = NULL;
@@ -737,7 +735,7 @@
case HandShakeM:
if(msock) sock_release(msock);
msock = s;
- set_bit(UNIQUE, &mdev->flags);
+ set_bit(DISCARD_CONCURRENT, &mdev->flags);
break;
default:
WARN("Error receiving initial packet\n");
@@ -1195,55 +1193,87 @@
* maybe assert this? */
}
dec_unacked(mdev);
- return ok;
} else if(unlikely(!drbd_bio_uptodate(e->private_bio))) {
ok = drbd_io_error(mdev, FALSE);
}
-// warning LGE "FIXME code missing"
-#if 0
/* we delete from the conflict detection hash _after_ we sent out the
* WriteAck / NegAck, to get the sequence number right. */
+ if (mdev->net_conf->two_primaries) {
+ spin_lock_irq(&mdev->req_lock);
+ D_ASSERT(!hlist_unhashed(&e->colision));
+ hlist_del_init(&e->colision);
+ spin_unlock_irq(&mdev->req_lock);
+ } else {
+ D_ASSERT(hlist_unhashed(&e->colision));
+ }
+
+ return ok;
+}
+
+STATIC int e_send_discard_ack(drbd_dev *mdev, struct drbd_work *w, int unused)
+{
+ struct Tl_epoch_entry *e = (struct Tl_epoch_entry*)w;
+ int ok=1;
+
+ D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
+ ok = drbd_send_ack(mdev,DiscardAck,e);
+
+ spin_lock_irq(&mdev->req_lock);
D_ASSERT(!hlist_unhashed(&e->colision));
- /* FIXME "wake" any conflicting requests
- * that have been waiting for this one to finish */
hlist_del_init(&e->colision);
-#endif
+ spin_unlock_irq(&mdev->req_lock);
+ dec_unacked(mdev);
+
return ok;
}
-/* FIXME implementation wrong.
- * For the algorithm to be correct, we need to send and store the
- * sector and size, not the block id. We have to check for overlap.
- * We may _only_ remove the info when its sequence number is less than
- * the current sequence number.
+/* Called from receive_Data.
+ * Synchronize packets on sock with packets on msock.
*
- * I think the "discard info" are the wrong way, anyways.
- * Instead of silently discarding such writes, we should send a DiscardAck,
- * and we should retard sending of the data until we get that Discard Ack
- * and thus the conflicting request is done.
- */
-STATIC int drbd_chk_discard(drbd_dev *mdev,struct Tl_epoch_entry *e)
+ * This is here so even when a Data packet traveling via sock overtook an Ack
+ * packet traveling on msock, they are still processed in the order they have
+ * been sent.
+ *
+ * Note: we don't care for Ack packets overtaking Data packets.
+ *
+ * In case packet_seq is larger than mdev->peer_seq number, there are
+ * outstanding packets on the msock. We wait for them to arrive.
+ * In case we are the logically next packet, we update mdev->peer_seq
+ * ourselves. Correctly handles 32bit wrap around.
+ * FIXME verify that atomic_t guarantees 32bit wrap around,
+ * otherwise we have to play tricks with << ...
+ *
+ * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
+ * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
+ * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
+ * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
+ *
+ * returns 0 if we may process the packet,
+ * -ERESTARTSYS if we were interrupted (by disconnect signal). */
+static int drbd_wait_peer_seq(drbd_dev *mdev, const u32 packet_seq)
{
- struct drbd_discard_note *dn;
- struct list_head *le,*tmp;
-
- MUST_HOLD(&mdev->peer_seq_lock);
- list_for_each_safe(le,tmp,&mdev->discard) {
- dn = list_entry(le, struct drbd_discard_note, list);
- if( dn->seq_num == mdev->peer_seq ) {
- D_ASSERT( dn->block_id == e->block_id );
- list_del(le);
- kfree(dn);
- return 1;
+ DEFINE_WAIT(wait);
+ int ret = 0;
+ spin_lock(&mdev->peer_seq_lock);
+ for (;;) {
+ prepare_to_wait(&mdev->seq_wait,&wait,TASK_INTERRUPTIBLE);
+ if (seq_le(packet_seq,mdev->peer_seq+1))
+ break;
+ spin_unlock(&mdev->peer_seq_lock);
+ if (signal_pending(current)) {
+ ret = -ERESTARTSYS;
+ break;
}
- if( dn->seq_num < mdev->peer_seq ) {
- list_del(le);
- kfree(dn);
- }
+ schedule();
+ spin_lock(&mdev->peer_seq_lock);
}
- return 0;
+ finish_wait(&mdev->seq_wait, &wait);
+ if (mdev->peer_seq+1 == packet_seq)
+ mdev->peer_seq++;
+ spin_unlock(&mdev->peer_seq_lock);
+ return ret;
}
// mirrored write
@@ -1251,11 +1281,8 @@
{
sector_t sector;
struct Tl_epoch_entry *e;
- /* FIXME currently some unused variable intended
- * for the now not-implemented conflict detection */
- drbd_request_t * req;
Drbd_Data_Packet *p = (Drbd_Data_Packet*)h;
- int header_size, data_size, packet_seq, discard, rv;
+ int header_size, data_size;
unsigned int barrier_nr = 0;
unsigned int epoch_size = 0;
u32 dp_flags;
@@ -1291,98 +1318,143 @@
e->private_bio->bi_end_io = drbd_endio_write_sec;
e->w.cb = e_end_block;
- /* FIXME drbd_al_begin_io in case we have two primaries... */
+ dp_flags = be32_to_cpu(p->dp_flags);
+ if ( dp_flags & DP_HARDBARRIER ) {
+ e->private_bio->bi_rw |= BIO_RW_BARRIER;
+ }
+ if ( dp_flags & DP_RW_SYNC ) {
+ e->private_bio->bi_rw |= BIO_RW_SYNC;
+ }
-// warning LGE "FIXME code missing"
-#if 0
-/* sorry.
- * to get this patch in a shape where it can be committed,
- * I need to disable the broken conflict detection code for now.
- * will implement the correct as soon as possible...
- * it is done in my head already, "only have to write it down",
- * which will take an other couple of days, probably.
- */
+ /* I'm the receiver, I do hold a net_cnt reference. */
+ if (!mdev->net_conf->two_primaries) {
+ spin_lock_irq(&mdev->req_lock);
+ } else {
+ /* don't get the req_lock yet,
+ * we may sleep in drbd_wait_peer_seq */
+ const sector_t sector = e->sector;
+ const int size = e->size;
+ const int discard = test_bit(DISCARD_CONCURRENT,&mdev->flags);
+ DEFINE_WAIT(wait);
+ drbd_request_t *i;
+ struct hlist_node *n;
+ struct hlist_head *slot;
+ int first;
- /* This wait_event is here so even when a DATA packet traveling via
- * sock overtook an ACK packet traveling on msock, they are still
- * processed in the order they have been sent.
- * FIXME TODO: Wrap around of seq_num !!!
- */
- if (mdev->net_conf->two_primaries) {
- packet_seq = be32_to_cpu(p->seq_num);
- if( wait_event_interruptible(mdev->cstate_wait,
- packet_seq <= peer_seq(mdev)+1)) {
- rv = FALSE;
- goto out2;
- }
+ D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
+ BUG_ON(mdev->ee_hash == NULL);
+ BUG_ON(mdev->tl_hash == NULL);
- /* FIXME current discard implementation is wrong */
- spin_lock(&mdev->peer_seq_lock);
- mdev->peer_seq = max(mdev->peer_seq, packet_seq);
- /* is update_peer_seq(mdev,packet_seq); */
- discard = drbd_chk_discard(mdev,e);
- spin_unlock(&mdev->peer_seq_lock);
+ /* conflict detection and handling:
+ * 1. wait on the sequence number,
+ * in case this data packet overtook ACK packets.
+ * 2. check our hash tables for conflicting requests.
+ * we only need to walk the tl_hash, since an ee can not
+ * have a conflict with an other ee: on the submitting
+ * node, the corresponding req had already been conflicting,
+ * and a conflicting req is never sent.
+ *
+ * Note: for two_primaries, we are protocol C,
+ * so there cannot be any request that is DONE
+ * but still on the transfer log.
+ *
+ * unconditionally add to the ee_hash.
+ *
+ * if no conflicting request is found:
+ * submit.
+ *
+ * if any conflicting request is found that has not yet been acked,
+ * AND I have the "discard concurrent writes" flag:
+ * queue (via done_ee) the DiscardAck; OUT.
+ *
+ * if any conflicting request is found:
+ * block the receiver, waiting on cstate_wait
+ * until no more conflicting requests are there,
+ * or we get interrupted (disconnect).
+ *
+ * we do not just write after local io completion of those
+ * requests, but only after req is done completely, i.e.
+ * we wait for the DiscardAck to arrive!
+ *
+ * then proceed normally, i.e. submit.
+ */
+ if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
+ goto out_interrupted;
- if(discard) {
- WARN("Concurrent write! [DISCARD BY LIST] sec=%lu\n",
- (unsigned long)sector);
- rv = TRUE;
- goto out2;
- }
+ spin_lock_irq(&mdev->req_lock);
- req = req_have_write(mdev, e);
+ hlist_add_head(&e->colision,ee_hash_slot(mdev,sector));
- if(req) {
- /* FIXME RACE
- * rq_status may be changing while we are looking.
- * in rare cases it could even disappear right now.
- * e.g. when it has already been ACK'ed, and the local
- * storage has been way too slow, and only now
- * completes the thing.
- */
- if( req->rq_status & RQ_DRBD_SENT ) {
- /* Conflicting write, got ACK */
- /* write afterwards ...*/
- WARN("Concurrent write! [W AFTERWARDS1] "
- "sec=%lu\n",(unsigned long)sector);
- if( wait_event_interruptible(mdev->cstate_wait,
- !req_have_write(mdev,e))) {
- rv = FALSE;
- goto out2;
- }
- } else {
- /* Conflicting write, no ACK by now*/
- if (test_bit(UNIQUE,&mdev->flags)) {
- WARN("Concurrent write! [DISCARD BY FLAG] sec=%lu\n",
- (unsigned long)sector);
- rv = TRUE;
- goto out2;
- } else {
- /* write afterwards do not exp ACK */
- WARN("Concurrent write! [W AFTERWARDS2] sec=%lu\n",
- (unsigned long)sector);
- drbd_send_discard(mdev,req);
- drbd_end_req(req, RQ_DRBD_SENT, 1, sector);
- dec_ap_pending(mdev);
- if( wait_event_interruptible(mdev->cstate_wait,
- !req_have_write(mdev,e))) {
- rv = FALSE;
- goto out2;
+#define OVERLAPS overlaps(i->sector, i->size, sector, size)
+ slot = tl_hash_slot(mdev,sector);
+ first = 1;
+ for(;;) {
+ int have_unacked = 0;
+ int have_conflict = 0;
+ prepare_to_wait(&mdev->cstate_wait,&wait,TASK_INTERRUPTIBLE);
+ hlist_for_each_entry(i, n, slot, colision) {
+ if (OVERLAPS) {
+ if (first) {
+ /* only ALERT on first iteration,
+ * we may be woken up early... */
+ ALERT("%s[%u] Concurrent local write detected!"
+ " new: %llu +%d; pending: %llu +%d\n",
+ current->comm, current->pid,
+ (unsigned long long)sector, size,
+ (unsigned long long)i->sector, i->size);
}
+ if (i->rq_state & RQ_NET_PENDING) ++have_unacked;
+ ++have_conflict;
}
}
+#undef OVERLAPS
+ if (!have_conflict) break;
+
+ /* Discard Ack only for the _first_ iteration */
+ if (first && discard && have_unacked) {
+ ALERT("Concurrent write! [DISCARD BY FLAG] sec=%llu\n",
+ (unsigned long long)sector);
+ inc_unacked(mdev);
+ mdev->epoch_size++;
+ e->w.cb = e_send_discard_ack;
+ list_add_tail(&e->w.list,&mdev->done_ee);
+
+ spin_unlock_irq(&mdev->req_lock);
+
+ /* we could probably send that DiscardAck ourselves,
+ * but I don't like the receiver using the msock */
+
+ dec_local(mdev);
+ wake_asender(mdev);
+ finish_wait(&mdev->cstate_wait, &wait);
+ return TRUE;
+ }
+
+ if (signal_pending(current)) {
+ hlist_del_init(&e->colision);
+
+ spin_unlock_irq(&mdev->req_lock);
+
+ finish_wait(&mdev->cstate_wait, &wait);
+ goto out_interrupted;
+ }
+
+ spin_unlock_irq(&mdev->req_lock);
+ if (first) {
+ first = 0;
+ ALERT("Concurrent write! [W AFTERWARDS] "
+ "sec=%llu\n",(unsigned long long)sector);
+ } else if (discard) {
+ /* we had none on the first iteration.
+ * there must be none now. */
+ D_ASSERT(have_unacked == 0);
+ }
+ schedule();
+ spin_lock_irq(&mdev->req_lock);
}
+ finish_wait(&mdev->cstate_wait, &wait);
}
-#endif
- dp_flags = be32_to_cpu(p->dp_flags);
- if ( dp_flags & DP_HARDBARRIER ) {
- e->private_bio->bi_rw |= BIO_RW_BARRIER;
- }
- if ( dp_flags & DP_RW_SYNC ) {
- e->private_bio->bi_rw |= BIO_RW_SYNC;
- }
-
/* when using TCQ:
* note that, when using tagged command queuing, we may
* have more than one reorder domain "active" at a time.
@@ -1427,8 +1499,6 @@
* otherwise, tag the write with the barrier number, so it
* will trigger the b_ack before its own ack.
*/
-
- spin_lock_irq(&mdev->req_lock);
if (mdev->next_barrier_nr) {
/* only when using TCQ */
if (list_empty(&mdev->active_ee)) {
@@ -1471,7 +1541,7 @@
if(mdev->state.pdsk == Diskless) {
// In case we have the only disk of the cluster,
drbd_set_out_of_sync(mdev,e->sector,e->size);
- e->flags |= CALL_AL_COMPLETE_IO;
+ e->flags |= EE_CALL_AL_COMPLETE_IO;
drbd_al_begin_io(mdev, e->sector);
}
@@ -1486,13 +1556,13 @@
maybe_kick_lo(mdev);
return TRUE;
- out2:
+ out_interrupted:
/* yes, the epoch_size now is imbalanced.
* but we drop the connection anyways, so we don't have a chance to
* receive a barrier... atomic_inc(&mdev->epoch_size); */
dec_local(mdev);
drbd_free_ee(mdev,e);
- return rv;
+ return FALSE;
}
STATIC int receive_DataRequest(drbd_dev *mdev,Drbd_Header *h)
@@ -2166,14 +2236,14 @@
mask.i = be32_to_cpu(p->mask);
val.i = be32_to_cpu(p->val);
- if (test_bit(UNIQUE,&mdev->flags)) drbd_state_lock(mdev);
+ if (test_bit(DISCARD_CONCURRENT,&mdev->flags)) drbd_state_lock(mdev);
mask = convert_state(mask);
val = convert_state(val);
rv = drbd_change_state(mdev,ChgStateVerbose,mask,val);
- if (test_bit(UNIQUE,&mdev->flags)) drbd_state_unlock(mdev);
+ if (test_bit(DISCARD_CONCURRENT,&mdev->flags)) drbd_state_unlock(mdev);
drbd_send_sr_reply(mdev,rv);
drbd_md_sync(mdev);
@@ -2407,6 +2477,8 @@
drbd_cmd_handler_f handler;
Drbd_Header *header = &mdev->data.rbuf.head;
+ /* FIXME test for thread state == RUNNING here,
+ * in case we missed some signal! */
for (;;) {
if (!drbd_recv_header(mdev,header))
break;
@@ -2974,10 +3046,25 @@
return FALSE;
}
- _req_mod(req,
- h->command == cpu_to_be16(WriteAck)
- ? write_acked_by_peer
- : recv_acked_by_peer);
+ switch (be16_to_cpu(h->command)) {
+ case WriteAck:
+ D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
+ _req_mod(req,write_acked_by_peer);
+ break;
+ case RecvAck:
+ D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_A);
+ _req_mod(req,recv_acked_by_peer);
+ break;
+ case DiscardAck:
+ D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
+ ALERT("Got DiscardAck packet %llu +%u!"
+ " DRBD is not a random data generator!\n",
+ (unsigned long long)req->sector, req->size);
+ _req_mod(req, conflict_discarded_by_peer);
+ break;
+ default:
+ D_ASSERT(0);
+ }
spin_unlock_irq(&mdev->req_lock);
}
}
@@ -3076,31 +3163,6 @@
return TRUE;
}
-/* FIXME implementation wrong.
- * For the algorithm to be correct, we need to send and store the
- * sector and size too.
- */
-STATIC int got_Discard(drbd_dev *mdev, Drbd_Header* h)
-{
- Drbd_Discard_Packet *p = (Drbd_Discard_Packet*)h;
- struct drbd_discard_note *dn;
-
- dn = kmalloc(sizeof(struct drbd_discard_note),GFP_KERNEL);
- if(!dn) {
- ERR("kmalloc(drbd_discard_note) failed.");
- return FALSE;
- }
-
- dn->block_id = p->block_id;
- dn->seq_num = be32_to_cpu(p->seq_num);
-
- spin_lock(&mdev->peer_seq_lock);
- list_add(&dn->list,&mdev->discard);
- spin_unlock(&mdev->peer_seq_lock);
-
- return TRUE;
-}
-
struct asender_cmd {
size_t pkt_size;
int (*process)(drbd_dev *mdev, Drbd_Header* h);
@@ -3123,11 +3185,11 @@
[PingAck] ={ sizeof(Drbd_Header), got_PingAck },
[RecvAck] ={ sizeof(Drbd_BlockAck_Packet), got_BlockAck },
[WriteAck] ={ sizeof(Drbd_BlockAck_Packet), got_BlockAck },
+ [DiscardAck]={ sizeof(Drbd_BlockAck_Packet), got_BlockAck },
[NegAck] ={ sizeof(Drbd_BlockAck_Packet), got_NegAck },
[NegDReply] ={ sizeof(Drbd_BlockAck_Packet), got_NegDReply },
[NegRSDReply]={sizeof(Drbd_BlockAck_Packet), got_NegRSDReply},
[BarrierAck]={ sizeof(Drbd_BarrierAck_Packet),got_BarrierAck },
- [DiscardNote]={sizeof(Drbd_Discard_Packet), got_Discard },
[StateChgReply]={sizeof(Drbd_RqS_Reply_Packet),got_RqSReply },
};
Modified: trunk/drbd/drbd_req.c
===================================================================
--- trunk/drbd/drbd_req.c 2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/drbd_req.c 2006-10-19 13:22:52 UTC (rev 2548)
@@ -34,8 +34,8 @@
#include "drbd_req.h"
//#define VERBOSE_REQUEST_CODE
-#ifdef VERBOSE_REQUEST_CODE
-void print_req_mod(drbd_request_t *req,drbd_req_event_t what)
+#if defined(VERBOSE_REQUEST_CODE) || defined(ENABLE_DYNAMIC_TRACE)
+void _print_req_mod(drbd_request_t *req,drbd_req_event_t what)
{
drbd_dev *mdev = req->mdev;
const int rw = (req->master_bio == NULL ||
@@ -46,9 +46,6 @@
[created] = "created",
[to_be_send] = "to_be_send",
[to_be_submitted] = "to_be_submitted",
- [suspend_because_of_conflict] = "suspend_because_of_conflict",
- [conflicting_req_done] = "conflicting_req_done",
- [conflicting_ee_done] = "conflicting_ee_done",
[queue_for_net_write] = "queue_for_net_write",
[queue_for_net_read] = "queue_for_net_read",
[send_canceled] = "send_canceled",
@@ -58,6 +55,7 @@
[recv_acked_by_peer] = "recv_acked_by_peer",
[write_acked_by_peer] = "write_acked_by_peer",
[neg_acked] = "neg_acked",
+ [conflict_discarded_by_peer] = "conflict_discarded_by_peer",
[barrier_acked] = "barrier_acked",
[data_received] = "data_received",
[read_completed_with_error] = "read_completed_with_error",
@@ -68,7 +66,7 @@
INFO("_req_mod(%p %c ,%s)\n", req, rw, rq_event_names[what]);
}
-void print_rq_state(drbd_request_t *req, const char *txt)
+void _print_rq_state(drbd_request_t *req, const char *txt)
{
const unsigned long s = req->rq_state;
drbd_dev *mdev = req->mdev;
@@ -90,8 +88,16 @@
conns_to_name(mdev->state.conn));
}
+# ifdef ENABLE_DYNAMIC_TRACE
+# define print_rq_state(R,T) MTRACE(TraceTypeRq,TraceLvlMetrics,_print_rq_state(R,T);)
+# define print_req_mod(T,W) MTRACE(TraceTypeRq,TraceLvlMetrics,_print_req_mod(T,W);)
+# else
+# define print_rq_state(R,T) _print_rq_state(R,T)
+# define print_req_mod(T,W) _print_req_mod(T,W)
+# endif
+
#else
-#define print_rq_state(R,T)
+#define print_rq_state(R,T)
#define print_req_mod(T,W)
#endif
@@ -127,7 +133,13 @@
* what we need to do here is just: complete the master_bio.
*/
int ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
- rw = bio_data_dir(req->master_bio);
+ rw = bio_data_dir(req->master_bio);
+
+ /* remove the request from the conflict detection
+ * respective block_id verification hash */
+ if (!hlist_unhashed(&req->colision)) hlist_del(&req->colision);
+ else D_ASSERT((s & RQ_NET_MASK) == 0);
+
if (rw == WRITE) {
drbd_request_t *i;
struct Tl_epoch_entry *e;
@@ -139,39 +151,51 @@
if (req->epoch == mdev->newest_barrier->br_number)
set_bit(ISSUE_BARRIER,&mdev->flags);
- /* and maybe "wake" those conflicting requests that
- * wait for this request to finish.
- * we just have to walk starting from req->next,
- * see _req_add_hash_check_colision(); */
-#define OVERLAPS overlaps(req->sector, req->size, i->sector, i->size)
- n = req->colision.next;
- /* hlist_del ... done below */
- hlist_for_each_entry_from(i, n, colision) {
- if (OVERLAPS)
- drbd_queue_work(&mdev->data.work,&i->w);
- }
+ /* we need to do the conflict detection stuff,
+ * if we have the ee_hash (two_primaries) and
+ * this has been on the network */
+ if ((s & RQ_NET_DONE) && mdev->ee_hash != NULL) {
+ const sector_t sector = req->sector;
+ const int size = req->size;
- /* and maybe "wake" those conflicting epoch entries
- * that wait for this request to finish */
- /* FIXME looks alot like we could consolidate some code
- * and maybe even hash tables? */
+ /* ASSERT:
+ * there must be no conflicting requests, since
+ * they must have been failed on the spot */
+#define OVERLAPS overlaps(sector, size, i->sector, i->size)
+ slot = tl_hash_slot(mdev,sector);
+ hlist_for_each_entry(i, n, slot, colision) {
+ if (OVERLAPS) {
+ ALERT("LOGIC BUG: completed: %p %llu +%d; other: %p %llu +%d\n",
+ req, (unsigned long long)sector, size,
+ i, (unsigned long long)i->sector, i->size);
+ }
+ }
+
+ /* maybe "wake" those conflicting epoch entries
+ * that wait for this request to finish.
+ *
+ * currently, there can be only _one_ such ee
+ * (well, or some more, which would be pending
+ * DiscardAck not yet sent by the asender...),
+ * since we block the receiver thread upon the
+ * first conflict detection, which will wait on
+ * cstate_wait. maybe we want to assert that?
+ *
+ * anyways, if we found one,
+ * we just have to do a wake_up. */
#undef OVERLAPS
-#define OVERLAPS overlaps(req->sector, req->size, e->sector, e->size)
- if(mdev->ee_hash_s) {
+#define OVERLAPS overlaps(sector, size, e->sector, e->size)
slot = ee_hash_slot(mdev,req->sector);
hlist_for_each_entry(e, n, slot, colision) {
- if (OVERLAPS)
- drbd_queue_work(&mdev->data.work,&e->w);
+ if (OVERLAPS) {
+ wake_up(&mdev->cstate_wait);
+ break;
+ }
}
}
#undef OVERLAPS
}
- /* else: READ, READA: nothing more to do */
- /* remove the request from the conflict detection
- * respective block_id verification hash */
- if(!hlist_unhashed(&req->colision)) hlist_del(&req->colision);
-
/* FIXME not yet implemented...
* in case we got "suspended" (on_disconnect: freeze io)
* we may not yet complete the request...
@@ -250,15 +274,14 @@
}
/*
- * checks whether there was an overlapping request already registered.
- * if so, add the request to the colision hash
- * _after_ the (first) overlapping request,
- * and return 1
- * if no overlap was found, add this request to the front of the chain,
- * and return 0
+ * checks whether there was an overlapping request
+ * or ee already registered.
*
- * corresponding hlist_del is in _req_may_be_done()
+ * if so, return 1, in which case this request is completed on the spot,
+ * without ever being submitted or send.
*
+ * return 0 if it is ok to submit this request.
+ *
* NOTE:
* paranoia: assume something above us is broken, and issues different write
* requests for the same block simultaneously...
@@ -273,7 +296,7 @@
* second hlist_for_each_entry becomes a noop. This is even simpler than to
* grab a reference on the net_conf, and check for the two_primaries flag...
*/
-STATIC int _req_add_hash_check_colision(drbd_request_t *req)
+STATIC int _req_conflicts(drbd_request_t *req)
{
drbd_dev *mdev = req->mdev;
const sector_t sector = req->sector;
@@ -285,6 +308,17 @@
MUST_HOLD(&mdev->req_lock);
D_ASSERT(hlist_unhashed(&req->colision));
+
+ /* FIXME should this inc_net/dec_net
+ * rather be done in drbd_make_request_common? */
+ if (!inc_net(mdev))
+ return 0;
+
+ /* BUG_ON */
+ ERR_IF (mdev->tl_hash_s == 0)
+ goto out_no_conflict;
+ BUG_ON(mdev->tl_hash == NULL);
+
#define OVERLAPS overlaps(i->sector, i->size, sector, size)
slot = tl_hash_slot(mdev,sector);
hlist_for_each_entry(i, n, slot, colision) {
@@ -294,16 +328,13 @@
current->comm, current->pid,
(unsigned long long)sector, size,
(unsigned long long)i->sector, i->size);
- hlist_add_after(n,&req->colision);
- return 1;
+ goto out_conflict;
}
}
- /* no overlapping request with local origin found,
- * register in front */
- hlist_add_head(&req->colision,slot);
if(mdev->ee_hash_s) {
/* now, check for overlapping requests with remote origin */
+ BUG_ON(mdev->ee_hash == NULL);
#undef OVERLAPS
#define OVERLAPS overlaps(e->sector, e->size, sector, size)
slot = ee_hash_slot(mdev,sector);
@@ -314,15 +345,21 @@
current->comm, current->pid,
(unsigned long long)sector, size,
(unsigned long long)e->sector, e->size);
- return 1;
+ goto out_conflict;
}
}
}
#undef OVERLAPS
+ out_no_conflict:
/* this is like it should be, and what we expected.
- * out users do behave after all... */
+ * our users do behave after all... */
+ dec_net(mdev);
return 0;
+
+ out_conflict:
+ dec_net(mdev);
+ return 1;
}
/* obviously this could be coded as many single functions
@@ -372,17 +409,6 @@
req->rq_state |= RQ_LOCAL_PENDING;
break;
-#if 0
- /* done inline below */
- case suspend_because_of_conflict:
- /* assert something? */
- /* reached via drbd_make_request_common */
- /* update state flag? why? which one? */
- req->w.cb = w_req_cancel_conflict;
- /* no queue here, see below! */
- break;
-#endif
-
/* FIXME these *_completed_* are basically the same.
* can probably be merged with some if (what == xy) */
@@ -474,44 +500,17 @@
/* assert something? */
/* from drbd_make_request_common only */
+ hlist_add_head(&req->colision,tl_hash_slot(mdev,req->sector));
+ /* corresponding hlist_del is in _req_may_be_done() */
+
/* NOTE
* In case the req ended up on the transfer log before being
* queued on the worker, it could lead to this request being
* missed during cleanup after connection loss.
* So we have to do both operations here,
* within the same lock that protects the transfer log.
- */
-
- /* register this request on the colison detection hash
- * tables. if we have a conflict, just leave here.
- * the request will be "queued" for faked "completion"
- * once the conflicting request is done.
- */
- if (_req_add_hash_check_colision(req)) {
- /* this is a conflicting request.
- * even though it may have been only _partially_
- * overlapping with one of the currently pending requests,
- * without even submitting or sending it,
- * we will pretend that it was successfully served
- * once the pending conflicting request is done.
- */
- /* _req_mod(req, suspend_because_of_conflict); */
- /* this callback is just for ASSERT purposes */
- req->w.cb = w_req_cancel_conflict;
-
- /* we don't add this to any epoch (barrier) object.
- * assign the "invalid" barrier_number 0.
- * it should be 0 anyways, still,
- * but being explicit won't harm. */
- req->epoch = 0;
-
- /*
- * EARLY break here!
- */
- break;
- }
-
- /* _req_add_to_epoch(req); this has to be after the
+ *
+ * _req_add_to_epoch(req); this has to be after the
* _maybe_start_new_epoch(req); which happened in
* drbd_make_request_common, because we now may set the bit
* again ourselves to close the current epoch.
@@ -535,31 +534,6 @@
drbd_queue_work(&mdev->data.work, &req->w);
break;
- case conflicting_req_done:
- case conflicting_ee_done:
- /* reached via bio_endio of the
- * conflicting request or epoch entry.
- * we now just "fake" completion of this request.
- * THINK: I'm going to _FAIL_ this request.
- */
- D_ASSERT(req->w.cb == w_req_cancel_conflict);
- D_ASSERT(req->epoch == 0);
- {
- const unsigned long s = req->rq_state;
- if (s & RQ_LOCAL_MASK) {
- D_ASSERT(s & RQ_LOCAL_PENDING);
- bio_put(req->private_bio);
- req->private_bio = NULL;
- dec_local(mdev);
- }
- D_ASSERT((s & RQ_NET_MASK) == RQ_NET_PENDING);
- dec_ap_pending(mdev);
- }
- /* no _OK ... this is going to be an io-error */
- req->rq_state = RQ_LOCAL_COMPLETED|RQ_NET_DONE;
- _req_may_be_done(req);
- break;
-
/* FIXME
* to implement freeze-io,
* we may not finish the request just yet.
@@ -619,6 +593,8 @@
_req_may_be_done(req);
break;
+ case conflict_discarded_by_peer:
+ /* interesstingly, this is the same thing! */
case write_acked_by_peer:
/* assert something? */
/* protocol C; successfully written on peer */
@@ -936,61 +912,51 @@
if (remote) _req_mod(req, to_be_send);
if (local) _req_mod(req, to_be_submitted);
- /* NOTE remote first: to get he concurrent write detection right, we
- * must register the request before start of local IO. */
+ /* check this request on the colison detection hash tables.
+ * if we have a conflict, just complete it here.
+ * THINK do we want to check reads, too? (I don't think so...) */
+ if (rw == WRITE && _req_conflicts(req)) {
+ /* this is a conflicting request.
+ * even though it may have been only _partially_
+ * overlapping with one of the currently pending requests,
+ * without even submitting or sending it, we will
+ * pretend that it was successfully served right now.
+ */
+ if (local) {
+ bio_put(req->private_bio);
+ req->private_bio = NULL;
+ drbd_al_complete_io(mdev, req->sector);
+ dec_local(mdev);
+ local = 0;
+ }
+ if (remote) dec_ap_pending(mdev);
+ dump_bio(mdev,req->master_bio,1);
+ /* THINK: do we want to fail it (-EIO), or pretend success? */
+ bio_endio(req->master_bio, req->master_bio->bi_size, 0);
+ req->master_bio = NULL;
+ dec_ap_bio(mdev);
+ drbd_req_free(req);
+ local = remote = 0;
+ }
+
+ /* NOTE remote first: to get the concurrent write detection right,
+ * we must register the request before start of local IO. */
if (remote) {
/* either WRITE and Connected,
* or READ, and no local disk,
* or READ, but not in sync.
*/
- _req_mod(req, rw == WRITE
- ? queue_for_net_write
- : queue_for_net_read);
+ if (rw == WRITE) _req_mod(req,queue_for_net_write);
+ else _req_mod(req,queue_for_net_read);
}
+ spin_unlock_irq(&mdev->req_lock);
+ if (b) kfree(b); /* if someone else has beaten us to it... */
- /* still holding the req_lock.
- * not strictly neccessary, but for the statistic counters... */
-
-#if 0
if (local) {
- /* FIXME I think this branch can go completely. */
- if (rw == WRITE) {
- /* we defer the drbd_set_out_of_sync to the bio_endio
- * function. we only need to make sure the bit is set
- * before we do drbd_al_complete_io. */
- if (!remote) drbd_set_out_of_sync(mdev,sector,size);
- } else {
- D_ASSERT(!remote); /* we should not read from both */
- }
- /* FIXME
- * Should we add even local reads to some list, so
- * they can be grabbed and freed somewhen?
- *
- * They already have a reference count (sort of...)
- * on mdev via inc_local()
- */
-
- /* XXX we probably should not update these here but in bio_endio.
- * especially the read_cnt could go wrong for all the READA
- * that may just be failed because of "overload"... */
- if(rw == WRITE) mdev->writ_cnt += size>>9;
- else mdev->read_cnt += size>>9;
-
/* FIXME what ref count do we have to ensure the backing_bdev
* was not detached below us? */
- req->private_bio->bi_rw = rw; /* redundant */
req->private_bio->bi_bdev = mdev->bc->backing_bdev;
- }
-#endif
- spin_unlock_irq(&mdev->req_lock);
- if (b) kfree(b); /* if someone else has beaten us to it... */
-
- /* extra if branch so I don't need to write spin_unlock_irq twice */
-
- if (local) {
- req->private_bio->bi_bdev = mdev->bc->backing_bdev;
-
if (FAULT_ACTIVE(rw==WRITE? DRBD_FAULT_DT_WR : DRBD_FAULT_DT_RD))
bio_endio(req->private_bio, req->private_bio->bi_size, -EIO);
else
@@ -1004,6 +970,7 @@
return 0;
fail_and_free_req:
+ if (b) kfree(b);
bio_endio(bio, bio->bi_size, -EIO);
drbd_req_free(req);
return 0;
@@ -1041,7 +1008,9 @@
*/
if ( mdev->state.disk <= Inconsistent &&
mdev->state.conn < Connected) {
- ERR("Sorry, I have no access to good data anymore.\n");
+ if (DRBD_ratelimit(5*HZ,5)) {
+ ERR("Sorry, I have no access to good data anymore.\n");
+ }
/*
* FIXME suspend, loop waiting on cstate wait?
*/
Modified: trunk/drbd/drbd_req.h
===================================================================
--- trunk/drbd/drbd_req.h 2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/drbd_req.h 2006-10-19 13:22:52 UTC (rev 2548)
@@ -88,10 +88,6 @@
to_be_send,
to_be_submitted,
- suspend_because_of_conflict,
- conflicting_req_done,
- conflicting_ee_done,
-
/* XXX yes, now I am inconsistent...
* these two are not "events" but "actions"
* oh, well... */
@@ -104,6 +100,7 @@
connection_lost_while_pending,
recv_acked_by_peer,
write_acked_by_peer,
+ conflict_discarded_by_peer,
neg_acked,
barrier_acked, /* in protocol A and B */
data_received, /* (remote read) */
Modified: trunk/drbd/drbd_worker.c
===================================================================
--- trunk/drbd/drbd_worker.c 2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/drbd_worker.c 2006-10-19 13:22:52 UTC (rev 2548)
@@ -141,7 +141,7 @@
if (do_wake) wake_up(&mdev->ee_wait);
- if(e->flags & CALL_AL_COMPLETE_IO) drbd_al_complete_io(mdev,e->sector);
+ if(e->flags & EE_CALL_AL_COMPLETE_IO) drbd_al_complete_io(mdev,e->sector);
wake_asender(mdev);
dec_local(mdev);
@@ -233,14 +233,6 @@
return 1; // Simply ignore this!
}
-/* for debug assertion only */
-int w_req_cancel_conflict(drbd_dev *mdev, struct drbd_work *w, int cancel)
-{
- ERR("w_req_cancel_conflict: this callback should never be called!\n");
- if (cancel) return 1; /* does it matter? */
- return 0;
-}
-
void resync_timer_fn(unsigned long data)
{
unsigned long flags;
Modified: trunk/drbd/linux/drbd.h
===================================================================
--- trunk/drbd/linux/drbd.h 2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/linux/drbd.h 2006-10-19 13:22:52 UTC (rev 2548)
@@ -106,7 +106,10 @@
FailedToClaimMyself,
UnknownNetLinkPacket,
HaveNoDiskConfig,
- AfterLastRetCode,
+ ProtocolCRequired,
+
+ /* insert new ones above this line */
+ AfterLastRetCode
};
#define DRBD_PROT_A 1
Modified: trunk/drbd/linux/drbd_config.h
===================================================================
--- trunk/drbd/linux/drbd_config.h 2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/linux/drbd_config.h 2006-10-19 13:22:52 UTC (rev 2548)
@@ -23,8 +23,8 @@
extern const char * drbd_buildtag(void);
#define REL_VERSION "8.0pre5"
-#define API_VERSION 84
-#define PRO_VERSION 83
+#define API_VERSION 85
+#define PRO_VERSION 84
// undef if you need the workaround in drbd_receiver
#define HAVE_UML_TO_VIRT 1
Modified: trunk/drbd/lru_cache.c
===================================================================
--- trunk/drbd/lru_cache.c 2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/drbd/lru_cache.c 2006-10-19 13:22:52 UTC (rev 2548)
@@ -284,6 +284,7 @@
PARANOIA_ENTRY();
BUG_ON(e->refcnt == 0);
+ BUG_ON(e == lc->changing_element);
if ( --e->refcnt == 0) {
list_move(&e->list,&lc->lru); // move it to the front of LRU.
lc->used--;
Modified: trunk/user/drbdsetup.c
===================================================================
--- trunk/user/drbdsetup.c 2006-10-18 20:50:49 UTC (rev 2547)
+++ trunk/user/drbdsetup.c 2006-10-19 13:22:52 UTC (rev 2548)
@@ -369,7 +369,16 @@
EM(DiskLowerThanOutdated) = "DiskLowerThanOutdated",
EM(FailedToClaimMyself) = "FailedToClaimMyself",
EM(HaveNoDiskConfig) = "HaveNoDiskConfig",
+ EM(ProtocolCRequired) = "ProtocolCRequired"
};
+#define MAX_ERROR (sizeof(error_messages)/sizeof(*error_messages))
+const char * error_to_string(int err_no)
+{
+ const unsigned int idx = err_no - RetCodeBase;
+ if (idx >= MAX_ERROR) return "Unknown... maybe API_VERSION mismatch?";
+ return error_messages[idx];
+}
+#undef MAX_ERROR
char* cmdname = 0;
@@ -705,7 +714,7 @@
} else {
if(err_no > RetCodeBase ) {
fprintf(stderr,"Failure: (%d) %s\n",err_no,
- error_messages[err_no-RetCodeBase]);
+ error_to_string(err_no));
rv = 10;
} else if (err_no == SS_UnknownError) {
fprintf(stderr,"State change failed: (%d)"
@@ -1606,9 +1615,10 @@
/* == '-' catches -h, --help, and similar */
if (argc > 1 && (!strcmp(argv[1],"help") || argv[1][0] == '-')) {
- if(argc == 3) {
+ if(argc >= 3) {
cmd=find_cmd_by_name(argv[2]);
if(cmd) print_command_usage(cmd-commands,NULL,0);
+ else print_usage("unknown command");
exit(0);
}
}
More information about the drbd-cvs
mailing list