[DRBD-cvs] svn commit by lars - r2394 - in trunk: . drbd drbd/linux user - This huge patch took me the better part of the last cou

drbd-cvs at lists.linbit.com drbd-cvs at lists.linbit.com
Sun Sep 10 01:04:21 CEST 2006


Author: lars
Date: 2006-09-10 01:04:18 +0200 (Sun, 10 Sep 2006)
New Revision: 2394

Added:
   trunk/drbd/drbd_req.h
Modified:
   trunk/ChangeLog
   trunk/drbd/drbd_actlog.c
   trunk/drbd/drbd_bitmap.c
   trunk/drbd/drbd_compat_wrappers.h
   trunk/drbd/drbd_fs.c
   trunk/drbd/drbd_int.h
   trunk/drbd/drbd_main.c
   trunk/drbd/drbd_nl.c
   trunk/drbd/drbd_receiver.c
   trunk/drbd/drbd_req.c
   trunk/drbd/drbd_worker.c
   trunk/drbd/linux/drbd.h
   trunk/drbd/linux/drbd_config.h
   trunk/user/Makefile
   trunk/user/drbdadm_main.c
Log:
This huge patch took me the better part of the last couple of weeks,
especially the last week was (more than) full-time dedicated to
this stuff, consolidating/fixing the various parts of the code that
handle "drbd_request" objects.

Even so, this is NOT TESTED.  Sorry.
well, it
 compiles,
 loads
 unloads...

but beyond that, it may be seriously broken still (should be mostly obvious
bugs/thinkos/"forgot to inizialize something"s).
I have to get my uml working again.

But since I start traveling again, I want this committed.
In case it really is broken beyond some "obvious" fixes,
maybe move this into some "lge-test" branch until I can fix it.

so what is in here:

 * we get a new file, drbd_req.h
 * hopefully the code can be audited more easily now,
   when "everything" is in one place
 * removal of some functions the functionality of which
   was reimplemented in (mainly) drbd_req.h
 * removal of some left-over wrapper functions
 * drbd_request now has explicit sector and size memebers,
   needed to implement protocol A and B correctly
 * ee_lock, tl_lock and pr_lock have gone.
   all occurences replace with req_lock when applicable.
 * fixes for
   * uncounted race conditions and
   * reference count imbalances all over the code
     for various counters and
   * bio leak when diskless
   * protocol A and B "forgetting" to dirty blocks
   * some other logic bugs (wrong order of actions)
   * probably some other things I forgot about right now...
 * lots of comments about how I think it is or should be
 * lots of new /* FIXME */ and "#warning FIXME"

 * unfortunatley, detection of conflicting writes is unimplemented (or
   half-implemented) now; I'll fix that asap.
   it was broken anyways.
 
 some whitespace changes :)

userland:
 drbdadm got a new -v "verbose" flag to echo the commands that are executed
 drbdadm dump includes the config file name

 I feel uneasy to use kernel headers directly when compiling userland stuff.
 user/Makefile: only use the -I$(KDIR)/include for drbdsetup



Modified: trunk/ChangeLog
===================================================================
--- trunk/ChangeLog	2006-09-09 10:42:16 UTC (rev 2393)
+++ trunk/ChangeLog	2006-09-09 23:04:18 UTC (rev 2394)
@@ -13,7 +13,7 @@
    context that calls make_request().
  * The worker thread no longer gets restarted upon loss of connection.
  * A testsuite developed by students of 'FH Hagenberg' was added.
-	
+
 8.0pre3 (api:82/proto:80)
 --------
  * Now it works on device mapper (LVM) as well as on "real" block devices.

Modified: trunk/drbd/drbd_actlog.c
===================================================================
--- trunk/drbd/drbd_actlog.c	2006-09-09 10:42:16 UTC (rev 2393)
+++ trunk/drbd/drbd_actlog.c	2006-09-09 23:04:18 UTC (rev 2394)
@@ -160,7 +160,7 @@
 struct update_odbm_work {
 	struct drbd_work w;
 	unsigned int enr;
-};
+} ;
 
 struct update_al_work {
 	struct drbd_work w;
@@ -203,6 +203,9 @@
 	return al_ext;
 }
 
+/* FIXME
+ * this should be able to return failure when meta data update has failed.
+ */
 void drbd_al_begin_io(struct Drbd_Conf *mdev, sector_t sector)
 {
 	unsigned int enr = (sector >> (AL_EXTENT_SIZE_B-9));
@@ -319,6 +322,7 @@
 	buffer->xor_sum = cpu_to_be32(xor_sum);
 
 #warning check outcome of addition u64/sector_t/s32
+#warning "FIXME code missing"
 	sector = mdev->bc->md.md_offset + mdev->bc->md.al_offset + mdev->al_tr_pos;
 
 	if(!drbd_md_sync_page_io(mdev,mdev->bc,sector,WRITE)) {
@@ -496,7 +500,7 @@
 
 	wait_event(mdev->al_wait, lc_try_lock(mdev->act_log));
 
-	i=inc_md_only(mdev,Attaching);
+	i=inc_local_if_state(mdev,Attaching);
 	D_ASSERT( i ); // Assertions should not have side effects.
 	// I do not want to have D_ASSERT( inc_md_only(mdev,Attaching) );
 
@@ -579,7 +583,7 @@
 {
 	struct update_odbm_work *udw = (struct update_odbm_work*)w;
 
-	if( !inc_md_only(mdev,Attaching) ) {
+	if( !inc_local_if_state(mdev,Attaching) ) {
 		if (DRBD_ratelimit(5*HZ,5))
 			WARN("Can not update on disk bitmap, local IO disabled.\n");
 		return 1;
@@ -768,14 +772,14 @@
 
 /*
  * this is intended to set one request worth of data out of sync.
- * affects at least 1 bit, and at most 1+PAGE_SIZE/BM_BLOCK_SIZE bits.
+ * affects at least 1 bit, and at most 1+DRBD_MAX_SEGMENT_SIZE/BM_BLOCK_SIZE bits.
  *
  * called by tl_clear and drbd_send_dblock (==drbd_make_request).
  * so this can be _any_ process.
  */
 void __drbd_set_out_of_sync(drbd_dev* mdev, sector_t sector, int size, const char* file, const unsigned int line)
 {
-	unsigned long sbnr,ebnr,lbnr,bnr;
+	unsigned long sbnr,ebnr,lbnr;
 	sector_t esector, nr_sectors;
 
 	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
@@ -803,11 +807,9 @@
 	}
 #endif
 
-	/*
-	 * ok, (capacity & 7) != 0 sometimes, but who cares...
-	 * we count rs_{total,left} in bits, not sectors.
-	 */
-	for(bnr=sbnr; bnr <= ebnr; bnr++) drbd_bm_set_bit(mdev,bnr);
+	/* ok, (capacity & 7) != 0 sometimes, but who cares...
+	 * we count rs_{total,left} in bits, not sectors.  */
+	drbd_bm_set_bits_in_irq(mdev,sbnr,ebnr);
 }
 
 static inline
@@ -946,7 +948,8 @@
 
 	spin_lock_irq(&mdev->al_lock);
 
-	if(inc_md_only(mdev,Failed)) { // Makes sure mdev->resync is there.
+	/* inc_local to make sure mdev->resync is there */
+	if(inc_local_if_state(mdev,Failed)) {
 		for(i=0;i<mdev->resync->nr_elements;i++) {
 			bm_ext = (struct bm_extent*) lc_entry(mdev->resync,i);
 			if(bm_ext->lce.lc_number == LC_FREE) continue;

Modified: trunk/drbd/drbd_bitmap.c
===================================================================
--- trunk/drbd/drbd_bitmap.c	2006-09-09 10:42:16 UTC (rev 2393)
+++ trunk/drbd/drbd_bitmap.c	2006-09-09 23:04:18 UTC (rev 2394)
@@ -27,7 +27,9 @@
 #include <linux/bitops.h>
 #include <linux/vmalloc.h>
 #include <linux/string.h> // for memset
+#include <linux/hardirq.h> /* for D_ASSERT(in_interrupt()) */
 
+
 #include <linux/drbd.h>
 #include "drbd_int.h"
 
@@ -53,14 +55,20 @@
  * or wether I want it to do all the sector<->bit calculation in here.
  */
 
+#warning "verify all spin_lock_irq here, and their call path"
+#warning "and change to irqsave where applicable"
+#warning "so we don't accidentally nest spin_lock_irq()"
 /*
  * NOTE
  *  Access to the *bm is protected by bm_lock.
  *  It is safe to read the other members within the lock.
  *
  *  drbd_bm_set_bit is called from bio_endio callbacks,
- *  so there we need a spin_lock_irqsave.
- *  Everywhere else we need a spin_lock_irq.
+ *  We may be called with irq already disabled,
+ *  so we need spin_lock_irqsave().
+ * FIXME
+ *  for performance reasons, when we _know_ we have irq disabled, we should
+ *  probably introduce some _in_irq variants, so we know to only spin_lock().
  *
  * FIXME
  *  Actually you need to serialize all resize operations.
@@ -83,6 +91,7 @@
 	 * it will blow up if we make the bitmap bigger...
 	 * not that it makes much sense to have a bitmap that large,
 	 * rather change the granularity to 16k or 64k or something.
+	 * (that implies other problems, however...)
 	 */
 	unsigned long bm_fo;        // next offset for drbd_bm_find_next
 	unsigned long bm_set;       // nr of set bits; THINK maybe atomic_t ?
@@ -590,10 +599,6 @@
 	ERR_IF(!b) return;
 	ERR_IF(!b->bm) return;
 
-	D_BUG_ON(!b);
-	if (b->bm_bits == 0) return;
-	D_BUG_ON(!b->bm);
-
 	MUST_BE_LOCKED();
 
 	spin_lock_irq(&b->bm_lock);
@@ -969,6 +974,33 @@
 	return i;
 }
 
+/* returns number of bits actually changed (0->1)
+ * wants bitnr, not sector */
+int drbd_bm_set_bits_in_irq(drbd_dev *mdev, const unsigned long s, const unsigned long e)
+{
+	struct drbd_bitmap *b = mdev->bitmap;
+	unsigned long bitnr;
+	int c = 0;
+	ERR_IF(!b) return 1;
+	ERR_IF(!b->bm) return 1;
+
+	D_BUG_ON(!in_interrupt()); /* called within spin_lock_irq(&mdev->req_lock) */
+
+	spin_lock(&b->bm_lock);
+	BM_PARANOIA_CHECK();
+	MUST_NOT_BE_LOCKED();
+	for (bitnr = s; bitnr <=e; bitnr++) {
+		ERR_IF (bitnr >= b->bm_bits) {
+			ERR("bitnr=%lu bm_bits=%lu\n",bitnr, b->bm_bits);
+		} else {
+			c += (0 == __test_and_set_bit(bitnr, b->bm));
+		}
+	}
+	b->bm_set += c;
+	spin_unlock(&b->bm_lock);
+	return c;
+}
+
 /* returns previous bit state
  * wants bitnr, NOT sector.
  */

Modified: trunk/drbd/drbd_compat_wrappers.h
===================================================================
--- trunk/drbd/drbd_compat_wrappers.h	2006-09-09 10:42:16 UTC (rev 2393)
+++ trunk/drbd/drbd_compat_wrappers.h	2006-09-09 23:04:18 UTC (rev 2394)
@@ -27,8 +27,7 @@
 
 extern int drbd_endio_read_sec (struct bio *bio, unsigned int bytes_done, int error);
 extern int drbd_endio_write_sec(struct bio *bio, unsigned int bytes_done, int error);
-extern int drbd_endio_read_pri (struct bio *bio, unsigned int bytes_done, int error);
-extern int drbd_endio_write_pri(struct bio *bio, unsigned int bytes_done, int error);
+extern int drbd_endio_pri      (struct bio *bio, unsigned int bytes_done, int error);
 
 static inline sector_t drbd_get_hardsect(struct block_device *bdev)
 {
@@ -56,48 +55,6 @@
 
 #define drbd_bio_uptodate(bio) bio_flagged(bio,BIO_UPTODATE)
 
-static inline void drbd_bio_IO_error(struct bio *bio)
-{
-	bio_endio(bio,bio->bi_size,-EIO);
-}
-
-static inline void drbd_bio_endio(struct bio *bio, int uptodate)
-{
-	bio_endio(bio,bio->bi_size,uptodate ? 0 : -EIO);
-}
-
-static inline drbd_dev* drbd_req_get_mdev(struct drbd_request *req)
-{
-	return (drbd_dev*) req->mdev;
-}
-
-static inline sector_t drbd_req_get_sector(struct drbd_request *req)
-{
-	return req->master_bio->bi_sector;
-}
-
-static inline unsigned short drbd_req_get_size(struct drbd_request *req)
-{
-	drbd_dev* mdev = req->mdev;
-	D_ASSERT(req->master_bio->bi_size);
-	return req->master_bio->bi_size;
-}
-
-static inline struct bio* drbd_req_private_bio(struct drbd_request *req)
-{
-	return req->private_bio;
-}
-
-static inline sector_t drbd_ee_get_sector(struct Tl_epoch_entry *ee)
-{
-	return ee->ee_sector;
-}
-
-static inline unsigned short drbd_ee_get_size(struct Tl_epoch_entry *ee)
-{
-	return ee->ee_size;
-}
-
 #ifdef CONFIG_HIGHMEM
 /*
  * I don't know why there is no bvec_kmap, only bvec_kmap_irq ...
@@ -179,7 +136,7 @@
 	if (!bio->bi_bdev) {
 		printk(KERN_ERR "drbd_generic_make_request: bio->bi_bdev == NULL\n");
 		dump_stack();
-		drbd_bio_IO_error(bio);
+		bio_endio(bio, bio->bi_size, -ENODEV);
 		return;
 	}
 

Modified: trunk/drbd/drbd_fs.c
===================================================================
--- trunk/drbd/drbd_fs.c	2006-09-09 10:42:16 UTC (rev 2393)
+++ trunk/drbd/drbd_fs.c	2006-09-09 23:04:18 UTC (rev 2394)
@@ -756,6 +756,10 @@
 
 	/* As soon as mdev->state.conn < Unconnected nobody can increase
 	   the net_cnt. Wait until the net_cnt is 0. */
+	/* FIXME this is buggy by design.
+	 * And, we currently do not wrap all dereferences to net_conf in
+	 * inc_net...  this needs to become a rw_semaphore!
+	 */
 	if ( wait_event_interruptible( mdev->cstate_wait,
 				       atomic_read(&mdev->net_cnt) == 0 ) ) {
 		retcode=GotSignal;

Modified: trunk/drbd/drbd_int.h
===================================================================
--- trunk/drbd/drbd_int.h	2006-09-09 10:42:16 UTC (rev 2393)
+++ trunk/drbd/drbd_int.h	2006-09-09 23:04:18 UTC (rev 2394)
@@ -23,6 +23,10 @@
   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
 
 */
+
+#ifndef _DRBD_INT_H
+#define _DRBD_INT_H
+
 #include <linux/compiler.h>
 #include <linux/types.h>
 #include <linux/version.h>
@@ -214,10 +218,6 @@
  * our structs
  *************************/
 
-#define SET_MAGIC(x)       ((x)->magic = (long)(x) ^ DRBD_MAGIC)
-#define VALID_POINTER(x)   ((x) ? (((x)->magic ^ DRBD_MAGIC) == (long)(x)):0)
-#define INVALIDATE_MAGIC(x) (x->magic--)
-
 #define SET_MDEV_MAGIC(x) \
 	({ typecheck(struct Drbd_Conf*,x); \
 	  (x)->magic = (long)(x) ^ DRBD_MAGIC; })
@@ -225,15 +225,6 @@
 	( typecheck(struct Drbd_Conf*,x) && \
 	  ((x) ? (((x)->magic ^ DRBD_MAGIC) == (long)(x)):0))
 
-/* these defines should go into blkdev.h
-   (if it will be ever includet into linus' linux) */
-#define RQ_DRBD_NOTHING	  0x0001
-#define RQ_DRBD_SENT      0x0010   // We got an ack
-#define RQ_DRBD_LOCAL     0x0020   // We wrote it to the local disk
-#define RQ_DRBD_IN_TL     0x0040   // Set when it is in the TL
-#define RQ_DRBD_ON_WIRE   0x0080   // Set as soon as it is on the socket...
-#define RQ_DRBD_DONE      ( RQ_DRBD_SENT + RQ_DRBD_LOCAL + RQ_DRBD_ON_WIRE )
-
 /* drbd_meta-data.c (still in drbd_main.c) */
 #define DRBD_MD_MAGIC (DRBD_MAGIC+4) // 4th incarnation of the disk layout.
 
@@ -266,6 +257,7 @@
 	printk(KERN_EMERG DEVICE_NAME "%d: " fmt,			\
 			mdev_to_minor(mdev) , ##args);		\
 } while (0)
+#warning "drbd_panic() does nothing but printk()!"
 #endif
 #undef DRBD_PANIC
 
@@ -413,6 +405,7 @@
  */
 
 #define DP_HARDBARRIER 1
+/* FIXME map BIO_RW_SYNC, too ... */
 
 typedef struct {
 	Drbd_Header head;
@@ -466,6 +459,10 @@
 } __attribute((packed)) Drbd_HandShake_Packet;
 // 80 bytes, FIXED for the next century
 
+/* FIXME do we actually send a barrier packet with "0" as barrier number?
+ * what for?
+ * couldn't we send the pointer as handle as well, as we do with block_id?
+ */
 typedef struct {
 	Drbd_Header head;
 	u32         barrier;   // may be 0 or a barrier number
@@ -604,14 +601,24 @@
 struct drbd_barrier;
 struct drbd_request {
 	struct drbd_work w;
-	struct list_head tl_requests; // double linked list in the TL
-	struct drbd_barrier *barrier; // The next barrier.
-	struct bio *master_bio;       // master bio pointer
+	drbd_dev *mdev;
 	struct bio *private_bio;
 	struct hlist_node colision;
-	drbd_dev *mdev;
-	long magic;
-	int rq_status;
+	sector_t sector;
+	unsigned int size;
+	unsigned int epoch; /* barrier_nr */
+
+	/* barrier_nr: used to check on "completion" whether this req was in
+	 * the current epoch, and we therefore have to close it,
+	 * starting a new epoch...
+	 */
+
+	/* up to here, the struct layout is identical to Tl_epoch_entry;
+	 * we might be able to use that to our advantage...  */
+
+	struct list_head tl_requests; /* ring list in the transfer log */
+	struct bio *master_bio;       /* master bio pointer */
+	unsigned long rq_state; /* see comments above _req_mod() */
 	int seq_num;
 };
 
@@ -619,7 +626,7 @@
 	struct drbd_work w;
 	struct list_head requests; // requests before
 	struct drbd_barrier *next; // pointer to the next barrier
-	int br_number;  // the barriers identifier.
+	unsigned int br_number;  // the barriers identifier.
 	int n_req;      // number of requests attached before this barrier
 };
 
@@ -634,20 +641,24 @@
 
 struct Tl_epoch_entry {
 	struct drbd_work    w;
+	drbd_dev *mdev;
 	struct bio *private_bio;
-	u64    block_id;
-	long magic;
-	unsigned int ee_size;
-	sector_t ee_sector;
 	struct hlist_node colision;
-	drbd_dev *mdev;
+	sector_t sector;
+	unsigned int size;
 	unsigned int barrier_nr;
+
+	/* up to here, the struct layout is identical to drbd_request;
+	 * we might be able to use that to our advantage...  */
+
 	unsigned int barrier_nr2;
 	/* If we issue the bio with BIO_RW_BARRIER we have to
 	   send a barrier ACK before we send the ACK to this
 	   write. We store the barrier number in here.
 	   In case the barrier after this write has been coalesced
 	   as well, we set it's barrier_nr into barrier_nr2 */
+
+	u64    block_id;
 };
 
 /* flag bits */
@@ -787,7 +798,7 @@
 	atomic_t local_cnt;      // Waiting for local disk to signal completion
 	atomic_t net_cnt;        // Users of net_conf
 	spinlock_t req_lock;
-	spinlock_t tl_lock;
+	struct drbd_barrier* unused_spare_barrier; /* for pre-allocation */
 	struct drbd_barrier* newest_barrier;
 	struct drbd_barrier* oldest_barrier;
 	struct hlist_head * tl_hash;
@@ -807,19 +818,22 @@
 	atomic_t resync_locked;   // Number of locked elements in resync LRU
 	int open_cnt;
 	u64 *p_uuid;
-	spinlock_t ee_lock;
+	/* no more ee_lock
+	 * we had to grab both req_lock _and_ ee_lock in almost every place we
+	 * needed one of them. so why bother having too spinlocks?
+	 * FIXME clean comments, restructure so it is more obvious which
+	 * members areprotected by what */
 	unsigned int epoch_size;
 	struct list_head active_ee; // IO in progress
 	struct list_head sync_ee;   // IO in progress
 	struct list_head done_ee;   // send ack
 	struct list_head read_ee;   // IO in progress
 	struct list_head net_ee;    // zero-copy network send in progress
-	struct hlist_head * ee_hash; // is proteced by tl_lock!
+	struct hlist_head * ee_hash; // is proteced by req_lock!
 	unsigned int ee_hash_s;
 	struct Tl_epoch_entry * last_write_w_barrier; // ee_lock, single thread
 	int next_barrier_nr;  // ee_lock, single thread
-	spinlock_t pr_lock;
-	struct hlist_head * app_reads_hash; // is proteced by pr_lock
+	struct hlist_head * app_reads_hash; // is proteced by req_lock
 	struct list_head resync_reads;
 	atomic_t pp_in_use;
 	wait_queue_head_t ee_wait;
@@ -882,14 +896,7 @@
 extern void tl_release(drbd_dev *mdev,unsigned int barrier_nr,
 		       unsigned int set_size);
 extern void tl_clear(drbd_dev *mdev);
-extern void tl_add(drbd_dev *mdev, drbd_request_t *req);
-extern void _tl_add(drbd_dev *mdev, drbd_request_t *req);
-extern struct drbd_barrier *tl_add_barrier(drbd_dev *mdev);
 extern struct drbd_barrier *_tl_add_barrier(drbd_dev *,struct drbd_barrier *);
-extern struct Tl_epoch_entry * _ee_have_write(drbd_dev *mdev,drbd_request_t * req);
-extern int tl_dependence(drbd_dev *mdev, drbd_request_t * item);
-extern int tl_verify(drbd_dev *mdev, drbd_request_t * item, sector_t sector);
-extern drbd_request_t * req_have_write(drbd_dev *, struct Tl_epoch_entry *);
 extern void drbd_free_sock(drbd_dev *mdev);
 extern int drbd_send(drbd_dev *mdev, struct socket *sock,
 		     void* buf, size_t size, unsigned msg_flags);
@@ -1081,6 +1088,8 @@
 extern void drbd_bm_clear_all (drbd_dev *mdev);
 extern void drbd_bm_reset_find(drbd_dev *mdev);
 extern int  drbd_bm_set_bit   (drbd_dev *mdev, unsigned long bitnr);
+extern int  drbd_bm_set_bits_in_irq(
+		drbd_dev *mdev, unsigned long s, unsigned long e);
 extern int  drbd_bm_test_bit  (drbd_dev *mdev, unsigned long bitnr);
 extern int  drbd_bm_clear_bit (drbd_dev *mdev, unsigned long bitnr);
 extern int  drbd_bm_e_weight  (drbd_dev *mdev, unsigned long enr);
@@ -1127,12 +1136,10 @@
 extern drbd_dev *drbd_new_device(int minor);
 
 // drbd_req
-#define ERF_NOTLD    2   /* do not call tl_dependence */
-extern void drbd_end_req(drbd_request_t *, int, int, sector_t);
 extern int drbd_make_request_26(request_queue_t *q, struct bio *bio);
 extern int drbd_read_remote(drbd_dev *mdev, drbd_request_t *req);
 extern int drbd_merge_bvec(request_queue_t *, struct bio *, struct bio_vec *);
-extern int drbd_pr_verify(drbd_dev *, drbd_request_t *, sector_t);
+extern int is_valid_ar_handle(drbd_request_t *, sector_t);
 
 
 // drbd_fs.c
@@ -1164,7 +1171,7 @@
 extern int drbd_md_sync_page_io(drbd_dev *mdev, struct drbd_backing_dev *bdev,
 				sector_t sector, int rw);
 // worker callbacks
-extern int w_is_resync_read      (drbd_dev *, struct drbd_work *, int);
+extern int w_req_cancel_conflict (drbd_dev *, struct drbd_work *, int);
 extern int w_read_retry_remote   (drbd_dev *, struct drbd_work *, int);
 extern int w_e_end_data_req      (drbd_dev *, struct drbd_work *, int);
 extern int w_e_end_rsdata_req    (drbd_dev *, struct drbd_work *, int);
@@ -1273,37 +1280,39 @@
 	return _drbd_request_state(mdev, mask, val, ChgStateVerbose);
 }
 
-static inline void drbd_req_free(drbd_request_t *req)
-{
-	INVALIDATE_MAGIC(req);
-	mempool_free(req,drbd_request_mempool);
-}
-
 /**
  * drbd_chk_io_error: Handles the on_io_error setting, should be called from
  * all io completion handlers. See also drbd_io_error().
  */
+static inline void __drbd_chk_io_error(drbd_dev* mdev)
+{
+	/* FIXME cleanup the messages here */
+	switch(mdev->bc->dc.on_io_error) {
+	case PassOn: /* FIXME should the better be named "Ignore"? */
+		ERR("Ignoring local IO error!\n");
+		break;
+	case Panic:
+		_drbd_set_state(mdev,_NS(disk,Failed),ChgStateHard);
+		/* FIXME this is very ugly anyways.
+		 * but in case we panic, we should at least not panic
+		 * while holding the req_lock hand with irq disabled. */
+		drbd_panic("IO error on backing device!\n");
+		break;
+	case Detach:
+		if (_drbd_set_state(mdev,_NS(disk,Failed),ChgStateHard) 
+		    == SS_Success) {
+			ERR("Local IO failed. Detaching...\n");
+		}
+		break;
+	}
+}
+
 static inline void drbd_chk_io_error(drbd_dev* mdev, int error)
 {
 	if (error) {
 		unsigned long flags;
 		spin_lock_irqsave(&mdev->req_lock,flags);
-
-		switch(mdev->bc->dc.on_io_error) {
-		case PassOn:
-			ERR("Ignoring local IO error!\n");
-			break;
-		case Panic:
-			_drbd_set_state(mdev,_NS(disk,Failed),ChgStateHard);
-			drbd_panic("IO error on backing device!\n");
-			break;
-		case Detach:
-			if (_drbd_set_state(mdev,_NS(disk,Failed),ChgStateHard) 
-			    == SS_Success) {
-				ERR("Local IO failed. Detaching...\n");
-			}
-			break;
-		}
+		__drbd_chk_io_error(mdev);
 		spin_unlock_irqrestore(&mdev->req_lock,flags);
 	}
 }
@@ -1461,6 +1470,29 @@
 	_drbd_thread_stop(thi,TRUE,FALSE);
 }
 
+/* counts how many answer packets packets we expect from our peer,
+ * for either explicit application requests,
+ * or implicit barrier packets as necessary.
+ * increased:
+ *  w_send_barrier
+ *  _req_mod(req, queue_for_net_write or queue_for_net_read);
+ *    it is much easier and equally valid to count what we queue for the
+ *    worker, even before it actually was queued or send.
+ *    (drbd_make_request_common; recovery path on read io-error)
+ * decreased:
+ *  got_BarrierAck (respective tl_clear, tl_clear_barrier)
+ *  _req_mod(req, data_received)
+ *     [from receive_DataReply]
+ *  _req_mod(req, write_acked_by_peer or recv_acked_by_peer or neg_acked)
+ *     [from got_BlockAck (WriteAck, RecvAck)]
+ *     FIXME
+ *     for some reason it is NOT decreased in got_NegAck,
+ *     but in the resulting cleanup code from report_params.
+ *     we should try to remember the reason for that...
+ *  _req_mod(req, send_failed or send_canceled)
+ *  _req_mod(req, connection_lost_while_pending)
+ *     [from tl_clear_barrier]
+ */
 static inline void inc_ap_pending(drbd_dev* mdev)
 {
 	atomic_inc(&mdev->ap_pending_cnt);
@@ -1478,6 +1510,12 @@
 		wake_up(&mdev->cstate_wait);			\
 	ERR_IF_CNT_IS_NEGATIVE(ap_pending_cnt)
 
+/* counts how many resync-related answers we still expect from the peer
+ *                   increase                   decrease
+ * SyncTarget sends RSDataRequest (and expects RSDataReply)
+ * SyncSource sends RSDataReply   (and expects WriteAck whith ID_SYNCER)
+ *                                         (or NegAck with ID_SYNCER)
+ */
 static inline void inc_rs_pending(drbd_dev* mdev)
 {
 	atomic_inc(&mdev->rs_pending_cnt);
@@ -1488,52 +1526,22 @@
 	atomic_dec(&mdev->rs_pending_cnt);			\
 	ERR_IF_CNT_IS_NEGATIVE(rs_pending_cnt)
 
+/* counts how many answers we still need to send to the peer.
+ * increased on
+ *  receive_Data        unless protocol A;
+ *                      we need to send a RecvAck (proto B)
+ *                      or WriteAck (proto C)
+ *  receive_RSDataReply (recv_resync_read) we need to send a WriteAck
+ *  receive_DataRequest (receive_RSDataRequest) we need to send back Data
+ *  receive_Barrier_*   we need to send a BarrierAck
+ */ 
 static inline void inc_unacked(drbd_dev* mdev)
 {
 	atomic_inc(&mdev->unacked_cnt);
 }
 
-#if 0 && LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
-/*
- * idea was to forcefully push the tcp stack whenever the
- * currently last pending packet is in the buffer.
- * should be benchmarked on some real box to see if it has any
- * effect on overall latency.
- */
-
-/* this only works with 2.6 kernels because of some conflicting defines
- * in header files included from net.tcp.h.
- */
-
-#include <net/tcp.h>
-static inline void drbd_push_msock(drbd_dev* mdev)
-{
-	struct sock    *sk;
-	struct tcp_opt *tp;
-	if (mdev->meta.socket == NULL) return;
-	sk = mdev->meta.socket->sk;
-	tp = tcp_sk(sk);
-	lock_sock(sk);
-	__tcp_push_pending_frames(sk, tp, tcp_current_mss(sk, 1), TCP_NAGLE_PUSH);
-	release_sock(sk);
-}
-
 #define dec_unacked(mdev)					\
-	might_sleep();						\
 	typecheck(drbd_dev*,mdev);				\
-	if (atomic_dec_and_test(&mdev->unacked_cnt))		\
-		drbd_push_msock(mdev);				\
-	ERR_IF_CNT_IS_NEGATIVE(unacked_cnt);
-
-#define sub_unacked(mdev, n)					\
-	might_sleep();						\
-	typecheck(drbd_dev*,mdev);				\
-	if (atomic_sub_and_test(n, &mdev->unacked_cnt))		\
-		drbd_push_msock(mdev);				\
-	ERR_IF_CNT_IS_NEGATIVE(unacked_cnt);
-#else
-#define dec_unacked(mdev)					\
-	typecheck(drbd_dev*,mdev);				\
 	atomic_dec(&mdev->unacked_cnt);				\
 	ERR_IF_CNT_IS_NEGATIVE(unacked_cnt)
 
@@ -1541,9 +1549,9 @@
 	typecheck(drbd_dev*,mdev);				\
 	atomic_sub(n, &mdev->unacked_cnt);			\
 	ERR_IF_CNT_IS_NEGATIVE(unacked_cnt)
-#endif
 
 
+#warning "FIXME inherently racy. this is buggy by design :("
 /**
  * inc_net: Returns TRUE when it is ok to access mdev->net_conf. You
  * should call dec_net() when finished looking at mdev->net_conf.
@@ -1565,34 +1573,33 @@
 	}
 }
 
+/* strictly speaking,
+ * these would have to hold the req_lock while looking at
+ * the disk state. But since we cannot submit within a spinlock,
+ * this is mood...
+ */
+
 /**
  * inc_local: Returns TRUE when local IO is possible. If it returns
  * TRUE you should call dec_local() after IO is completed.
  */
-static inline int inc_local(drbd_dev* mdev)
+static inline int inc_local_if_state(drbd_dev* mdev, drbd_disks_t mins)
 {
 	int io_allowed;
 
 	atomic_inc(&mdev->local_cnt);
-	io_allowed = (mdev->state.disk >= Inconsistent);
+	io_allowed = (mdev->state.disk >= mins ); 
 	if( !io_allowed ) {
 		atomic_dec(&mdev->local_cnt);
 	}
 	return io_allowed;
 }
-
-static inline int inc_md_only(drbd_dev* mdev, drbd_disks_t mins)
+static inline int inc_local(drbd_dev* mdev)
 {
-	int io_allowed;
-
-	atomic_inc(&mdev->local_cnt);
-	io_allowed = (mdev->state.disk >= mins ); 
-	if( !io_allowed ) {
-		atomic_dec(&mdev->local_cnt);
-	}
-	return io_allowed;
+	return inc_local_if_state(mdev, Inconsistent);
 }
 
+
 static inline void dec_local(drbd_dev* mdev)
 {
 	if(atomic_dec_and_test(&mdev->local_cnt) &&
@@ -1603,12 +1610,24 @@
 	D_ASSERT(atomic_read(&mdev->local_cnt)>=0);
 }
 
-/* 
+/* this throttles on-the-fly application requests
+ * according to max_buffers settings;
+ * maybe re-implement using semaphores? */
 static inline void inc_ap_bio(drbd_dev* mdev)
 {
-	atomic_inc(&mdev->ap_bio_cnt);
+	int mxb = 1000000; /* arbitrary limit on open requests */
+
+	if(inc_net(mdev)) {
+		mxb = mdev->net_conf->max_buffers;
+		dec_net(mdev);
+		/* decrease here already, so you can reconfigure
+		 * the max buffer setting even under load.
+		 * alternative: use rw_semaphore. I'd like that.
+		 */
+	}
+
+	wait_event( mdev->rq_wait,atomic_add_unless(&mdev->ap_bio_cnt,1,mxb) );
 }
-*/
 
 static inline void dec_ap_bio(drbd_dev* mdev)
 {
@@ -1625,12 +1644,15 @@
 	D_ASSERT(atomic_read(&mdev->ap_bio_cnt)>=0);
 }
 
+/* FIXME does not handle wrap around yet */
 static inline void update_peer_seq(drbd_dev* mdev, int new_seq)
 {
 	spin_lock(&mdev->peer_seq_lock);
 	mdev->peer_seq = max(mdev->peer_seq, new_seq);
 	spin_unlock(&mdev->peer_seq_lock);
 	wake_up(&mdev->cstate_wait);
+	/* FIXME introduce seq_wait, no point in waking up a number of
+	 * processes with each and every Ack received... */
 }
 
 static inline int peer_seq(drbd_dev* mdev)
@@ -1729,3 +1751,4 @@
 		drbd_blk_run_queue(bdev_get_queue(mdev->bc->backing_bdev));
 	}
 }
+#endif

Modified: trunk/drbd/drbd_main.c
===================================================================
--- trunk/drbd/drbd_main.c	2006-09-09 10:42:16 UTC (rev 2393)
+++ trunk/drbd/drbd_main.c	2006-09-09 23:04:18 UTC (rev 2394)
@@ -58,6 +58,7 @@
 #include <linux/drbd.h>
 #include <linux/drbd_limits.h>
 #include "drbd_int.h"
+#include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
 
 /* YES. We got an official device major from lanana
  */
@@ -187,42 +188,10 @@
 	}
 }
 
-STATIC unsigned int tl_hash_fn(drbd_dev *mdev, sector_t sector)
-{
-	BUG_ON(mdev->tl_hash_s == 0);
-	return (unsigned int)(sector>>HT_SHIFT) % mdev->tl_hash_s;
-}
-
-
-void _tl_add(drbd_dev *mdev, drbd_request_t *req)
-{
-	struct drbd_barrier *b;
-
-	b=mdev->newest_barrier;
-
-	req->barrier = b;
-	req->rq_status |= RQ_DRBD_IN_TL; /* BUG, not holding req_lock */
-	list_add(&req->tl_requests,&b->requests);
-
-	if( b->n_req++ > mdev->net_conf->max_epoch_size ) {
-		set_bit(ISSUE_BARRIER,&mdev->flags);
-	}
-
-	INIT_HLIST_NODE(&req->colision);
-	hlist_add_head( &req->colision, mdev->tl_hash +
-			tl_hash_fn(mdev, drbd_req_get_sector(req) ));
-}
-
-void tl_add(drbd_dev *mdev, drbd_request_t * req)
-{
-	spin_lock_irq(&mdev->tl_lock);
-	_tl_add(mdev,req);
-	spin_unlock_irq(&mdev->tl_lock);
-}
-
 /**
- * _tl_add_barrier: Adds a barrier to the TL. It returns the the newest 
- * (but not the just created barrier) to the caller.
+ * _tl_add_barrier: Adds a barrier to the TL.
+ * It returns the previously newest barrier
+ * (not the just created barrier) to the caller.
  */
 struct drbd_barrier *_tl_add_barrier(drbd_dev *mdev,struct drbd_barrier *new)
 {
@@ -234,7 +203,8 @@
 
 	/* mdev->newest_barrier == NULL "cannot happen". but anyways... */
 	newest_before = mdev->newest_barrier;
-	/* never send a barrier number == 0 */
+	/* never send a barrier number == 0, because that is special-cased
+	 * when using TCQ for our write ordering code */
 	new->br_number = (newest_before->br_number+1) ?: 1;
 	mdev->newest_barrier->next = new;
 	mdev->newest_barrier = new;
@@ -242,22 +212,33 @@
 	return newest_before;
 }
 
+/* when we receive a barrier ack */
 void tl_release(drbd_dev *mdev,unsigned int barrier_nr,
 		       unsigned int set_size)
 {
 	struct drbd_barrier *b;
+	struct list_head *le, *tle;
+	struct drbd_request *r;
 
-	spin_lock_irq(&mdev->tl_lock);
+	spin_lock_irq(&mdev->req_lock);
 
 	b = mdev->oldest_barrier;
 	mdev->oldest_barrier = b->next;
 
+	/* in protocol C this list should be empty,
+	 * unless there is local io pending.
+	 * in protocol A and B, this should not be empty, even though the
+	 * master_bio's could already been completed.  */
+	list_for_each_safe(le, tle, &b->requests) {
+		r = list_entry(le, struct drbd_request,tl_requests);
+		_req_mod(r, barrier_acked);
+	}
 	list_del(&b->requests);
 	/* There could be requests on the list waiting for completion
 	   of the write to the local disk, to avoid corruptions of
 	   slab's data structures we have to remove the lists head */
 
-	spin_unlock_irq(&mdev->tl_lock);
+	spin_unlock_irq(&mdev->req_lock);
 
 	D_ASSERT(b->br_number == barrier_nr);
 	D_ASSERT(b->n_req == set_size);
@@ -276,162 +257,68 @@
 	kfree(b);
 }
 
-int tl_verify(drbd_dev *mdev, drbd_request_t * req, sector_t sector)
+/* FIXME called by whom? worker only? */
+void tl_clear(drbd_dev *mdev)
 {
-	struct hlist_head *slot = mdev->tl_hash + tl_hash_fn(mdev,sector);
-	struct hlist_node *n;
-	drbd_request_t * i;
-	int rv=0;
+	struct drbd_barrier *b, *tmp;
 
-	spin_lock_irq(&mdev->tl_lock);
+	WARN("tl_clear()\n");
 
-	hlist_for_each_entry(i, n, slot, colision) {
-		if (i==req) {
-		  if (drbd_req_get_sector(i) != sector) {
-			  ERR("tl_verify: found req %p but it has wrong sector (%llx versus %llx)\n",
-			      req, (long long)drbd_req_get_sector(i), (long long)sector);
-		  }
-		  rv=1;
-		  break;
+	spin_lock_irq(&mdev->req_lock);
+	b = mdev->oldest_barrier;
+	while ( b ) {
+		struct list_head *le, *tle;
+		struct drbd_request *r;
+		list_for_each_safe(le, tle, &b->requests) {
+			r = list_entry(le, struct drbd_request,tl_requests);
+			_req_mod(r, connection_lost_while_pending);
 		}
-	}
-
-	spin_unlock_irq(&mdev->tl_lock);
-
-	// Really better find it!
-	if (!rv) {
-		ERR("tl_verify: failed to find req %p, sector %llx in list\n", 
-		    req, (long long)sector);
-	}
-
-	return rv;
-}
-
-/* tl_dependence reports if this sector was present in the current
-   epoch.
-   As side effect it clears also the pointer to the request if it
-   was present in the transfert log. (Since tl_dependence indicates
-   that IO is complete and that drbd_end_req() should not be called
-   in case tl_clear has to be called due to interruption of the
-   communication)
-*/
-/* bool */
-int tl_dependence(drbd_dev *mdev, drbd_request_t * req)
-{
-	unsigned long flags;
-	int r=TRUE;
-
-	spin_lock_irqsave(&mdev->tl_lock,flags);
-
-	r = ( req->barrier == mdev->newest_barrier );
-	list_del(&req->tl_requests);
-	hlist_del(&req->colision);
-	// req->barrier->n_req--; // Barrier migh be free'ed already!
-
-	spin_unlock_irqrestore(&mdev->tl_lock,flags);
-	return r;
-}
-
-STATIC void tl_clear_barrier(drbd_dev *mdev, struct list_head *requests)
-{
-	struct list_head *le, *tle;
-	struct drbd_request *r;
-	sector_t sector;
-	unsigned int size;
-
-	list_for_each_safe(le, tle, requests) {
-		r = list_entry(le, struct drbd_request,tl_requests);
-		// bi_size and bi_sector are modified in bio_endio!
-		sector = drbd_req_get_sector(r);
-		size   = drbd_req_get_size(r);
-		
-		if( r->rq_status & RQ_DRBD_ON_WIRE &&
-		    mdev->net_conf->wire_protocol != DRBD_PROT_A ) {
-			dec_ap_pending(mdev);
+		tmp = b->next;
+		/* FIXME can there still be requests on that ring list now?
+		 * funny race conditions ... */
+		if (!list_empty(&b->requests)) {
+			WARN("FIXME explain this race...");
+			list_del(&b->requests);
 		}
-		
-		if( !(r->rq_status & RQ_DRBD_SENT) ) {
-			drbd_end_req(r,RQ_DRBD_SENT,ERF_NOTLD|1, sector);
-			goto mark;
+		dec_ap_pending(mdev); /* for the barrier */
+		if (b == mdev->newest_barrier) {
+			D_ASSERT(tmp == NULL);
+			b->br_number=4711;
+			b->n_req=0;
+			INIT_LIST_HEAD(&b->requests);
+			mdev->oldest_barrier = b;
+			break;
 		}
-		if(mdev->net_conf->wire_protocol != DRBD_PROT_C ) {
-		mark:
-			drbd_set_out_of_sync(mdev, sector, size);
-		}
+		kfree(b);
+		b = tmp;
 	}
+	D_ASSERT(mdev->newest_barrier == mdev->oldest_barrier);
+	D_ASSERT(mdev->newest_barrier->br_number == 4711);
+	spin_unlock_irq(&mdev->req_lock);
 }
 
-void tl_clear(drbd_dev *mdev)
+#warning "FIXME code missing"
+#if 0
+/* FIXME "wrong"
+ * see comment in receive_Data */
+drbd_request_t * _req_have_write(drbd_dev *mdev, struct Tl_epoch_entry *e)
 {
-	struct list_head tmp;
-	struct drbd_barrier *b,*f;
-
-	WARN("tl_clear()\n");
-	INIT_LIST_HEAD(&tmp);
-
-	spin_lock_irq(&mdev->tl_lock);
-
-	f = mdev->oldest_barrier;
-	b = f->next;
-
-	// mdev->oldest_barrier = f;
-	mdev->newest_barrier = f;
-
-	list_add(&tmp,&f->requests);
-	list_del_init(&f->requests);
-
-	f->next=NULL;
-	f->br_number=4711;
-	f->n_req=0;
-
-	spin_unlock_irq(&mdev->tl_lock);
-
-	tl_clear_barrier(mdev,&tmp);
-
-	while ( b ) {
-		tl_clear_barrier(mdev,&b->requests);
-		f=b;
-		b=b->next;
-		list_del(&f->requests);
-		kfree(f);
-		dec_ap_pending(mdev); // for the barrier
-	}
-}
-
-STATIC unsigned int ee_hash_fn(drbd_dev *mdev, sector_t sector)
-{
-	BUG_ON(mdev->ee_hash_s == 0);
-	return (unsigned int)(sector>>HT_SHIFT) % mdev->ee_hash_s;
-}
-
-STATIC int overlaps(sector_t s1, int l1, sector_t s2, int l2)
-{
-	return !( ( s1 + (l1>>9) <= s2 ) || ( s1 >= s2 + (l2>>9) ) );
-}
-
-drbd_request_t * req_have_write(drbd_dev *mdev, struct Tl_epoch_entry *e)
-{
 	struct hlist_head *slot;
 	struct hlist_node *n;
 	drbd_request_t * req;
-	sector_t sector = drbd_ee_get_sector(e);
-	int size = drbd_ee_get_size(e);
+	sector_t sector = e->sector;
+	int size = e->drbd_ee_get_size(e);
 	int i;
 
+	MUST_HOLD(&mdev->req_lock);
 	D_ASSERT(size <= 1<<(HT_SHIFT+9) );
 
-	spin_lock_irq(&mdev->tl_lock);
-
-	for(i=-1;i<=1;i++ ) {
-		slot = mdev->tl_hash + tl_hash_fn(mdev,
-						  sector + i*(1<<(HT_SHIFT)));
-		hlist_for_each_entry(req, n, slot, colision) {
-			if( overlaps(drbd_req_get_sector(req),
-				     drbd_req_get_size(req),
-				     sector,
-				     size) ) goto out;
-		} // hlist_for_each_entry()
-	}
+#define OVERLAPS overlaps(req->sector, req->size, sector, size)
+	slot = mdev->tl_hash + tl_hash_fn(mdev, sector);
+	hlist_for_each_entry(req, n, slot, colision) {
+		if (OVERLAPS) return req;
+	} // hlist_for_each_entry()
+#undef OVERLAPS
 	req = NULL;
 	// Good, no conflict found
 	INIT_HLIST_NODE(&e->colision);
@@ -442,36 +329,8 @@
 
 	return req;
 }
+#endif
 
-struct Tl_epoch_entry * _ee_have_write(drbd_dev *mdev, drbd_request_t * req)
-{
-	struct hlist_head *slot;
-	struct hlist_node *n;
-	struct Tl_epoch_entry *ee;
-	sector_t sector = drbd_req_get_sector(req);
-	int size = drbd_req_get_size(req);
-	int i;
-
-	D_ASSERT(size <= 1<<(HT_SHIFT+9) );
-
-	for(i=-1;i<=1;i++ ) {
-		slot = mdev->ee_hash + ee_hash_fn(mdev,
-						  sector + i*(1<<(HT_SHIFT)));
-		hlist_for_each_entry(ee, n, slot, colision) {
-			if( overlaps(drbd_ee_get_sector(ee),
-				     drbd_ee_get_size(ee),
-				     sector,
-				     size) ) goto out;
-		} // hlist_for_each_entry()
-	}
-	ee = NULL;
-	// Good, no conflict found
-	_tl_add(mdev,req);
- out:
-
-	return ee;
-}
-
 /**
  * drbd_io_error: Handles the on_io_error setting, should be called in the
  * unlikely(!drbd_bio_uptodate(e->bio)) case from kernel thread context.
@@ -500,7 +359,8 @@
 	if(!send) return ok;
 
 	ok = drbd_send_state(mdev);
-	WARN("Notified peer that my disk is broken.\n");
+	if (ok) WARN("Notified peer that my disk is broken.\n");
+	else ERR("Sending state in drbd_io_error() failed\n");
 
 	D_ASSERT(drbd_md_test_flag(mdev->bc,MDF_FullSync));
 	D_ASSERT(!drbd_md_test_flag(mdev->bc,MDF_Consistent));
@@ -889,6 +749,8 @@
 
 	if ( ns.role == Primary && ns.conn < Connected &&
 	     ns.disk < Consistent ) {
+#warning "ugly and wrong"
+#warning "FIXME code missing"
 		drbd_panic("No access to good data anymore.\n");
 	}
 
@@ -1353,7 +1215,7 @@
 	sector_t d_size;
 	int ok;
 
-	if(inc_md_only(mdev,Attaching)) {
+	if(inc_local_if_state(mdev,Attaching)) {
 		D_ASSERT(mdev->bc->backing_bdev);
 		d_size = drbd_get_max_capacity(mdev->bc);
 		p.u_size = cpu_to_be64(mdev->bc->dc.disk_size);
@@ -1435,6 +1297,7 @@
 			/* write_bm did fail! panic.
 			 * FIXME can we do something better than panic?
 			 */
+#warning "ugly and wrong"
 			drbd_panic("Failed to write bitmap to disk\n!");
 			ok = FALSE;
 			goto out;
@@ -1473,19 +1336,6 @@
 	return ok;
 }
 
-int _drbd_send_barrier(drbd_dev *mdev, struct drbd_barrier *barrier)
-{
-	int ok;
-	Drbd_Barrier_Packet p;
-
-	p.barrier=barrier->br_number;
-
-	inc_ap_pending(mdev);
-	ok = _drbd_send_cmd(mdev,mdev->data.socket,Barrier,(Drbd_Header*)&p,sizeof(p),0);
-
-	return ok;
-}
-
 int drbd_send_b_ack(drbd_dev *mdev, u32 barrier_nr,u32 set_size)
 {
 	int ok;
@@ -1539,8 +1389,8 @@
 int drbd_send_ack(drbd_dev *mdev, Drbd_Packet_Cmd cmd, struct Tl_epoch_entry *e)
 {
 	return _drbd_send_ack(mdev,cmd,
-			      cpu_to_be64(drbd_ee_get_sector(e)),
-			      cpu_to_be32(drbd_ee_get_size(e)),
+			      cpu_to_be64(e->sector),
+			      cpu_to_be32(e->size),
 			      e->block_id);
 }
 
@@ -1554,6 +1404,8 @@
 	p.block_id = block_id;
 	p.blksize  = cpu_to_be32(size);
 
+	/* FIXME BIO_RW_SYNC ? */
+
 	ok = drbd_send_cmd(mdev,USE_DATA_SOCKET,cmd,(Drbd_Header*)&p,sizeof(p));
 	return ok;
 }
@@ -1714,16 +1566,17 @@
 
 	p.head.magic   = BE_DRBD_MAGIC;
 	p.head.command = cpu_to_be16(Data);
-	p.head.length  = cpu_to_be16( sizeof(p)-sizeof(Drbd_Header)
-					      + drbd_req_get_size(req) );
+	p.head.length  = cpu_to_be16(sizeof(p)-sizeof(Drbd_Header)+req->size);
 
-	p.sector   = cpu_to_be64(drbd_req_get_sector(req));
+	p.sector   = cpu_to_be64(req->sector);
 	p.block_id = (unsigned long)req;
 	p.seq_num  = cpu_to_be32( req->seq_num =
 				  atomic_add_return(1,&mdev->packet_seq) );
 	if(req->master_bio->bi_rw & BIO_RW_BARRIER) {
 		dp_flags = DP_HARDBARRIER;
 	}
+	/* FIXME BIO_RW_SYNC */
+
 	p.dp_flags = cpu_to_be32(dp_flags);
 	dump_packet(mdev,mdev->data.socket,0,(void*)&p, __FILE__, __LINE__);
 	set_bit(UNPLUG_REMOTE,&mdev->flags);
@@ -1752,10 +1605,9 @@
 
 	p.head.magic   = BE_DRBD_MAGIC;
 	p.head.command = cpu_to_be16(cmd);
-	p.head.length  = cpu_to_be16( sizeof(p)-sizeof(Drbd_Header)
-				     + drbd_ee_get_size(e) );
+	p.head.length  = cpu_to_be16( sizeof(p)-sizeof(Drbd_Header) + e->size);
 
-	p.sector   = cpu_to_be64(drbd_ee_get_sector(e));
+	p.sector   = cpu_to_be64(e->sector);
 	p.block_id = e->block_id;
 	/* p.seq_num  = 0;    No sequence numbers here.. */
 
@@ -1997,10 +1849,7 @@
 	spin_lock_init(&mdev->meta.work.q_lock);
 
 	spin_lock_init(&mdev->al_lock);
-	spin_lock_init(&mdev->tl_lock);
-	spin_lock_init(&mdev->ee_lock);
 	spin_lock_init(&mdev->req_lock);
-	spin_lock_init(&mdev->pr_lock);
 	spin_lock_init(&mdev->peer_seq_lock);
 
 	INIT_LIST_HEAD(&mdev->active_ee);
@@ -2239,6 +2088,8 @@
 			if(!mdev) continue;
 
 			down(&mdev->device_mutex);
+			/* shouldn't this be an assert only?
+			 * we are removing the module here! */
 			drbd_set_role(mdev,Secondary,0);
 			up(&mdev->device_mutex);
 			drbd_sync_me(mdev);
@@ -2593,7 +2444,7 @@
 
 	// We use here Failed and not Attaching because we try to write
 	// metadata even if we detach due to a disk failure!
-	if(!inc_md_only(mdev,Failed)) return;
+	if(!inc_local_if_state(mdev,Failed)) return;
 
 	INFO("Writing meta data super block now.\n");
 
@@ -2638,6 +2489,7 @@
 			 * but we are supposed to be able to,
 			 * tough!
 			 */
+#warning "ugly and wrong"
 			drbd_panic("meta data update failed!\n");
 		}
 	}
@@ -2661,7 +2513,7 @@
 	struct meta_data_on_disk * buffer;
 	int i,rv = NoError;
 
-	if(!inc_md_only(mdev,Attaching)) return MDIOError;
+	if(!inc_local_if_state(mdev,Attaching)) return MDIOError;
 
 	down(&mdev->md_io_mutex);
 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);

Modified: trunk/drbd/drbd_nl.c
===================================================================
--- trunk/drbd/drbd_nl.c	2006-09-09 10:42:16 UTC (rev 2393)
+++ trunk/drbd/drbd_nl.c	2006-09-09 23:04:18 UTC (rev 2394)
@@ -440,7 +440,7 @@
 	}
 
 	if ( la_size_changed || md_moved ) {
-		if( inc_md_only(mdev,Attaching) ) {
+		if( inc_local_if_state(mdev,Attaching) ) {
 			drbd_al_shrink(mdev); // All extents inactive.
 			drbd_bm_write(mdev);  // write bitmap
 			// Write mdev->la_size to on disk.

Modified: trunk/drbd/drbd_receiver.c
===================================================================
--- trunk/drbd/drbd_receiver.c	2006-09-09 10:42:16 UTC (rev 2393)
+++ trunk/drbd/drbd_receiver.c	2006-09-09 23:04:18 UTC (rev 2394)
@@ -49,6 +49,7 @@
 #include <linux/random.h>
 #include <linux/drbd.h>
 #include "drbd_int.h"
+#include "drbd_req.h"
 
 #if defined(__arch_um__) && !defined(HAVE_UML_TO_VIRT)
 static inline void *to_virt(unsigned long phys)
@@ -133,8 +134,11 @@
 	 */
 
 	spin_lock_irqsave(&drbd_pp_lock,flags);
-	/* This lock needs to be IRQ save because we might call drdb_pp_free()
-	   from IRQ context. */
+	/* This lock needs to lock out irq because we might call drdb_pp_free()
+	   from IRQ context.
+	   FIXME but why irq _save_ ?
+	   this is only called from drbd_alloc_ee,
+	   and that is strictly process context! */
 	if ( (page = drbd_pp_pool) ) {
 		drbd_pp_pool = (struct page*)page->U_PRIVATE;
 		drbd_pp_vacant--;
@@ -147,7 +151,7 @@
 	for (;;) {
 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
 
-		/* try the pool again, maybe the drbd_kick_log set some free */
+		/* try the pool again, maybe the drbd_kick_lo set some free */
 		spin_lock_irqsave(&drbd_pp_lock,flags);
 		if ( (page = drbd_pp_pool) ) {
 			drbd_pp_pool = (struct page*)page->U_PRIVATE;
@@ -215,11 +219,11 @@
 }
 
 /*
-You need to hold the ee_lock:
+You need to hold the req_lock:
  drbd_free_ee()
  _drbd_wait_ee_list_empty()
 
-You must not have the ee_lock:
+You must not have the req_lock:
  drbd_alloc_ee()
  drbd_init_ee()
  drbd_release_ee()
@@ -256,80 +260,19 @@
 		page = drbd_pp_alloc(mdev, gfp_mask);
 		if (!page) goto fail2;
 		if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
-			/*
-			 * actually:
-			drbd_pp_free(page);
+			drbd_pp_free(mdev,page);
 			goto fail2;
-			 * But see below.
-			 */
 			break;
 		}
 		ds -= min_t(int, ds, PAGE_SIZE);
 	}
 
-	/* D_ASSERT( data_size == bio->bi_size); */
-	if (ds) {
-		/*
-		 * bio_add_page failed.
-		 *
-		 * if this happens, it indicates we are not doing correct
-		 * stacking of device limits.
-		 *
-		 * ---
-		 * this may also happen on the SyncSource for syncer requests:
-		 * for performance, the syncer may choose to ignore the
-		 * agreed-upon device limits (max_segment_size may be
-		 * arbitrarily set to PAGE_SIZE because the lower level device
-		 * happens to have a merge_bvec_fn).
-		 *
-		 * we would then just "split" the request here,
-		 * and then send multiple RSDataReply packets to the peer.
-		 *
-		 * FIXME to implement that, we'd need ot be able to
-		 * "return several Tl_epoch_entry" here,
-		 * so we'd need to either recurse, or add more state to the
-		 * return valud of this function.
-		 * or let the caller check e->ee_size against what he requested,
-		 * and reiterate there.
-		 *
-		 * It is probably just not worth the hassle,
-		 * but I'll check it in unfinished now anyways.
-		 *
-		 * TODO
-		 * either complete support on this side, or rip it out
-		 * and do the one-liner patch in w_make_resync_request
-		 * exchanging DRBD_MAX_SEGMENT_SIZE with q->max_segment_size
-		 * ---
-		 *
-		 * if this happens for the _first_ page, however, my
-		 * understanding is it would indicate a bug in the lower levels,
-		 * since adding _one_ page is "guaranteed" to work.
-		 */
-		if (ds < data_size && id == ID_SYNCER) {
-			/* this currently only serves the first part of that
-			 * request. you have to restart the syncer...
-			 * this is currently still very buggy and get our
-			 * housekeeping about in-sync areas completely wrong.
-			 */
-			ERR("should have split resync request: "
-			    "sector/data_size/rest %llu %u %u\n",
-			    (unsigned long long)sector, data_size, ds);
-		} else {
-			/* this should not happen,
-			 * if we correctly stacked limits. */
-			ERR("bio_add_page failed: "
-			    "id/sector/data_size/rest 0x%llx %llu %u %u\n",
-			    (unsigned long long)id,
-			    (unsigned long long)sector, data_size, ds);
-			drbd_pp_free(mdev,page);
-			goto fail2;
-		}
-	}
+	D_ASSERT( data_size == bio->bi_size);
 
 	bio->bi_private = e;
 	e->mdev = mdev;
-	e->ee_sector = sector;
-	e->ee_size = bio->bi_size;
+	e->sector = sector;
+	e->size = bio->bi_size;
 
 	e->private_bio = bio;
 	e->block_id = id;
@@ -338,6 +281,7 @@
 	e->barrier_nr2 = 0;
 
 	return e;
+
  fail2:
 	__bio_for_each_segment(bvec, bio, i, 0) {
 		drbd_pp_free(mdev,bvec->bv_page);
@@ -364,20 +308,21 @@
 	mempool_free(e, drbd_ee_mempool);
 }
 
+/* currently on module unload only */
 int drbd_release_ee(drbd_dev *mdev,struct list_head* list)
 {
 	int count=0;
 	struct Tl_epoch_entry* e;
 	struct list_head *le;
 
-	spin_lock_irq(&mdev->ee_lock);
+	spin_lock_irq(&mdev->req_lock);
 	while(!list_empty(list)) {
 		le = list->next;
 		e = list_entry(le, struct Tl_epoch_entry, w.list);
 		drbd_free_ee(mdev,e);
 		count++;
 	}
-	spin_unlock_irq(&mdev->ee_lock);
+	spin_unlock_irq(&mdev->req_lock);
 
 	return count;
 }
@@ -415,10 +360,10 @@
 	struct Tl_epoch_entry *e, *t;
 	int ok=1;
 
-	spin_lock_irq(&mdev->ee_lock);
+	spin_lock_irq(&mdev->req_lock);
 	reclaim_net_ee(mdev);
 	list_splice_init(&mdev->done_ee,&work_list);
-	spin_unlock_irq(&mdev->ee_lock);
+	spin_unlock_irq(&mdev->req_lock);
 
 	/* XXX maybe wake_up here already?
 	 * or wake_up withing drbd_free_ee just after mempool_free?
@@ -438,6 +383,8 @@
 	return ok;
 }
 
+
+
 /* clean-up helper for drbd_disconnect */
 void _drbd_clear_done_ee(drbd_dev *mdev)
 {
@@ -471,19 +418,19 @@
 	/* avoids spin_lock/unlock and calling prepare_to_wait in the fast path */
 	while (!list_empty(head)) {
 		prepare_to_wait(&mdev->ee_wait,&wait,TASK_UNINTERRUPTIBLE);
-		spin_unlock_irq(&mdev->ee_lock);
+		spin_unlock_irq(&mdev->req_lock);
 		drbd_kick_lo(mdev);
 		schedule();
 		finish_wait(&mdev->ee_wait, &wait);
-		spin_lock_irq(&mdev->ee_lock);
+		spin_lock_irq(&mdev->req_lock);
 	}
 }
 
 void drbd_wait_ee_list_empty(drbd_dev *mdev,struct list_head *head)
 {
-	spin_lock_irq(&mdev->ee_lock);
+	spin_lock_irq(&mdev->req_lock);
 	_drbd_wait_ee_list_empty(mdev, head);
-	spin_unlock_irq(&mdev->ee_lock);
+	spin_unlock_irq(&mdev->req_lock);
 }
 
 STATIC struct socket* drbd_accept(drbd_dev *mdev,struct socket* sock)
@@ -934,11 +881,11 @@
 	if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
 		drbd_kick_lo(mdev);
 
-	spin_lock_irq(&mdev->ee_lock);
+	spin_lock_irq(&mdev->req_lock);
 	_drbd_wait_ee_list_empty(mdev,&mdev->active_ee);
 	epoch_size = mdev->epoch_size;
 	mdev->epoch_size = 0;
-	spin_unlock_irq(&mdev->ee_lock);
+	spin_unlock_irq(&mdev->req_lock);
 
 	rv = drbd_send_b_ack(mdev, p->barrier, epoch_size);
 	dec_unacked(mdev);
@@ -946,6 +893,8 @@
 	return rv;
 }
 
+/* used from receive_RSDataReply (recv_resync_read)
+ * and from receive_Data */
 STATIC struct Tl_epoch_entry *
 read_in_block(drbd_dev *mdev, u64 id, sector_t sector, int data_size)
 {
@@ -965,7 +914,7 @@
 		kunmap(page);
 		if( rr != min_t(int,ds,PAGE_SIZE) ) {
 			drbd_free_ee(mdev,e);
-			WARN("short read recev data: read %d expected %d\n",
+			WARN("short read receiving data: read %d expected %d\n",
 			     rr, min_t(int,ds,PAGE_SIZE));
 			return 0;
 		}
@@ -976,16 +925,15 @@
 	return e;
 }
 
-STATIC void receive_data_tail(drbd_dev *mdev,int data_size)
+/* kick lower level device, if we have more than (arbitrary number)
+ * reference counts on it, which typically are locally submitted io
+ * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
+static void maybe_kick_lo(drbd_dev *mdev)
 {
-	/* kick lower level device, if we have more than (arbitrary number)
-	 * reference counts on it, which typically are locally submitted io
-	 * requests.  don't use unacked_cnt, so we speed up proto A and B, too.
-	 */
 	if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark ) {
+		/* FIXME hysteresis ?? */
 		drbd_kick_lo(mdev);
 	}
-	mdev->writ_cnt+=data_size>>9;
 }
 
 STATIC int recv_dless_read(drbd_dev *mdev, drbd_request_t *req,
@@ -993,10 +941,10 @@
 {
 	struct bio_vec *bvec;
 	struct bio *bio;
-	int rr,i,expect,ok=1;
+	int rr,i,expect;
 
 	bio = req->master_bio;
-	D_ASSERT( sector == drbd_req_get_sector(req) );
+	D_ASSERT( sector == bio->bi_sector );
 
 	bio_for_each_segment(bvec, bio, i) {
 		expect = min_t(int,data_size,bvec->bv_len);
@@ -1005,41 +953,50 @@
 			     expect);
 		kunmap(bvec->bv_page);
 		if (rr != expect) {
-			ok = 0;
-			break;
+			WARN("short read receiving data reply: read %d expected %d\n",
+			     rr, expect);
+			return 0;
 		}
 		data_size -= rr;
 	}
 
-	D_ASSERT(data_size == 0 || !ok);
-	drbd_bio_endio(bio,ok);
-	dec_ap_bio(mdev);
-	dec_ap_pending(mdev);
-	return ok;
+	D_ASSERT(data_size == 0);
+	/* FIXME recv_cnt accounting ?? */
+	return 1;
 }
 
-/* e_end_resync_block() is called via drbd_process_done_ee().
- * this means this function only runs in the asender thread
+/* e_end_resync_block() is called via
+ * drbd_process_done_ee() or _drbd_clear_done_ee().
+ * only runs in the asender thread
  */
 STATIC int e_end_resync_block(drbd_dev *mdev, struct drbd_work *w, int unused)
 {
 	struct Tl_epoch_entry *e = (struct Tl_epoch_entry*)w;
-	sector_t sector = drbd_ee_get_sector(e);
+	sector_t sector = e->sector;
 	int ok;
 
-	drbd_rs_complete_io(mdev,sector); // before set_in_sync() !
+	D_ASSERT(hlist_unhashed(&e->colision));
+
+	/* before set_in_sync()
+	 * FIXME because ... */
+	drbd_rs_complete_io(mdev,sector);
 	if (likely( drbd_bio_uptodate(e->private_bio) )) {
+		/* "optimization" only...  state could still change anytime
+		 * while we are calling drbd_set_in_sync */
 		ok = mdev->state.disk >= Inconsistent &&
 			mdev->state.pdsk >= Inconsistent;
 		if (likely( ok )) {
-			drbd_set_in_sync(mdev, sector, drbd_ee_get_size(e));
-			/* THINK maybe don't send ack either
-			 * when we are suddenly diskless?
-			 * Dropping it here should do no harm,
-			 * since peer has no structs referencing this.
-			 */
+			drbd_set_in_sync(mdev, sector, e->size);
+			ok = drbd_send_ack(mdev,WriteAck,e);
+		} else {
+			/* FIXME think:
+			 * send a WriteAck anyways?
+			 * send a NegAck?
+			 * just ignore it?  (ignoring it is valid, peer has no
+			 * structs referencing this) */
 		}
-		ok = drbd_send_ack(mdev,WriteAck,e);
+		/* FIXME what exactly do we need this flag for, again??
+		 * and why do we set it only in the "up-to-date" branch? */
 		set_bit(SYNC_STARTED,&mdev->flags);
 	} else {
 		ok = drbd_send_ack(mdev,NegAck,e);
@@ -1055,25 +1012,25 @@
 	struct Tl_epoch_entry *e;
 
 	e = read_in_block(mdev,ID_SYNCER,sector,data_size);
-	if(!e) {
-		dec_local(mdev);
-		return FALSE;
-	}
+	if(!e) return FALSE;
 
 	dec_rs_pending(mdev);
 
 	drbd_ee_prepare_write(mdev,e);
 	e->w.cb     = e_end_resync_block;
 
-	spin_lock_irq(&mdev->ee_lock);
-	list_add(&e->w.list,&mdev->sync_ee);
-	spin_unlock_irq(&mdev->ee_lock);
-
 	inc_unacked(mdev);
+	/* corresponding dec_unacked() in e_end_resync_block()
+	 * respective _drbd_clear_done_ee */
 
+	spin_lock_irq(&mdev->req_lock);
+	list_add(&e->w.list,&mdev->sync_ee);
+	spin_unlock_irq(&mdev->req_lock);
+
 	drbd_generic_make_request(WRITE,e->private_bio);
+	/* accounting done in endio */
 
-	receive_data_tail(mdev,data_size);
+	maybe_kick_lo(mdev);
 	return TRUE;
 }
 
@@ -1088,9 +1045,9 @@
 	header_size = sizeof(*p) - sizeof(*h);
 	data_size   = h->length  - header_size;
 
-	/* I expect a block to be a multiple of 512 byte, and
-	 * no more than 4K (PAGE_SIZE). is this too restrictive?
-	 */
+	/* I expect a block to be a multiple of 512 byte,
+	 * and no more than DRBD_MAX_SEGMENT_SIZE.
+	 * is this too restrictive?  */
 	ERR_IF(data_size == 0) return FALSE;
 	ERR_IF(data_size &  0x1ff) return FALSE;
 	ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return FALSE;
@@ -1100,19 +1057,23 @@
 
 	sector = be64_to_cpu(p->sector);
 
-	req = (drbd_request_t *)(unsigned long)p->block_id;
-	if(unlikely(!drbd_pr_verify(mdev,req,sector))) {
+	spin_lock_irq(&mdev->req_lock);
+	req = _ar_id_to_req(mdev,p->block_id, sector);
+	spin_unlock_irq(&mdev->req_lock);
+	if (unlikely(!req)) {
 		ERR("Got a corrupt block_id/sector pair(1).\n");
 		return FALSE;
 	}
 
-	spin_lock(&mdev->pr_lock);
-	hlist_del(&req->colision);
-	spin_unlock(&mdev->pr_lock);
-
+	/* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
+	 * special casing it there for the various failure cases.
+	 * still no race with drbd_fail_pending_reads */
 	ok = recv_dless_read(mdev,req,sector,data_size);
 
-	drbd_req_free(req);
+	if (ok) req_mod(req, data_received);
+	/* else: nothing. handled from drbd_disconnect...
+	 * I don't think we may complete this just yet
+	 * in case we are "on-disconnect: freeze" */
 
 	return ok;
 }
@@ -1127,9 +1088,9 @@
 	header_size = sizeof(*p) - sizeof(*h);
 	data_size   = h->length  - header_size;
 
-	/* I expect a block to be a multiple of 512 byte, and
-	 * no more than 4K (PAGE_SIZE). is this too restrictive?
-	 */
+	/* I expect a block to be a multiple of 512 byte,
+	 * and no more than DRBD_MAX_SEGMENT_SIZE.
+	 * is this too restrictive?  */
 	ERR_IF(data_size == 0) return FALSE;
 	ERR_IF(data_size &  0x1ff) return FALSE;
 	ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return FALSE;
@@ -1140,117 +1101,100 @@
 	sector = be64_to_cpu(p->sector);
 	D_ASSERT(p->block_id == ID_SYNCER);
 
-	if(!inc_local(mdev)) {
+	if(inc_local(mdev)) {
+		/* data is submitted to disk within recv_resync_read.
+		 * corresponding dec_local done below on error,
+		 * or in drbd_endio_write_sec. */
+		/* FIXME paranoia:
+		 * verify that the corresponding bit is set.
+		 * in case we are Primary SyncTarget,
+		 * verify there are no pending write request to that area.
+		 */
+		ok = recv_resync_read(mdev,sector,data_size);
+		if (!ok) dec_local(mdev);
+	} else {
 		if (DRBD_ratelimit(5*HZ,5))
 			ERR("Can not write resync data to local disk.\n");
 		drbd_send_ack_dp(mdev,NegAck,p);
-		return TRUE;
+		/* FIXME:
+		 * we need to drain the data.  only then can we keep the
+		 * connection open.
+		 * without draining, we'd see an invalid packet header next,
+		 * and drop the connection there. */
+		/* ok = 1; not yet: keep connection open */
+		ok = 0;
 	}
 
-	ok = recv_resync_read(mdev,sector,data_size);
-
 	return ok;
 }
 
 /* e_end_block() is called via drbd_process_done_ee().
  * this means this function only runs in the asender thread
+ *
+ * for a broken example implementation of the TCQ barrier version of
+ * e_end_block see older revisions...
  */
-#if 0
-
-	/* disabled for now.
-	 * barrier handling via tcq currently broken!
-	 */
 STATIC int e_end_block(drbd_dev *mdev, struct drbd_work *w, int unused)
 {
 	struct Tl_epoch_entry *e = (struct Tl_epoch_entry*)w;
-	sector_t sector = drbd_ee_get_sector(e);
+	sector_t sector = e->sector;
 	// unsigned int epoch_size;
 	int ok=1;
 
 	if(mdev->net_conf->wire_protocol == DRBD_PROT_C) {
 		if(likely(drbd_bio_uptodate(e->private_bio))) {
-			if(e->barrier_nr) {
-# warning "epoch_size no more atomic_t"
-				/* only when using TCQ */
-				epoch_size = atomic_read(&mdev->epoch_size);
-				atomic_set(&mdev->epoch_size,0);
-				ok&=drbd_send_b_ack(mdev,
-						    cpu_to_be32(e->barrier_nr),
-						    epoch_size);
-				dec_unacked(mdev);
-			}
 			ok &= drbd_send_ack(mdev,WriteAck,e);
-			if(e->barrier_nr2) {
-				/* only when using TCQ */
-				atomic_set(&mdev->epoch_size,0);
-				ok&=drbd_send_b_ack(mdev,
-						   cpu_to_be32(e->barrier_nr2),
-						    1);
-				dec_unacked(mdev);
-			}
-			if (test_bit(SYNC_STARTED,&mdev->flags) )
-				drbd_set_in_sync(mdev,sector,drbd_ee_get_size(e));
-		} else {
-			ok = drbd_send_ack(mdev,NegAck,e);
-			ok&= drbd_io_error(mdev);
-			/* we expect it to be marked out of sync anyways...
-			 * maybe assert this?
-			 */
-		}
-		dec_unacked(mdev);
-
-		return ok;
-	}
-
-	if(unlikely(!drbd_bio_uptodate(e->private_bio))) {
-		ok = drbd_io_error(mdev);
-	}
-
-	return ok;
-}
-#else
-
-STATIC int e_end_block(drbd_dev *mdev, struct drbd_work *w, int unused)
-{
-	struct Tl_epoch_entry *e = (struct Tl_epoch_entry*)w;
-	sector_t sector = drbd_ee_get_sector(e);
-	// unsigned int epoch_size;
-	int ok=1;
-
-	if(mdev->net_conf->wire_protocol == DRBD_PROT_C) {
-		if(likely(drbd_bio_uptodate(e->private_bio))) {
-			ok &= drbd_send_ack(mdev,WriteAck,e);
 			if (test_bit(SYNC_STARTED,&mdev->flags))
-				drbd_set_in_sync(mdev,sector,drbd_ee_get_size(e));
+				drbd_set_in_sync(mdev,sector,e->size);
 		} else {
+			/* FIXME I think we should send a NegAck regardless of
+			 * which protocol is in effect.
+			 * In which case we would need to make sure that any
+			 * NegAck is sent. basically that means that drbd_process_done_ee
+			 * may not list_del() the ee before this callback did run...
+			 * maybe even move the list_del(e) in here... */
 			ok = drbd_send_ack(mdev,NegAck,e);
 			ok&= drbd_io_error(mdev);
 			/* we expect it to be marked out of sync anyways...
-			 * maybe assert this?
-			 */
+			 * maybe assert this?  */
 		}
 		dec_unacked(mdev);
-
 		return ok;
-	}
-
-	if(unlikely(!drbd_bio_uptodate(e->private_bio))) {
+	} else if(unlikely(!drbd_bio_uptodate(e->private_bio))) {
 		ok = drbd_io_error(mdev);
 	}
 
+#warning "FIXME code missing"
+#if 0
+	/* we delete from the conflict detection hash _after_ we sent out the
+	 * WriteAck / NegAck, to get the sequence number right.  */
+	D_ASSERT(!hlist_unhashed(&e->colision));
+	/* FIXME "wake" any conflicting requests
+	 * that have been waiting for this one to finish */
+	hlist_del_init(&e->colision);
+#endif
+
 	return ok;
 }
 
-#endif
-
+/* FIXME implementation wrong.
+ * For the algorithm to be correct, we need to send and store the
+ * sector and size, not the block id. We have to check for overlap.
+ * We may _only_ remove the info when its sequence number is less than
+ * the current sequence number.
+ *
+ * I think the "discard info" are the wrong way, anyways.
+ * Instead of silently discarding such writes, we should send a DiscardAck,
+ * and we should retard sending of the data until we get that Discard Ack
+ * and thus the conflicting request is done.
+ */
 STATIC int drbd_chk_discard(drbd_dev *mdev,struct Tl_epoch_entry *e)
 {
 	struct drbd_discard_note *dn;
-	struct list_head *le;
+	struct list_head *le,*tmp;
 
 	MUST_HOLD(&mdev->peer_seq_lock);
- start_over:
-	list_for_each(le,&mdev->discard) {
+	list_for_each_safe(le,tmp,&mdev->discard) {
 		dn = list_entry(le, struct drbd_discard_note, list);
 		if( dn->seq_num == mdev->peer_seq ) {
 			D_ASSERT( dn->block_id == e->block_id );
@@ -1261,7 +1205,6 @@
 		if( dn->seq_num < mdev->peer_seq ) {
 			list_del(le);
 			kfree(dn);
-			goto start_over;
 		}
 	}
 	return 0;
@@ -1272,6 +1215,8 @@
 {
 	sector_t sector;
 	struct Tl_epoch_entry *e;
+	/* FIXME currently some unused variable intended
+	 * for the now not-implemented conflict detection */
 	drbd_request_t * req;
 	Drbd_Data_Packet *p = (Drbd_Data_Packet*)h;
 	int header_size, data_size, packet_seq, discard, rv;
@@ -1290,6 +1235,9 @@
 		return FALSE;
 
 	if(!inc_local(mdev)) {
+		/* data is submitted to disk at the end of this function.
+		 * corresponding dec_local done either below (on error),
+		 * or in drbd_endio_write_sec. */
 		if (DRBD_ratelimit(5*HZ,5))
 			ERR("Can not write mirrored data block to local disk.\n");
 		drbd_send_ack_dp(mdev,NegAck,p);
@@ -1306,23 +1254,32 @@
 	drbd_ee_prepare_write(mdev, e);
 	e->w.cb     = e_end_block;
 
-	/* This wait_event is here to make sure that never ever an
-	   DATA packet traveling via sock can overtake an ACK packet
-	   traveling on msock
-	   PRE TODO: Wrap around of seq_num !!!
-	*/
+	/* FIXME drbd_al_begin_io in case we have two primaries... */
+
+#warning "FIXME code missing"
+#if 0 
+/* sorry.
+ * to get this patch in a shape where it can be committed,
+ * I need to disable the broken conflict detection code for now.
+ * will implement the correct as soon as possible...
+ * it is done in my head already, "only have to write it down",
+ * which will take an other couple of days, probably.
+ */
+
+	/* This wait_event is here so even when a DATA packet traveling via
+	 * sock overtook an ACK packet traveling on msock, they are still
+	 * processed in the order they have been sent.
+	 * FIXME TODO: Wrap around of seq_num !!!
+	 */
 	if (mdev->net_conf->two_primaries) {
 		packet_seq = be32_to_cpu(p->seq_num);
-		/* if( packet_seq > peer_seq(mdev)+1 ) {
-			WARN(" will wait till (packet_seq) %d <= %d\n",
-			     packet_seq,peer_seq(mdev)+1);
-			     } */
 		if( wait_event_interruptible(mdev->cstate_wait,
 					     packet_seq <= peer_seq(mdev)+1)) {
 			rv = FALSE;
 			goto out2;
 		}
 
+		/* FIXME current discard implementation is wrong */
 		spin_lock(&mdev->peer_seq_lock);
 		mdev->peer_seq = max(mdev->peer_seq, packet_seq);
 		/* is update_peer_seq(mdev,packet_seq); */
@@ -1339,6 +1296,13 @@
 		req = req_have_write(mdev, e);
 
 		if(req) {
+			/* FIXME RACE
+			 * rq_status may be changing while we are looking.
+			 * in rare cases it could even disappear right now.
+			 * e.g. when it has already been ACK'ed, and the local
+			 * storage has been way too slow, and only now
+			 * completes the thing.
+			 */
 			if( req->rq_status & RQ_DRBD_SENT ) {
 				/* Conflicting write, got ACK */
 				/* write afterwards ...*/
@@ -1372,6 +1336,7 @@
 			}
 		}
 	}
+#endif
 
 	if ( be32_to_cpu(p->dp_flags) & DP_HARDBARRIER ) {
 		e->private_bio->bi_rw |= BIO_RW_BARRIER;
@@ -1422,7 +1387,7 @@
 	 * will trigger the b_ack before its own ack.
 	 */
 
-	spin_lock_irq(&mdev->ee_lock);
+	spin_lock_irq(&mdev->req_lock);
 	if (mdev->next_barrier_nr) {
 		/* only when using TCQ */
 		if (list_empty(&mdev->active_ee)) {
@@ -1436,7 +1401,7 @@
 		mdev->next_barrier_nr = 0;
 	}
 	list_add(&e->w.list,&mdev->active_ee);
-	spin_unlock_irq(&mdev->ee_lock);
+	spin_unlock_irq(&mdev->req_lock);
 
 	if (barrier_nr) {
 		/* only when using TCQ
@@ -1449,8 +1414,12 @@
 	switch(mdev->net_conf->wire_protocol) {
 	case DRBD_PROT_C:
 		inc_unacked(mdev);
+		/* corresponding dec_unacked() in e_end_block()
+		 * respective _drbd_clear_done_ee */
 		break;
 	case DRBD_PROT_B:
+		/* I really don't like it that the receiver thread
+		 * sends on the msock, but anyways */
 		drbd_send_ack(mdev, RecvAck, e);
 		break;
 	case DRBD_PROT_A:
@@ -1458,9 +1427,11 @@
 		break;
 	}
 
+	/* FIXME drbd_al_begin_io in case we have two primaries... */
 	drbd_generic_make_request(WRITE,e->private_bio);
+	/* accounting done in endio */
 
-	receive_data_tail(mdev,data_size);
+	maybe_kick_lo(mdev);
 	return TRUE;
 
  out2:
@@ -1499,7 +1470,7 @@
 		return FALSE;
 	}
 
-	if(!inc_local(mdev) || mdev->state.disk < UpToDate ) {
+	if(!inc_local_if_state(mdev, UpToDate)) {
 		if (DRBD_ratelimit(5*HZ,5))
 			ERR("Can not satisfy peer's read request, no local data.\n");
 		drbd_send_ack_rp(mdev,h->command == DataRequest ? NegDReply :
@@ -1513,9 +1484,9 @@
 		return FALSE;
 	}
 
-	spin_lock_irq(&mdev->ee_lock);
+	spin_lock_irq(&mdev->req_lock);
 	list_add(&e->w.list,&mdev->read_ee);
-	spin_unlock_irq(&mdev->ee_lock);
+	spin_unlock_irq(&mdev->req_lock);
 
 	drbd_ee_prepare_read(mdev,e);
 
@@ -1531,25 +1502,22 @@
 		 * the drbd_work_queue mechanism is made for this...
 		 */
 		if (!drbd_rs_begin_io(mdev,sector)) {
-			// we have been interrupted, probably connection lost!
+			/* we have been interrupted,
+			 * probably connection lost! */
 			D_ASSERT(signal_pending(current));
+			dec_local(mdev);
 			drbd_free_ee(mdev,e);
 			return 0;
 		}
 		break;
-	default:
-		ERR("unexpected command (%s) in receive_DataRequest\n",
-		    cmdname(h->command));
+	default:; /* avoid compiler warning */
 	}
 
-	mdev->read_cnt += size >> 9;
 	inc_unacked(mdev);
+	/* FIXME actually, it could be a READA originating from the peer ... */
 	drbd_generic_make_request(READ,e->private_bio);
-	if (atomic_read(&mdev->local_cnt) >= (mdev->net_conf->max_epoch_size>>4) ) {
-		drbd_kick_lo(mdev);
-	}
+	maybe_kick_lo(mdev);
 
-
 	return TRUE;
 }
 
@@ -2154,7 +2122,7 @@
 	peer_state.i = be32_to_cpu(p->state);
 
 	if (mdev->p_uuid && mdev->state.conn <= Connected && 
-	    inc_md_only(mdev,Attaching) ) {
+	    inc_local_if_state(mdev,Attaching) ) {
 		nconn=drbd_sync_handshake(mdev,peer_state.role,peer_state.disk);
 		dec_local(mdev);
 
@@ -2466,13 +2434,13 @@
 	}
 
 	// Receiving side (may be primary, in case we had two primaries)
-	spin_lock_irq(&mdev->ee_lock);
+	spin_lock_irq(&mdev->req_lock);
 	_drbd_wait_ee_list_empty(mdev,&mdev->read_ee);
 	_drbd_wait_ee_list_empty(mdev,&mdev->active_ee);
 	_drbd_wait_ee_list_empty(mdev,&mdev->sync_ee);
 	_drbd_clear_done_ee(mdev);
 	mdev->epoch_size = 0;
-	spin_unlock_irq(&mdev->ee_lock);
+	spin_unlock_irq(&mdev->req_lock);
 	// Needs to happen before we schedule the disconnect work callback,
 	// Since they might have something for the worker's queue as well.
 
@@ -2822,27 +2790,44 @@
 	update_peer_seq(mdev,be32_to_cpu(p->seq_num));
 
 	smp_rmb();
+	/* FIXME smp_rmb() is probably not good enough.
+	 * we have to make sure that, no matter what,
+	 * we do not set something "in sync" when
+	 * the peer has no disk (anymore)
+	 * I think this has to be looked at under the req_lock.
+	 * since we need to grab that anyways, lets do that.
+	 */
 	if(likely(mdev->state.pdsk >= Inconsistent )) {
-		// test_bit(PARTNER_DISKLESS,&mdev->flags)
-		// This happens if one a few IO requests on the peer
-		// failed, and some subsequest completed sucessfull
-		// afterwards.
+		/*
+		 * If one of a few IO requests on the peer failed (got_NegAck),
+		 * but some subsequent requests completed sucessfull
+		 * afterwards, verification of the block_id below would fail,
+		 * since we killed everything out of the transferlog when we
+		 * got the news hat IO is broken on the peer.
+		 *
+		 * FIXME
+		 * could this be handled better?
+		 * do we need to look over this again for freeze-io?
+		 */
 
-		// But we killed everything out of the transferlog
-		// as we got the news hat IO is broken on the peer.
-
 		if( is_syncer_block_id(p->block_id)) {
 			drbd_set_in_sync(mdev,sector,blksize);
 			set_bit(SYNC_STARTED,&mdev->flags);
 		} else {
-			req=(drbd_request_t*)(unsigned long)p->block_id;
+			spin_lock_irq(&mdev->req_lock);
+			req = _ack_id_to_req(mdev, p->block_id, sector);
 
-			if (unlikely(!tl_verify(mdev,req,sector))) {
+			if (unlikely(!req)) {
+				spin_unlock_irq(&mdev->req_lock);
 				ERR("Got a corrupt block_id/sector pair(2).\n");
 				return FALSE;
 			}
 
-			drbd_end_req(req, RQ_DRBD_SENT, 1, sector);
+			_req_mod(req,
+				 h->command == WriteAck
+				 ? write_acked_by_peer
+				 : recv_acked_by_peer);
+			spin_unlock_irq(&mdev->req_lock);
 
 			if (test_bit(SYNC_STARTED,&mdev->flags) &&
 			    mdev->net_conf->wire_protocol == DRBD_PROT_C)
@@ -2852,10 +2837,9 @@
 
 	if(is_syncer_block_id(p->block_id)) {
 		dec_rs_pending(mdev);
-	} else {
-		D_ASSERT(mdev->net_conf->wire_protocol != DRBD_PROT_A);
-		dec_ap_pending(mdev);
 	}
+	/* dec_ap_pending is handled within _req_mod */
+
 	return TRUE;
 }
 
@@ -2884,18 +2868,19 @@
 	Drbd_BlockAck_Packet *p = (Drbd_BlockAck_Packet*)h;
 	sector_t sector = be64_to_cpu(p->sector);
 
-	req = (drbd_request_t *)(unsigned long)p->block_id;
-	if(unlikely(!drbd_pr_verify(mdev,req,sector))) {
+	spin_lock_irq(&mdev->req_lock);
+	req = _ar_id_to_req(mdev,p->block_id, sector);
+	if (unlikely(!req)) {
+		spin_unlock_irq(&mdev->req_lock);
 		ERR("Got a corrupt block_id/sector pair(3).\n");
 		return FALSE;
 	}
 
-	spin_lock(&mdev->pr_lock);
-	list_del(&req->w.list);
-	spin_unlock(&mdev->pr_lock);
+	/* FIXME what for ?? list_del(&req->w.list); */
+	_req_mod(req, neg_acked);
+	spin_unlock_irq(&mdev->req_lock);
 
-	drbd_req_free(req);
-
+#warning "ugly and wrong"
 	drbd_khelper(mdev,"pri-on-incon-degr");
 	drbd_panic("Got NegDReply. WE ARE LOST. We lost our up-to-date disk.\n");
 
@@ -2918,6 +2903,7 @@
 
 	// In case we are not primary, we could simply live on...
 
+#warning "ugly and wrong"
 	drbd_panic("Got NegRSDReply. WE ARE LOST. We lost our up-to-date disk.\n");
 
 	// THINK do we have other options, but panic?
@@ -2939,6 +2925,10 @@
 	return TRUE;
 }
 
+/* FIXME implementation wrong.
+ * For the algorithm to be correct, we need to send and store the
+ * sector and size too.
+ */
 STATIC int got_Discard(drbd_dev *mdev, Drbd_Header* h)
 {
 	Drbd_Discard_Packet *p = (Drbd_Discard_Packet*)h;

Modified: trunk/drbd/drbd_req.c
===================================================================
--- trunk/drbd/drbd_req.c	2006-09-09 10:42:16 UTC (rev 2393)
+++ trunk/drbd/drbd_req.c	2006-09-09 23:04:18 UTC (rev 2394)
@@ -31,162 +31,8 @@
 #include <linux/slab.h>
 #include <linux/drbd.h>
 #include "drbd_int.h"
+#include "drbd_req.h"
 
-/*
-void drbd_show_req(struct Drbd_Conf* mdev, char *txt, drbd_request_t *req)
-{
-	INFO("req %s %p %c%c%c%c%c %p\n",
-	     txt,
-	     req,
-	     req->rq_status & RQ_DRBD_ON_WIRE ? 'w' :'_',
-	     req->rq_status & RQ_DRBD_IN_TL   ? 't' :'_',
-	     req->rq_status & RQ_DRBD_SENT    ? 's' :'_',
-	     req->rq_status & RQ_DRBD_LOCAL   ? 'l' :'_',
-	     req->rq_status & RQ_DRBD_NOTHING ? 'u' :'_',
-	     req->barrier
-	     );
-}
-*/
-
-void drbd_end_req(drbd_request_t *req, int nextstate, int er_flags,
-		  sector_t rsector)
-{
-	/* This callback will be called in irq context by the IDE drivers,
-	   and in Softirqs/Tasklets/BH context by the SCSI drivers.
-	   This function is called by the receiver in kernel-thread context.
-	   Try to get the locking right :) */
-
-	struct Drbd_Conf* mdev = drbd_req_get_mdev(req);
-	struct drbd_barrier *b;
-	unsigned long flags=0;
-	int uptodate;
-
-	PARANOIA_BUG_ON(!IS_VALID_MDEV(mdev));
-	PARANOIA_BUG_ON(drbd_req_get_sector(req) != rsector);
-	spin_lock_irqsave(&mdev->req_lock,flags);
-
-	if(req->rq_status & nextstate) {
-		ERR("request state error(%d)\n", req->rq_status);
-	}
-
-	req->rq_status |= nextstate;
-	req->rq_status &= er_flags | ~0x0001;
-	if( (req->rq_status & RQ_DRBD_DONE) == RQ_DRBD_DONE ) {
-		goto end_it;
-	}
-
-	spin_unlock_irqrestore(&mdev->req_lock,flags);
-
-	return;
-
-/* We only report uptodate == TRUE if both operations (WRITE && SEND)
-   reported uptodate == TRUE
- */
-
-	end_it:
-	spin_unlock_irqrestore(&mdev->req_lock,flags);
-
-	if( req->rq_status & RQ_DRBD_IN_TL ) {
-		if( ! ( er_flags & ERF_NOTLD ) ) {
-			/*If this call is from tl_clear() we may not call
-			  tl_dependene, otherwhise we have a homegrown
-			  spinlock deadlock.   */
-			if(tl_dependence(mdev,req))
-				set_bit(ISSUE_BARRIER,&mdev->flags);
-		} else {
-			/* FIXME not longer true!
-			 * we don't have the tl_lock here anymore...
-			 * sorry sir.
-			 **/
-			MUST_HOLD(&mdev->tl_lock);
-			list_del(&req->tl_requests); // we have the tl_lock...
-			hlist_del(&req->colision);
-			// req->barrier->n_req--; // Barrier migh be free'ed !
-		}
-	}
-
-	uptodate = req->rq_status & 0x0001;
-	if( !uptodate && mdev->bc->dc.on_io_error == Detach) {
-		drbd_set_out_of_sync(mdev,rsector, drbd_req_get_size(req));
-		// It should also be as out of sync on
-		// the other side!  See w_io_error()
-
-		drbd_bio_endio(req->master_bio,1);
-		dec_ap_bio(mdev);
-		// The assumption is that we wrote it on the peer.
-
-// FIXME proto A and diskless :)
-
-		req->w.cb = w_io_error;
-		drbd_queue_work(&mdev->data.work,&req->w);
-
-		goto out;
-
-	}
-
-	drbd_bio_endio(req->master_bio,uptodate);
-	dec_ap_bio(mdev);
-
-	drbd_req_free(req);
-
- out:
-	b = kmalloc(sizeof(struct drbd_barrier),GFP_NOIO);
-	if(b) {
-		spin_lock_irq(&mdev->tl_lock);
-		if(test_and_clear_bit(ISSUE_BARRIER,&mdev->flags)) {
-			b = _tl_add_barrier(mdev,b);
-			b->w.cb =  w_send_barrier;
-			drbd_queue_work(&mdev->data.work, &b->w);
-		} else {
-			kfree(b);
-		}
-		spin_unlock_irq(&mdev->tl_lock);
-	}
-}
-
-static unsigned int ar_hash_fn(drbd_dev *mdev, sector_t sector)
-{
-	return (unsigned int)(sector) % APP_R_HSIZE;
-}
-
-int drbd_read_remote(drbd_dev *mdev, drbd_request_t *req)
-{
-	req->w.cb = w_send_read_req;
-	spin_lock(&mdev->pr_lock);
-	INIT_HLIST_NODE(&req->colision);
-	hlist_add_head( &req->colision, mdev->app_reads_hash +
-			ar_hash_fn(mdev, drbd_req_get_sector(req) ));
-	spin_unlock(&mdev->pr_lock);
-	set_bit(UNPLUG_REMOTE,&mdev->flags);
-
-	drbd_queue_work(&mdev->data.work, &req->w);
-
-	return 1;
-}
-
-int drbd_pr_verify(drbd_dev *mdev, drbd_request_t * req, sector_t sector)
-{
-	struct hlist_head *slot = mdev->app_reads_hash+ar_hash_fn(mdev,sector);
-	struct hlist_node *n;
-	drbd_request_t * i;
-	int rv=0;
-
-	spin_lock(&mdev->pr_lock);
-
-	hlist_for_each_entry(i, n, slot, colision) {
-		if (i==req) {
-			D_ASSERT(drbd_req_get_sector(i) == sector);
-			rv=1;
-			break;
-		}
-	}
-
-	spin_unlock(&mdev->pr_lock);
-
-	return rv;
-}
-
-
 /* we may do a local read if:
  * - we are consistent (of course),
  * - or we are generally inconsistent,
@@ -218,73 +64,63 @@
 	return 1;
 }
 
-static inline drbd_request_t* drbd_req_new(drbd_dev *mdev, struct bio *bio_src)
-{
-	struct bio *bio;
-	drbd_request_t *req = mempool_alloc(drbd_request_mempool, GFP_NOIO);
-	if (req) {
-		SET_MAGIC(req);
+/*
+ * general note:
+ * looking at the state (conn, disk, susp, pdsk) outside of the spinlock that
+ * protects the state changes is inherently racy.
+ *
+ * FIXME verify this rationale why we may do so anyways:
+ *
+ * I think it "should" be like this:
+ * as soon as we have a "ap_bio_cnt" reference we may test for "bad" states,
+ * because the transition from "bad" to "good" states may only happen while no
+ * application request is on the fly, so once we are positive about a "bad"
+ * state, we know it won't get better during the lifetime of this request.
+ *
+ * In case we think we are ok, but "asynchronously" some interrupt or other thread
+ * marks some operation as impossible, we are still ok, since we would just try
+ * anyways, and then see that it does not work there and then.
+ */
 
-		bio = bio_clone(bio_src, GFP_NOIO); /* XXX cannot fail?? */
-
-		req->rq_status   = RQ_DRBD_NOTHING;
-		req->mdev        = mdev;
-		req->master_bio  = bio_src;
-		req->private_bio = bio;
-		req->barrier     = NULL;
-
-		bio->bi_private  = req;
-		bio->bi_end_io   =
-			bio_data_dir(bio) == WRITE
-			? drbd_endio_write_pri
-			: drbd_endio_read_pri;
-		bio->bi_next    = 0;
-	}
-	return req;
-}
-
 STATIC int
 drbd_make_request_common(drbd_dev *mdev, int rw, int size,
 			 sector_t sector, struct bio *bio)
 {
-	struct drbd_barrier *b;
+	struct drbd_barrier *b = NULL;
 	drbd_request_t *req;
 	int local, remote;
-	int mxb;
 
-	mxb = 1000000; /* Artificial limit on open requests */
-	if(inc_net(mdev)) {
-		mxb = mdev->net_conf->max_buffers;
-		dec_net(mdev);
-	}
-
-	/* allocate outside of all locks
-	 */
+	/* allocate outside of all locks; get a "reference count" (ap_bio_cnt)
+	 * to avoid races with the disconnect/reconnect code.  */
+	inc_ap_bio(mdev);
 	req = drbd_req_new(mdev,bio);
 	if (!req) {
+		dec_ap_bio(mdev);
 		/* only pass the error to the upper layers.
-		 * if user cannot handle io errors, thats not our business.
-		 */
+		 * if user cannot handle io errors, thats not our business. */
 		ERR("could not kmalloc() req\n");
-		drbd_bio_IO_error(bio);
+		bio_endio(bio, bio->bi_size, -ENOMEM);
 		return 0;
 	}
 
-	// down_read(mdev->device_lock);
-
+	/* we wait here
+	 *    as long as the device is suspended
+	 *    until the bitmap is no longer on the fly during connection handshake
+	 */
 	wait_event( mdev->cstate_wait,
 		    (volatile int)((mdev->state.conn < WFBitMapS ||
 				    mdev->state.conn > WFBitMapT) &&
 				   !mdev->state.susp ) );
-	/* FIXME RACE
-	 * the wait condition may already be wrong again...
-	 * ok, thats "academic" atm, but probably not good in the long term.
-	 *
-	 * we should have a function that does wait for the condition,
-	 * and do the inc_local within what ever lock is necessary...
-	 */
+
 	local = inc_local(mdev);
-	if (rw == READ || rw == READA) {
+	if (!local) {
+		bio_put(req->private_bio); /* or we get a bio leak */
+		req->private_bio = NULL;
+	}
+	if (rw == WRITE) {
+		remote = 1;
+	} else {
+		/* READ || READA */
 		if (local) {
 			if (!drbd_may_do_local_read(mdev,sector,size)) {
 				/* we could kick the syncer to
@@ -312,12 +148,12 @@
  * think this over again for two primaries */
 
 				local = 0;
+				bio_put(req->private_bio);
+				req->private_bio = NULL;
 				dec_local(mdev);
 			}
 		}
 		remote = !local && mdev->state.pdsk >= UpToDate;//Consistent;
-	} else {
-		remote = 1;
 	}
 
 	/* If we have a disk, but a READA request is mapped to remote,
@@ -332,11 +168,16 @@
 		goto fail_and_free_req;
 	}
 
+	/* For WRITES going to the local disk, grab a reference on the target extent.
+	 * This waits for any resync activity in the corresponding resync
+	 * extent to finish, and, if necessary, pulls in the target extent into
+	 * the activity log, which involves further disk io because of transactional
+	 * on-disk meta data updates. */
 	if (rw == WRITE && local)
 		drbd_al_begin_io(mdev, sector);
 
-	remote = remote && (mdev->state.pdsk == UpToDate || 
-			    ( mdev->state.pdsk == Inconsistent && 
+	remote = remote && (mdev->state.pdsk == UpToDate ||
+			    ( mdev->state.pdsk == Inconsistent &&
 			      mdev->state.conn >= Connected ) );
 
 	D_ASSERT( (rw != WRITE) || (remote == (mdev->state.conn >= Connected)) );
@@ -346,80 +187,108 @@
 		goto fail_and_free_req;
 	}
 
-	/* do this first, so I do not need to call drbd_end_req,
-	 * but can set the rq_status directly.
-	 */
-	if (!local)
-		req->rq_status |= RQ_DRBD_LOCAL;
-	if (!remote)
-		req->rq_status |= RQ_DRBD_SENT | RQ_DRBD_ON_WIRE;
-
-	/* we need to plug ALWAYS since we possibly need to kick lo_dev */
+	/* we need to plug ALWAYS since we possibly need to kick lo_dev
+	 * FIXME I'd like to put this within the req_lock, too... */
 	drbd_plug_device(mdev);
 
-	// inc_ap_bio(mdev); do not allow more open requests than max_buffers!
-	wait_event( mdev->rq_wait,atomic_add_unless(&mdev->ap_bio_cnt,1,mxb) );
+	/* For WRITE request, we have to make sure that we have an
+	 * unused_spare_barrier, in case we need to start a new epoch.
+	 * I try to be smart and avoid to pre-allocate always "just in case",
+	 * but there is a race between testing the bit and pointer outside the
+	 * spinlock, and grabbing the spinlock.
+	 * if we lost that race, we retry.  */
+	if (rw == WRITE && remote &&
+	    mdev->unused_spare_barrier == NULL &&
+	    test_bit(ISSUE_BARRIER,&mdev->flags))
+	{
+  allocate_barrier:
+		b = kmalloc(sizeof(struct drbd_barrier),GFP_NOIO);
+		if(!b) {
+			ERR("Failed to alloc barrier.");
+			goto fail_and_free_req;
+		}
+	}
 
-	/* remote might be already wrong here, since we might slept after
-	   looking at the connection state, but this is ok. */
-	if (remote) {
-		/* either WRITE and Connected,
-		 * or READ, and no local disk,
-		 * or READ, but not in sync.
-		 */
-		if (rw == WRITE) {
+	/* GOOD, everything prepared, grab the spin_lock */
+	spin_lock_irq(&mdev->req_lock);
 
-			b = kmalloc(sizeof(struct drbd_barrier),GFP_NOIO);
-			if(!b) {
-				ERR("Failed to alloc barrier.");
-				goto fail_and_free_req;
-			}
+	if (b && mdev->unused_spare_barrier == NULL) {
+		mdev->unused_spare_barrier = b;
+		b = NULL;
+	}
+	if (rw == WRITE && remote &&
+	    mdev->unused_spare_barrier == NULL &&
+	    test_bit(ISSUE_BARRIER,&mdev->flags)) {
+		/* someone closed the current epoch
+		 * while we were grabbing the spinlock */
+		spin_unlock_irq(&mdev->req_lock);
+		goto allocate_barrier;
+	}
 
-			spin_lock_irq(&mdev->tl_lock);
+	/* _maybe_start_new_epoch(mdev);
+	 * If we need to generate a write barrier packet, we have to add the
+	 * new epoch (barrier) object, and queue the barrier packet for sending,
+	 * and queue the req's data after it _within the same lock_, otherwise
+	 * we have race conditions were the reorder domains could be mixed up.
+	 *
+	 * Even read requests may start a new epoch and queue the corresponding
+	 * barrier packet.  To get the write ordering right, we only have to
+	 * make sure that, if this is a write request and it triggered a
+	 * barrier packet, this request is queued within the same spinlock. */
+	if (mdev->unused_spare_barrier &&
+            test_and_clear_bit(ISSUE_BARRIER,&mdev->flags)) {
+		struct drbd_barrier *b = mdev->unused_spare_barrier;
+		b = _tl_add_barrier(mdev,b);
+		b->w.cb =  w_send_barrier;
+		drbd_queue_work(&mdev->data.work, &b->w);
+	} else {
+		D_ASSERT(!(remote && rw == WRITE &&
+			   test_bit(ISSUE_BARRIER,&mdev->flags)));
+	}
 
-			if(test_and_clear_bit(ISSUE_BARRIER,&mdev->flags)) {
-				b = _tl_add_barrier(mdev,b);
-				b->w.cb =  w_send_barrier;
-				drbd_queue_work(&mdev->data.work, &b->w);
-				b = NULL;
-			}
+	/* NOTE
+	 * Actually, 'local' may be wrong here already, since we may have failed
+	 * to write to the meta data, and may become wrong anytime because of
+	 * local io-error for some other request, which would lead to us
+	 * "detaching" the local disk.
+	 *
+	 * 'remote' may become wrong any time because the network could fail.
+	 *
+	 * This is a harmless race condition, though, since it is handled
+	 * correctly at the appropriate places; so it just deferres the failure
+	 * of the respective operation.
+	 */
 
-			if (mdev->net_conf->two_primaries) {
-				if(_ee_have_write(mdev,req)) { // tl_add() here
-					spin_unlock_irq(&mdev->tl_lock);
+	/* mark them early for readability.
+	 * this just sets some state flags. */
+	if (remote) _req_mod(req, to_be_send);
+	if (local)  _req_mod(req, to_be_submitted);
 
-					WARN("Concurrent write! [DISCARD L] sec=%lu\n",
-					     (unsigned long)sector);
-					dec_local(mdev);
-					local=0;
-
-					drbd_end_req(req, RQ_DRBD_DONE, 1, sector);
-					if(b) kfree(b);
-					return 0;
-				}
-			} else {
-				_tl_add(mdev,req);
-			}
-			req->w.cb =  w_send_dblock;
-			drbd_queue_work(&mdev->data.work, &req->w);
-
-			spin_unlock_irq(&mdev->tl_lock);
-
-			if(b) kfree(b);
-		} else {
-			// this node is diskless ...
-			drbd_read_remote(mdev,req);
-		}
+	/* NOTE remote first: to get he concurrent write detection right, we
+	 * must register the request before start of local IO.  */
+	if (remote) {
+		/* either WRITE and Connected,
+		 * or READ, and no local disk,
+		 * or READ, but not in sync.
+		 */
+		_req_mod(req, rw == WRITE
+				? queue_for_net_write
+				: queue_for_net_read);
 	}
 
-	/* NOTE: drbd_send_dlobck() must happen before start of local IO,
-	         to get he concurrent write detection right. */
+	/* still holding the req_lock.
+	 * not strictly neccessary, but for the statistic counters... */
 
+#if 0
 	if (local) {
+		/* FIXME I think this branch can go completely.  */
 		if (rw == WRITE) {
-			if (!remote) drbd_set_out_of_sync(mdev,sector,size);
+			/* we defer the drbd_set_out_of_sync to the bio_endio
+			 * function. we only need to make sure the bit is set
+			 * before we do drbd_al_complete_io. */
+			 if (!remote) drbd_set_out_of_sync(mdev,sector,size);
 		} else {
-			D_ASSERT(!remote);
+			D_ASSERT(!remote); /* we should not read from both */
 		}
 		/* FIXME
 		 * Should we add even local reads to some list, so
@@ -428,26 +297,40 @@
 		 * They already have a reference count (sort of...)
 		 * on mdev via inc_local()
 		 */
+
+		/* XXX we probably should not update these here but in bio_endio.
+		 * especially the read_cnt could go wrong for all the READA
+		 * that may just be failed because of "overload"... */
 		if(rw == WRITE) mdev->writ_cnt += size>>9;
 		else            mdev->read_cnt += size>>9;
 
-		// in 2.4.X, READA are submitted as READ.
-		req->private_bio->bi_rw = rw;
+		/* FIXME what ref count do we have to ensure the backing_bdev
+		 * was not detached below us? */
+		req->private_bio->bi_rw = rw; /* redundant */
 		req->private_bio->bi_bdev = mdev->bc->backing_bdev;
-		generic_make_request(req->private_bio);
 	}
+#endif
 
-	// up_read(mdev->device_lock);
+	req->private_bio->bi_bdev = mdev->bc->backing_bdev;
+	spin_unlock_irq(&mdev->req_lock);
+	if (b) kfree(b); /* if someone else has beaten us to it... */
+
+	/* extra if branch so I don't need to write spin_unlock_irq twice */
+
+	if (local) {
+		BUG_ON(req->private_bio->bi_bdev == NULL);
+		generic_make_request(req->private_bio);
+	}
 	return 0;
 
   fail_and_free_req:
-	drbd_bio_IO_error(bio);
+	bio_endio(bio, bio->bi_size, -EIO);
 	drbd_req_free(req);
 	return 0;
 }
 
 /* helper function for drbd_make_request
- * if we can determine just by the mdev (state) that this reques will fail,
+ * if we can determine just by the mdev (state) that this request will fail,
  * return 1
  * otherwise return 0
  */
@@ -498,14 +381,8 @@
 	unsigned int s_enr,e_enr;
 	struct Drbd_Conf* mdev = (drbd_dev*) q->queuedata;
 
-/* FIXME
- * I think we need to grab some sort of reference count right here.
- * Would make it easier to serialize with size changes and other funny stuff.
- * Maybe move inc_ap_bio right here?
- */
-
 	if (drbd_fail_request_early(mdev, bio_data_dir(bio) & WRITE)) {
-		drbd_bio_IO_error(bio);
+		bio_endio(bio, bio->bi_size, -EPERM);
 		return 0;
 	}
 
@@ -514,17 +391,26 @@
 	 */
 	D_ASSERT(bio->bi_size > 0);
 	D_ASSERT( (bio->bi_size & 0x1ff) == 0);
-	D_ASSERT(bio->bi_size <= DRBD_MAX_SEGMENT_SIZE);
+	D_ASSERT(bio->bi_size <= q->max_segment_size);
 	D_ASSERT(bio->bi_idx == 0);
 
+#if 1
+	/* to make some things easier, force allignment of requests within the
+	 * granularity of our hash tables */
+	s_enr = bio->bi_sector >> HT_SHIFT;
+	e_enr = (bio->bi_sector+(bio->bi_size>>9)-1) >> HT_SHIFT;
+#else
+	/* when not using two primaries (and not being as paranoid as lge),
+	 * actually there is no need to be as strict.
+	 * only force allignment within AL_EXTENT boundaries */
 	s_enr = bio->bi_sector >> (AL_EXTENT_SIZE_B-9);
 	e_enr = (bio->bi_sector+(bio->bi_size>>9)-1) >> (AL_EXTENT_SIZE_B-9);
+#endif
 	D_ASSERT(e_enr >= s_enr);
 
 	if(unlikely(s_enr != e_enr)) {
-		/* This bio crosses an AL_EXTENT boundary, so we have to
-		 * split it. [So far, only XFS is known to do this...]
-		 */
+		/* This bio crosses some boundary, so we have to split it.
+		 * [So far, only XFS is known to do this...] */
 		struct bio_pair *bp;
 		bp = bio_split(bio, bio_split_pool,
 			       (e_enr<<(AL_EXTENT_SIZE_B-9)) - bio->bi_sector);
@@ -538,31 +424,30 @@
 					bio->bi_sector,bio);
 }
 
-/* This is called by bio_add_page(). With this function we prevent
-   that we get BIOs that span over multiple AL_EXTENTs.
+/* This is called by bio_add_page().  With this function we reduce
+ * the number of BIOs that span over multiple AL_EXTENTs.
+ *
+ * we do the calculation within the lower 32bit of the byte offsets,
+ * since we don't care for actual offset, but only check whether it
+ * would cross "activity log extent" boundaries.
+ *
+ * As long as the BIO is emtpy we have to allow at least one bvec,
+ * regardless of size and offset.  so the resulting bio may still
+ * cross extent boundaries.  those are dealt with (bio_split) in
+ * drbd_make_request_26.
  */
+/* FIXME for two_primaries,
+ * we should use DRBD_MAX_SEGMENT_SIZE instead of AL_EXTENT_SIZE */
 int drbd_merge_bvec(request_queue_t *q, struct bio *bio, struct bio_vec *bvec)
 {
-	unsigned int s = (unsigned int)bio->bi_sector << 9; // 32 bit...
-	unsigned int t;
+	unsigned int bio_offset = (unsigned int)bio->bi_sector << 9; // 32 bit...
+	unsigned int bio_size = bio->bi_size;
+	int max;
 
-	if (bio->bi_size == 0) {
-		s = max_t(unsigned int,
-			  AL_EXTENT_SIZE - (s & (AL_EXTENT_SIZE-1)),
-			  PAGE_SIZE);
-		// As long as the BIO is emtpy we allow at least one page.
-	} else {
-		t = s & ~(AL_EXTENT_SIZE-1);
-		s = (s + bio->bi_size);
-
-		if( ( s & ~(AL_EXTENT_SIZE-1) ) != t ) {
-			s = 0;
-			// This BIO already spans over an AL_EXTENTs boundary.
-		} else {
-			s = AL_EXTENT_SIZE - ( s & (AL_EXTENT_SIZE-1) );
-			// Bytes to the next AL_EXTENT boundary.
-		}
-	}
-
-	return s;
+	max = AL_EXTENT_SIZE - ((bio_offset & (AL_EXTENT_SIZE-1)) + bio_size);
+	if (max < 0) max = 0;
+	if (max <= bvec->bv_len && bio_size == 0)
+		return bvec->bv_len;
+	else
+		return max;
 }

Added: trunk/drbd/drbd_req.h
===================================================================
--- trunk/drbd/drbd_req.h	2006-09-09 10:42:16 UTC (rev 2393)
+++ trunk/drbd/drbd_req.h	2006-09-09 23:04:18 UTC (rev 2394)
@@ -0,0 +1,855 @@
+/*
+   drbd_req.h
+   Kernel module for 2.6.x Kernels
+
+   This file is part of DRBD
+
+   Copyright (C) 2006, Lars Ellenberg <lars.ellenberg at linbit.com>.
+   Copyright (C) 2006, LINBIT Information Technologies GmbH.
+
+   DRBD is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2, or (at your option)
+   any later version.
+
+   DRBD is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with drbd; see the file COPYING.  If not, write to
+   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef _DRBD_REQ_H
+#define _DRBD_REQ_H
+
+#include <linux/config.h>
+#include <linux/module.h>
+
+#include <linux/slab.h>
+#include <linux/drbd.h>
+#include "drbd_int.h"
+
+/* The request callbacks will be called in irq context by the IDE drivers,
+   and in Softirqs/Tasklets/BH context by the SCSI drivers,
+   and by the receiver and worker in kernel-thread context.
+   Try to get the locking right :) */
+
+/*
+ * Objects of type drbd_request_t do only exist on a Primary node, and are
+ * associated with IO requests originating from the block layer above us.
+ *
+ * There are quite a few things that may happen to a drbd request
+ * during its lifetime.
+ *
+ *  It will be created.
+ *  It will be marked with the intention to be
+ *    submitted to local disk and/or
+ *    send via the network.
+ *
+ *  It has to be placed on the transfer log and other housekeeping lists,
+ *  In case we have a network connection.
+ *    FIXME I believe that for consistency we should place even READ requests
+ *    on these lists, so we can moan when we detect that the other node is
+ *    writing to an area that we currently read from (when this happens, our
+ *    users are broken).
+ *
+ *  It may be identified as a concurrent (write) request
+ *    and be handled accordingly.
+ *
+ *  It may me handed over to the local disk subsystem.
+ *  It may be completed by the local disk subsystem,
+ *    either sucessfully or with io-error.
+ *  In case it is a READ request, and it failed locally,
+ *    it may be retried remotely.
+ *
+ *  It may be queued for sending.
+ *  It may be handed over to the network stack,
+ *    which may fail.
+ *  It may be acknowledged by the "peer" according to the wire_protocol in use.
+ *    this may be a negative ack.
+ *  It may receive a faked ack when the network connection is lost and the
+ *  transfer log is cleaned up.
+ *  Sending may be canceled due to network connection loss.
+ *  When it finally has outlived its time,
+ *    corresponding dirty bits in the resync-bitmap may be cleared or set,
+ *    it will be destroyed,
+ *    and completion will be signalled to the originator,
+ *      with or without "success".
+ *
+ * See also documentation/drbd-request-state-overview.dot
+ *  (dot -Tps2 documentation/drbd-request-state-overview.dot | display -)
+ */
+
+typedef enum {
+	created,
+	to_be_send,
+	to_be_submitted,
+
+	suspend_because_of_conflict,
+	conflicting_req_done,
+	conflicting_ee_done,
+
+	/* XXX yes, now I am inconsistent...
+	 * these two are not "events" but "actions"
+	 * oh, well... */
+	queue_for_net_write,
+	queue_for_net_read,
+
+	send_canceled,
+	send_failed,
+	handed_over_to_network,
+	connection_lost_while_pending,
+	recv_acked_by_peer,
+	write_acked_by_peer,
+	neg_acked,
+	barrier_acked, /* in protocol A and B */
+	data_received, /* (remote read) */
+
+	read_completed_with_error,
+	write_completed_with_error,
+	completed_ok,
+} drbd_req_event_t;
+
+/* encoding of request states for now.  we don't actually need that many bits.
+ * we don't need to do atomic bit operations either, since most of the time we
+ * need to look at the connection state and/or manipulate some lists at the
+ * same time, so we should hold the request lock anyways.
+ */
+enum drbd_req_state_bits {
+	/* 210
+	 * 000: no local possible
+	 * 001: to be submitted
+	 *    UNUSED, we could map: 011: submitted, completion still pending
+	 * 110: completed ok
+	 * 010: completed with error
+	 */
+	__RQ_LOCAL_PENDING,
+	__RQ_LOCAL_COMPLETED,
+	__RQ_LOCAL_OK,
+
+	/* 76543
+	 * 00000: no network possible
+	 * 00001: to be send
+	 * 00011: to be send, on worker queue
+	 * 00101: sent, expecting recv_ack (B) or write_ack (C)
+	 * 11101: sent,
+	 *        recv_ack (B) or implicit "ack" (A),
+	 *        still waiting for the barrier ack.
+	 *        master_bio may already be completed and invalidated.
+	 * 11100: write_acked (C),
+	 *        data_received (for remote read, any protocol)
+	 *        or finally the barrier ack has arrived (B,A)...
+	 *        request can be freed
+	 * 01100: neg-acked (write, protocol C)
+	 *        or neg-d-acked (read, any protocol)
+	 *        or killed from the transfer log
+	 *        during cleanup after connection loss
+	 *        request can be freed
+	 * 01000: canceled or send failed...
+	 *        request can be freed
+	 */
+
+	/* if "SENT" is not set, yet, this can still fail or be canceled.
+	 * if "SENT" is set already, we still wait for an Ack packet.
+	 * when cleared, the master_bio may be completed.
+	 * in (B,A) the request object may still linger on the transaction log
+	 * until the corresponding barrier ack comes in */
+	__RQ_NET_PENDING,
+
+	/* If it is QUEUED, and it is a WRITE, it is also registered in the
+	 * transfer log. Currently we need this flag to avoid conflicts between
+	 * worker canceling the request and tl_clear_barrier killing it from
+	 * transfer log.  We should restructure the code so this conflict does
+	 * no longer occur. */
+	__RQ_NET_QUEUED,
+
+	/* well, actually only "handed over to the network stack" */
+	__RQ_NET_SENT,
+
+     	/* when set, the request may be freed.
+	 * in (C) this happens when WriteAck is received,
+	 * in (B,A) when the corresponding BarrierAck is received */
+	__RQ_NET_DONE,
+
+	/* whether or not we know (C) or pretend (B,A) that the write
+	 * was successfully written on the peer.
+	 */
+	__RQ_NET_OK,
+};
+
+#define RQ_LOCAL_PENDING   (1UL << __RQ_LOCAL_PENDING)
+#define RQ_LOCAL_COMPLETED (1UL << __RQ_LOCAL_COMPLETED)
+#define RQ_LOCAL_OK        (1UL << __RQ_LOCAL_OK)
+
+#define RQ_LOCAL_MASK      ((RQ_LOCAL_OK << 1)-1) /* 0x07 */
+
+#define RQ_NET_PENDING     (1UL << __RQ_NET_PENDING)
+#define RQ_NET_QUEUED      (1UL << __RQ_NET_QUEUED)
+#define RQ_NET_SENT        (1UL << __RQ_NET_SENT)
+#define RQ_NET_DONE        (1UL << __RQ_NET_DONE)
+#define RQ_NET_OK          (1UL << __RQ_NET_OK)
+
+#define RQ_NET_MASK        (((RQ_NET_OK << 1)-1) & ~RQ_LOCAL_MASK) /* 0xf8 */
+
+/* epoch entries */
+static struct hlist_head* ee_hash_slot(drbd_dev *mdev, sector_t sector)
+{
+	BUG_ON(mdev->ee_hash_s == 0);
+	return mdev->ee_hash + ((unsigned int)(sector>>HT_SHIFT) % mdev->ee_hash_s);
+}
+
+/* transfer log (drbd_request objects) */
+static struct hlist_head* tl_hash_slot(drbd_dev *mdev, sector_t sector)
+{
+	BUG_ON(mdev->tl_hash_s == 0);
+	return mdev->tl_hash +
+		((unsigned int)(sector>>HT_SHIFT) % mdev->tl_hash_s);
+}
+
+/* when we receive the answer for a read request,
+ * verify that we actually know about it */
+static inline drbd_request_t* _ack_id_to_req(drbd_dev *mdev,u64 id, sector_t sector)
+{
+	struct hlist_head *slot = tl_hash_slot(mdev,sector);
+	struct hlist_node *n;
+	drbd_request_t * req;
+
+	hlist_for_each_entry(req, n, slot, colision) {
+		if ((unsigned long)req == (unsigned long)id) {
+			if (req->sector != sector) {
+				ERR("_ack_id_to_req: found req %p but it has "
+				    "wrong sector (%llx versus %llx)\n", req,
+				    (unsigned long long)req->sector,
+				    (unsigned long long)sector);
+				break;
+			}
+			return req;
+		}
+	}
+	ERR("_ack_id_to_req: failed to find req %p, sector %llx in list\n", 
+		(void*)(unsigned long)id, (unsigned long long)sector);
+	return NULL;
+}
+
+/* application reads (drbd_request objects) */
+static struct hlist_head* ar_hash_slot(drbd_dev *mdev, sector_t sector)
+{
+	return mdev->app_reads_hash
+		+ ((unsigned int)(sector) % APP_R_HSIZE);
+}
+
+/* when we receive the answer for a read request,
+ * verify that we actually know about it */
+static inline drbd_request_t* _ar_id_to_req(drbd_dev *mdev,u64 id, sector_t sector)
+{
+	struct hlist_head *slot = ar_hash_slot(mdev,sector);
+	struct hlist_node *n;
+	drbd_request_t * req;
+
+	hlist_for_each_entry(req, n, slot, colision) {
+		if ((unsigned long)req == (unsigned long)id) {
+			D_ASSERT(req->sector == sector);
+			return req;
+		}
+	}
+	return NULL;
+}
+
+static inline drbd_request_t* drbd_req_new(drbd_dev *mdev, struct bio *bio_src)
+{
+	struct bio *bio;
+	drbd_request_t *req = mempool_alloc(drbd_request_mempool, GFP_NOIO);
+	if (likely(req)) {
+		bio = bio_clone(bio_src, GFP_NOIO); /* XXX cannot fail?? */
+
+		req->rq_state    = 0;
+		req->mdev        = mdev;
+		req->master_bio  = bio_src;
+		req->private_bio = bio;
+		req->epoch       = 0;
+		req->sector      = bio->bi_sector;
+		req->size        = bio->bi_size;
+		INIT_HLIST_NODE(&req->colision);
+
+		bio->bi_private  = req;
+		bio->bi_end_io   = drbd_endio_pri;
+		bio->bi_next    = 0;
+	}
+	return req;
+}
+
+static inline void drbd_req_free(drbd_request_t *req)
+{
+	mempool_free(req,drbd_request_mempool);
+}
+
+static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
+{
+	return !( ( s1 + (l1>>9) <= s2 ) || ( s1 >= s2 + (l2>>9) ) );
+}
+
+static inline void _req_may_be_done(drbd_request_t *req)
+{
+	const unsigned long s = req->rq_state;
+	drbd_dev *mdev = req->mdev;
+	int rw;
+
+	MUST_HOLD(&mdev->req_lock)
+
+	if (s & RQ_NET_PENDING) return;
+	if (s & RQ_LOCAL_PENDING) return;
+
+	if (req->master_bio) {
+		/* this is data_received (remote read)
+		 * or protocol C WriteAck
+		 * or protocol B RecvAck
+		 * or protocol A "handed_over_to_network" (SendAck)
+		 * or canceled or failed,
+		 * or killed from the transfer log due to connection loss.
+		 */
+
+		/*
+		 * figure out whether to report success or failure.
+		 *
+		 * report success when at least one of the oprations suceeded.
+		 * or, to put the other way,
+		 * only report failure, when both operations failed.
+		 *
+		 * what to do about the failures is handled elsewhere.
+		 * what we need to do here is just: complete the master_bio.
+		 */
+		int ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
+		rw = bio_data_dir(req->master_bio); 
+		if (rw == WRITE) {
+			drbd_request_t *i;
+			struct Tl_epoch_entry *e;
+			struct hlist_node *n;
+			struct hlist_head *slot;
+
+			/* before we can signal completion to the upper layers,
+			 * we may need to close the current epoch */
+			if (req->epoch == mdev->newest_barrier->br_number)
+				set_bit(ISSUE_BARRIER,&mdev->flags);
+
+			/* and maybe "wake" those conflicting requests that
+			 * wait for this request to finish.
+			 * we just have to walk starting from req->next,
+			 * see _req_add_hash_check_colision(); */
+#define OVERLAPS overlaps(req->sector, req->size, i->sector, i->size)
+			n = req->colision.next;
+			/* hlist_del ... done below */
+			hlist_for_each_entry_from(i, n, colision) {
+				if (OVERLAPS)
+					drbd_queue_work(&mdev->data.work,&i->w);
+			}
+
+			/* and maybe "wake" those conflicting epoch entries
+			 * that wait for this request to finish */
+			/* FIXME looks alot like we could consolidate some code
+			 * and maybe even hash tables? */
+#undef OVERLAPS
+#define OVERLAPS overlaps(req->sector, req->size, e->sector, e->size)
+			slot = ee_hash_slot(mdev,req->sector);
+			hlist_for_each_entry(e, n, slot, colision) {
+				if (OVERLAPS)
+					drbd_queue_work(&mdev->data.work,&e->w);
+			}
+#undef OVERLAPS
+		}
+		/* else: READ, READA: nothing more to do */
+
+		/* remove the request from the conflict detection
+		 * respective block_id verification hash */
+		hlist_del(&req->colision);
+
+		/* FIXME not yet implemented...
+		 * in case we got "suspended" (on_disconnect: freeze io)
+		 * we may not yet complete the request...
+		 * though, this is probably best handled elsewhere by not
+		 * walking the transfer log until "unfreeze", so we won't end
+		 * up here anyways during the freeze ...
+		 * then again, if it is a READ, it is not in the TL at all.
+		 * is it still leagal to complete a READ during freeze? */
+		bio_endio(req->master_bio, req->master_bio->bi_size, ok ? 0 : -EIO);
+		req->master_bio = NULL;
+	} else {
+		/* only WRITE requests can end up here without a master_bio */
+		rw = WRITE;
+	}
+
+	if ((s == RQ_NET_MASK) == 0 || (s & RQ_NET_DONE)) {
+		/* this is disconnected (local only) operation,
+		 * or protocol C WriteAck,
+		 * or protocol A or B BarrierAck,
+		 * or killed from the transfer log due to connection loss. */
+
+		/* if it was a write, we may have to set the corresponding
+		 * bit(s) out-of-sync first. If it had a local part, we need to
+		 * release the reference to the activity log. */
+		if (rw == WRITE) {
+			/* remove it from the transfer log */
+			list_del(&req->tl_requests);
+			/* Set out-of-sync unless both OK flags are set 
+			 * (local only or remote failed).
+			 * Other places where we set out-of-sync:
+			 * READ with local io-error */
+			if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))
+				drbd_set_out_of_sync(mdev,req->sector,req->size);
+			if (s & RQ_LOCAL_MASK) {
+				drbd_al_complete_io(mdev, req->sector);
+			}
+		}
+
+		/* if it was an io error, we want to notify our
+		 * peer about that, and see if we need to
+		 * detach the disk and stuff.
+		 * to avoid allocating some special work
+		 * struct, reuse the request. */
+		if (rw == WRITE &&
+		    (( s & RQ_LOCAL_MASK) && !(s & RQ_LOCAL_OK))) {
+			if (!(req->w.list.next == LIST_POISON1 ||
+			      list_empty(&req->w.list))) {
+				/* DEBUG ASSERT only; if this triggers, we
+				 * probably corrupt the worker list here */
+				DUMPP(req->w.list.next);
+				DUMPP(req->w.list.prev);
+			}
+			req->w.cb = w_io_error;
+			drbd_queue_work(&mdev->data.work, &req->w);
+			/* drbd_req_free() is done in w_io_error */
+		} else {
+			drbd_req_free(req);
+		}
+	}
+	/* else: network part and not DONE yet. that is
+	 * protocol A or B, barrier ack still pending... */
+}
+
+/*
+ * checks whether there was an overlapping request already registered.
+ * if so, add the request to the colision hash
+ *        _after_ the (first) overlapping request,
+ * 	  and return 1
+ * if no overlap was found, add this request to the front of the chain,
+ *        and return 0
+ *
+ * corresponding hlist_del is in _req_may_be_done()
+ *
+ * NOTE:
+ * paranoia: assume something above us is broken, and issues different write
+ * requests for the same block simultaneously...
+ *
+ * To ensure these won't be reordered differently on both nodes, resulting in
+ * diverging data sets, we discard the later one(s). Not that this is supposed
+ * to happen, but this is the rationale why we also have to check for
+ * conflicting requests with local origin, and why we have to do so regardless
+ * of whether we allowed multiple primaries.
+ *
+ * BTW, in case we only have one primary, the ee_hash is empty anyways, and the
+ * second hlist_for_each_entry becomes a noop. This is even simpler than to
+ * grab a reference on the net_conf, and check for the two_primaries flag...
+ */
+static int _req_add_hash_check_colision(drbd_request_t *req)
+{
+	drbd_dev *mdev = req->mdev;
+	const sector_t sector = req->sector;
+	const int size = req->size;
+	drbd_request_t *i;
+	struct Tl_epoch_entry *e;
+	struct hlist_node *n;
+	struct hlist_head *slot;
+
+	MUST_HOLD(&mdev->req_lock);
+	D_ASSERT(hlist_unhashed(&req->colision));
+#define OVERLAPS overlaps(i->sector, i->size, sector, size)
+	slot = tl_hash_slot(mdev,sector);
+	hlist_for_each_entry(i, n, slot, colision) {
+		if (OVERLAPS) {
+			ALERT("%s[%u] Concurrent local write detected!"
+			      "	[DISCARD L] new: %llu +%d; pending: %llu +%d\n",
+			      current->comm, current->pid,
+			      (unsigned long long)sector, size,
+			      (unsigned long long)i->sector, i->size);
+			hlist_add_after(n,&req->colision);
+			return 1;
+		}
+	}
+	/* no overlapping request with local origin found,
+	 * register in front */
+	hlist_add_head(&req->colision,slot);
+
+	/* now, check for overlapping requests with remote origin */
+#undef OVERLAPS
+#define OVERLAPS overlaps(e->sector, e->size, sector, size)
+	slot = ee_hash_slot(mdev,sector);
+	hlist_for_each_entry(e, n, slot, colision) {
+		if (OVERLAPS) {
+			ALERT("%s[%u] Concurrent remote write detected!"
+			      "	[DISCARD L] new: %llu +%d; pending: %llu +%d\n",
+			      current->comm, current->pid,
+			      (unsigned long long)sector, size,
+			      e->sector, e->size);
+			return 1;
+		}
+	}
+#undef OVERLAPS
+
+	/* this is like it should be, and what we expected.
+	 * out users do behave after all... */
+	return 0;
+}
+
+/* obviously this could be coded as many single functions
+ * instead of one huge switch,
+ * or by putting the code directly in the respective locations
+ * (as it has been before).
+ *
+ * but having it this way
+ *  enforces that it is all in this one place, where it is easier to audit,
+ *  it makes it obvious that whatever "event" "happens" to a request should
+ *  happen "atomically" within the req_lock,
+ *  and it enforces that we have to think in a very structured manner
+ *  about the "events" that may happen to a request during its life time ...
+ *
+ * Though I think it is likely that we break this again into many
+ * static inline void _req_mod_ ## what (req) ...
+ */
+static inline void _req_mod(drbd_request_t *req, drbd_req_event_t what)
+{
+	drbd_dev *mdev = req->mdev;
+	MUST_HOLD(&mdev->req_lock);
+
+	switch(what) {
+	default:
+		ERR("LOGIC BUG in %s:%u\n", __FILE__ , __LINE__ );
+		return;
+
+	/* does not happen...
+	 * initialization done in drbd_req_new
+	case created:
+		break;
+		*/
+
+	case to_be_send: /* via network */
+		/* reached via drbd_make_request_common
+		 * and from FIXME w_read_retry_remote */
+		D_ASSERT(!(req->rq_state & RQ_NET_MASK));
+		req->rq_state |= RQ_NET_PENDING;
+		inc_ap_pending(mdev);
+		break;
+
+	case to_be_submitted: /* locally */
+		/* reached via drbd_make_request_common */
+		D_ASSERT(!(req->rq_state & RQ_LOCAL_MASK));
+		req->rq_state |= RQ_LOCAL_PENDING;
+		break;
+
+#if 0
+		/* done inline below */
+	case suspend_because_of_conflict:
+		/* assert something? */
+		/* reached via drbd_make_request_common */
+		/* update state flag? why? which one? */
+		req->w.cb = w_req_cancel_conflict;
+		/* no queue here, see below! */
+		break;
+#endif
+
+	/* FIXME these *_completed_* are basically the same.
+	 * can probably be merged with some if (what == xy) */
+
+	case completed_ok:
+		req->rq_state |= (RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);
+		req->rq_state &= ~RQ_LOCAL_PENDING;
+
+		bio_put(req->private_bio);
+		req->private_bio = NULL;
+		dec_local(mdev);
+
+		if (bio_data_dir(req->private_bio) == WRITE)
+			mdev->writ_cnt += req->size>>9;
+		else
+			mdev->read_cnt += req->size>>9;
+
+		_req_may_be_done(req);
+		break;
+
+	case write_completed_with_error:
+		req->rq_state |= RQ_LOCAL_COMPLETED;
+		req->rq_state &= ~RQ_LOCAL_PENDING;
+
+		bio_put(req->private_bio);
+		req->private_bio = NULL;
+		dec_local(mdev);
+		ALERT("Local WRITE failed sec=%llu size=%u\n",
+					req->sector, req->size);
+		/* and now: check how to handle local io error.
+		 * FIXME see comment below in read_completed_with_error */
+		__drbd_chk_io_error(mdev);
+		_req_may_be_done(req);
+		break;
+
+	case read_completed_with_error:
+		drbd_set_out_of_sync(mdev,req->sector,req->size);
+		req->rq_state |= RQ_LOCAL_COMPLETED;
+		req->rq_state &= ~RQ_LOCAL_PENDING;
+
+		bio_put(req->private_bio);
+		req->private_bio = NULL;
+		dec_local(mdev);
+		if (bio_rw(req->master_bio) == READA)
+			/* it is legal to fail READA */
+			break;
+		/* else */
+		ALERT("Local READ failed sec=%llu size=%u\n",
+					req->sector, req->size);
+		/* _req_mod(req,to_be_send); oops, recursion in static inline */
+		D_ASSERT(!(req->rq_state & RQ_NET_MASK));
+		req->rq_state |= RQ_NET_PENDING;
+		inc_ap_pending(mdev);
+
+		/* and now: check how to handle local io error.
+		 *
+		 * FIXME we should not handle WRITE and READ io errors
+		 * the same. When we retry the READ, and then write
+		 * the answer, that might suceed because modern drives
+		 * would relocate the sectors. We'd need to keep our
+		 * private bio then, and round the offset and size so
+		 * we get back enough data to be able to clear the bits again.
+		 */
+		__drbd_chk_io_error(mdev);
+		/* fall through: _req_mod(req,queue_for_net_read); */
+
+	case queue_for_net_read:
+		/* READ or READA, and
+		 * no local disk,
+		 * or target area marked as invalid,
+		 * or just got an io-error. */
+		/* from drbd_make_request_common
+		 * or from bio_endio during read io-error recovery */
+
+		/* so we can verify the handle in the answer packet
+		 * corresponding hlist_del is in _req_may_be_done() */
+		hlist_add_head(&req->colision, ar_hash_slot(mdev,req->sector));
+
+		set_bit(UNPLUG_REMOTE,&mdev->flags); /* why? */
+
+		D_ASSERT(req->rq_state & RQ_NET_PENDING);
+		req->rq_state |= RQ_NET_QUEUED;
+		req->w.cb = (req->rq_state & RQ_LOCAL_MASK)
+			? w_read_retry_remote
+			: w_send_read_req;
+		drbd_queue_work(&mdev->data.work, &req->w);
+		break;
+
+	case queue_for_net_write:
+		/* assert something? */
+		/* from drbd_make_request_common only */
+
+		/* NOTE
+		 * In case the req ended up on the transfer log before being
+		 * queued on the worker, it could lead to this request being
+		 * missed during cleanup after connection loss.
+		 * So we have to do both operations here,
+		 * within the same lock that protects the transfer log.
+		 */
+
+		/* register this request on the colison detection hash
+		 * tables. if we have a conflict, just leave here.
+		 * the request will be "queued" for faked "completion"
+		 * once the conflicting request is done.
+		 */
+		if (_req_add_hash_check_colision(req)) {
+			/* this is a conflicting request.
+			 * even though it may have been only _partially_
+			 * overlapping with one of the currently pending requests,
+			 * without even submitting or sending it,
+			 * we will pretend that it was successfully served
+			 * once the pending conflicting request is done.
+			 */
+			/* _req_mod(req, suspend_because_of_conflict); */
+			/* this callback is just for ASSERT purposes */
+			req->w.cb = w_req_cancel_conflict;
+
+			/* we don't add this to any epoch (barrier) object.
+			 * assign the "invalid" barrier_number 0.
+			 * it should be 0 anyways, still,
+			 * but being explicit won't harm. */
+			req->epoch = 0;
+
+			/*
+			 * EARLY break here!
+			 */
+			break;
+		}
+
+		/* _req_add_to_epoch(req); this has to be after the
+		 * _maybe_start_new_epoch(req); which happened in
+		 * drbd_make_request_common, because we now may set the bit
+		 * again ourselves to close the current epoch.
+		 *
+		 * Add req to the (now) current epoch (barrier). */
+
+		/* see drbd_make_request_common just after it grabs the req_lock */
+		D_ASSERT(test_bit(ISSUE_BARRIER, &mdev->flags) == 0);
+
+		req->epoch = mdev->newest_barrier->br_number;
+		list_add(&req->tl_requests,&mdev->newest_barrier->requests);
+
+		/* mark the current epoch as closed,
+		 * in case it outgrew the limit */
+		if( ++mdev->newest_barrier->n_req >= mdev->net_conf->max_epoch_size )
+			set_bit(ISSUE_BARRIER,&mdev->flags);
+
+		D_ASSERT(req->rq_state & RQ_NET_PENDING);
+		req->rq_state |= RQ_NET_QUEUED;
+		req->w.cb =  w_send_dblock;
+		drbd_queue_work(&mdev->data.work, &req->w);
+		break;
+
+	case conflicting_req_done:
+	case conflicting_ee_done:
+		/* reached via bio_endio of the
+		 * conflicting request or epoch entry.
+		 * we now just "fake" completion of this request.
+		 * THINK: I'm going to _FAIL_ this request.
+		 */
+		D_ASSERT(req->w.cb == w_req_cancel_conflict);
+		D_ASSERT(req->epoch == 0);
+		{
+			const unsigned long s = req->rq_state;
+			if (s & RQ_LOCAL_MASK) {
+				D_ASSERT(s & RQ_LOCAL_PENDING);
+				bio_put(req->private_bio);
+				req->private_bio = NULL;
+				dec_local(mdev);
+			}
+			D_ASSERT((s & RQ_NET_MASK) == RQ_NET_PENDING);
+			dec_ap_pending(mdev);
+		}
+		/* no _OK ... this is going to be an io-error */
+		req->rq_state = RQ_LOCAL_COMPLETED|RQ_NET_DONE;
+		_req_may_be_done(req);
+		break;
+
+	/* FIXME
+	 * to implement freeze-io,
+	 * we may not finish the request just yet.
+	 */
+	case send_canceled:
+		/* for the request, this is the same thing */
+	case send_failed:
+		D_ASSERT(req->rq_state & RQ_NET_PENDING);
+		dec_ap_pending(mdev);
+		req->rq_state &= ~(RQ_NET_PENDING|RQ_NET_QUEUED|RQ_NET_OK);
+		req->rq_state |= RQ_NET_DONE;
+		_req_may_be_done(req);
+		break;
+
+	case handed_over_to_network:
+		/* assert something? */
+		if ( bio_data_dir(req->master_bio) == WRITE &&
+		     mdev->net_conf->wire_protocol == DRBD_PROT_A ) {
+			/* this is what is dangerous about protocol A:
+			 * pretend it was sucessfully written on the peer.
+			 * FIXME in case we get a local io-error in
+			 * protocol != C, we might want to defer comletion
+			 * until we get the barrier ack, and send a NegAck
+			 * in case the other node had an io-error, too...
+			 * That way we would at least not report "success"
+			 * if it was not written at all. */
+			if (req->rq_state & RQ_NET_PENDING) {
+				dec_ap_pending(mdev);
+				req->rq_state &= ~RQ_NET_PENDING;
+				req->rq_state |= RQ_NET_OK;
+			} /* else: neg-ack was faster... */
+			/* it is still not yet RQ_NET_DONE until the
+			 * corresponding epoch barrier got acked as well,
+			 * so we know what to dirty on connection loss */
+		}
+		req->rq_state &= ~RQ_NET_QUEUED;
+		req->rq_state |= RQ_NET_SENT;
+		/* because _drbd_send_zc_bio could sleep, and may want to
+		 * dereference the bio even after the "write_acked_by_peer" and
+		 * "completed_ok" events came in, once we return from
+		 * _drbd_send_zc_bio (drbd_send_dblock), we have to check
+		 * whether it is done already, and end it.  */
+		_req_may_be_done(req);
+		break;
+
+	case connection_lost_while_pending:
+		/* transfer log cleanup after connection loss */
+		/* assert something? */
+		if (req->rq_state & RQ_NET_PENDING) dec_ap_pending(mdev);
+		req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);
+		req->rq_state |= RQ_NET_DONE;
+		/* if it is still queued, we may not complete it here.
+		 * it will be canceled soon.
+		 * FIXME we should change the code so this can not happen. */
+		if (!(req->rq_state & RQ_NET_QUEUED)) 
+			_req_may_be_done(req);
+		break;
+
+	case write_acked_by_peer:
+		/* assert something? */
+		/* protocol C; successfully written on peer */
+		req->rq_state |= RQ_NET_DONE;
+		/* rest is the same as for: */
+	case recv_acked_by_peer:
+		/* protocol B; pretends to be sucessfully written on peer.
+		 * see also notes above in handed_over_to_network about
+		 * protocol != C */
+		req->rq_state |= RQ_NET_OK;
+		D_ASSERT(req->rq_state & RQ_NET_PENDING);
+		dec_ap_pending(mdev);
+		req->rq_state &= ~RQ_NET_PENDING;
+		if (req->rq_state & RQ_NET_SENT)
+			_req_may_be_done(req);
+		/* else: done by handed_over_to_network */
+		break;
+
+	case neg_acked:
+		/* assert something? */
+		if (req->rq_state & RQ_NET_PENDING) dec_ap_pending(mdev);
+		req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);
+		/* FIXME THINK! is it DONE now, or is it not? */
+		req->rq_state |= RQ_NET_DONE;
+		if (req->rq_state & RQ_NET_SENT)
+			_req_may_be_done(req);
+		/* else: done by handed_over_to_network */
+		break;
+
+	case barrier_acked:
+		/* can even happen for protocol C,
+		 * when local io is stil pending.
+		 * in which case it does nothing. */
+		D_ASSERT(req->rq_state & RQ_NET_SENT);
+		req->rq_state |= RQ_NET_DONE;
+		_req_may_be_done(req);
+		break;
+
+	case data_received:
+		D_ASSERT(req->rq_state & RQ_NET_PENDING);
+		dec_ap_pending(mdev);
+		req->rq_state &= ~RQ_NET_PENDING;
+		req->rq_state |= (RQ_NET_OK|RQ_NET_DONE);
+		/* can it happen that we receive the DataReply
+		 * before the send DataRequest function returns? */
+		if (req->rq_state & RQ_NET_SENT)
+			_req_may_be_done(req);
+		/* else: done by handed_over_to_network */
+		break;
+	};
+}
+
+/* If you need it irqsave, do it your self! */
+static inline void req_mod(drbd_request_t *req, drbd_req_event_t what)
+{
+	spin_lock_irq(&req->mdev->req_lock);
+	_req_mod(req,what);
+	spin_unlock_irq(&req->mdev->req_lock);
+}
+#endif

Modified: trunk/drbd/drbd_worker.c
===================================================================
--- trunk/drbd/drbd_worker.c	2006-09-09 10:42:16 UTC (rev 2393)
+++ trunk/drbd/drbd_worker.c	2006-09-09 23:04:18 UTC (rev 2394)
@@ -40,6 +40,7 @@
 
 #include <linux/drbd.h>
 #include "drbd_int.h"
+#include "drbd_req.h"
 
 /* I choose to have all block layer end_io handlers defined here.
 
@@ -75,17 +76,16 @@
 
 	/* We are called each time a part of the bio is finished, but
 	 * we are only interested when the whole bio is finished, therefore
-	 * return as long as bio->bio_size is positive.
-	 */
+	 * return as long as bio->bio_size is positive.  */
 	if (bio->bi_size) return 1;
 
-	PARANOIA_BUG_ON(!VALID_POINTER(e));
 	D_ASSERT(e->block_id != ID_VACANT);
 
-	spin_lock_irqsave(&mdev->ee_lock,flags);
+	spin_lock_irqsave(&mdev->req_lock,flags);
+	mdev->read_cnt += e->size >> 9;
 	list_del(&e->w.list);
 	if(list_empty(&mdev->read_ee)) wake_up(&mdev->ee_wait);
-	spin_unlock_irqrestore(&mdev->ee_lock,flags);
+	spin_unlock_irqrestore(&mdev->req_lock,flags);
 
 	drbd_chk_io_error(mdev,error);
 	drbd_queue_work(&mdev->data.work,&e->w);
@@ -100,7 +100,7 @@
 {
 	unsigned long flags=0;
 	struct Tl_epoch_entry *e=NULL;
-	struct Drbd_Conf* mdev;
+	drbd_dev *mdev;
 	int do_wake;
 	int is_syncer_req;
 
@@ -110,91 +110,59 @@
 	// see above
 	if (bio->bi_size) return 1;
 
-	PARANOIA_BUG_ON(!VALID_POINTER(e));
 	D_ASSERT(e->block_id != ID_VACANT);
 
-	spin_lock_irqsave(&mdev->ee_lock,flags);
+	spin_lock_irqsave(&mdev->req_lock,flags);
+	mdev->writ_cnt += e->size >> 9;
 	is_syncer_req = is_syncer_block_id(e->block_id);
-	list_del(&e->w.list);
+	list_del(&e->w.list); /* has been on active_ee or sync_ee */
 	list_add_tail(&e->w.list,&mdev->done_ee);
 
-	if(!is_syncer_req) mdev->epoch_size++;
+	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
+	 * neither did we wake possibly waiting conflicting requests.
+	 * done from "drbd_process_done_ee" or _drbd_clear_done_ee
+	 * within the appropriate w.cb (e_end_block) */
 
+	if(!is_syncer_req) mdev->epoch_size++;
+	
 	do_wake = is_syncer_req
 		? list_empty(&mdev->sync_ee)
 		: list_empty(&mdev->active_ee);
 
-	spin_unlock_irqrestore(&mdev->ee_lock,flags);
+	if (error) __drbd_chk_io_error(mdev);
+	spin_unlock_irqrestore(&mdev->req_lock,flags);
 
 	if (do_wake) wake_up(&mdev->ee_wait);
 
-	if( !hlist_unhashed(&e->colision) ) {
-		spin_lock_irqsave(&mdev->tl_lock,flags);
-		hlist_del_init(&e->colision);
-		spin_unlock_irqrestore(&mdev->tl_lock,flags);
-	}
-
-	drbd_chk_io_error(mdev,error);
 	wake_asender(mdev);
 	dec_local(mdev);
 	return 0;
 }
 
-/* writes on Primary comming from drbd_make_request
+/* read, readA or write requests on Primary comming from drbd_make_request
  */
-int drbd_endio_write_pri(struct bio *bio, unsigned int bytes_done, int error)
+int drbd_endio_pri(struct bio *bio, unsigned int bytes_done, int error)
 {
+	unsigned long flags;
 	drbd_request_t *req=bio->bi_private;
-	struct Drbd_Conf* mdev=req->mdev;
-	sector_t rsector;
+	drbd_dev *mdev = req->mdev;
+	drbd_req_event_t what;
 
 	// see above
 	if (bio->bi_size) return 1;
 
-	drbd_chk_io_error(mdev,error);
-	rsector = drbd_req_get_sector(req);
-        // the bi_sector of the bio gets modified somewhere in drbd_end_req()!
-	drbd_end_req(req, RQ_DRBD_LOCAL, (error == 0), rsector);
-	drbd_al_complete_io(mdev,rsector);
-	dec_local(mdev);
-	bio_put(bio);
+	/* to avoid recursion in _req_mod */
+	what = error
+	       ? (bio_data_dir(bio) == WRITE)
+	         ? write_completed_with_error
+	         : read_completed_with_error
+	       : completed_ok;
+	spin_lock_irqsave(&mdev->req_lock,flags);
+	_req_mod(req, what);
+	spin_unlock_irqrestore(&mdev->req_lock,flags);
 	return 0;
 }
 
-/* reads on Primary comming from drbd_make_request
- */
-int drbd_endio_read_pri(struct bio *bio, unsigned int bytes_done, int error)
-{
-	drbd_request_t *req=bio->bi_private;
-	struct Drbd_Conf* mdev=req->mdev;
-
-	if (bio->bi_size) return 1;
-
-	/* READAs may fail.
-	 * upper layers need to be able to handle that themselves */
-	if (bio_rw(bio) == READA) goto pass_on;
-	if (error) {
-		drbd_chk_io_error(mdev,error); // handle panic and detach.
-		if(mdev->bc->dc.on_io_error == PassOn) goto pass_on;
-		// ok, if we survived this, retry:
-		// FIXME sector ...
-		if (DRBD_ratelimit(5*HZ,5))
-			ERR("local read failed, retrying remotely\n");
-		req->w.cb = w_read_retry_remote;
-		drbd_queue_work(&mdev->data.work,&req->w);
-	} else {
-	pass_on:
-		bio_endio(req->master_bio,req->master_bio->bi_size,error);
-		dec_ap_bio(mdev);
-
-		drbd_req_free(req);
-	}
-
-	bio_put(bio);
-	dec_local(mdev);
-	return 0;
-}
-
 int w_io_error(drbd_dev* mdev, struct drbd_work* w,int cancel)
 {
 	drbd_request_t *req = (drbd_request_t*)w;
@@ -208,6 +176,8 @@
 	 */
 	D_ASSERT(mdev->bc->dc.on_io_error != PassOn);
 
+	/* the only way this callback is scheduled is from _req_may_be_done,
+	 * when it is done and had a local write error, see comments there */
 	drbd_req_free(req);
 
 	if(unlikely(cancel)) return 1;
@@ -220,36 +190,24 @@
 int w_read_retry_remote(drbd_dev* mdev, struct drbd_work* w,int cancel)
 {
 	drbd_request_t *req = (drbd_request_t*)w;
-	int ok;
 
-	smp_rmb();
+	spin_lock_irq(&mdev->req_lock);
 	if ( cancel ||
 	     mdev->state.conn < Connected ||
 	     mdev->state.pdsk <= Inconsistent ) {
-		drbd_khelper(mdev,"pri-on-incon-degr");
-		drbd_panic("WE ARE LOST. Local IO failure, no peer.\n");
-
-		// does not make much sense, but anyways...
-		drbd_bio_endio(req->master_bio,0);
-		dec_ap_bio(mdev);
-		drbd_req_free(req);
+		_req_mod(req, send_canceled); /* FIXME freeze? ... */
+		spin_unlock_irq(&mdev->req_lock);
+		drbd_khelper(mdev,"pri-on-incon-degr"); /* FIXME REALLY? */
+		ALERT("WE ARE LOST. Local IO failure, no peer.\n");
 		return 1;
 	}
+	spin_unlock_irq(&mdev->req_lock);
 
-	// FIXME: what if partner was SyncTarget, and is out of sync for
-	// this area ?? ... should be handled in the receiver.
-
-	ok = drbd_io_error(mdev);
-	if(unlikely(!ok)) ERR("Sending in w_read_retry_remote() failed\n");
-	
-	ok = drbd_read_remote(mdev,req);
-	if(unlikely(!ok)) {
-		ERR("drbd_read_remote() failed\n");
-		/* dec_ap_pending and bio_io_error are done in
-		 * drbd_fail_pending_reads
-		 */
-	}
-	return ok;
+	/* FIXME this is ugly. we should not detach for read io-error,
+	 * but try to WRITE the DataReply to the failed location,
+	 * to give the disk the chance to relocate that block */
+	drbd_io_error(mdev); /* tries to schedule a detach and notifies peer */
+	return w_send_read_req(mdev,w,0);
 }
 
 int w_resync_inactive(drbd_dev *mdev, struct drbd_work *w, int cancel)
@@ -259,13 +217,11 @@
 	return 0;
 }
 
-/* FIXME
- * not used any longer, they now use e_end_resync_block.
- * maybe remove again?
- */
-int w_is_resync_read(drbd_dev *mdev, struct drbd_work *w, int unused)
+/* for debug assertion only */
+int w_req_cancel_conflict(drbd_dev *mdev, struct drbd_work *w, int cancel)
 {
-	ERR("%s: Typecheck only, should never be called!\n", __FUNCTION__ );
+	ERR("w_req_cancel_conflict: this callback should never be called!\n");
+	if (cancel) return 1; /* does it matter? */
 	return 0;
 }
 
@@ -296,6 +252,7 @@
 	unsigned long bit;
 	sector_t sector;
 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
+	int max_segment_size = mdev->rq_queue->max_segment_size;
 	int number,i,size;
 
 	PARANOIA_BUG_ON(w != &mdev->resync_work);
@@ -353,21 +310,26 @@
 		 * or if it the request would cross a 32k boundary
 		 * (play more nicely with most raid devices).
 		 *
-		 * we don't care about the agreed-uppon q->max_segment_size
-		 * here, because we don't keep a reference on this side.
-		 * the sync source will split the requests approrpiately as
-		 * needed, and may send multiple RSDataReply packets.
+		 * we _do_ care about the agreed-uppon q->max_segment_size
+		 * here, as splitting up the requests on the other side is more
+		 * difficult.  the consequence is, that on lvm and md and other
+		 * "indirect" devices, this is dead code, since
+		 * q->max_segment_size will be PAGE_SIZE.
 		 */
 		for (;;) {
-			if (size < DRBD_MAX_SEGMENT_SIZE)
+			if (size + BM_BLOCK_SIZE > max_segment_size)
 				break;
 			if ((sector & ~63ULL) + BM_BIT_TO_SECT(2) <= 64ULL)
 				break;
 			// do not cross extent boundaries
 			if (( (bit+1) & BM_BLOCKS_PER_BM_EXT_MASK ) == 0)
 				break;
-			// now, is it actually dirty, after all?
-			if ( drbd_bm_test_bit(mdev,bit+1) == 0 )
+			/* now, is it actually dirty, after all?
+			 * caution, drbd_bm_test_bit is tri-state for some
+			 * obscure reason; ( b == 0 ) would get the out-of-band
+			 * only accidentally right because of the "oddly sized"
+			 * adjustment below */
+			if ( drbd_bm_test_bit(mdev,bit+1) != 1 )
 				break;
 			bit++;
 			size += BM_BLOCK_SIZE;
@@ -454,7 +416,7 @@
 }
 
 /**
- * w_e_end_data_req: Send the answer (DataReply) to a DataRequest.
+ * w_e_end_data_req: Send the answer (DataReply) in response to a DataRequest.
  */
 int w_e_end_data_req(drbd_dev *mdev, struct drbd_work *w, int cancel)
 {
@@ -473,19 +435,22 @@
 		ok=drbd_send_ack(mdev,NegDReply,e);
 		if (DRBD_ratelimit(5*HZ,5))
 			ERR("Sending NegDReply. I guess it gets messy.\n");
+		/* FIXME we should not detach for read io-errors, in particular
+		 * not now: when the peer asked us for our data, we are likely
+		 * the only remaining disk... */
 		drbd_io_error(mdev);
 	}
 
 	dec_unacked(mdev);
 
-	spin_lock_irq(&mdev->ee_lock);
+	spin_lock_irq(&mdev->req_lock);
 	if( drbd_bio_has_active_page(e->private_bio) ) {
 		/* This might happen if sendpage() has not finished */
 		list_add_tail(&e->w.list,&mdev->net_ee);
 	} else {
 		drbd_free_ee(mdev,e);
 	}
-	spin_unlock_irq(&mdev->ee_lock);
+	spin_unlock_irq(&mdev->req_lock);
 
 	if(unlikely(!ok)) ERR("drbd_send_block() failed\n");
 	return ok;
@@ -505,7 +470,7 @@
 		return 1;
 	}
 
-	drbd_rs_complete_io(mdev,drbd_ee_get_sector(e));
+	drbd_rs_complete_io(mdev,e->sector);
 
 	if(likely(drbd_bio_uptodate(e->private_bio))) {
 		if (likely( mdev->state.pdsk >= Inconsistent )) {
@@ -525,14 +490,14 @@
 
 	dec_unacked(mdev);
 
-	spin_lock_irq(&mdev->ee_lock);
+	spin_lock_irq(&mdev->req_lock);
 	if( drbd_bio_has_active_page(e->private_bio) ) {
 		/* This might happen if sendpage() has not finished */
 		list_add_tail(&e->w.list,&mdev->net_ee);
 	} else {
 		drbd_free_ee(mdev,e);
 	}
-	spin_unlock_irq(&mdev->ee_lock);
+	spin_unlock_irq(&mdev->req_lock);
 
 	if(unlikely(!ok)) ERR("drbd_send_block() failed\n");
 	return ok;
@@ -541,14 +506,20 @@
 int w_send_barrier(drbd_dev *mdev, struct drbd_work *w, int cancel)
 {
 	struct drbd_barrier *b = (struct drbd_barrier *)w;
+	Drbd_Barrier_Packet *p = &mdev->data.sbuf.Barrier;
 	int ok=1;
 
 	if(unlikely(cancel)) return ok;
 
 	down(&mdev->data.mutex);
-	ok = _drbd_send_barrier(mdev,b);
+	p->barrier = b->br_number;
+	inc_ap_pending(mdev);
+	ok = _drbd_send_cmd(mdev,mdev->data.socket,Barrier,(Drbd_Header*)p,sizeof(*p),0);
 	up(&mdev->data.mutex);
 
+	/* pairing dec_ap_pending() happens in got_BarrierAck,
+	 * or (on connection loss) in tl_clear.  */
+
 	return ok;
 }
 
@@ -564,39 +535,15 @@
 int w_send_dblock(drbd_dev *mdev, struct drbd_work *w, int cancel)
 {
 	drbd_request_t *req = (drbd_request_t *)w;
-	sector_t sector;
-	unsigned int size;
 	int ok;
 
-	D_ASSERT( !(req->rq_status & RQ_DRBD_SENT) );
-
 	if (unlikely(cancel)) {
-		/* We clear it up here explicit, since we might be _after_ the
-		   run of tl_clear() */
-		sector = drbd_req_get_sector(req);
-		size   = drbd_req_get_size(req);
-
-		drbd_end_req(req,RQ_DRBD_SENT|RQ_DRBD_ON_WIRE,1, sector);
-		drbd_set_out_of_sync(mdev, sector, size);
-
+		req_mod(req, send_canceled);
 		return 1;
 	}
 
-	inc_ap_pending(mdev);
 	ok = drbd_send_dblock(mdev,req);
-	drbd_end_req(req,RQ_DRBD_ON_WIRE,1,drbd_req_get_sector(req));
-	if (ok) {
-		if(mdev->net_conf->wire_protocol == DRBD_PROT_A) {
-			dec_ap_pending(mdev);
-			drbd_end_req(req, RQ_DRBD_SENT, 1, 
-				     drbd_req_get_sector(req));
-		}
-	} else {
-		if (mdev->state.conn >= Connected) 
-			drbd_force_state(mdev,NS(conn,NetworkFailure));
-		drbd_thread_restart_nowait(&mdev->receiver);
-		/* The request gets cleared up by tl_clear() */
-	}
+	req_mod(req,ok ? handed_over_to_network : send_failed);
 
 	return ok;
 }
@@ -607,39 +554,44 @@
 int w_send_read_req(drbd_dev *mdev, struct drbd_work *w, int cancel)
 {
 	drbd_request_t *req = (drbd_request_t *)w;
-	struct bio *bio = req->master_bio;
 	int ok;
 
-	inc_ap_pending(mdev);
+	if (unlikely(cancel)) {
+		req_mod(req, send_canceled);
+		return 1;
+	}
 
-	if (unlikely(cancel)) return 1;
-
-	ok = drbd_send_drequest(mdev, DataRequest, bio->bi_sector, bio->bi_size,
+	ok = drbd_send_drequest(mdev, DataRequest, req->sector, req->size,
 				(unsigned long)req);
 
 	if (!ok) {
+		/* ?? we set Timeout or BrokenPipe in drbd_send() */
 		if (mdev->state.conn >= Connected) 
 			drbd_force_state(mdev,NS(conn,NetworkFailure));
 		drbd_thread_restart_nowait(&mdev->receiver);
+		/* req_mod(req, send_failed); we should not fail it here,
+		 * we might have to "freeze" on disconnect.
+		 * handled by req_mod(req, connection_lost_while_pending);
+		 * in drbd_fail_pending_reads soon enough. */
 	}
 
 	return ok;
 }
 
+/* FIXME how should freeze-io be handled? */
 STATIC void drbd_fail_pending_reads(drbd_dev *mdev)
 {
 	struct hlist_head *slot;
 	struct hlist_node *n;
 	drbd_request_t * req;
 	struct list_head *le;
-	struct bio *bio;
 	LIST_HEAD(workset);
 	int i;
 
 	/*
 	 * Application READ requests
 	 */
-	spin_lock(&mdev->pr_lock);
+	spin_lock_irq(&mdev->req_lock);
 	for(i=0;i<APP_R_HSIZE;i++) {
 		slot = mdev->app_reads_hash+i;
 		hlist_for_each_entry(req, n, slot, colision) {
@@ -647,21 +599,15 @@
 		}
 	}
 	memset(mdev->app_reads_hash,0,APP_R_HSIZE*sizeof(void*));
-	spin_unlock(&mdev->pr_lock);
 
 	while(!list_empty(&workset)) {
 		le = workset.next;
 		req = list_entry(le, drbd_request_t, w.list);
 		list_del(le);
 
-		bio = req->master_bio;
-
-		drbd_bio_IO_error(bio);
-		dec_ap_bio(mdev);
-		dec_ap_pending(mdev);
-
-		drbd_req_free(req);
+		_req_mod(req, connection_lost_while_pending);
 	}
+	spin_unlock_irq(&mdev->req_lock);
 }
 
 /**

Modified: trunk/drbd/linux/drbd.h
===================================================================
--- trunk/drbd/linux/drbd.h	2006-09-09 10:42:16 UTC (rev 2393)
+++ trunk/drbd/linux/drbd.h	2006-09-09 23:04:18 UTC (rev 2394)
@@ -34,7 +34,7 @@
 #endif
 
 enum io_error_handler {
-	PassOn,
+	PassOn, /* FIXME should the better be named "Ignore"? */
 	Panic,
 	Detach
 };

Modified: trunk/drbd/linux/drbd_config.h
===================================================================
--- trunk/drbd/linux/drbd_config.h	2006-09-09 10:42:16 UTC (rev 2393)
+++ trunk/drbd/linux/drbd_config.h	2006-09-09 23:04:18 UTC (rev 2394)
@@ -22,7 +22,7 @@
 
 extern const char * drbd_buildtag(void);
 
-#define REL_VERSION "8.0pre4"
+#define REL_VERSION "8.0pre5"
 #define API_VERSION 84
 #define PRO_VERSION 82
 

Modified: trunk/user/Makefile
===================================================================
--- trunk/user/Makefile	2006-09-09 10:42:16 UTC (rev 2393)
+++ trunk/user/Makefile	2006-09-09 23:04:18 UTC (rev 2394)
@@ -19,8 +19,10 @@
 
 KDIR := /lib/modules/$(shell uname -r)/build
 
-CFLAGS = -g -O2 -c -W -Wall -I../drbd -I$(KDIR)/include
 CC = gcc
+CFLAGS = -g -O2 -c -W -Wall -I../drbd
+# for the netlink connector stuff <linux/connector.h>:
+drbdsetup: CFLAGS += -I$(KDIR)/include
 
 drbdadm-obj = drbdadm_scanner.o drbdadm_parser.o drbdadm_main.o \
 	      drbdadm_adjust.o drbdtool_common.o drbdadm_usage_cnt.o \

Modified: trunk/user/drbdadm_main.c
===================================================================
--- trunk/user/drbdadm_main.c	2006-09-09 10:42:16 UTC (rev 2393)
+++ trunk/user/drbdadm_main.c	2006-09-09 23:04:18 UTC (rev 2394)
@@ -122,6 +122,7 @@
 int config_valid=1;
 int no_tty;
 int dry_run;
+int verbose;
 int do_verify_ips;
 char* drbdsetup;
 char* drbdmeta;
@@ -178,6 +179,7 @@
 
 struct option admopt[] = {
   { "dry-run",      no_argument,      0, 'd' },
+  { "verbose",      no_argument,      0, 'v' },
   { "config-file",  required_argument,0, 'c' },
   { "drbdsetup",    required_argument,0, 's' },
   { "drbdmeta",     required_argument,0, 'm' },
@@ -525,12 +527,12 @@
   sigemptyset(&sa.sa_mask);
   sa.sa_flags=0;
 
-  if(dry_run) {
+  if(dry_run || verbose) {
     while(*cmdline) {
       fprintf(stdout,"%s ",*cmdline++);
     }
     fprintf(stdout,"\n");
-    return 0;
+    if (dry_run) return 0;
   }
 
   pid = fork();
@@ -1421,6 +1423,7 @@
   drbdsetup=NULL;
   drbdmeta=NULL;
   dry_run=0;
+  verbose=0;
   yyin=NULL;
   uname(&nodeinfo); /* FIXME maybe fold to lower case ? */
   no_tty = (!isatty(fileno(stdin)) || !isatty(fileno(stdout)));
@@ -1479,6 +1482,9 @@
       if(c == -1) break;
       switch(c)
 	{
+	case 'v':
+	  verbose++;
+	  break;
 	case 'd':
 	  dry_run++;
 	  break;
@@ -1644,6 +1650,7 @@
 
       if ( optind==argc || !strcmp(argv[optind],"all") ) {
         if (is_dump) {
+	  printf("# %s\n",config_file);
 	  dump_global_info();
 	  dump_common_info();
 	}



More information about the drbd-cvs mailing list