[DRBD-user] DRBD and Direct Block IO

Ross S. W. Walker rwalker at medallion.com
Fri Jan 12 01:47:38 CET 2007

Note: "permalinks" may not be as permanent as we would like,
direct links of old sources may well be a few messages off.


I am having some serious performance issues with a kernel module for
iSCSI Enterprise Target and drbd 0.7.21 on CentOS 4.4.

The kernel module (source code listed below) basically takes a data
request issued over iSCSI translates it to a bio request that is then
carried out synchronously to the device below.

When run against an MD/LVM/SD device I don't have any problems, I get
performance to be expected, but when run against a drbd 0.7.21 device it
chokes down hard. For example when doing seq 64K block write direct to
device I can get 112 MB/s sustained, when I have drbd in the middle that
throughput drops to 10 MB/s.

Can anybody help explain this or point out a serious flaw in the code
below that would cause this. I would rather solve the problem (if it
can) then try to run version 8 beta in production (if it will even solve
my problem).

Thanks,

---------------- block-io.c
/*
 * Target device block I/O.
 *
 * Based on file I/O driver from FUJITA Tomonori
 * (C) 2004 - 2005 FUJITA Tomonori <tomof at acm.org>
 * (C) 2006 Andre Brinkmann <brinkman at hni.upb.de>
 * This code is licenced under the GPL.
 */

#include <linux/blkdev.h>
#include <linux/writeback.h>
#include <linux/parser.h>

#include <linux/blkdev.h>
#include <linux/buffer_head.h>
#include <linux/version.h>
#include <linux/kernel.h>
#include <linux/proc_fs.h>
#include <linux/genhd.h>
#include <linux/fs.h>
#include <linux/stat.h>
#include <linux/ctype.h>
#include <linux/delay.h>

#include "iscsi.h"
#include "iscsi_dbg.h"
#include "iotype.h"

struct blockio_data
{
	char *path;
	struct block_device *device;
	struct file *filp;
       unsigned long old_ra_pages;
#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,12))
	unsigned int old_capabilities;
#else
	int old_memory_backed;
#endif
};

static void
blockio_bio_endio (struct bio *bio, unsigned int bytes_done, int error)
{

	struct completion *wait = bio->bi_private;

	/* check if all bytes have been written */
	if (bio->bi_size)
		eprintk ("I/O error %d  Not all bytes written for
bio\n", 
		bio->bi_size);

	if (error)
		eprintk ("I/O error: Error %d occured \n", error);

	if (bio->bi_private) {
		wait = (struct completion *) bio->bi_private;
		complete (wait);
	}

	/* release bio structure */
	bio_put (bio);

	return;
}

/**
 * blockio_make_request(): The function translates an iscsi-request into

 * a number of requests to the corresponding block device. 
 **/
int
blockio_make_request (struct iet_volume *lu, struct tio *tio, int rw)
{
	struct blockio_data *p;
	struct block_device *target_device;
	struct request_queue *target_queue;
	struct bio *target_bio;
	int max_sectors;
	int pg_number;
	int page_count;
	int counter;
	struct page *page;
	mm_segment_t oldfs;

	u32 offset, size;
	u32 len;

	loff_t ppos;
	int i;
	ssize_t ret;
	DECLARE_COMPLETION (work);

	p = (struct blockio_data *) lu->private;
	assert (p);

	target_device = p->device;
	assert (target_device);

	size = tio->size;
	offset = tio->offset;

	ppos = (loff_t) tio->idx << PAGE_SHIFT;
	ppos += offset;

	/* All IO is to be synchronous */
	rw |= (1 << BIO_RW_SYNC);

	/* Get maximum number of sectors / pages that could be sent to
target 
	 * block device within a single bio-structure */

	target_queue = target_device->bd_disk->queue;
	if (target_queue) {
		max_sectors = target_queue->max_sectors;
		if (max_sectors > 0) {
			pg_number = (max_sectors << SECTOR_SIZE_BITS) >>
PAGE_SHIFT;
			if (pg_number > tio->pg_cnt)
				pg_number = tio->pg_cnt;
		}
		else
			pg_number = tio->pg_cnt;
	}
	else {
		max_sectors = 0;
		pg_number = tio->pg_cnt;
	}

	page_count = 0;
	counter = tio->pg_cnt;

	while (counter > 0) {
		/* get new bio-structure */
		target_bio = bio_alloc (GFP_NOIO, pg_number);
		if (!target_bio) {
			eprintk ("I/O error:  %d\n", page_count);
			return -ENOMEM;
		}

		/* Initialize bio */
		target_bio->bi_sector = ppos >> SECTOR_SIZE_BITS;
		target_bio->bi_bdev = target_device;
		target_bio->bi_rw = rw;
		target_bio->bi_end_io = (bio_end_io_t *)
blockio_bio_endio;
		target_bio->bi_private = &work;

		for (i = 0; i < pg_number; i++) {
			page = tio->pvec[page_count];
			assert (page);

			/* calc access length for this page */
			len = PAGE_SIZE;
			if (offset)
				len -= offset;
			if (size < len)
				len = size;

			/* bio_add_page returns len if successful */
			ret = bio_add_page (target_bio, page, len,
offset);
			if (!ret) {
				eprintk ("I/O error:  %ld\n", (long)
ret);
				return -EIO;
			}
			/* offset valid only once */
			offset = 0;
			size -= len;
			page_count++;
		}

		counter -= pg_number;
		ppos += (pg_number << PAGE_SHIFT);

		if (pg_number > counter)
			pg_number = counter;

		oldfs = get_fs ();
		set_fs (get_ds ());

		/* send bio to generic_make_request */
		submit_bio (rw, target_bio);

		wait_for_completion (&work);

		set_fs (oldfs);

	}
	assert (!size);

	return 0;
}

static int
open_path (struct iet_volume *volume, const char *path)
{
	int err = 0;
	struct blockio_data *info = (struct blockio_data *)
volume->private;
	struct file *filp;
	mm_segment_t oldfs;
	int flags;

	info->path = kmalloc (strlen (path) + 1, GFP_KERNEL);
	if (!info->path)
		return -ENOMEM;
	strcpy (info->path, path);
	info->path[strlen (path)] = '\0';

	oldfs = get_fs ();
	set_fs (get_ds ());
	flags = (LUReadonly (volume) ? O_RDONLY : O_RDWR) | O_LARGEFILE
| O_SYNC | O_DIRECT;
	filp = filp_open (path, flags, 0);
	set_fs (oldfs);

	if (IS_ERR (filp)) {
		err = PTR_ERR (filp);
		eprintk ("Can't open %s %d\n", path, err);
		info->filp = NULL;
	}
	else
		info->filp = filp;

	return err;
}

static int
set_scsiid (struct iet_volume *volume, const char *id)
{
	size_t len;

	if ((len = strlen (id)) > SCSI_ID_LEN - VENDOR_ID_LEN) {
		eprintk ("too long SCSI ID %lu\n", (unsigned long) len);
		return -EINVAL;
	}

	len = min (sizeof (volume->scsi_id) - VENDOR_ID_LEN, len);
	memcpy (volume->scsi_id + VENDOR_ID_LEN, id, len);

	return 0;
}

static void
gen_scsiid (struct iet_volume *volume, struct inode *inode)
{
	int i;
	u32 *p;

	strlcpy (volume->scsi_id, VENDOR_ID, VENDOR_ID_LEN);

	for (i = VENDOR_ID_LEN; i < SCSI_ID_LEN; i++)
		if (volume->scsi_id[i])
			return;

	p = (u32 *) (volume->scsi_id + VENDOR_ID_LEN);
	*(p + 0) = volume->target->trgt_param.target_type;
	*(p + 1) = volume->target->tid;
	*(p + 2) = (unsigned int) inode->i_ino;
	*(p + 3) = (unsigned int) inode->i_sb->s_dev;
}

static int
set_scsisn(struct iet_volume *volume, const char *sn)
{
	size_t len;

	if ((len = strlen(sn)) > SCSI_SN_LEN) {
		eprintk("too long SCSI SN %lu\n", (unsigned long) len);
		return -EINVAL;
	}
	memcpy(volume->scsi_sn, sn, len);
	return 0;
}

enum
{
	Opt_scsiid, Opt_scsisn, Opt_path, Opt_ignore, Opt_err,
};

static match_table_t tokens = {
	{Opt_scsiid, "ScsiId=%s"},
	{Opt_scsisn, "ScsiSN=%s"},
	{Opt_path, "Path=%s"},
	{Opt_ignore, "Type=%s"},
	{Opt_ignore, "IOMode=%s"},
	{Opt_err, NULL},
};

static int
parse_blockio_params (struct iet_volume *volume, char *params)
{
	int err = 0;
	char *p, *q;

	while ((p = strsep (&params, ",")) != NULL) {
		substring_t args[MAX_OPT_ARGS];
		int token;
		if (!*p)
			continue;
		token = match_token (p, tokens, args);
		switch (token) {
		case Opt_scsiid:
			if (!(q = match_strdup (&args[0]))) {
				err = -ENOMEM;
				goto out;
			}
			err = set_scsiid (volume, q);
			kfree (q);
			if (err < 0)
				goto out;
			break;
		case Opt_scsisn:
			if (!(q = match_strdup(&args[0]))) {
				err = -ENOMEM;
				goto out;
			}
			err = set_scsisn(volume, q);
			kfree(q);
			if (err < 0)
				goto out;
			break;
		case Opt_path:
			if (!(q = match_strdup (&args[0]))) {
				err = -ENOMEM;
				goto out;
			}
			err = open_path (volume, q);
			kfree (q);
			if (err < 0)
				goto out;
			break;
		case Opt_ignore:
			break;
		default:
			eprintk ("Unknown %s\n", p);
			return -EINVAL;
		}
	}

  out:
	return err;
}

static void
blockio_detach (struct iet_volume *lu)
{
	struct inode *inode;
	struct blockio_data *p = (struct blockio_data *) lu->private;

	inode = p->device->bd_inode;

	inode->i_mapping->backing_dev_info->ra_pages = p->old_ra_pages;
#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,12))
	inode->i_mapping->backing_dev_info->capabilities =
p->old_capabilities;
#else
	inode->i_mapping->backing_dev_info->memory_backed =
p->old_memory_backed;
#endif

	kfree (p->path);
	if (p->filp)
		filp_close (p->filp, NULL);
	kfree (p);
	lu->private = NULL;
}

static int
blockio_attach (struct iet_volume *lu, char *args)
{
	int err = 0;
	struct blockio_data *p;
	struct inode *inode;

	if (lu->private) {
		printk ("already attached ? %d\n", lu->lun);
		return -EBUSY;
	}

	if (!(p = kmalloc (sizeof (*p), GFP_KERNEL)))
		return -ENOMEM;
	memset (p, 0, sizeof (*p));
	lu->private = p;

	if ((err = parse_blockio_params (lu, args)) < 0) {
		eprintk ("%d\n", err);
		goto out;
	}
	inode = p->filp->f_dentry->d_inode;

	gen_scsiid (lu, inode);

	/* Only block devices are allowed here */
	if (S_ISBLK (inode->i_mode)) {
		inode = inode->i_bdev->bd_inode;
		p->device = inode->i_bdev;
		printk (KERN_INFO "Max queue length: %d \n",
				p->device->bd_disk->queue->max_sectors);
	}
	else {
		err = -EINVAL;
		goto out;
	}

	p->old_ra_pages = inode->i_mapping->backing_dev_info->ra_pages;
#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,12))
	p->old_capabilities =
inode->i_mapping->backing_dev_info->capabilities;
#else
	p->old_memory_backed =
inode->i_mapping->backing_dev_info->memory_backed;
#endif

	inode->i_mapping->backing_dev_info->ra_pages = 0;
#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,12))
	inode->i_mapping->backing_dev_info->capabilities =
BDI_CAP_NO_ACCT_DIRTY | BDI_CAP_NO_WRITEBACK;
#else
	inode->i_mapping->backing_dev_info->memory_backed = 1;
#endif

	/* get sector size of the block device */
	lu->blk_shift = SECTOR_SIZE_BITS;
	lu->blk_cnt = inode->i_size >> lu->blk_shift;

  out:
	if (err < 0)
		blockio_detach (lu);
	return err;
}

void
blockio_show (struct iet_volume *lu, struct seq_file *seq)
{
	struct blockio_data *p = (struct blockio_data *) lu->private;
	seq_printf (seq, " path:%s\n", p->path);
}

struct iotype blockio = {
	.name = "blockio",
	.attach = blockio_attach,
	.make_request = blockio_make_request,
	.detach = blockio_detach,
	.show = blockio_show,
};

______________________________________________________________________
This e-mail, and any attachments thereto, is intended only for use by
the addressee(s) named herein and may contain legally privileged
and/or confidential information. If you are not the intended recipient
of this e-mail, you are hereby notified that any dissemination,
distribution or copying of this e-mail, and any attachments thereto,
is strictly prohibited. If you have received this e-mail in error,
please immediately notify the sender and permanently delete the
original and any copy or printout thereof.




More information about the drbd-user mailing list