[DRBD-cvs] r1768 - in trunk: . drbd

www-data www-data at garcon.linbit.com
Tue Mar 15 21:51:34 CET 2005


Author: phil
Date: 2005-03-15 21:51:32 +0100 (Tue, 15 Mar 2005)
New Revision: 1768

Modified:
   trunk/ROADMAP
   trunk/drbd/drbd_compat_wrappers.h
   trunk/drbd/drbd_fs.c
   trunk/drbd/drbd_int.h
   trunk/drbd/drbd_main.c
   trunk/drbd/drbd_receiver.c
   trunk/drbd/drbd_req.c
   trunk/drbd/drbd_worker.c
Log:
Implemented item 15 of the ROADMAP !
 This means that DRBD now accepts BIOs with up to 32kb of data
  = 8 pages on x86. -> We now make real use of Linux-2.6's
 IO subsystem.

Currently there is only one open issue:
  We hit a kernel bug in bio_split()
    
But apart from that it really works great, and makes some
things even simpler!
    
Oh, and by the way, this also finishes item 11 of the ROADMAP!    


Modified: trunk/ROADMAP
===================================================================
--- trunk/ROADMAP	2005-03-11 12:51:42 UTC (rev 1767)
+++ trunk/ROADMAP	2005-03-15 20:51:32 UTC (rev 1768)
@@ -354,7 +354,7 @@
   Probably a general high performance implementation for this
   issue is not necessary, since clusters of machines with 
   different PAGE_SIZE are of academic interest only.
-  0% DONE
+  100% DONE by item 15
 
 12 Introduce a "common" section in the config file. Option
   section (like handlers, startup, disk, net and syncer)
@@ -376,10 +376,10 @@
   /dev/mapper/control
   0% DONE
 
-15 Accept BIOs bigger than one page, probabely up to 64k (16 pages) 
-  would be a good choce. When this is done make the bits in the
-  bitmap to account for more then 4k e.g. 64k
-  0% DONE
+15 Accept BIOs bigger than one page, probabely up to 32k (8 pages) 
+  currently. When this is done make the bits in the bitmap to account 
+  for more then 4k e.g. 64k
+  50% DONE we handle big BIOs now, a bitmap bit is still 4k.
 
 [ Item 16 is still unfinished, as described here, the algorithm has
   some loose ends... ]

Modified: trunk/drbd/drbd_compat_wrappers.h
===================================================================
--- trunk/drbd/drbd_compat_wrappers.h	2005-03-11 12:51:42 UTC (rev 1767)
+++ trunk/drbd/drbd_compat_wrappers.h	2005-03-15 20:51:32 UTC (rev 1768)
@@ -146,24 +146,6 @@
 }
 #endif
 
-static inline void drbd_ee_init(struct Tl_epoch_entry *e,struct page *page)
-{
-	struct bio * const bio = &e->private_bio;
-	struct bio_vec * const vec = &e->ee_bvec;
-
-	memset(e, 0, sizeof(*e));
-	bio_init(bio);
-
-	bio->bi_io_vec = vec;
-	bio->bi_destructor = NULL;
-	vec->bv_page = page;
-	bio->bi_size = vec->bv_len = PAGE_SIZE;
-	bio->bi_max_vecs = bio->bi_vcnt = 1;
-	vec->bv_offset = 0;
-
-	e->block_id = ID_VACANT;
-}
-
 static inline void drbd_bio_set_pages_dirty(struct bio *bio)
 {
 	bio_set_pages_dirty(bio);
@@ -175,44 +157,19 @@
 }
 
 static inline void
-drbd_ee_bio_prepare(drbd_dev *mdev, struct Tl_epoch_entry* e,
-		    sector_t sector, int size)
-{
-	struct bio * const bio = &e->private_bio;
-	struct bio_vec * const vec = &e->ee_bvec;
-	struct page * const page = vec->bv_page;
-	D_ASSERT(mdev->backing_bdev);
-
-	/* Clear plate. */
-	bio_init(bio);
-
-	bio->bi_io_vec = vec;
-	bio->bi_destructor = NULL;
-	vec->bv_page = page;
-	vec->bv_offset = 0;
-	bio->bi_max_vecs = bio->bi_vcnt = 1;
-
-	bio->bi_bdev = mdev->backing_bdev;
-	bio->bi_private = mdev;
-
-	e->ee_sector = bio->bi_sector = sector;
-	e->ee_size = bio->bi_size = bio->bi_io_vec->bv_len = size;
-}
-
-static inline void
 drbd_ee_prepare_write(drbd_dev *mdev, struct Tl_epoch_entry* e,
-		      sector_t sector, int size)
+		      sector_t sector)
 {
-	drbd_ee_bio_prepare(mdev,e,sector,size);
-	e->private_bio.bi_end_io = drbd_dio_end_sec;
+	e->ee_sector = e->private_bio->bi_sector = sector;
+	e->private_bio->bi_end_io = drbd_dio_end_sec;
 }
 
 static inline void
 drbd_ee_prepare_read(drbd_dev *mdev, struct Tl_epoch_entry* e,
-		     sector_t sector, int size)
+		     sector_t sector)
 {
-	drbd_ee_bio_prepare(mdev,e,sector,size);
-	e->private_bio.bi_end_io = enslaved_read_bi_end_io;
+	e->ee_sector = e->private_bio->bi_sector = sector;
+	e->private_bio->bi_end_io = enslaved_read_bi_end_io;
 }
 
 static inline void
@@ -245,10 +202,16 @@
 	req->mdev      = mdev;
 }
 
-static inline struct page* drbd_bio_get_page(struct bio *bio)
+static inline int drbd_bio_has_active_page(struct bio *bio)
 {
-	struct bio_vec *bvec = bio_iovec(bio);
-	return bvec->bv_page;
+	struct bio_vec *bvec;
+	int i;
+
+	__bio_for_each_segment(bvec, bio, i, 0) {
+		if (page_count(bvec->bv_page) > 1) return 1;
+	}
+
+	return 0;
 }
 
 /*
@@ -306,12 +269,6 @@
 	spin_unlock_irq(q->queue_lock);
 }
 
-static inline int _drbd_send_zc_bio(drbd_dev *mdev, struct bio *bio)
-{
-	struct bio_vec *bvec = bio_iovec_idx(bio, bio->bi_idx);
-	return _drbd_send_page(mdev,bvec->bv_page,bvec->bv_offset,bvec->bv_len);
-}
-
 static inline int _drbd_send_bio(drbd_dev *mdev, struct bio *bio)
 {
 	struct bio_vec *bvec = bio_iovec(bio);

Modified: trunk/drbd/drbd_fs.c
===================================================================
--- trunk/drbd/drbd_fs.c	2005-03-11 12:51:42 UTC (rev 1767)
+++ trunk/drbd/drbd_fs.c	2005-03-15 20:51:32 UTC (rev 1768)
@@ -375,10 +375,10 @@
 	request_queue_t * const q = mdev->rq_queue;
 	request_queue_t * const b = bdev->bd_disk->queue;
 
-	q->max_sectors = min_not_zero((unsigned short)(PAGE_SIZE >> 9), b->max_sectors);
-	q->max_phys_segments = 1;
-	q->max_hw_segments   = 1;
-	q->max_segment_size  = min((unsigned)PAGE_SIZE,b->max_segment_size);
+	q->max_sectors = min_not_zero((unsigned short)(DRBD_MAX_SEGMENT_SIZE >> 9), b->max_sectors);
+	q->max_phys_segments = DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE;
+	q->max_hw_segments   = DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE;
+	q->max_segment_size  = min_t(int,DRBD_MAX_SEGMENT_SIZE,b->max_segment_size);
 	q->hardsect_size     = max((unsigned short)512,b->hardsect_size);
 	q->seg_boundary_mask = PAGE_SIZE-1;
 	D_ASSERT(q->hardsect_size <= PAGE_SIZE); // or we are really screwed ;-)

Modified: trunk/drbd/drbd_int.h
===================================================================
--- trunk/drbd/drbd_int.h	2005-03-11 12:51:42 UTC (rev 1767)
+++ trunk/drbd/drbd_int.h	2005-03-15 20:51:32 UTC (rev 1768)
@@ -611,14 +611,13 @@
  */
 struct Tl_epoch_entry {
 	struct drbd_work    w;
-	struct bio private_bio; // private bio struct, NOT a pointer
+	struct bio *private_bio;
 	u64    block_id;
 	long magic;
 	unsigned int ee_size;
 	sector_t ee_sector;
 	struct hlist_node colision;
-	// THINK: maybe we rather want bio_alloc(GFP_*,1)
-	struct bio_vec ee_bvec;
+	drbd_dev *mdev;
 };
 
 /* flag bits */
@@ -746,7 +745,6 @@
 	u32 *p_gen_cnt;
 	atomic_t epoch_size;
 	spinlock_t ee_lock;
-	struct list_head free_ee;   // available
 	struct list_head active_ee; // IO in progress
 	struct list_head sync_ee;   // IO in progress
 	struct list_head done_ee;   // send ack
@@ -757,8 +755,7 @@
 	spinlock_t pr_lock;
 	struct list_head app_reads;
 	struct list_head resync_reads;
-	int ee_vacant;
-	int ee_in_use;
+	atomic_t pp_in_use;
 	wait_queue_head_t ee_wait;
 	struct list_head busy_blocks;
 	struct page *md_io_page;      // one page buffer for md_io
@@ -950,9 +947,10 @@
 #endif
 
 /* Sector shift value for hash functions for tl_hash table and ee_hash
-   table. A value of 3 makes all IOs in on 4K block to make to the same
+   table. A value of 6 makes all IOs in on 32K block to make to the same
    slot of the hash table. */
-#define HT_SHIFT 3
+#define HT_SHIFT 6
+#define DRBD_MAX_SEGMENT_SIZE (1<<(9+HT_SHIFT)) 
 
 extern int  drbd_bm_init      (drbd_dev *mdev);
 extern int  drbd_bm_resize    (drbd_dev *mdev, sector_t sectors);
@@ -997,6 +995,11 @@
 extern kmem_cache_t *drbd_ee_cache;
 extern mempool_t *drbd_request_mempool;
 
+extern struct page* drbd_pp_pool; // drbd's page pool
+extern spinlock_t   drbd_pp_lock;
+extern int          drbd_pp_vacant;
+extern wait_queue_head_t drbd_pp_wait;
+
 // drbd_req
 #define ERF_NOTLD    2   /* do not call tl_dependence */
 extern void drbd_end_req(drbd_request_t *, int, int, sector_t);
@@ -1034,9 +1037,10 @@
 
 // drbd_receiver.c
 extern int drbd_release_ee(drbd_dev* mdev,struct list_head* list);
-extern int drbd_init_ee(drbd_dev* mdev);
-extern void drbd_put_ee(drbd_dev* mdev,struct Tl_epoch_entry *e);
-extern struct Tl_epoch_entry* drbd_get_ee(drbd_dev* mdev);
+extern struct Tl_epoch_entry* drbd_alloc_ee(drbd_dev *mdev, 
+					    unsigned int data_size,
+					    unsigned int gfp_mask);
+extern void drbd_free_ee(drbd_dev *mdev, struct Tl_epoch_entry* e);
 extern void drbd_wait_ee(drbd_dev *mdev,struct list_head *head);
 
 // drbd_proc.c

Modified: trunk/drbd/drbd_main.c
===================================================================
--- trunk/drbd/drbd_main.c	2005-03-11 12:51:42 UTC (rev 1767)
+++ trunk/drbd/drbd_main.c	2005-03-15 20:51:32 UTC (rev 1768)
@@ -77,9 +77,8 @@
 MODULE_AUTHOR("Philipp Reisner <phil at linbit.com>, Lars Ellenberg <lars at linbit.com>");
 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
 MODULE_LICENSE("GPL");
-MODULE_PARM_DESC(use_nbd_major, "DEPRECATED! use nbd device major nr (43) "
-		                "instead of the default " __stringify(LANANA_DRBD_MAJOR) );
 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
+MODULE_ALIAS_BLOCKDEV_MAJOR(LANANA_DRBD_MAJOR);
 
 #include <linux/moduleparam.h>
 /*
@@ -93,13 +92,11 @@
  */
 
 /* thanks to these macros, if compiled into the kernel (not-module),
- * these become boot parameters drbd.use_nbd_major and drbd.minor_count
+ * this becomes the boot parameter drbd.minor_count
  */
-module_param(use_nbd_major,   bool,0);
 module_param(minor_count,      int,0);
 
 // module parameter, defined
-int use_nbd_major = 0;
 int major_nr = LANANA_DRBD_MAJOR;
 #ifdef MODULE
 int minor_count = 2;
@@ -121,7 +118,20 @@
 kmem_cache_t *drbd_request_cache;
 kmem_cache_t *drbd_ee_cache;
 mempool_t *drbd_request_mempool;
+mempool_t *drbd_ee_mempool;
 
+/* I do not use a standard mempool, because:
+   1) I want to hand out the preallocated objects first.
+   2) I want to be able to interrupt sleeping allocation with a signal.
+   Note: This is a single linked list, the next pointer is the private
+         member of struct page.
+ */
+struct page* drbd_pp_pool;
+spinlock_t   drbd_pp_lock;
+int          drbd_pp_vacant;
+wait_queue_head_t drbd_pp_wait;
+
+
 STATIC struct block_device_operations drbd_ops = {
 	.owner =   THIS_MODULE,
 	.open =    drbd_open,
@@ -1177,33 +1187,9 @@
        return ret;
 }
 
-#ifdef DRBD_DISABLE_SENDPAGE
 int _drbd_send_page(drbd_dev *mdev, struct page *page,
 		    int offset, size_t size)
 {
-	int sent,ok;
-	int len   = size;
-
-	spin_lock(&mdev->send_task_lock);
-	mdev->send_task=current;
-	spin_unlock(&mdev->send_task_lock);
-
-	sent =  _drbd_no_send_page(mdev, page, offset, size);
-	if (likely(sent > 0)) len -= sent;
-
-	spin_lock(&mdev->send_task_lock);
-	mdev->send_task=NULL;
-	spin_unlock(&mdev->send_task_lock);
-
-	ok = (len == 0);
-	if (likely(ok))
-		mdev->send_cnt += size>>9;
-	return ok;
-}
-#else
-int _drbd_send_page(drbd_dev *mdev, struct page *page,
-		    int offset, size_t size)
-{
 	mm_segment_t oldfs = get_fs();
 	int sent,ok;
 	int len   = size;
@@ -1281,8 +1267,22 @@
 		mdev->send_cnt += size>>9;
 	return ok;
 }
-#endif
 
+STATIC int _drbd_send_zc_bio(drbd_dev *mdev, struct bio *bio)
+{
+	struct bio_vec *bvec;
+	int i;
+	
+	bio_for_each_segment(bvec, bio, i) {
+		if (! _drbd_send_page(mdev, bvec->bv_page, bvec->bv_offset,
+				      bvec->bv_len) ) {
+			return 0;
+		}
+	}
+
+	return 1;
+}
+
 // Used to send write requests: bh->b_rsector !!
 int drbd_send_dblock(drbd_dev *mdev, drbd_request_t *req)
 {
@@ -1399,7 +1399,7 @@
 
 	dump_packet(mdev,mdev->data.socket,0,(void*)&p, __FILE__, __LINE__);
 	ok = sizeof(p) == drbd_send(mdev,mdev->data.socket,&p,sizeof(p),MSG_MORE);
-	if (ok) ok = _drbd_send_zc_bio(mdev,&e->private_bio);
+	if (ok) ok = _drbd_send_zc_bio(mdev,e->private_bio);
 
 	spin_lock(&mdev->send_task_lock);
 	mdev->send_task=NULL;
@@ -1620,6 +1620,7 @@
 	atomic_set(&mdev->local_cnt,0);
 	atomic_set(&mdev->resync_locked,0);
 	atomic_set(&mdev->packet_seq,0);
+	atomic_set(&mdev->pp_in_use, 0);
 
 	init_MUTEX(&mdev->md_io_mutex);
 	init_MUTEX(&mdev->data.mutex);
@@ -1635,7 +1636,6 @@
 	spin_lock_init(&mdev->send_task_lock);
 	spin_lock_init(&mdev->peer_seq_lock);
 
-	INIT_LIST_HEAD(&mdev->free_ee);
 	INIT_LIST_HEAD(&mdev->active_ee);
 	INIT_LIST_HEAD(&mdev->sync_ee);
 	INIT_LIST_HEAD(&mdev->done_ee);
@@ -1714,11 +1714,8 @@
 
 	drbd_thread_stop(&mdev->worker);
 
-	if (   mdev->ee_in_use  !=  0
-	    || mdev->ee_vacant  != 32 /* EE_MININUM */
-	    || atomic_read(&mdev->epoch_size) !=  0)
-		ERR("ee_in_use:%d ee_vacant:%d epoch_size:%d\n",
-		    mdev->ee_in_use, mdev->ee_vacant, atomic_read(&mdev->epoch_size));
+	if ( atomic_read(&mdev->epoch_size) !=  0)
+		ERR("epoch_size:%d\n",atomic_read(&mdev->epoch_size));
 #define ZAP(x) memset(&x,0,sizeof(x))
 	ZAP(mdev->conf);
 	ZAP(mdev->sync_conf);
@@ -1772,6 +1769,19 @@
 
 void drbd_destroy_mempools(void)
 {
+	struct page *page;
+
+	while(drbd_pp_pool) {
+		page = drbd_pp_pool;
+		drbd_pp_pool = (struct page*)page->private;
+		__free_page(page);
+		drbd_pp_vacant--;
+	}
+
+	/* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
+
+	if (drbd_ee_mempool)
+		mempool_destroy(drbd_ee_mempool);
 	if (drbd_request_mempool)
 		mempool_destroy(drbd_request_mempool);
 	if (drbd_ee_cache && kmem_cache_destroy(drbd_ee_cache))
@@ -1782,6 +1792,7 @@
 		       ": kmem_cache_destroy(drbd_request_cache) FAILED\n");
 	// FIXME what can we do if we fail to destroy them?
 
+	drbd_ee_mempool      = NULL;
 	drbd_request_mempool = NULL;
 	drbd_ee_cache        = NULL;
 	drbd_request_cache   = NULL;
@@ -1791,32 +1802,53 @@
 
 int drbd_create_mempools(void)
 {
+	struct page *page;
+	const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
+	int i;
+
 	// prepare our caches and mempools
 	drbd_request_mempool = NULL;
 	drbd_ee_cache        = NULL;
 	drbd_request_cache   = NULL;
+	drbd_pp_pool         = NULL;
 
 	// caches
 	drbd_request_cache = kmem_cache_create(
 		"drbd_req_cache", sizeof(drbd_request_t),
-		0, SLAB_NO_REAP, NULL, NULL);
+		0, 0, NULL, NULL);
 	if (drbd_request_cache == NULL)
 		goto Enomem;
 
 	drbd_ee_cache = kmem_cache_create(
 		"drbd_ee_cache", sizeof(struct Tl_epoch_entry),
-		0, SLAB_NO_REAP, NULL, NULL);
+		0, 0, NULL, NULL);
 	if (drbd_ee_cache == NULL)
 		goto Enomem;
 
 	// mempools
-	drbd_request_mempool = mempool_create(16, //TODO; reasonable value
+	drbd_request_mempool = mempool_create( number,
 		mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
 	if (drbd_request_mempool == NULL)
 		goto Enomem;
 
-		return 0;
+	drbd_ee_mempool = mempool_create( number,
+		mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
+	if (drbd_request_mempool == NULL)
+		goto Enomem;
 
+	// drbd's page pool
+	spin_lock_init(&drbd_pp_lock);
+
+	for (i=0;i< number;i++) {
+		page = alloc_page(GFP_KERNEL);
+		if(!page) goto Enomem;
+		page->private = (unsigned long)drbd_pp_pool;
+		drbd_pp_pool = page;
+	}
+	drbd_pp_vacant = number;
+
+	return 0;
+
   Enomem:
 	drbd_destroy_mempools(); // in case we allocated some
 	return -ENOMEM;
@@ -1868,12 +1900,6 @@
 			if (mdev->bitmap) drbd_bm_cleanup(mdev);
 			if (mdev->resync) lc_free(mdev->resync);
 
-			D_ASSERT(mdev->ee_in_use==0);
-
-			rr = drbd_release_ee(mdev,&mdev->free_ee);
-			// INFO("%d EEs in free list found.\n",rr);
-			// D_ASSERT(rr == 32);
-
 			rr = drbd_release_ee(mdev,&mdev->active_ee);
 			if(rr) ERR("%d EEs in active list found!\n",rr);
 
@@ -1895,7 +1921,6 @@
 					DUMPP(lp);
 				}
 			};
-			D_ASSERT(mdev->ee_vacant == 0);
 
 			if (mdev->md_io_page)
 				__free_page(mdev->md_io_page);
@@ -1984,10 +2009,6 @@
 		return -EINVAL;
 	}
 
-	if (use_nbd_major) {
-		major_nr = NBD_MAJOR;
-	}
-
 	if (1 > minor_count||minor_count > 255) {
 		printk(KERN_ERR DEVICE_NAME
 			": invalid minor_count (%d)\n",minor_count);
@@ -2013,6 +2034,8 @@
 	 */
 	err = -ENOMEM;
 
+	init_waitqueue_head(&drbd_pp_wait);
+
 	drbd_proc = NULL; // play safe for drbd_cleanup
 	drbd_conf = kmalloc(sizeof(drbd_dev)*minor_count,GFP_KERNEL);
 	if (likely(drbd_conf!=NULL))
@@ -2082,7 +2105,6 @@
 
 		init_MUTEX(&mdev->device_mutex);
 		if (!tl_init(mdev)) goto Enomem;
-		if (!drbd_init_ee(mdev)) goto Enomem;
 	}
 
 #if CONFIG_PROC_FS
@@ -2124,9 +2146,6 @@
 	       "Version: " REL_VERSION " (api:%d/proto:%d)\n",
 	       API_VERSION,PRO_VERSION);
 	printk(KERN_INFO DEVICE_NAME ": %s\n", drbd_buildtag());
-	if (use_nbd_major) {
-		printk(KERN_INFO DEVICE_NAME": hijacking NBD device major!\n");
-	}
 	printk(KERN_INFO DEVICE_NAME": registered as block device major %d\n", MAJOR_NR);
 
 	return 0; // Success!

Modified: trunk/drbd/drbd_receiver.c
===================================================================
--- trunk/drbd/drbd_receiver.c	2005-03-11 12:51:42 UTC (rev 1767)
+++ trunk/drbd/drbd_receiver.c	2005-03-15 20:51:32 UTC (rev 1768)
@@ -51,8 +51,6 @@
 #include <linux/drbd.h>
 #include "drbd_int.h"
 
-#define EE_MININUM 32    // @4k pages => 128 KByte
-
 #define is_syncer_blk(A,B) ((B)==ID_SYNCER)
 
 #ifdef __arch_um__
@@ -137,11 +135,94 @@
 }
 #endif //PARANOIA
 
+#define GFP_TRY	( __GFP_HIGHMEM | __GFP_NOWARN )
+
+STATIC int drbd_process_ee(drbd_dev *mdev, int be_sleepy);
+
+/**
+ * drbd_bp_alloc: Returns a page. Fails only if a signal comes in.
+ */
+STATIC struct page * drbd_pp_alloc(drbd_dev *mdev, unsigned int gfp_mask)
+{
+	struct page *page;
+	DEFINE_WAIT(wait);
+
+	if ( drbd_pp_vacant == 
+	     (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count/2 ) {
+		drbd_kick_lo(mdev);
+	}
+
+	spin_lock(&drbd_pp_lock);
+	if ( (page = drbd_pp_pool) ) {
+		drbd_pp_pool = (struct page*)page->private;
+		drbd_pp_vacant--;
+	}
+	spin_unlock(&drbd_pp_lock);
+	if ( page ) goto got_page;
+
+	drbd_process_ee(mdev,1);
+ 
+	spin_lock(&drbd_pp_lock);
+	if ( (page = drbd_pp_pool) ) {
+		drbd_pp_pool = (struct page*)page->private;
+		drbd_pp_vacant--;
+	}
+	spin_unlock(&drbd_pp_lock);
+	if ( page ) goto got_page;
+
+	for (;;) {
+		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
+
+		spin_lock(&drbd_pp_lock);
+		if ( (page = drbd_pp_pool) ) {
+			drbd_pp_pool = (struct page*)page->private;
+			drbd_pp_vacant--;
+		}
+		spin_unlock(&drbd_pp_lock);
+		if ( page ) break;
+
+		if ( atomic_read(&mdev->pp_in_use) < mdev->conf.max_buffers ) {
+			if( (page = alloc_page(GFP_TRY)) ) break;
+		}
+		drbd_kick_lo(mdev);
+		schedule();
+		finish_wait(&drbd_pp_wait, &wait);
+		if (signal_pending(current)) {
+			WARN("drbd_pp_alloc interrupted!\n");
+			return NULL;
+		}
+		// finish wait is inside, so that we are TASK_RUNNING 
+		// in _drbd_process_ee (which might sleep by itself.)
+		drbd_process_ee(mdev,1);
+	}
+	finish_wait(&drbd_pp_wait, &wait); 
+
+ got_page:
+	atomic_inc(&mdev->pp_in_use);
+
+	return page;
+}
+
+STATIC void drbd_pp_free(drbd_dev *mdev,struct page *page)
+{
+	atomic_dec(&mdev->pp_in_use);
+
+	spin_lock(&drbd_pp_lock);
+	if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
+		__free_page(page);
+	} else {
+		page->private = (unsigned long)drbd_pp_pool;
+		drbd_pp_pool = page;
+		drbd_pp_vacant++;
+	}
+	spin_unlock(&drbd_pp_lock);
+
+	wake_up(&drbd_pp_wait);
+}
+
 /*
 You need to hold the ee_lock:
  drbd_free_ee()
- drbd_get_ee()
- drbd_put_ee()
  _drbd_process_ee()
 
 You must not have the ee_lock:
@@ -155,80 +236,79 @@
  drbd_wait_ee()
 */
 
-STATIC int _drbd_alloc_ee(drbd_dev *mdev,struct page* page,int mask)
+struct Tl_epoch_entry* drbd_alloc_ee(drbd_dev *mdev, 
+				     unsigned int data_size,
+				     unsigned int gfp_mask)
 {
 	struct Tl_epoch_entry* e;
+	struct bio_vec *bvec;
+	struct page *page;
+	struct bio *bio;
+	unsigned int ds;
+	int i;
 
-	e = kmem_cache_alloc(drbd_ee_cache, mask);
-	if( e == NULL ) return FALSE;
+	e = kmem_cache_alloc(drbd_ee_cache, gfp_mask);
+	if (!e) return NULL;
 
-	drbd_ee_init(e,page);
-	spin_lock_irq(&mdev->ee_lock);
-	list_add(&e->w.list,&mdev->free_ee);
-	mdev->ee_vacant++;
-	spin_unlock_irq(&mdev->ee_lock);
+	bio = bio_alloc(GFP_KERNEL, div_ceil(data_size,PAGE_SIZE));
+	if (!bio) goto fail1;
 
-	return TRUE;
-}
+	bio->bi_bdev = mdev->backing_bdev;
+	
+	ds = data_size;
+	while(ds) {
+		page = drbd_pp_alloc(mdev, gfp_mask);
+		if (!page) goto fail2;
+		bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0);
+		ds -= min_t(int, ds, PAGE_SIZE);
+	}
 
-/* bool */
-STATIC int drbd_alloc_ee(drbd_dev *mdev,int mask)
-{
-	struct page *page;
+	bio->bi_private = e;
+	e->mdev = mdev;
+	e->ee_size = bio->bi_size;
+	D_ASSERT( data_size == bio->bi_size);
+	e->private_bio = bio;
+	e->block_id = ID_VACANT;
+	INIT_HLIST_NODE(&e->colision);
 
-	page=alloc_page(mask);
-	if(!page) return FALSE;
-
-	if(!_drbd_alloc_ee(mdev,page,GFP_KERNEL)) {
-		__free_page(page);
-		return FALSE;
+	return e;
+ fail2:
+	__bio_for_each_segment(bvec, bio, i, 0) {
+		drbd_pp_free(mdev,bvec->bv_page);
 	}
-
-	return TRUE;
+	bio_put(bio);
+ fail1:
+	kmem_cache_free(drbd_ee_cache, e);
+	
+	return NULL;
 }
 
-STATIC struct page* drbd_free_ee(drbd_dev *mdev, struct list_head *list)
+void drbd_free_ee(drbd_dev *mdev, struct Tl_epoch_entry* e)
 {
-	struct list_head *le;
-	struct Tl_epoch_entry* e;
-	struct page* page;
+	struct bio *bio=e->private_bio;
+	struct bio_vec *bvec;
+	int i;
 
-	MUST_HOLD(&mdev->ee_lock);
+	__bio_for_each_segment(bvec, bio, i, 0) {
+		drbd_pp_free(mdev,bvec->bv_page);
+	}
 
-	D_ASSERT(!list_empty(list));
-	le = list->next;
-	e = list_entry(le, struct Tl_epoch_entry, w.list);
-	list_del(le);
+	bio_put(bio);
 
-	page = drbd_bio_get_page(&e->private_bio);
-
-	D_ASSERT(page == e->ee_bvec.bv_page);
-	page = e->ee_bvec.bv_page;
-
 	kmem_cache_free(drbd_ee_cache, e);
-	mdev->ee_vacant--;
-
-	return page;
 }
 
-int drbd_init_ee(drbd_dev *mdev)
-{
-	while(mdev->ee_vacant < EE_MININUM ) {
-		if(!drbd_alloc_ee(mdev,GFP_USER)) {
-			ERR("Failed to allocate %d EEs !\n",EE_MININUM);
-			return 0;
-		}
-	}
-	return 1;
-}
-
 int drbd_release_ee(drbd_dev *mdev,struct list_head* list)
 {
 	int count=0;
+	struct Tl_epoch_entry* e;
+	struct list_head *le;
 
 	spin_lock_irq(&mdev->ee_lock);
 	while(!list_empty(list)) {
-		__free_page(drbd_free_ee(mdev,list));
+		le = list->next;
+		e = list_entry(le, struct Tl_epoch_entry, w.list);
+		drbd_free_ee(mdev,e);
 		count++;
 	}
 	spin_unlock_irq(&mdev->ee_lock);
@@ -236,97 +316,6 @@
 	return count;
 }
 
-#define GFP_TRY	( __GFP_HIGHMEM | __GFP_NOWARN )
-
-STATIC int _drbd_process_ee(drbd_dev *mdev, int be_sleepy);
-
-/**
- * drbd_get_ee: Returns an Tl_epoch_entry; might sleep. Fails only if
- * a signal comes in.
- */
-struct Tl_epoch_entry* drbd_get_ee(drbd_dev *mdev)
-{
-	struct list_head *le;
-	struct Tl_epoch_entry* e;
-	DEFINE_WAIT(wait);
-
-	MUST_HOLD(&mdev->ee_lock);
-
-	if(mdev->ee_vacant == EE_MININUM / 2) {
-		spin_unlock_irq(&mdev->ee_lock);
-		drbd_kick_lo(mdev);
-		spin_lock_irq(&mdev->ee_lock);
-	}
-
-	if(list_empty(&mdev->free_ee)) _drbd_process_ee(mdev,1);
-
-	if(list_empty(&mdev->free_ee)) {
-		for (;;) {
-			prepare_to_wait(&mdev->ee_wait, &wait, 
-					TASK_INTERRUPTIBLE);
-			if(!list_empty(&mdev->free_ee)) break;
-			spin_unlock_irq(&mdev->ee_lock);
-			if( ( mdev->ee_vacant+mdev->ee_in_use) < 
-			      mdev->conf.max_buffers ) {
-				if(drbd_alloc_ee(mdev,GFP_TRY)) {
-					spin_lock_irq(&mdev->ee_lock);
-					break;
-				}
-			}
-			drbd_kick_lo(mdev);
-			schedule();
-			spin_lock_irq(&mdev->ee_lock);
-			finish_wait(&mdev->ee_wait, &wait);
-			if (signal_pending(current)) {
-				WARN("drbd_get_ee interrupted!\n");
-				return 0;
-			}
-			// finish wait is inside, so that we are TASK_RUNNING 
-			// in _drbd_process_ee (which might sleep by itself.)
-			_drbd_process_ee(mdev,1);
-		}
-		finish_wait(&mdev->ee_wait, &wait); 
-	}
-
-	le=mdev->free_ee.next;
-	list_del(le);
-	mdev->ee_vacant--;
-	mdev->ee_in_use++;
-	e=list_entry(le, struct Tl_epoch_entry, w.list);
-
-	D_ASSERT(e->private_bio.bi_idx == 0);
-	drbd_ee_init(e,e->ee_bvec.bv_page); // reinitialize
-
-	e->block_id = !ID_VACANT;
-	SET_MAGIC(e);
-	return e;
-}
-
-void drbd_put_ee(drbd_dev *mdev,struct Tl_epoch_entry *e)
-{
-	MUST_HOLD(&mdev->ee_lock);
-
-	D_ASSERT(page_count(drbd_bio_get_page(&e->private_bio)) == 1);
-
-	mdev->ee_in_use--;
-	mdev->ee_vacant++;
-	e->block_id = ID_VACANT;
-	INVALIDATE_MAGIC(e);
-	list_add_tail(&e->w.list,&mdev->free_ee);
-
-	if((mdev->ee_vacant * 2 > mdev->ee_in_use ) &&
-	   ( mdev->ee_vacant + mdev->ee_in_use > EE_MININUM) ) {
-		__free_page(drbd_free_ee(mdev,&mdev->free_ee));
-	}
-	if(mdev->ee_in_use == 0) {
-		while( mdev->ee_vacant > EE_MININUM ) {
-			__free_page(drbd_free_ee(mdev,&mdev->free_ee));
-		}
-	}
-
-	wake_up(&mdev->ee_wait);
-}
-
 STATIC void reclaim_net_ee(drbd_dev *mdev)
 {
 	struct Tl_epoch_entry *e;
@@ -339,9 +328,9 @@
 
 	list_for_each_safe(le, tle, &mdev->net_ee) {
 		e = list_entry(le, struct Tl_epoch_entry, w.list);
-		if( page_count(drbd_bio_get_page(&e->private_bio)) > 1 ) break;
+		if( drbd_bio_has_active_page(e->private_bio) ) break;
 		list_del(le);
-		drbd_put_ee(mdev,e);
+		drbd_free_ee(mdev,e);
 	}
 }
 
@@ -380,8 +369,8 @@
 		spin_unlock_irq(&mdev->ee_lock);
 		e = list_entry(le, struct Tl_epoch_entry, w.list);
 		ok = ok && e->w.cb(mdev,&e->w,0);
+		drbd_free_ee(mdev,e);
 		spin_lock_irq(&mdev->ee_lock);
-		drbd_put_ee(mdev,e);
 	}
 
 	clear_bit(PROCESS_EE_RUNNING,&mdev->flags);
@@ -417,7 +406,7 @@
 		   is_syncer_blk(mdev,e->block_id)) {
 			++n;
 		}
-		drbd_put_ee(mdev,e);
+		drbd_free_ee(mdev,e);
 	}
 
 	spin_unlock_irq(&mdev->ee_lock);
@@ -790,29 +779,29 @@
 read_in_block(drbd_dev *mdev, int data_size)
 {
 	struct Tl_epoch_entry *e;
+	struct bio_vec *bvec;
+	struct page *page;
 	struct bio *bio;
-	int rr;
+	int ds,i,rr;
 
-	spin_lock_irq(&mdev->ee_lock);
-	e=drbd_get_ee(mdev);
-	spin_unlock_irq(&mdev->ee_lock);
+	e = drbd_alloc_ee(mdev,data_size,GFP_KERNEL);
 	if(!e) return 0;
-
-	bio = &e->private_bio;
-
-	rr=drbd_recv(mdev, drbd_bio_kmap(bio), data_size);
-	drbd_bio_kunmap(bio);
-
-	if ( rr != data_size) {
-		spin_lock_irq(&mdev->ee_lock);
-		drbd_put_ee(mdev,e);
-		spin_unlock_irq(&mdev->ee_lock);
-		WARN("short read receiving data block: read %d expected %d\n",
-			rr, data_size);
-		return 0;
+	bio = e->private_bio;
+	ds = data_size;
+	bio_for_each_segment(bvec, bio, i) {
+		page = bvec->bv_page;
+		rr = drbd_recv(mdev,kmap(page),min_t(int,ds,PAGE_SIZE));
+		kunmap(page);
+		if( rr != min_t(int,ds,PAGE_SIZE) ) {
+			drbd_free_ee(mdev,e);
+			WARN("short read recev data: read %d expected %d\n",
+			     rr, min_t(int,ds,PAGE_SIZE));
+			return 0;
+		}
+		ds -= rr;
 	}
-	mdev->recv_cnt+=data_size>>9;
 
+	mdev->recv_cnt+=data_size>>9;
 	return e;
 }
 
@@ -834,20 +823,29 @@
 STATIC int recv_dless_read(drbd_dev *mdev, drbd_request_t *req,
 			   sector_t sector, int data_size)
 {
+	struct bio_vec *bvec;
 	struct bio *bio;
-	int ok,rr;
+	int rr,i,expect,ok=1;
 
 	bio = req->master_bio;
-
 	D_ASSERT( sector == drbd_req_get_sector(req) );
+	
+	bio_for_each_segment(bvec, bio, i) {
+		expect = min_t(int,data_size,bvec->bv_len);
+		rr=drbd_recv(mdev,
+			     kmap(bvec->bv_page)+bvec->bv_offset,
+			     expect);	
+		kunmap(bvec->bv_page);
+		if (rr != expect) {
+			ok = 0;
+			break;
+		}
+		data_size -= rr;
+	}
 
-	rr=drbd_recv(mdev,drbd_bio_kmap(bio),data_size);
-	drbd_bio_kunmap(bio);
-
-	ok=(rr==data_size);
+	D_ASSERT(data_size == 0 || !ok);
 	drbd_bio_endio(bio,ok);
 	dec_ap_bio(mdev);
-
 	dec_ap_pending(mdev);
 	return ok;
 }
@@ -859,7 +857,7 @@
 	int ok;
 
 	drbd_rs_complete_io(mdev,sector); // before set_in_sync() !
-	if (likely( drbd_bio_uptodate(&e->private_bio) )) {
+	if (likely( drbd_bio_uptodate(e->private_bio) )) {
 		ok = mdev->state.s.disk >= Inconsistent &&
 			mdev->state.s.pdsk >= Inconsistent;
 		if (likely( ok )) {
@@ -895,13 +893,11 @@
 		if (DRBD_ratelimit(5*HZ,5))
 			ERR("Can not write resync data to local disk.\n");
 		drbd_send_ack(mdev,NegAck,e);
-		spin_lock_irq(&mdev->ee_lock);
-		drbd_put_ee(mdev,e);
-		spin_unlock_irq(&mdev->ee_lock);
+		drbd_free_ee(mdev,e);
 		return TRUE;
 	}
 
-	drbd_ee_prepare_write(mdev,e,sector,data_size);
+	drbd_ee_prepare_write(mdev,e,sector);
 	e->w.cb     = e_end_resync_block;
 
 	spin_lock_irq(&mdev->ee_lock);
@@ -910,7 +906,7 @@
 
 	inc_unacked(mdev);
 
-	drbd_generic_make_request(WRITE,&e->private_bio);
+	drbd_generic_make_request(WRITE,e->private_bio);
 
 	receive_data_tail(mdev,data_size);
 	return TRUE;
@@ -990,7 +986,7 @@
 
 	atomic_inc(&mdev->epoch_size);
 	if(mdev->conf.wire_protocol == DRBD_PROT_C) {
-		if(likely(drbd_bio_uptodate(&e->private_bio))) {
+		if(likely(drbd_bio_uptodate(e->private_bio))) {
 			ok=drbd_send_ack(mdev,WriteAck,e);
 			if (ok && test_bit(SYNC_STARTED,&mdev->flags) )
 				drbd_set_in_sync(mdev,sector,drbd_ee_get_size(e));
@@ -1006,7 +1002,7 @@
 		return ok;
 	}
 
-	if(unlikely(!drbd_bio_uptodate(&e->private_bio))) {
+	if(unlikely(!drbd_bio_uptodate(e->private_bio))) {
 		ok = drbd_io_error(mdev);
 	}
 
@@ -1050,12 +1046,10 @@
 	header_size = sizeof(*p) - sizeof(*h);
 	data_size   = h->length  - header_size;
 
-	/* I expect a block to be a multiple of 512 byte, and
-	 * no more than 4K (PAGE_SIZE). is this too restrictive?
-	 */
+	if( data_size > 4096 ) INFO("data_size=%d\n",data_size);
 	ERR_IF(data_size == 0) return FALSE;
 	ERR_IF(data_size &  0x1ff) return FALSE;
-	ERR_IF(data_size >  PAGE_SIZE) return FALSE;
+	ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return FALSE;
 
 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
 		return FALSE;
@@ -1073,7 +1067,7 @@
 	}
 
 	e->block_id = p->block_id; // no meaning on this side, e* on partner
-	drbd_ee_prepare_write(mdev, e, sector, data_size);
+	drbd_ee_prepare_write(mdev, e, sector);
 	e->w.cb     = e_end_block;
 
 	/* This wait_event is here to make sure that never ever an
@@ -1159,7 +1153,7 @@
 		break;
 	}
 
-	drbd_generic_make_request(WRITE,&e->private_bio);
+	drbd_generic_make_request(WRITE,e->private_bio);
 
 	receive_data_tail(mdev,data_size);
 	return TRUE;
@@ -1168,9 +1162,7 @@
 	atomic_inc(&mdev->epoch_size);
 	dec_local(mdev);
  out1:
-	spin_lock_irq(&mdev->ee_lock);
-	drbd_put_ee(mdev,e);
-	spin_unlock_irq(&mdev->ee_lock);
+	drbd_free_ee(mdev,e);
 	return rv;
 }
 
@@ -1198,7 +1190,7 @@
 		return FALSE;
 	*/
 
-	if (size <= 0 || (size & 0x1ff) != 0 || size > PAGE_SIZE) {
+	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
 		ERR("%s:%d: sector: %lu, size: %d\n", __FILE__, __LINE__,
 				(unsigned long)sector,size);
 		return FALSE;
@@ -1209,13 +1201,11 @@
 		return FALSE;
 	}
 
-	spin_lock_irq(&mdev->ee_lock);
-	e=drbd_get_ee(mdev);
-	if(!e) {
-		spin_unlock_irq(&mdev->ee_lock);
-		return FALSE;
-	}
+	e = drbd_alloc_ee(mdev,size,GFP_KERNEL);
+	if (!e) return FALSE;
+
 	e->block_id = p->block_id; // no meaning on this side, pr* on partner
+	spin_lock_irq(&mdev->ee_lock);
 	list_add(&e->w.list,&mdev->read_ee);
 	spin_unlock_irq(&mdev->ee_lock);
 
@@ -1223,13 +1213,11 @@
 		if (DRBD_ratelimit(5*HZ,5))
 			ERR("Can not satisfy peer's read request, no local data.\n");
 		drbd_send_ack(mdev,NegDReply,e);
-		spin_lock_irq(&mdev->ee_lock);
-		drbd_put_ee(mdev,e);
-		spin_unlock_irq(&mdev->ee_lock);
+		drbd_free_ee(mdev,e);
 		return TRUE;
 	}
 
-	drbd_ee_prepare_read(mdev,e,sector,size);
+	drbd_ee_prepare_read(mdev,e,sector);
 
 	switch (h->command) {
 	case DataRequest:
@@ -1245,7 +1233,7 @@
 		if (!drbd_rs_begin_io(mdev,sector)) {
 			// we have been interrupted, probably connection lost!
 			D_ASSERT(signal_pending(current));
-			drbd_put_ee(mdev,e);
+			drbd_free_ee(mdev,e);
 			return 0;
 		}
 		break;
@@ -1256,7 +1244,7 @@
 
 	mdev->read_cnt += size >> 9;
 	inc_unacked(mdev);
-	drbd_generic_make_request(READ,&e->private_bio);
+	drbd_generic_make_request(READ,e->private_bio);
 	if (atomic_read(&mdev->local_cnt) >= (mdev->conf.max_epoch_size>>4) ) {
 		drbd_kick_lo(mdev);
 	}
@@ -1837,7 +1825,7 @@
 	wait_event( mdev->cstate_wait, atomic_read(&mdev->ap_pending_cnt)==0 );
 	D_ASSERT(mdev->oldest_barrier->n_req == 0);
 
-	D_ASSERT(mdev->ee_in_use == 0);
+	D_ASSERT(atomic_read(&mdev->pp_in_use) == 0);
 	D_ASSERT(list_empty(&mdev->read_ee)); // done by termination of worker
 	D_ASSERT(list_empty(&mdev->active_ee)); // done here
 	D_ASSERT(list_empty(&mdev->sync_ee)); // done here

Modified: trunk/drbd/drbd_req.c
===================================================================
--- trunk/drbd/drbd_req.c	2005-03-11 12:51:42 UTC (rev 1767)
+++ trunk/drbd/drbd_req.c	2005-03-15 20:51:32 UTC (rev 1768)
@@ -356,7 +356,8 @@
 		else            mdev->read_cnt += size>>9;
 
 		// in 2.4.X, READA are submitted as READ.
-		drbd_generic_make_request(rw,drbd_req_private_bio(req));
+		req->private_bio->bi_rw = rw;
+		generic_make_request(req->private_bio);
 	}
 
 	// up_read(mdev->device_lock);
@@ -377,8 +378,7 @@
 	 */
 	D_ASSERT(bio->bi_size > 0);
 	D_ASSERT( (bio->bi_size & 0x1ff) == 0);
-	D_ASSERT(bio->bi_size <= PAGE_SIZE);
-	D_ASSERT(bio->bi_vcnt == 1);
+	D_ASSERT(bio->bi_size <= DRBD_MAX_SEGMENT_SIZE);
 	D_ASSERT(bio->bi_idx == 0);
 
 	s_enr = bio->bi_sector >> (AL_EXTENT_SIZE_B-9);

Modified: trunk/drbd/drbd_worker.c
===================================================================
--- trunk/drbd/drbd_worker.c	2005-03-11 12:51:42 UTC (rev 1767)
+++ trunk/drbd/drbd_worker.c	2005-03-15 20:51:32 UTC (rev 1768)
@@ -54,8 +54,7 @@
  */
 int drbd_md_io_complete(struct bio *bio, unsigned int bytes_done, int error)
 {
-	if (bio->bi_size)
-		return 1;
+	if (bio->bi_size) return 1;
 
 	complete((struct completion*)bio->bi_private);
 	return 0;
@@ -70,16 +69,15 @@
 	struct Tl_epoch_entry *e=NULL;
 	struct Drbd_Conf* mdev;
 
-	mdev=bio->bi_private;
-	PARANOIA_BUG_ON(!IS_VALID_MDEV(mdev));
+	e = bio->bi_private;
+	mdev = e->mdev;
 
-	/* we should be called via bio_endio, so this should never be the case
-	 * but "everyone else does it", and so do we ;)		-lge
+	/* We are called each time a part of the bio is finished, but
+	 * we are only interested when the whole bio is finished, therefore
+	 * return as long as bio->bio_size is positive.
 	 */
-	ERR_IF (bio->bi_size)
-		return 1;
+	if (bio->bi_size) return 1;
 
-	e = container_of(bio,struct Tl_epoch_entry,private_bio);
 	PARANOIA_BUG_ON(!VALID_POINTER(e));
 	D_ASSERT(e->block_id != ID_VACANT);
 
@@ -103,14 +101,12 @@
 	struct Tl_epoch_entry *e=NULL;
 	struct Drbd_Conf* mdev;
 
-	mdev=bio->bi_private;
-	PARANOIA_BUG_ON(!IS_VALID_MDEV(mdev));
+	e = bio->bi_private;
+	mdev = e->mdev;
 
 	// see above
-	ERR_IF (bio->bi_size)
-		return 1;
+	if (bio->bi_size) return 1;
 
-	e = container_of(bio,struct Tl_epoch_entry,private_bio);
 	PARANOIA_BUG_ON(!VALID_POINTER(e));
 	D_ASSERT(e->block_id != ID_VACANT);
 
@@ -146,8 +142,7 @@
 	sector_t rsector;
 
 	// see above
-	ERR_IF (bio->bi_size)
-		return 1;
+	if (bio->bi_size) return 1;
 
 	drbd_chk_io_error(mdev,error);
 	rsector = drbd_req_get_sector(req);
@@ -166,9 +161,7 @@
 	drbd_request_t *req=bio->bi_private;
 	struct Drbd_Conf* mdev=req->mdev;
 
-	// see above
-	ERR_IF (bio->bi_size)
-		return 1;
+	if (bio->bi_size) return 1;
 
 	/* READAs may fail.
 	 * upper layers need to be able to handle that themselves */
@@ -413,14 +406,12 @@
 	int ok;
 
 	if(unlikely(cancel)) {
-		spin_lock_irq(&mdev->ee_lock);
-		drbd_put_ee(mdev,e);
-		spin_unlock_irq(&mdev->ee_lock);
+		drbd_free_ee(mdev,e);
 		dec_unacked(mdev);
 		return 1;
 	}
 
-	if(likely(drbd_bio_uptodate(&e->private_bio))) {
+	if(likely(drbd_bio_uptodate(e->private_bio))) {
 		ok=drbd_send_block(mdev, DataReply, e);
 	} else {
 		ok=drbd_send_ack(mdev,NegDReply,e);
@@ -432,11 +423,11 @@
 	dec_unacked(mdev);
 
 	spin_lock_irq(&mdev->ee_lock);
-	if( page_count(drbd_bio_get_page(&e->private_bio)) > 1 ) {
+	if( drbd_bio_has_active_page(e->private_bio) ) {
 		/* This might happen if sendpage() has not finished */
 		list_add_tail(&e->w.list,&mdev->net_ee);
 	} else {
-		drbd_put_ee(mdev,e);
+		drbd_free_ee(mdev,e);
 	}
 	spin_unlock_irq(&mdev->ee_lock);
 
@@ -450,16 +441,14 @@
 	int ok;
 
 	if(unlikely(cancel)) {
-		spin_lock_irq(&mdev->ee_lock);
-		drbd_put_ee(mdev,e);
-		spin_unlock_irq(&mdev->ee_lock);
+		drbd_free_ee(mdev,e);
 		dec_unacked(mdev);
 		return 1;
 	}
 
 	drbd_rs_complete_io(mdev,drbd_ee_get_sector(e));
 
-	if(likely(drbd_bio_uptodate(&e->private_bio))) {
+	if(likely(drbd_bio_uptodate(e->private_bio))) {
 		if (likely( mdev->state.s.pdsk >= Inconsistent )) {
 			inc_rs_pending(mdev);
 			ok=drbd_send_block(mdev, RSDataReply, e);
@@ -478,11 +467,11 @@
 	dec_unacked(mdev);
 
 	spin_lock_irq(&mdev->ee_lock);
-	if( page_count(drbd_bio_get_page(&e->private_bio)) > 1 ) {
+	if( drbd_bio_has_active_page(e->private_bio) ) {
 		/* This might happen if sendpage() has not finished */
 		list_add_tail(&e->w.list,&mdev->net_ee);
 	} else {
-		drbd_put_ee(mdev,e);
+		drbd_free_ee(mdev,e);
 	}
 	spin_unlock_irq(&mdev->ee_lock);
 



More information about the drbd-cvs mailing list