[DRBD-cvs] r1768 - in trunk: . drbd
www-data
www-data at garcon.linbit.com
Tue Mar 15 21:51:34 CET 2005
Author: phil
Date: 2005-03-15 21:51:32 +0100 (Tue, 15 Mar 2005)
New Revision: 1768
Modified:
trunk/ROADMAP
trunk/drbd/drbd_compat_wrappers.h
trunk/drbd/drbd_fs.c
trunk/drbd/drbd_int.h
trunk/drbd/drbd_main.c
trunk/drbd/drbd_receiver.c
trunk/drbd/drbd_req.c
trunk/drbd/drbd_worker.c
Log:
Implemented item 15 of the ROADMAP !
This means that DRBD now accepts BIOs with up to 32kb of data
= 8 pages on x86. -> We now make real use of Linux-2.6's
IO subsystem.
Currently there is only one open issue:
We hit a kernel bug in bio_split()
But apart from that it really works great, and makes some
things even simpler!
Oh, and by the way, this also finishes item 11 of the ROADMAP!
Modified: trunk/ROADMAP
===================================================================
--- trunk/ROADMAP 2005-03-11 12:51:42 UTC (rev 1767)
+++ trunk/ROADMAP 2005-03-15 20:51:32 UTC (rev 1768)
@@ -354,7 +354,7 @@
Probably a general high performance implementation for this
issue is not necessary, since clusters of machines with
different PAGE_SIZE are of academic interest only.
- 0% DONE
+ 100% DONE by item 15
12 Introduce a "common" section in the config file. Option
section (like handlers, startup, disk, net and syncer)
@@ -376,10 +376,10 @@
/dev/mapper/control
0% DONE
-15 Accept BIOs bigger than one page, probabely up to 64k (16 pages)
- would be a good choce. When this is done make the bits in the
- bitmap to account for more then 4k e.g. 64k
- 0% DONE
+15 Accept BIOs bigger than one page, probabely up to 32k (8 pages)
+ currently. When this is done make the bits in the bitmap to account
+ for more then 4k e.g. 64k
+ 50% DONE we handle big BIOs now, a bitmap bit is still 4k.
[ Item 16 is still unfinished, as described here, the algorithm has
some loose ends... ]
Modified: trunk/drbd/drbd_compat_wrappers.h
===================================================================
--- trunk/drbd/drbd_compat_wrappers.h 2005-03-11 12:51:42 UTC (rev 1767)
+++ trunk/drbd/drbd_compat_wrappers.h 2005-03-15 20:51:32 UTC (rev 1768)
@@ -146,24 +146,6 @@
}
#endif
-static inline void drbd_ee_init(struct Tl_epoch_entry *e,struct page *page)
-{
- struct bio * const bio = &e->private_bio;
- struct bio_vec * const vec = &e->ee_bvec;
-
- memset(e, 0, sizeof(*e));
- bio_init(bio);
-
- bio->bi_io_vec = vec;
- bio->bi_destructor = NULL;
- vec->bv_page = page;
- bio->bi_size = vec->bv_len = PAGE_SIZE;
- bio->bi_max_vecs = bio->bi_vcnt = 1;
- vec->bv_offset = 0;
-
- e->block_id = ID_VACANT;
-}
-
static inline void drbd_bio_set_pages_dirty(struct bio *bio)
{
bio_set_pages_dirty(bio);
@@ -175,44 +157,19 @@
}
static inline void
-drbd_ee_bio_prepare(drbd_dev *mdev, struct Tl_epoch_entry* e,
- sector_t sector, int size)
-{
- struct bio * const bio = &e->private_bio;
- struct bio_vec * const vec = &e->ee_bvec;
- struct page * const page = vec->bv_page;
- D_ASSERT(mdev->backing_bdev);
-
- /* Clear plate. */
- bio_init(bio);
-
- bio->bi_io_vec = vec;
- bio->bi_destructor = NULL;
- vec->bv_page = page;
- vec->bv_offset = 0;
- bio->bi_max_vecs = bio->bi_vcnt = 1;
-
- bio->bi_bdev = mdev->backing_bdev;
- bio->bi_private = mdev;
-
- e->ee_sector = bio->bi_sector = sector;
- e->ee_size = bio->bi_size = bio->bi_io_vec->bv_len = size;
-}
-
-static inline void
drbd_ee_prepare_write(drbd_dev *mdev, struct Tl_epoch_entry* e,
- sector_t sector, int size)
+ sector_t sector)
{
- drbd_ee_bio_prepare(mdev,e,sector,size);
- e->private_bio.bi_end_io = drbd_dio_end_sec;
+ e->ee_sector = e->private_bio->bi_sector = sector;
+ e->private_bio->bi_end_io = drbd_dio_end_sec;
}
static inline void
drbd_ee_prepare_read(drbd_dev *mdev, struct Tl_epoch_entry* e,
- sector_t sector, int size)
+ sector_t sector)
{
- drbd_ee_bio_prepare(mdev,e,sector,size);
- e->private_bio.bi_end_io = enslaved_read_bi_end_io;
+ e->ee_sector = e->private_bio->bi_sector = sector;
+ e->private_bio->bi_end_io = enslaved_read_bi_end_io;
}
static inline void
@@ -245,10 +202,16 @@
req->mdev = mdev;
}
-static inline struct page* drbd_bio_get_page(struct bio *bio)
+static inline int drbd_bio_has_active_page(struct bio *bio)
{
- struct bio_vec *bvec = bio_iovec(bio);
- return bvec->bv_page;
+ struct bio_vec *bvec;
+ int i;
+
+ __bio_for_each_segment(bvec, bio, i, 0) {
+ if (page_count(bvec->bv_page) > 1) return 1;
+ }
+
+ return 0;
}
/*
@@ -306,12 +269,6 @@
spin_unlock_irq(q->queue_lock);
}
-static inline int _drbd_send_zc_bio(drbd_dev *mdev, struct bio *bio)
-{
- struct bio_vec *bvec = bio_iovec_idx(bio, bio->bi_idx);
- return _drbd_send_page(mdev,bvec->bv_page,bvec->bv_offset,bvec->bv_len);
-}
-
static inline int _drbd_send_bio(drbd_dev *mdev, struct bio *bio)
{
struct bio_vec *bvec = bio_iovec(bio);
Modified: trunk/drbd/drbd_fs.c
===================================================================
--- trunk/drbd/drbd_fs.c 2005-03-11 12:51:42 UTC (rev 1767)
+++ trunk/drbd/drbd_fs.c 2005-03-15 20:51:32 UTC (rev 1768)
@@ -375,10 +375,10 @@
request_queue_t * const q = mdev->rq_queue;
request_queue_t * const b = bdev->bd_disk->queue;
- q->max_sectors = min_not_zero((unsigned short)(PAGE_SIZE >> 9), b->max_sectors);
- q->max_phys_segments = 1;
- q->max_hw_segments = 1;
- q->max_segment_size = min((unsigned)PAGE_SIZE,b->max_segment_size);
+ q->max_sectors = min_not_zero((unsigned short)(DRBD_MAX_SEGMENT_SIZE >> 9), b->max_sectors);
+ q->max_phys_segments = DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE;
+ q->max_hw_segments = DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE;
+ q->max_segment_size = min_t(int,DRBD_MAX_SEGMENT_SIZE,b->max_segment_size);
q->hardsect_size = max((unsigned short)512,b->hardsect_size);
q->seg_boundary_mask = PAGE_SIZE-1;
D_ASSERT(q->hardsect_size <= PAGE_SIZE); // or we are really screwed ;-)
Modified: trunk/drbd/drbd_int.h
===================================================================
--- trunk/drbd/drbd_int.h 2005-03-11 12:51:42 UTC (rev 1767)
+++ trunk/drbd/drbd_int.h 2005-03-15 20:51:32 UTC (rev 1768)
@@ -611,14 +611,13 @@
*/
struct Tl_epoch_entry {
struct drbd_work w;
- struct bio private_bio; // private bio struct, NOT a pointer
+ struct bio *private_bio;
u64 block_id;
long magic;
unsigned int ee_size;
sector_t ee_sector;
struct hlist_node colision;
- // THINK: maybe we rather want bio_alloc(GFP_*,1)
- struct bio_vec ee_bvec;
+ drbd_dev *mdev;
};
/* flag bits */
@@ -746,7 +745,6 @@
u32 *p_gen_cnt;
atomic_t epoch_size;
spinlock_t ee_lock;
- struct list_head free_ee; // available
struct list_head active_ee; // IO in progress
struct list_head sync_ee; // IO in progress
struct list_head done_ee; // send ack
@@ -757,8 +755,7 @@
spinlock_t pr_lock;
struct list_head app_reads;
struct list_head resync_reads;
- int ee_vacant;
- int ee_in_use;
+ atomic_t pp_in_use;
wait_queue_head_t ee_wait;
struct list_head busy_blocks;
struct page *md_io_page; // one page buffer for md_io
@@ -950,9 +947,10 @@
#endif
/* Sector shift value for hash functions for tl_hash table and ee_hash
- table. A value of 3 makes all IOs in on 4K block to make to the same
+ table. A value of 6 makes all IOs in on 32K block to make to the same
slot of the hash table. */
-#define HT_SHIFT 3
+#define HT_SHIFT 6
+#define DRBD_MAX_SEGMENT_SIZE (1<<(9+HT_SHIFT))
extern int drbd_bm_init (drbd_dev *mdev);
extern int drbd_bm_resize (drbd_dev *mdev, sector_t sectors);
@@ -997,6 +995,11 @@
extern kmem_cache_t *drbd_ee_cache;
extern mempool_t *drbd_request_mempool;
+extern struct page* drbd_pp_pool; // drbd's page pool
+extern spinlock_t drbd_pp_lock;
+extern int drbd_pp_vacant;
+extern wait_queue_head_t drbd_pp_wait;
+
// drbd_req
#define ERF_NOTLD 2 /* do not call tl_dependence */
extern void drbd_end_req(drbd_request_t *, int, int, sector_t);
@@ -1034,9 +1037,10 @@
// drbd_receiver.c
extern int drbd_release_ee(drbd_dev* mdev,struct list_head* list);
-extern int drbd_init_ee(drbd_dev* mdev);
-extern void drbd_put_ee(drbd_dev* mdev,struct Tl_epoch_entry *e);
-extern struct Tl_epoch_entry* drbd_get_ee(drbd_dev* mdev);
+extern struct Tl_epoch_entry* drbd_alloc_ee(drbd_dev *mdev,
+ unsigned int data_size,
+ unsigned int gfp_mask);
+extern void drbd_free_ee(drbd_dev *mdev, struct Tl_epoch_entry* e);
extern void drbd_wait_ee(drbd_dev *mdev,struct list_head *head);
// drbd_proc.c
Modified: trunk/drbd/drbd_main.c
===================================================================
--- trunk/drbd/drbd_main.c 2005-03-11 12:51:42 UTC (rev 1767)
+++ trunk/drbd/drbd_main.c 2005-03-15 20:51:32 UTC (rev 1768)
@@ -77,9 +77,8 @@
MODULE_AUTHOR("Philipp Reisner <phil at linbit.com>, Lars Ellenberg <lars at linbit.com>");
MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
MODULE_LICENSE("GPL");
-MODULE_PARM_DESC(use_nbd_major, "DEPRECATED! use nbd device major nr (43) "
- "instead of the default " __stringify(LANANA_DRBD_MAJOR) );
MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
+MODULE_ALIAS_BLOCKDEV_MAJOR(LANANA_DRBD_MAJOR);
#include <linux/moduleparam.h>
/*
@@ -93,13 +92,11 @@
*/
/* thanks to these macros, if compiled into the kernel (not-module),
- * these become boot parameters drbd.use_nbd_major and drbd.minor_count
+ * this becomes the boot parameter drbd.minor_count
*/
-module_param(use_nbd_major, bool,0);
module_param(minor_count, int,0);
// module parameter, defined
-int use_nbd_major = 0;
int major_nr = LANANA_DRBD_MAJOR;
#ifdef MODULE
int minor_count = 2;
@@ -121,7 +118,20 @@
kmem_cache_t *drbd_request_cache;
kmem_cache_t *drbd_ee_cache;
mempool_t *drbd_request_mempool;
+mempool_t *drbd_ee_mempool;
+/* I do not use a standard mempool, because:
+ 1) I want to hand out the preallocated objects first.
+ 2) I want to be able to interrupt sleeping allocation with a signal.
+ Note: This is a single linked list, the next pointer is the private
+ member of struct page.
+ */
+struct page* drbd_pp_pool;
+spinlock_t drbd_pp_lock;
+int drbd_pp_vacant;
+wait_queue_head_t drbd_pp_wait;
+
+
STATIC struct block_device_operations drbd_ops = {
.owner = THIS_MODULE,
.open = drbd_open,
@@ -1177,33 +1187,9 @@
return ret;
}
-#ifdef DRBD_DISABLE_SENDPAGE
int _drbd_send_page(drbd_dev *mdev, struct page *page,
int offset, size_t size)
{
- int sent,ok;
- int len = size;
-
- spin_lock(&mdev->send_task_lock);
- mdev->send_task=current;
- spin_unlock(&mdev->send_task_lock);
-
- sent = _drbd_no_send_page(mdev, page, offset, size);
- if (likely(sent > 0)) len -= sent;
-
- spin_lock(&mdev->send_task_lock);
- mdev->send_task=NULL;
- spin_unlock(&mdev->send_task_lock);
-
- ok = (len == 0);
- if (likely(ok))
- mdev->send_cnt += size>>9;
- return ok;
-}
-#else
-int _drbd_send_page(drbd_dev *mdev, struct page *page,
- int offset, size_t size)
-{
mm_segment_t oldfs = get_fs();
int sent,ok;
int len = size;
@@ -1281,8 +1267,22 @@
mdev->send_cnt += size>>9;
return ok;
}
-#endif
+STATIC int _drbd_send_zc_bio(drbd_dev *mdev, struct bio *bio)
+{
+ struct bio_vec *bvec;
+ int i;
+
+ bio_for_each_segment(bvec, bio, i) {
+ if (! _drbd_send_page(mdev, bvec->bv_page, bvec->bv_offset,
+ bvec->bv_len) ) {
+ return 0;
+ }
+ }
+
+ return 1;
+}
+
// Used to send write requests: bh->b_rsector !!
int drbd_send_dblock(drbd_dev *mdev, drbd_request_t *req)
{
@@ -1399,7 +1399,7 @@
dump_packet(mdev,mdev->data.socket,0,(void*)&p, __FILE__, __LINE__);
ok = sizeof(p) == drbd_send(mdev,mdev->data.socket,&p,sizeof(p),MSG_MORE);
- if (ok) ok = _drbd_send_zc_bio(mdev,&e->private_bio);
+ if (ok) ok = _drbd_send_zc_bio(mdev,e->private_bio);
spin_lock(&mdev->send_task_lock);
mdev->send_task=NULL;
@@ -1620,6 +1620,7 @@
atomic_set(&mdev->local_cnt,0);
atomic_set(&mdev->resync_locked,0);
atomic_set(&mdev->packet_seq,0);
+ atomic_set(&mdev->pp_in_use, 0);
init_MUTEX(&mdev->md_io_mutex);
init_MUTEX(&mdev->data.mutex);
@@ -1635,7 +1636,6 @@
spin_lock_init(&mdev->send_task_lock);
spin_lock_init(&mdev->peer_seq_lock);
- INIT_LIST_HEAD(&mdev->free_ee);
INIT_LIST_HEAD(&mdev->active_ee);
INIT_LIST_HEAD(&mdev->sync_ee);
INIT_LIST_HEAD(&mdev->done_ee);
@@ -1714,11 +1714,8 @@
drbd_thread_stop(&mdev->worker);
- if ( mdev->ee_in_use != 0
- || mdev->ee_vacant != 32 /* EE_MININUM */
- || atomic_read(&mdev->epoch_size) != 0)
- ERR("ee_in_use:%d ee_vacant:%d epoch_size:%d\n",
- mdev->ee_in_use, mdev->ee_vacant, atomic_read(&mdev->epoch_size));
+ if ( atomic_read(&mdev->epoch_size) != 0)
+ ERR("epoch_size:%d\n",atomic_read(&mdev->epoch_size));
#define ZAP(x) memset(&x,0,sizeof(x))
ZAP(mdev->conf);
ZAP(mdev->sync_conf);
@@ -1772,6 +1769,19 @@
void drbd_destroy_mempools(void)
{
+ struct page *page;
+
+ while(drbd_pp_pool) {
+ page = drbd_pp_pool;
+ drbd_pp_pool = (struct page*)page->private;
+ __free_page(page);
+ drbd_pp_vacant--;
+ }
+
+ /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
+
+ if (drbd_ee_mempool)
+ mempool_destroy(drbd_ee_mempool);
if (drbd_request_mempool)
mempool_destroy(drbd_request_mempool);
if (drbd_ee_cache && kmem_cache_destroy(drbd_ee_cache))
@@ -1782,6 +1792,7 @@
": kmem_cache_destroy(drbd_request_cache) FAILED\n");
// FIXME what can we do if we fail to destroy them?
+ drbd_ee_mempool = NULL;
drbd_request_mempool = NULL;
drbd_ee_cache = NULL;
drbd_request_cache = NULL;
@@ -1791,32 +1802,53 @@
int drbd_create_mempools(void)
{
+ struct page *page;
+ const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
+ int i;
+
// prepare our caches and mempools
drbd_request_mempool = NULL;
drbd_ee_cache = NULL;
drbd_request_cache = NULL;
+ drbd_pp_pool = NULL;
// caches
drbd_request_cache = kmem_cache_create(
"drbd_req_cache", sizeof(drbd_request_t),
- 0, SLAB_NO_REAP, NULL, NULL);
+ 0, 0, NULL, NULL);
if (drbd_request_cache == NULL)
goto Enomem;
drbd_ee_cache = kmem_cache_create(
"drbd_ee_cache", sizeof(struct Tl_epoch_entry),
- 0, SLAB_NO_REAP, NULL, NULL);
+ 0, 0, NULL, NULL);
if (drbd_ee_cache == NULL)
goto Enomem;
// mempools
- drbd_request_mempool = mempool_create(16, //TODO; reasonable value
+ drbd_request_mempool = mempool_create( number,
mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
if (drbd_request_mempool == NULL)
goto Enomem;
- return 0;
+ drbd_ee_mempool = mempool_create( number,
+ mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
+ if (drbd_request_mempool == NULL)
+ goto Enomem;
+ // drbd's page pool
+ spin_lock_init(&drbd_pp_lock);
+
+ for (i=0;i< number;i++) {
+ page = alloc_page(GFP_KERNEL);
+ if(!page) goto Enomem;
+ page->private = (unsigned long)drbd_pp_pool;
+ drbd_pp_pool = page;
+ }
+ drbd_pp_vacant = number;
+
+ return 0;
+
Enomem:
drbd_destroy_mempools(); // in case we allocated some
return -ENOMEM;
@@ -1868,12 +1900,6 @@
if (mdev->bitmap) drbd_bm_cleanup(mdev);
if (mdev->resync) lc_free(mdev->resync);
- D_ASSERT(mdev->ee_in_use==0);
-
- rr = drbd_release_ee(mdev,&mdev->free_ee);
- // INFO("%d EEs in free list found.\n",rr);
- // D_ASSERT(rr == 32);
-
rr = drbd_release_ee(mdev,&mdev->active_ee);
if(rr) ERR("%d EEs in active list found!\n",rr);
@@ -1895,7 +1921,6 @@
DUMPP(lp);
}
};
- D_ASSERT(mdev->ee_vacant == 0);
if (mdev->md_io_page)
__free_page(mdev->md_io_page);
@@ -1984,10 +2009,6 @@
return -EINVAL;
}
- if (use_nbd_major) {
- major_nr = NBD_MAJOR;
- }
-
if (1 > minor_count||minor_count > 255) {
printk(KERN_ERR DEVICE_NAME
": invalid minor_count (%d)\n",minor_count);
@@ -2013,6 +2034,8 @@
*/
err = -ENOMEM;
+ init_waitqueue_head(&drbd_pp_wait);
+
drbd_proc = NULL; // play safe for drbd_cleanup
drbd_conf = kmalloc(sizeof(drbd_dev)*minor_count,GFP_KERNEL);
if (likely(drbd_conf!=NULL))
@@ -2082,7 +2105,6 @@
init_MUTEX(&mdev->device_mutex);
if (!tl_init(mdev)) goto Enomem;
- if (!drbd_init_ee(mdev)) goto Enomem;
}
#if CONFIG_PROC_FS
@@ -2124,9 +2146,6 @@
"Version: " REL_VERSION " (api:%d/proto:%d)\n",
API_VERSION,PRO_VERSION);
printk(KERN_INFO DEVICE_NAME ": %s\n", drbd_buildtag());
- if (use_nbd_major) {
- printk(KERN_INFO DEVICE_NAME": hijacking NBD device major!\n");
- }
printk(KERN_INFO DEVICE_NAME": registered as block device major %d\n", MAJOR_NR);
return 0; // Success!
Modified: trunk/drbd/drbd_receiver.c
===================================================================
--- trunk/drbd/drbd_receiver.c 2005-03-11 12:51:42 UTC (rev 1767)
+++ trunk/drbd/drbd_receiver.c 2005-03-15 20:51:32 UTC (rev 1768)
@@ -51,8 +51,6 @@
#include <linux/drbd.h>
#include "drbd_int.h"
-#define EE_MININUM 32 // @4k pages => 128 KByte
-
#define is_syncer_blk(A,B) ((B)==ID_SYNCER)
#ifdef __arch_um__
@@ -137,11 +135,94 @@
}
#endif //PARANOIA
+#define GFP_TRY ( __GFP_HIGHMEM | __GFP_NOWARN )
+
+STATIC int drbd_process_ee(drbd_dev *mdev, int be_sleepy);
+
+/**
+ * drbd_bp_alloc: Returns a page. Fails only if a signal comes in.
+ */
+STATIC struct page * drbd_pp_alloc(drbd_dev *mdev, unsigned int gfp_mask)
+{
+ struct page *page;
+ DEFINE_WAIT(wait);
+
+ if ( drbd_pp_vacant ==
+ (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count/2 ) {
+ drbd_kick_lo(mdev);
+ }
+
+ spin_lock(&drbd_pp_lock);
+ if ( (page = drbd_pp_pool) ) {
+ drbd_pp_pool = (struct page*)page->private;
+ drbd_pp_vacant--;
+ }
+ spin_unlock(&drbd_pp_lock);
+ if ( page ) goto got_page;
+
+ drbd_process_ee(mdev,1);
+
+ spin_lock(&drbd_pp_lock);
+ if ( (page = drbd_pp_pool) ) {
+ drbd_pp_pool = (struct page*)page->private;
+ drbd_pp_vacant--;
+ }
+ spin_unlock(&drbd_pp_lock);
+ if ( page ) goto got_page;
+
+ for (;;) {
+ prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
+
+ spin_lock(&drbd_pp_lock);
+ if ( (page = drbd_pp_pool) ) {
+ drbd_pp_pool = (struct page*)page->private;
+ drbd_pp_vacant--;
+ }
+ spin_unlock(&drbd_pp_lock);
+ if ( page ) break;
+
+ if ( atomic_read(&mdev->pp_in_use) < mdev->conf.max_buffers ) {
+ if( (page = alloc_page(GFP_TRY)) ) break;
+ }
+ drbd_kick_lo(mdev);
+ schedule();
+ finish_wait(&drbd_pp_wait, &wait);
+ if (signal_pending(current)) {
+ WARN("drbd_pp_alloc interrupted!\n");
+ return NULL;
+ }
+ // finish wait is inside, so that we are TASK_RUNNING
+ // in _drbd_process_ee (which might sleep by itself.)
+ drbd_process_ee(mdev,1);
+ }
+ finish_wait(&drbd_pp_wait, &wait);
+
+ got_page:
+ atomic_inc(&mdev->pp_in_use);
+
+ return page;
+}
+
+STATIC void drbd_pp_free(drbd_dev *mdev,struct page *page)
+{
+ atomic_dec(&mdev->pp_in_use);
+
+ spin_lock(&drbd_pp_lock);
+ if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
+ __free_page(page);
+ } else {
+ page->private = (unsigned long)drbd_pp_pool;
+ drbd_pp_pool = page;
+ drbd_pp_vacant++;
+ }
+ spin_unlock(&drbd_pp_lock);
+
+ wake_up(&drbd_pp_wait);
+}
+
/*
You need to hold the ee_lock:
drbd_free_ee()
- drbd_get_ee()
- drbd_put_ee()
_drbd_process_ee()
You must not have the ee_lock:
@@ -155,80 +236,79 @@
drbd_wait_ee()
*/
-STATIC int _drbd_alloc_ee(drbd_dev *mdev,struct page* page,int mask)
+struct Tl_epoch_entry* drbd_alloc_ee(drbd_dev *mdev,
+ unsigned int data_size,
+ unsigned int gfp_mask)
{
struct Tl_epoch_entry* e;
+ struct bio_vec *bvec;
+ struct page *page;
+ struct bio *bio;
+ unsigned int ds;
+ int i;
- e = kmem_cache_alloc(drbd_ee_cache, mask);
- if( e == NULL ) return FALSE;
+ e = kmem_cache_alloc(drbd_ee_cache, gfp_mask);
+ if (!e) return NULL;
- drbd_ee_init(e,page);
- spin_lock_irq(&mdev->ee_lock);
- list_add(&e->w.list,&mdev->free_ee);
- mdev->ee_vacant++;
- spin_unlock_irq(&mdev->ee_lock);
+ bio = bio_alloc(GFP_KERNEL, div_ceil(data_size,PAGE_SIZE));
+ if (!bio) goto fail1;
- return TRUE;
-}
+ bio->bi_bdev = mdev->backing_bdev;
+
+ ds = data_size;
+ while(ds) {
+ page = drbd_pp_alloc(mdev, gfp_mask);
+ if (!page) goto fail2;
+ bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0);
+ ds -= min_t(int, ds, PAGE_SIZE);
+ }
-/* bool */
-STATIC int drbd_alloc_ee(drbd_dev *mdev,int mask)
-{
- struct page *page;
+ bio->bi_private = e;
+ e->mdev = mdev;
+ e->ee_size = bio->bi_size;
+ D_ASSERT( data_size == bio->bi_size);
+ e->private_bio = bio;
+ e->block_id = ID_VACANT;
+ INIT_HLIST_NODE(&e->colision);
- page=alloc_page(mask);
- if(!page) return FALSE;
-
- if(!_drbd_alloc_ee(mdev,page,GFP_KERNEL)) {
- __free_page(page);
- return FALSE;
+ return e;
+ fail2:
+ __bio_for_each_segment(bvec, bio, i, 0) {
+ drbd_pp_free(mdev,bvec->bv_page);
}
-
- return TRUE;
+ bio_put(bio);
+ fail1:
+ kmem_cache_free(drbd_ee_cache, e);
+
+ return NULL;
}
-STATIC struct page* drbd_free_ee(drbd_dev *mdev, struct list_head *list)
+void drbd_free_ee(drbd_dev *mdev, struct Tl_epoch_entry* e)
{
- struct list_head *le;
- struct Tl_epoch_entry* e;
- struct page* page;
+ struct bio *bio=e->private_bio;
+ struct bio_vec *bvec;
+ int i;
- MUST_HOLD(&mdev->ee_lock);
+ __bio_for_each_segment(bvec, bio, i, 0) {
+ drbd_pp_free(mdev,bvec->bv_page);
+ }
- D_ASSERT(!list_empty(list));
- le = list->next;
- e = list_entry(le, struct Tl_epoch_entry, w.list);
- list_del(le);
+ bio_put(bio);
- page = drbd_bio_get_page(&e->private_bio);
-
- D_ASSERT(page == e->ee_bvec.bv_page);
- page = e->ee_bvec.bv_page;
-
kmem_cache_free(drbd_ee_cache, e);
- mdev->ee_vacant--;
-
- return page;
}
-int drbd_init_ee(drbd_dev *mdev)
-{
- while(mdev->ee_vacant < EE_MININUM ) {
- if(!drbd_alloc_ee(mdev,GFP_USER)) {
- ERR("Failed to allocate %d EEs !\n",EE_MININUM);
- return 0;
- }
- }
- return 1;
-}
-
int drbd_release_ee(drbd_dev *mdev,struct list_head* list)
{
int count=0;
+ struct Tl_epoch_entry* e;
+ struct list_head *le;
spin_lock_irq(&mdev->ee_lock);
while(!list_empty(list)) {
- __free_page(drbd_free_ee(mdev,list));
+ le = list->next;
+ e = list_entry(le, struct Tl_epoch_entry, w.list);
+ drbd_free_ee(mdev,e);
count++;
}
spin_unlock_irq(&mdev->ee_lock);
@@ -236,97 +316,6 @@
return count;
}
-#define GFP_TRY ( __GFP_HIGHMEM | __GFP_NOWARN )
-
-STATIC int _drbd_process_ee(drbd_dev *mdev, int be_sleepy);
-
-/**
- * drbd_get_ee: Returns an Tl_epoch_entry; might sleep. Fails only if
- * a signal comes in.
- */
-struct Tl_epoch_entry* drbd_get_ee(drbd_dev *mdev)
-{
- struct list_head *le;
- struct Tl_epoch_entry* e;
- DEFINE_WAIT(wait);
-
- MUST_HOLD(&mdev->ee_lock);
-
- if(mdev->ee_vacant == EE_MININUM / 2) {
- spin_unlock_irq(&mdev->ee_lock);
- drbd_kick_lo(mdev);
- spin_lock_irq(&mdev->ee_lock);
- }
-
- if(list_empty(&mdev->free_ee)) _drbd_process_ee(mdev,1);
-
- if(list_empty(&mdev->free_ee)) {
- for (;;) {
- prepare_to_wait(&mdev->ee_wait, &wait,
- TASK_INTERRUPTIBLE);
- if(!list_empty(&mdev->free_ee)) break;
- spin_unlock_irq(&mdev->ee_lock);
- if( ( mdev->ee_vacant+mdev->ee_in_use) <
- mdev->conf.max_buffers ) {
- if(drbd_alloc_ee(mdev,GFP_TRY)) {
- spin_lock_irq(&mdev->ee_lock);
- break;
- }
- }
- drbd_kick_lo(mdev);
- schedule();
- spin_lock_irq(&mdev->ee_lock);
- finish_wait(&mdev->ee_wait, &wait);
- if (signal_pending(current)) {
- WARN("drbd_get_ee interrupted!\n");
- return 0;
- }
- // finish wait is inside, so that we are TASK_RUNNING
- // in _drbd_process_ee (which might sleep by itself.)
- _drbd_process_ee(mdev,1);
- }
- finish_wait(&mdev->ee_wait, &wait);
- }
-
- le=mdev->free_ee.next;
- list_del(le);
- mdev->ee_vacant--;
- mdev->ee_in_use++;
- e=list_entry(le, struct Tl_epoch_entry, w.list);
-
- D_ASSERT(e->private_bio.bi_idx == 0);
- drbd_ee_init(e,e->ee_bvec.bv_page); // reinitialize
-
- e->block_id = !ID_VACANT;
- SET_MAGIC(e);
- return e;
-}
-
-void drbd_put_ee(drbd_dev *mdev,struct Tl_epoch_entry *e)
-{
- MUST_HOLD(&mdev->ee_lock);
-
- D_ASSERT(page_count(drbd_bio_get_page(&e->private_bio)) == 1);
-
- mdev->ee_in_use--;
- mdev->ee_vacant++;
- e->block_id = ID_VACANT;
- INVALIDATE_MAGIC(e);
- list_add_tail(&e->w.list,&mdev->free_ee);
-
- if((mdev->ee_vacant * 2 > mdev->ee_in_use ) &&
- ( mdev->ee_vacant + mdev->ee_in_use > EE_MININUM) ) {
- __free_page(drbd_free_ee(mdev,&mdev->free_ee));
- }
- if(mdev->ee_in_use == 0) {
- while( mdev->ee_vacant > EE_MININUM ) {
- __free_page(drbd_free_ee(mdev,&mdev->free_ee));
- }
- }
-
- wake_up(&mdev->ee_wait);
-}
-
STATIC void reclaim_net_ee(drbd_dev *mdev)
{
struct Tl_epoch_entry *e;
@@ -339,9 +328,9 @@
list_for_each_safe(le, tle, &mdev->net_ee) {
e = list_entry(le, struct Tl_epoch_entry, w.list);
- if( page_count(drbd_bio_get_page(&e->private_bio)) > 1 ) break;
+ if( drbd_bio_has_active_page(e->private_bio) ) break;
list_del(le);
- drbd_put_ee(mdev,e);
+ drbd_free_ee(mdev,e);
}
}
@@ -380,8 +369,8 @@
spin_unlock_irq(&mdev->ee_lock);
e = list_entry(le, struct Tl_epoch_entry, w.list);
ok = ok && e->w.cb(mdev,&e->w,0);
+ drbd_free_ee(mdev,e);
spin_lock_irq(&mdev->ee_lock);
- drbd_put_ee(mdev,e);
}
clear_bit(PROCESS_EE_RUNNING,&mdev->flags);
@@ -417,7 +406,7 @@
is_syncer_blk(mdev,e->block_id)) {
++n;
}
- drbd_put_ee(mdev,e);
+ drbd_free_ee(mdev,e);
}
spin_unlock_irq(&mdev->ee_lock);
@@ -790,29 +779,29 @@
read_in_block(drbd_dev *mdev, int data_size)
{
struct Tl_epoch_entry *e;
+ struct bio_vec *bvec;
+ struct page *page;
struct bio *bio;
- int rr;
+ int ds,i,rr;
- spin_lock_irq(&mdev->ee_lock);
- e=drbd_get_ee(mdev);
- spin_unlock_irq(&mdev->ee_lock);
+ e = drbd_alloc_ee(mdev,data_size,GFP_KERNEL);
if(!e) return 0;
-
- bio = &e->private_bio;
-
- rr=drbd_recv(mdev, drbd_bio_kmap(bio), data_size);
- drbd_bio_kunmap(bio);
-
- if ( rr != data_size) {
- spin_lock_irq(&mdev->ee_lock);
- drbd_put_ee(mdev,e);
- spin_unlock_irq(&mdev->ee_lock);
- WARN("short read receiving data block: read %d expected %d\n",
- rr, data_size);
- return 0;
+ bio = e->private_bio;
+ ds = data_size;
+ bio_for_each_segment(bvec, bio, i) {
+ page = bvec->bv_page;
+ rr = drbd_recv(mdev,kmap(page),min_t(int,ds,PAGE_SIZE));
+ kunmap(page);
+ if( rr != min_t(int,ds,PAGE_SIZE) ) {
+ drbd_free_ee(mdev,e);
+ WARN("short read recev data: read %d expected %d\n",
+ rr, min_t(int,ds,PAGE_SIZE));
+ return 0;
+ }
+ ds -= rr;
}
- mdev->recv_cnt+=data_size>>9;
+ mdev->recv_cnt+=data_size>>9;
return e;
}
@@ -834,20 +823,29 @@
STATIC int recv_dless_read(drbd_dev *mdev, drbd_request_t *req,
sector_t sector, int data_size)
{
+ struct bio_vec *bvec;
struct bio *bio;
- int ok,rr;
+ int rr,i,expect,ok=1;
bio = req->master_bio;
-
D_ASSERT( sector == drbd_req_get_sector(req) );
+
+ bio_for_each_segment(bvec, bio, i) {
+ expect = min_t(int,data_size,bvec->bv_len);
+ rr=drbd_recv(mdev,
+ kmap(bvec->bv_page)+bvec->bv_offset,
+ expect);
+ kunmap(bvec->bv_page);
+ if (rr != expect) {
+ ok = 0;
+ break;
+ }
+ data_size -= rr;
+ }
- rr=drbd_recv(mdev,drbd_bio_kmap(bio),data_size);
- drbd_bio_kunmap(bio);
-
- ok=(rr==data_size);
+ D_ASSERT(data_size == 0 || !ok);
drbd_bio_endio(bio,ok);
dec_ap_bio(mdev);
-
dec_ap_pending(mdev);
return ok;
}
@@ -859,7 +857,7 @@
int ok;
drbd_rs_complete_io(mdev,sector); // before set_in_sync() !
- if (likely( drbd_bio_uptodate(&e->private_bio) )) {
+ if (likely( drbd_bio_uptodate(e->private_bio) )) {
ok = mdev->state.s.disk >= Inconsistent &&
mdev->state.s.pdsk >= Inconsistent;
if (likely( ok )) {
@@ -895,13 +893,11 @@
if (DRBD_ratelimit(5*HZ,5))
ERR("Can not write resync data to local disk.\n");
drbd_send_ack(mdev,NegAck,e);
- spin_lock_irq(&mdev->ee_lock);
- drbd_put_ee(mdev,e);
- spin_unlock_irq(&mdev->ee_lock);
+ drbd_free_ee(mdev,e);
return TRUE;
}
- drbd_ee_prepare_write(mdev,e,sector,data_size);
+ drbd_ee_prepare_write(mdev,e,sector);
e->w.cb = e_end_resync_block;
spin_lock_irq(&mdev->ee_lock);
@@ -910,7 +906,7 @@
inc_unacked(mdev);
- drbd_generic_make_request(WRITE,&e->private_bio);
+ drbd_generic_make_request(WRITE,e->private_bio);
receive_data_tail(mdev,data_size);
return TRUE;
@@ -990,7 +986,7 @@
atomic_inc(&mdev->epoch_size);
if(mdev->conf.wire_protocol == DRBD_PROT_C) {
- if(likely(drbd_bio_uptodate(&e->private_bio))) {
+ if(likely(drbd_bio_uptodate(e->private_bio))) {
ok=drbd_send_ack(mdev,WriteAck,e);
if (ok && test_bit(SYNC_STARTED,&mdev->flags) )
drbd_set_in_sync(mdev,sector,drbd_ee_get_size(e));
@@ -1006,7 +1002,7 @@
return ok;
}
- if(unlikely(!drbd_bio_uptodate(&e->private_bio))) {
+ if(unlikely(!drbd_bio_uptodate(e->private_bio))) {
ok = drbd_io_error(mdev);
}
@@ -1050,12 +1046,10 @@
header_size = sizeof(*p) - sizeof(*h);
data_size = h->length - header_size;
- /* I expect a block to be a multiple of 512 byte, and
- * no more than 4K (PAGE_SIZE). is this too restrictive?
- */
+ if( data_size > 4096 ) INFO("data_size=%d\n",data_size);
ERR_IF(data_size == 0) return FALSE;
ERR_IF(data_size & 0x1ff) return FALSE;
- ERR_IF(data_size > PAGE_SIZE) return FALSE;
+ ERR_IF(data_size > DRBD_MAX_SEGMENT_SIZE) return FALSE;
if (drbd_recv(mdev, h->payload, header_size) != header_size)
return FALSE;
@@ -1073,7 +1067,7 @@
}
e->block_id = p->block_id; // no meaning on this side, e* on partner
- drbd_ee_prepare_write(mdev, e, sector, data_size);
+ drbd_ee_prepare_write(mdev, e, sector);
e->w.cb = e_end_block;
/* This wait_event is here to make sure that never ever an
@@ -1159,7 +1153,7 @@
break;
}
- drbd_generic_make_request(WRITE,&e->private_bio);
+ drbd_generic_make_request(WRITE,e->private_bio);
receive_data_tail(mdev,data_size);
return TRUE;
@@ -1168,9 +1162,7 @@
atomic_inc(&mdev->epoch_size);
dec_local(mdev);
out1:
- spin_lock_irq(&mdev->ee_lock);
- drbd_put_ee(mdev,e);
- spin_unlock_irq(&mdev->ee_lock);
+ drbd_free_ee(mdev,e);
return rv;
}
@@ -1198,7 +1190,7 @@
return FALSE;
*/
- if (size <= 0 || (size & 0x1ff) != 0 || size > PAGE_SIZE) {
+ if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
ERR("%s:%d: sector: %lu, size: %d\n", __FILE__, __LINE__,
(unsigned long)sector,size);
return FALSE;
@@ -1209,13 +1201,11 @@
return FALSE;
}
- spin_lock_irq(&mdev->ee_lock);
- e=drbd_get_ee(mdev);
- if(!e) {
- spin_unlock_irq(&mdev->ee_lock);
- return FALSE;
- }
+ e = drbd_alloc_ee(mdev,size,GFP_KERNEL);
+ if (!e) return FALSE;
+
e->block_id = p->block_id; // no meaning on this side, pr* on partner
+ spin_lock_irq(&mdev->ee_lock);
list_add(&e->w.list,&mdev->read_ee);
spin_unlock_irq(&mdev->ee_lock);
@@ -1223,13 +1213,11 @@
if (DRBD_ratelimit(5*HZ,5))
ERR("Can not satisfy peer's read request, no local data.\n");
drbd_send_ack(mdev,NegDReply,e);
- spin_lock_irq(&mdev->ee_lock);
- drbd_put_ee(mdev,e);
- spin_unlock_irq(&mdev->ee_lock);
+ drbd_free_ee(mdev,e);
return TRUE;
}
- drbd_ee_prepare_read(mdev,e,sector,size);
+ drbd_ee_prepare_read(mdev,e,sector);
switch (h->command) {
case DataRequest:
@@ -1245,7 +1233,7 @@
if (!drbd_rs_begin_io(mdev,sector)) {
// we have been interrupted, probably connection lost!
D_ASSERT(signal_pending(current));
- drbd_put_ee(mdev,e);
+ drbd_free_ee(mdev,e);
return 0;
}
break;
@@ -1256,7 +1244,7 @@
mdev->read_cnt += size >> 9;
inc_unacked(mdev);
- drbd_generic_make_request(READ,&e->private_bio);
+ drbd_generic_make_request(READ,e->private_bio);
if (atomic_read(&mdev->local_cnt) >= (mdev->conf.max_epoch_size>>4) ) {
drbd_kick_lo(mdev);
}
@@ -1837,7 +1825,7 @@
wait_event( mdev->cstate_wait, atomic_read(&mdev->ap_pending_cnt)==0 );
D_ASSERT(mdev->oldest_barrier->n_req == 0);
- D_ASSERT(mdev->ee_in_use == 0);
+ D_ASSERT(atomic_read(&mdev->pp_in_use) == 0);
D_ASSERT(list_empty(&mdev->read_ee)); // done by termination of worker
D_ASSERT(list_empty(&mdev->active_ee)); // done here
D_ASSERT(list_empty(&mdev->sync_ee)); // done here
Modified: trunk/drbd/drbd_req.c
===================================================================
--- trunk/drbd/drbd_req.c 2005-03-11 12:51:42 UTC (rev 1767)
+++ trunk/drbd/drbd_req.c 2005-03-15 20:51:32 UTC (rev 1768)
@@ -356,7 +356,8 @@
else mdev->read_cnt += size>>9;
// in 2.4.X, READA are submitted as READ.
- drbd_generic_make_request(rw,drbd_req_private_bio(req));
+ req->private_bio->bi_rw = rw;
+ generic_make_request(req->private_bio);
}
// up_read(mdev->device_lock);
@@ -377,8 +378,7 @@
*/
D_ASSERT(bio->bi_size > 0);
D_ASSERT( (bio->bi_size & 0x1ff) == 0);
- D_ASSERT(bio->bi_size <= PAGE_SIZE);
- D_ASSERT(bio->bi_vcnt == 1);
+ D_ASSERT(bio->bi_size <= DRBD_MAX_SEGMENT_SIZE);
D_ASSERT(bio->bi_idx == 0);
s_enr = bio->bi_sector >> (AL_EXTENT_SIZE_B-9);
Modified: trunk/drbd/drbd_worker.c
===================================================================
--- trunk/drbd/drbd_worker.c 2005-03-11 12:51:42 UTC (rev 1767)
+++ trunk/drbd/drbd_worker.c 2005-03-15 20:51:32 UTC (rev 1768)
@@ -54,8 +54,7 @@
*/
int drbd_md_io_complete(struct bio *bio, unsigned int bytes_done, int error)
{
- if (bio->bi_size)
- return 1;
+ if (bio->bi_size) return 1;
complete((struct completion*)bio->bi_private);
return 0;
@@ -70,16 +69,15 @@
struct Tl_epoch_entry *e=NULL;
struct Drbd_Conf* mdev;
- mdev=bio->bi_private;
- PARANOIA_BUG_ON(!IS_VALID_MDEV(mdev));
+ e = bio->bi_private;
+ mdev = e->mdev;
- /* we should be called via bio_endio, so this should never be the case
- * but "everyone else does it", and so do we ;) -lge
+ /* We are called each time a part of the bio is finished, but
+ * we are only interested when the whole bio is finished, therefore
+ * return as long as bio->bio_size is positive.
*/
- ERR_IF (bio->bi_size)
- return 1;
+ if (bio->bi_size) return 1;
- e = container_of(bio,struct Tl_epoch_entry,private_bio);
PARANOIA_BUG_ON(!VALID_POINTER(e));
D_ASSERT(e->block_id != ID_VACANT);
@@ -103,14 +101,12 @@
struct Tl_epoch_entry *e=NULL;
struct Drbd_Conf* mdev;
- mdev=bio->bi_private;
- PARANOIA_BUG_ON(!IS_VALID_MDEV(mdev));
+ e = bio->bi_private;
+ mdev = e->mdev;
// see above
- ERR_IF (bio->bi_size)
- return 1;
+ if (bio->bi_size) return 1;
- e = container_of(bio,struct Tl_epoch_entry,private_bio);
PARANOIA_BUG_ON(!VALID_POINTER(e));
D_ASSERT(e->block_id != ID_VACANT);
@@ -146,8 +142,7 @@
sector_t rsector;
// see above
- ERR_IF (bio->bi_size)
- return 1;
+ if (bio->bi_size) return 1;
drbd_chk_io_error(mdev,error);
rsector = drbd_req_get_sector(req);
@@ -166,9 +161,7 @@
drbd_request_t *req=bio->bi_private;
struct Drbd_Conf* mdev=req->mdev;
- // see above
- ERR_IF (bio->bi_size)
- return 1;
+ if (bio->bi_size) return 1;
/* READAs may fail.
* upper layers need to be able to handle that themselves */
@@ -413,14 +406,12 @@
int ok;
if(unlikely(cancel)) {
- spin_lock_irq(&mdev->ee_lock);
- drbd_put_ee(mdev,e);
- spin_unlock_irq(&mdev->ee_lock);
+ drbd_free_ee(mdev,e);
dec_unacked(mdev);
return 1;
}
- if(likely(drbd_bio_uptodate(&e->private_bio))) {
+ if(likely(drbd_bio_uptodate(e->private_bio))) {
ok=drbd_send_block(mdev, DataReply, e);
} else {
ok=drbd_send_ack(mdev,NegDReply,e);
@@ -432,11 +423,11 @@
dec_unacked(mdev);
spin_lock_irq(&mdev->ee_lock);
- if( page_count(drbd_bio_get_page(&e->private_bio)) > 1 ) {
+ if( drbd_bio_has_active_page(e->private_bio) ) {
/* This might happen if sendpage() has not finished */
list_add_tail(&e->w.list,&mdev->net_ee);
} else {
- drbd_put_ee(mdev,e);
+ drbd_free_ee(mdev,e);
}
spin_unlock_irq(&mdev->ee_lock);
@@ -450,16 +441,14 @@
int ok;
if(unlikely(cancel)) {
- spin_lock_irq(&mdev->ee_lock);
- drbd_put_ee(mdev,e);
- spin_unlock_irq(&mdev->ee_lock);
+ drbd_free_ee(mdev,e);
dec_unacked(mdev);
return 1;
}
drbd_rs_complete_io(mdev,drbd_ee_get_sector(e));
- if(likely(drbd_bio_uptodate(&e->private_bio))) {
+ if(likely(drbd_bio_uptodate(e->private_bio))) {
if (likely( mdev->state.s.pdsk >= Inconsistent )) {
inc_rs_pending(mdev);
ok=drbd_send_block(mdev, RSDataReply, e);
@@ -478,11 +467,11 @@
dec_unacked(mdev);
spin_lock_irq(&mdev->ee_lock);
- if( page_count(drbd_bio_get_page(&e->private_bio)) > 1 ) {
+ if( drbd_bio_has_active_page(e->private_bio) ) {
/* This might happen if sendpage() has not finished */
list_add_tail(&e->w.list,&mdev->net_ee);
} else {
- drbd_put_ee(mdev,e);
+ drbd_free_ee(mdev,e);
}
spin_unlock_irq(&mdev->ee_lock);
More information about the drbd-cvs
mailing list