[DRBD-cvs] svn commit by lars - r2449 - trunk/drbd - rearranged
cleanup uppon connection loss. again. :) fin
drbd-cvs at lists.linbit.com
drbd-cvs at lists.linbit.com
Fri Sep 22 21:54:02 CEST 2006
Author: lars
Date: 2006-09-22 21:54:01 +0200 (Fri, 22 Sep 2006)
New Revision: 2449
Modified:
trunk/drbd/drbd_int.h
trunk/drbd/drbd_main.c
trunk/drbd/drbd_nl.c
trunk/drbd/drbd_receiver.c
trunk/drbd/drbd_req.c
trunk/drbd/drbd_worker.c
Log:
rearranged cleanup uppon connection loss. again. :)
finally got the ap_bio_cnt and ap_pending_cnt correctly balanced. (?)
even fixed a deadlock on.
Modified: trunk/drbd/drbd_int.h
===================================================================
--- trunk/drbd/drbd_int.h 2006-09-22 13:03:15 UTC (rev 2448)
+++ trunk/drbd/drbd_int.h 2006-09-22 19:54:01 UTC (rev 2449)
@@ -1614,7 +1614,13 @@
ERR_IF_CNT_IS_NEGATIVE(unacked_cnt)
-// warning LGE "FIXME inherently racy. this is buggy by design :("
+static inline void dec_net(drbd_dev* mdev)
+{
+ if(atomic_dec_and_test(&mdev->net_cnt)) {
+ wake_up(&mdev->cstate_wait);
+ }
+}
+
/**
* inc_net: Returns TRUE when it is ok to access mdev->net_conf. You
* should call dec_net() when finished looking at mdev->net_conf.
@@ -1625,17 +1631,10 @@
atomic_inc(&mdev->net_cnt);
have_net_conf = mdev->state.conn >= Unconnected;
- if(!have_net_conf) atomic_dec(&mdev->net_cnt);
+ if(!have_net_conf) dec_net(mdev);
return have_net_conf;
}
-static inline void dec_net(drbd_dev* mdev)
-{
- if(atomic_dec_and_test(&mdev->net_cnt)) {
- wake_up(&mdev->cstate_wait);
- }
-}
-
/* strictly speaking,
* these would have to hold the req_lock while looking at
* the disk state. But since we cannot submit within a spinlock,
@@ -1676,35 +1675,29 @@
/* this throttles on-the-fly application requests
* according to max_buffers settings;
* maybe re-implement using semaphores? */
-static inline void inc_ap_bio(drbd_dev* mdev)
+static inline int drbd_get_max_buffers(drbd_dev* mdev)
{
int mxb = 1000000; /* arbitrary limit on open requests */
-
if(inc_net(mdev)) {
mxb = mdev->net_conf->max_buffers;
dec_net(mdev);
- /* decrease here already, so you can reconfigure
- * the max buffer setting even under load.
- * alternative: use rw_semaphore. I'd like that.
- */
}
+ return mxb;
+}
+static inline void inc_ap_bio(drbd_dev* mdev)
+{
+ int mxb = drbd_get_max_buffers(mdev);
wait_event( mdev->rq_wait,atomic_add_unless(&mdev->ap_bio_cnt,1,mxb) );
}
static inline void dec_ap_bio(drbd_dev* mdev)
{
+ int mxb = drbd_get_max_buffers(mdev);
int ap_bio = atomic_dec_return(&mdev->ap_bio_cnt);
- if(ap_bio == 0) wake_up(&mdev->cstate_wait);
-
- if(inc_net(mdev)) {
- if(ap_bio < mdev->net_conf->max_buffers)
- wake_up(&mdev->rq_wait);
- dec_net(mdev);
- }
-
- D_ASSERT(atomic_read(&mdev->ap_bio_cnt)>=0);
+ D_ASSERT(ap_bio>=0);
+ if (ap_bio < mxb) wake_up(&mdev->rq_wait);
}
/* FIXME does not handle wrap around yet */
Modified: trunk/drbd/drbd_main.c
===================================================================
--- trunk/drbd/drbd_main.c 2006-09-22 13:03:15 UTC (rev 2448)
+++ trunk/drbd/drbd_main.c 2006-09-22 19:54:01 UTC (rev 2449)
@@ -91,16 +91,6 @@
#include <linux/moduleparam.h>
MODULE_PARM_DESC(disable_bd_claim, "DONT USE! disables block device claiming" );
-/*
- * please somebody explain to me what the "perm" of the module_param
- * macro is good for (yes, permission for it in the "driverfs", but what
- * do we need to do for them to show up, to begin with?)
- * once I understand this, and the rest of the sysfs stuff, I probably
- * be able to understand how we can move from our ioctl interface to a
- * proper sysfs based one.
- * -- lge
- */
-
/* thanks to these macros, if compiled into the kernel (not-module),
* this becomes the boot parameter drbd.minor_count
*/
@@ -173,6 +163,7 @@
b=kmalloc(sizeof(struct drbd_barrier),GFP_KERNEL);
if(!b) return 0;
INIT_LIST_HEAD(&b->requests);
+ INIT_LIST_HEAD(&b->w.list);
b->next=0;
b->br_number=4711;
b->n_req=0;
@@ -206,10 +197,10 @@
struct drbd_barrier *newest_before;
INIT_LIST_HEAD(&new->requests);
+ INIT_LIST_HEAD(&new->w.list);
new->next=0;
new->n_req=0;
- /* mdev->newest_barrier == NULL "cannot happen". but anyways... */
newest_before = mdev->newest_barrier;
/* never send a barrier number == 0, because that is special-cased
* when using TCQ for our write ordering code */
@@ -265,7 +256,7 @@
kfree(b);
}
-/* FIXME called by whom? worker only? */
+/* called by drbd_disconnect (exiting receiver only) */
void tl_clear(drbd_dev *mdev)
{
struct drbd_barrier *b, *tmp;
@@ -277,17 +268,21 @@
while ( b ) {
struct list_head *le, *tle;
struct drbd_request *r;
+
+ ERR_IF(!list_empty(&b->w.list)) {
+ DUMPI(b->br_number);
+ }
+
list_for_each_safe(le, tle, &b->requests) {
r = list_entry(le, struct drbd_request,tl_requests);
_req_mod(r, connection_lost_while_pending);
}
tmp = b->next;
- /* FIXME can there still be requests on that ring list now?
- * funny race conditions ... */
- if (!list_empty(&b->requests)) {
- WARN("FIXME explain this race...");
- list_del(&b->requests);
- }
+
+ /* there could still be requests on that ring list,
+ * in case local io is still pending */
+ list_del(&b->requests);
+
if (b == mdev->newest_barrier) {
D_ASSERT(tmp == NULL);
b->br_number=4711;
@@ -876,6 +871,8 @@
}
// Do not change the order of the if above and below...
if (os.conn != WFBitMapS && ns.conn == WFBitMapS) {
+ /* compare with drbd_make_request_common,
+ * wait_event and inc_ap_bio */
wait_event(mdev->cstate_wait,!atomic_read(&mdev->ap_bio_cnt));
drbd_bm_lock(mdev); // {
drbd_send_bitmap(mdev);
Modified: trunk/drbd/drbd_nl.c
===================================================================
--- trunk/drbd/drbd_nl.c 2006-09-22 13:03:15 UTC (rev 2448)
+++ trunk/drbd/drbd_nl.c 2006-09-22 19:54:01 UTC (rev 2449)
@@ -1031,11 +1031,6 @@
memset(new_ee_hash, 0, ns*sizeof(void*));
}
- /* IMPROVE:
- We should warn the user if the LL_DEV is
- used already. E.g. some FS mounted on it.
- */
-
((char*)new_conf->shared_secret)[SHARED_SECRET_MAX-1]=0;
#if 0
Modified: trunk/drbd/drbd_receiver.c
===================================================================
--- trunk/drbd/drbd_receiver.c 2006-09-22 13:03:15 UTC (rev 2448)
+++ trunk/drbd/drbd_receiver.c 2006-09-22 19:54:01 UTC (rev 2449)
@@ -2445,14 +2445,49 @@
}
}
+/* FIXME how should freeze-io be handled? */
+STATIC void drbd_fail_pending_reads(drbd_dev *mdev)
+{
+ struct hlist_head *slot;
+ struct hlist_node *n;
+ drbd_request_t * req;
+ struct list_head *le;
+ LIST_HEAD(workset);
+ int i;
+
+ /*
+ * Application READ requests
+ */
+ spin_lock_irq(&mdev->req_lock);
+ for(i=0;i<APP_R_HSIZE;i++) {
+ slot = mdev->app_reads_hash+i;
+ hlist_for_each_entry(req, n, slot, colision) {
+ list_add(&req->w.list, &workset);
+ }
+ }
+ memset(mdev->app_reads_hash,0,APP_R_HSIZE*sizeof(void*));
+
+ while(!list_empty(&workset)) {
+ le = workset.next;
+ req = list_entry(le, drbd_request_t, w.list);
+ list_del(le);
+
+ _req_mod(req, connection_lost_while_pending);
+ }
+ spin_unlock_irq(&mdev->req_lock);
+}
+
STATIC void drbd_disconnect(drbd_dev *mdev)
{
enum fencing_policy fp;
- struct drbd_work *disconnect_work;
-
D_ASSERT(mdev->state.conn < Connected);
+ /* FIXME verify that:
+ * the state change magic prevents us from becoming >= Connected again
+ * while we are still cleaning up.
+ */
+ /* asender cleans up active_ee, sync_ee and done_ee */
drbd_thread_stop(&mdev->asender);
fp = DontCare;
@@ -2461,29 +2496,74 @@
dec_local(mdev);
}
- // Receiving side (may be primary, in case we had two primaries)
spin_lock_irq(&mdev->req_lock);
_drbd_wait_ee_list_empty(mdev,&mdev->read_ee);
- _drbd_wait_ee_list_empty(mdev,&mdev->active_ee);
- _drbd_wait_ee_list_empty(mdev,&mdev->sync_ee);
- _drbd_clear_done_ee(mdev);
- mdev->epoch_size = 0;
+ reclaim_net_ee(mdev);
spin_unlock_irq(&mdev->req_lock);
- // Needs to happen before we schedule the disconnect work callback,
- // Since they might have something for the worker's queue as well.
- disconnect_work = kmalloc(sizeof(struct drbd_work),GFP_KERNEL);
- if(disconnect_work) {
- disconnect_work->cb = w_disconnect;
- drbd_queue_work(&mdev->data.work,disconnect_work);
- } else {
- WARN("kmalloc failed, taking messy shortcut.\n");
- w_disconnect(mdev,NULL,1);
+ /* would be nice to be able to wait for all ee and req objects
+ * currently on the worker queue to be "canceled",
+ * but that's not really necessary. */
+
+ down(&mdev->data.mutex);
+ drbd_free_sock(mdev);
+ up(&mdev->data.mutex);
+
+ /* FIXME were is the correct place for these? tl_clear? */
+ clear_bit(ISSUE_BARRIER,&mdev->flags);
+ mdev->epoch_size = 0;
+
+ /* FIXME: fail pending reads?
+ * when we are configured for freeze io,
+ * we could retry them once we un-freeze. */
+ drbd_fail_pending_reads(mdev);
+
+ /* FIXME maybe the cleanup of the resync should go
+ * into w_make_resync_request or w_resume_next_sg
+ * or something like that. */
+
+ /* We do not have data structures that would allow us to
+ get the rs_pending_cnt down to 0 again.
+ * On SyncTarget we do not have any data structures describing
+ the pending RSDataRequest's we have sent.
+ * On SyncSource there is no data structure that tracks
+ the RSDataReply blocks that we sent to the SyncTarget.
+ And no, it is not the sum of the reference counts in the
+ resync_LRU. The resync_LRU tracks the whole operation including
+ the disk-IO, while the rs_pending_cnt only tracks the blocks
+ on the fly. */
+ drbd_rs_cancel_all(mdev);
+ mdev->rs_total=0;
+ atomic_set(&mdev->rs_pending_cnt,0);
+ wake_up(&mdev->cstate_wait);
+
+ if(!mdev->state.susp) {
+ /* FIXME should go into some "after_state_change" handler */
+ tl_clear(mdev);
}
+ D_ASSERT(atomic_read(&mdev->pp_in_use) == 0);
+ D_ASSERT(list_empty(&mdev->read_ee));
+ D_ASSERT(list_empty(&mdev->active_ee));
+ D_ASSERT(list_empty(&mdev->sync_ee));
+ D_ASSERT(list_empty(&mdev->done_ee));
+
+ if ( mdev->p_uuid ) {
+ kfree(mdev->p_uuid);
+ mdev->p_uuid = NULL;
+ }
+
+ INFO("Connection closed\n");
+
+ /* FIXME I'm not sure, there may still be a race? */
+ if(mdev->state.conn == StandAlone && mdev->net_conf ) {
+ kfree(mdev->net_conf);
+ mdev->net_conf = NULL;
+ }
+
drbd_md_sync(mdev);
- if ( mdev->state.role == Primary ) {
+ if ( mdev->state.role == Primary ) {
if( fp >= Resource &&
mdev->state.pdsk >= DUnknown ) {
drbd_disks_t nps = drbd_try_outdate_peer(mdev);
@@ -3100,6 +3180,14 @@
drbd_thread_restart_nowait(&mdev->receiver);
}
+ INFO("asender starting cleanup\n");
+
+ spin_lock_irq(&mdev->req_lock);
+ _drbd_wait_ee_list_empty(mdev,&mdev->active_ee);
+ _drbd_wait_ee_list_empty(mdev,&mdev->sync_ee);
+ _drbd_clear_done_ee(mdev);
+ spin_unlock_irq(&mdev->req_lock);
+
INFO("asender terminated\n");
return 0;
Modified: trunk/drbd/drbd_req.c
===================================================================
--- trunk/drbd/drbd_req.c 2006-09-22 13:03:15 UTC (rev 2448)
+++ trunk/drbd/drbd_req.c 2006-09-22 19:54:01 UTC (rev 2449)
@@ -716,9 +716,24 @@
drbd_request_t *req;
int local, remote;
+ /* we wait here
+ * as long as the device is suspended
+ * until the bitmap is no longer on the fly during connection handshake
+ */
+ wait_event( mdev->cstate_wait,
+ (volatile int)((mdev->state.conn < WFBitMapS ||
+ mdev->state.conn > WFBitMapT) &&
+ !mdev->state.susp ) );
+
+ /* FIXME RACE here in case of freeze io.
+ * maybe put the inc_ap_bio within the condition of the wait_event? */
+
+ /* compare with after_state_ch,
+ * os.conn != WFBitMapS && ns.conn == WFBitMapS */
+ inc_ap_bio(mdev);
+
/* allocate outside of all locks; get a "reference count" (ap_bio_cnt)
* to avoid races with the disconnect/reconnect code. */
- inc_ap_bio(mdev);
req = drbd_req_new(mdev,bio);
if (!req) {
dec_ap_bio(mdev);
@@ -729,15 +744,6 @@
return 0;
}
- /* we wait here
- * as long as the device is suspended
- * until the bitmap is no longer on the fly during connection handshake
- */
- wait_event( mdev->cstate_wait,
- (volatile int)((mdev->state.conn < WFBitMapS ||
- mdev->state.conn > WFBitMapT) &&
- !mdev->state.susp ) );
-
local = inc_local(mdev);
if (!local) {
bio_put(req->private_bio); /* or we get a bio leak */
@@ -836,6 +842,21 @@
/* GOOD, everything prepared, grab the spin_lock */
spin_lock_irq(&mdev->req_lock);
+ /* FIXME race with drbd_disconnect and tl_clear? */
+ if (remote) {
+ remote = (mdev->state.pdsk == UpToDate ||
+ ( mdev->state.pdsk == Inconsistent &&
+ mdev->state.conn >= Connected ) );
+ if (!remote) {
+ WARN("lost connection while grabbing the req_lock!\n");
+ }
+ if (!(local || remote)) {
+ ERR("IO ERROR: neither local nor remote disk\n");
+ spin_unlock_irq(&mdev->req_lock);
+ goto fail_and_free_req;
+ }
+ }
+
if (b && mdev->unused_spare_barrier == NULL) {
mdev->unused_spare_barrier = b;
b = NULL;
@@ -849,6 +870,7 @@
goto allocate_barrier;
}
+
/* _maybe_start_new_epoch(mdev);
* If we need to generate a write barrier packet, we have to add the
* new epoch (barrier) object, and queue the barrier packet for sending,
@@ -865,6 +887,11 @@
b = _tl_add_barrier(mdev,b);
mdev->unused_spare_barrier = NULL;
b->w.cb = w_send_barrier;
+ /* inc_ap_pending done here, so we won't
+ * get imbalanced on connection loss.
+ * dec_ap_pending will be done in got_BarrierAck
+ * or (on connection loss) in tl_clear. */
+ inc_ap_pending(mdev);
drbd_queue_work(&mdev->data.work, &b->w);
} else {
D_ASSERT(!(remote && rw == WRITE &&
Modified: trunk/drbd/drbd_worker.c
===================================================================
--- trunk/drbd/drbd_worker.c 2006-09-22 13:03:15 UTC (rev 2448)
+++ trunk/drbd/drbd_worker.c 2006-09-22 19:54:01 UTC (rev 2449)
@@ -389,8 +389,6 @@
INFO("Resync done (total %lu sec; paused %lu sec; %lu K/sec)\n",
dt + mdev->rs_paused, mdev->rs_paused, dbdt);
- // assert that all bit-map parts are cleared.
- D_ASSERT(list_empty(&mdev->resync->lru));
D_ASSERT(drbd_bm_total_weight(mdev) == 0);
mdev->rs_total = 0;
mdev->rs_paused = 0;
@@ -533,13 +531,12 @@
if (!drbd_get_data_sock(mdev))
return 0;
p->barrier = b->br_number;
- inc_ap_pending(mdev);
+ /* inc_ap_pending was done where this was queued.
+ * dec_ap_pending will be done in got_BarrierAck
+ * or (on connection loss) in tl_clear. */
ok = _drbd_send_cmd(mdev,mdev->data.socket,Barrier,(Drbd_Header*)p,sizeof(*p),0);
drbd_put_data_sock(mdev);
- /* pairing dec_ap_pending() happens in got_BarrierAck,
- * or (on connection loss) in tl_clear. */
-
return ok;
}
@@ -600,111 +597,6 @@
return ok;
}
-/* FIXME how should freeze-io be handled? */
-STATIC void drbd_fail_pending_reads(drbd_dev *mdev)
-{
- struct hlist_head *slot;
- struct hlist_node *n;
- drbd_request_t * req;
- struct list_head *le;
- LIST_HEAD(workset);
- int i;
-
- /*
- * Application READ requests
- */
- spin_lock_irq(&mdev->req_lock);
- for(i=0;i<APP_R_HSIZE;i++) {
- slot = mdev->app_reads_hash+i;
- hlist_for_each_entry(req, n, slot, colision) {
- list_add(&req->w.list, &workset);
- }
- }
- memset(mdev->app_reads_hash,0,APP_R_HSIZE*sizeof(void*));
-
- while(!list_empty(&workset)) {
- le = workset.next;
- req = list_entry(le, drbd_request_t, w.list);
- list_del(le);
-
- _req_mod(req, connection_lost_while_pending);
- }
- spin_unlock_irq(&mdev->req_lock);
-}
-
-/**
- * w_disconnect clean up everything, and restart the receiver.
- */
-int w_disconnect(drbd_dev *mdev, struct drbd_work *w, int cancel)
-{
- D_ASSERT(cancel);
-
- down(&mdev->data.mutex);
- /* By grabbing the sock_mutex we make sure that no one
- uses the socket right now. */
- drbd_free_sock(mdev);
- up(&mdev->data.mutex);
-
- drbd_fail_pending_reads(mdev);
- drbd_rs_cancel_all(mdev);
-
- // primary
- clear_bit(ISSUE_BARRIER,&mdev->flags);
-
- if(!mdev->state.susp) {
- tl_clear(mdev);
- D_ASSERT(mdev->oldest_barrier->n_req == 0);
-
- if(atomic_read(&mdev->ap_pending_cnt)) {
- ERR("ap_pending_cnt = %d\n",
- atomic_read(&mdev->ap_pending_cnt));
- atomic_set(&mdev->ap_pending_cnt,0);
- }
- }
-
- D_ASSERT(atomic_read(&mdev->pp_in_use) == 0);
- D_ASSERT(list_empty(&mdev->read_ee));
- D_ASSERT(list_empty(&mdev->active_ee)); // done here
- D_ASSERT(list_empty(&mdev->sync_ee)); // done here
- D_ASSERT(list_empty(&mdev->done_ee)); // done here
-
- mdev->rs_total=0;
-
- if(atomic_read(&mdev->unacked_cnt)) {
- ERR("unacked_cnt = %d\n",atomic_read(&mdev->unacked_cnt));
- atomic_set(&mdev->unacked_cnt,0);
- }
-
- /* We do not have data structures that would allow us to
- get the rs_pending_cnt down to 0 again.
- * On SyncTarget we do not have any data structures describing
- the pending RSDataRequest's we have sent.
- * On SyncSource there is no data structure that tracks
- the RSDataReply blocks that we sent to the SyncTarget.
- And no, it is not the sum of the reference counts in the
- resync_LRU. The resync_LRU tracks the whole operation including
- the disk-IO, while the rs_pending_cnt only tracks the blocks
- on the fly. */
- atomic_set(&mdev->rs_pending_cnt,0);
- wake_up(&mdev->cstate_wait);
-
- if ( mdev->p_uuid ) {
- kfree(mdev->p_uuid);
- mdev->p_uuid = NULL;
- }
-
- INFO("Connection closed\n");
-
- if(w) kfree(w);
-
- if(mdev->state.conn == StandAlone && mdev->net_conf ) {
- kfree(mdev->net_conf);
- mdev->net_conf = NULL;
- }
-
- return 1;
-}
-
STATIC void drbd_global_lock(void)
{
drbd_dev *mdev;
More information about the drbd-cvs
mailing list