[DRBD-cvs] svn commit by lars - r2553 - trunk/drbd - when Primary
and sync target, the worker could "deadloc
drbd-cvs at lists.linbit.com
drbd-cvs at lists.linbit.com
Fri Oct 20 20:07:36 CEST 2006
Author: lars
Date: 2006-10-20 20:07:34 +0200 (Fri, 20 Oct 2006)
New Revision: 2553
Modified:
trunk/drbd/drbd_actlog.c
trunk/drbd/drbd_int.h
trunk/drbd/drbd_main.c
trunk/drbd/drbd_receiver.c
trunk/drbd/drbd_req.c
trunk/drbd/drbd_worker.c
trunk/drbd/lru_cache.c
trunk/drbd/lru_cache.h
Log:
when Primary and sync target, the worker could "deadlock" in
w_make_resync_requst (drbd_rs_begin_io), waiting for application
requsts to complete which are still to be sent by the worker. sic.
this implementation may not yet be optimal, but it seems to work.
Modified: trunk/drbd/drbd_actlog.c
===================================================================
--- trunk/drbd/drbd_actlog.c 2006-10-19 16:28:12 UTC (rev 2552)
+++ trunk/drbd/drbd_actlog.c 2006-10-20 18:07:34 UTC (rev 2553)
@@ -696,7 +696,7 @@
}
} else {
ERR("lc_get() failed! locked=%d/%d flags=%lu\n",
- atomic_read(&mdev->resync_locked),
+ mdev->resync_locked,
mdev->resync->nr_elements,
mdev->resync->flags);
}
@@ -841,28 +841,28 @@
int wakeup = 0;
unsigned long rs_flags;
- if(atomic_read(&mdev->resync_locked) > mdev->resync->nr_elements-3 ) {
+ spin_lock_irq(&mdev->al_lock);
+ if (mdev->resync_locked > mdev->resync->nr_elements-3) {
//WARN("bme_get() does not lock all elements\n");
- return 0;
+ spin_unlock_irq(&mdev->al_lock);
+ return NULL;
}
-
- spin_lock_irq(&mdev->al_lock);
bm_ext = (struct bm_extent*) lc_get(mdev->resync,enr);
if (bm_ext) {
- if(bm_ext->lce.lc_number != enr) {
+ if (bm_ext->lce.lc_number != enr) {
bm_ext->rs_left = drbd_bm_e_weight(mdev,enr);
bm_ext->rs_failed = 0;
lc_changed(mdev->resync,(struct lc_element*)bm_ext);
wakeup = 1;
}
- if(bm_ext->lce.refcnt == 1) atomic_inc(&mdev->resync_locked);
- set_bit(BME_NO_WRITES,&bm_ext->flags); // within the lock
+ if (bm_ext->lce.refcnt == 1) mdev->resync_locked++;
+ set_bit(BME_NO_WRITES,&bm_ext->flags);
}
rs_flags=mdev->resync->flags;
spin_unlock_irq(&mdev->al_lock);
if (wakeup) wake_up(&mdev->al_wait);
- if(!bm_ext) {
+ if (!bm_ext) {
if (rs_flags & LC_STARVING) {
WARN("Have to wait for element"
" (resync LRU too small?)\n");
@@ -903,6 +903,10 @@
* to BME_LOCKED.
*
* @sector: The sector number
+ *
+ * sleeps on al_wait.
+ * returns 1 if successful.
+ * returns 0 if interrupted.
*/
int drbd_rs_begin_io(drbd_dev* mdev, sector_t sector)
{
@@ -928,7 +932,7 @@
spin_lock_irq(&mdev->al_lock);
if( lc_put(mdev->resync,&bm_ext->lce) == 0 ) {
clear_bit(BME_NO_WRITES,&bm_ext->flags);
- atomic_dec(&mdev->resync_locked);
+ mdev->resync_locked--;
wake_up(&mdev->al_wait);
}
spin_unlock_irq(&mdev->al_lock);
@@ -941,6 +945,112 @@
return 1;
}
+/**
+ * drbd_try_rs_begin_io: Gets an extent in the resync LRU cache, sets it
+ * to BME_NO_WRITES, then tries to set it to BME_LOCKED.
+ *
+ * @sector: The sector number
+ *
+ * does not sleep.
+ * returns zero if we could set BME_LOCKED and can proceed,
+ * -EAGAIN if we need to try again.
+ */
+int drbd_try_rs_begin_io(drbd_dev* mdev, sector_t sector)
+{
+ unsigned int enr = BM_SECT_TO_EXT(sector);
+ const unsigned int al_enr = enr*AL_EXT_PER_BM_SECT;
+ struct bm_extent* bm_ext;
+ int i;
+
+ spin_lock_irq(&mdev->al_lock);
+ if (mdev->resync_wenr != LC_FREE) {
+ /* in case you have very heavy scattered io, it may
+ * stall the syncer undefined if we giveup the ref count
+ * when we try again and requeue.
+ *
+ * if we don't give up the refcount, but the next time
+ * we are scheduled this extent has been "synced" by new
+ * application writes, we'd miss the lc_put on the
+ * extent we keept the refcount on.
+ * so we remembered which extent we had to try agin, and
+ * if the next requested one is something else, we do
+ * the lc_put here...
+ * we also have to wake_up
+ */
+ bm_ext = (struct bm_extent*)lc_find(mdev->resync,mdev->resync_wenr);
+ if (bm_ext) {
+ D_ASSERT(!test_bit(BME_LOCKED,&bm_ext->flags));
+ D_ASSERT(test_bit(BME_NO_WRITES,&bm_ext->flags));
+ clear_bit(BME_NO_WRITES,&bm_ext->flags);
+ mdev->resync_wenr = LC_FREE;
+ lc_put(mdev->resync,&bm_ext->lce);
+ wake_up(&mdev->al_wait);
+ } else {
+ ALERT("LOGIC BUG\n");
+ }
+ }
+ bm_ext = (struct bm_extent*)lc_try_get(mdev->resync,enr);
+ if (bm_ext) {
+ if (test_bit(BME_LOCKED,&bm_ext->flags)) {
+ goto proceed;
+ }
+ if (!test_and_set_bit(BME_NO_WRITES,&bm_ext->flags)) {
+ mdev->resync_locked++;
+ } else {
+ /* we did set the BME_NO_WRITES,
+ * but then could not set BME_LOCKED,
+ * so we tried again.
+ * drop the extra reference. */
+ bm_ext->lce.refcnt--;
+ D_ASSERT(bm_ext->lce.refcnt > 0);
+ }
+ goto check_al;
+ } else {
+ if (mdev->resync_locked > mdev->resync->nr_elements-3)
+ goto try_again;
+ bm_ext = (struct bm_extent*)lc_get(mdev->resync,enr);
+ if (!bm_ext) {
+ const unsigned long rs_flags = mdev->resync->flags;
+ if (rs_flags & LC_STARVING) {
+ WARN("Have to wait for element"
+ " (resync LRU too small?)\n");
+ }
+ if (rs_flags & LC_DIRTY) {
+ BUG(); // WARN("Ongoing RS update (???)\n");
+ }
+ goto try_again;
+ }
+ if (bm_ext->lce.lc_number != enr) {
+ bm_ext->rs_left = drbd_bm_e_weight(mdev,enr);
+ bm_ext->rs_failed = 0;
+ lc_changed(mdev->resync,(struct lc_element*)bm_ext);
+ wake_up(&mdev->al_wait);
+ D_ASSERT(test_bit(BME_LOCKED,&bm_ext->flags) == 0);
+ }
+ set_bit(BME_NO_WRITES,&bm_ext->flags);
+ D_ASSERT(bm_ext->lce.refcnt == 1);
+ mdev->resync_locked++;
+ goto check_al;
+ }
+ check_al:
+ for (i=0;i<AL_EXT_PER_BM_SECT;i++) {
+ if (unlikely(al_enr+i == mdev->act_log->new_number))
+ goto try_again;
+ if (lc_is_used(mdev->act_log,al_enr+i))
+ goto try_again;
+ }
+ set_bit(BME_LOCKED,&bm_ext->flags);
+ proceed:
+ mdev->resync_wenr = LC_FREE;
+ spin_unlock_irq(&mdev->al_lock);
+ return 0;
+
+ try_again:
+ if (bm_ext) mdev->resync_wenr = enr;
+ spin_unlock_irq(&mdev->al_lock);
+ return -EAGAIN;
+}
+
void drbd_rs_complete_io(drbd_dev* mdev, sector_t sector)
{
unsigned int enr = BM_SECT_TO_EXT(sector);
@@ -963,7 +1073,7 @@
if( lc_put(mdev->resync,(struct lc_element *)bm_ext) == 0 ) {
clear_bit(BME_LOCKED,&bm_ext->flags);
clear_bit(BME_NO_WRITES,&bm_ext->flags);
- atomic_dec(&mdev->resync_locked);
+ mdev->resync_locked--;
wake_up(&mdev->al_wait);
}
@@ -998,7 +1108,8 @@
mdev->resync->used=0;
dec_local(mdev);
}
- atomic_set(&mdev->resync_locked,0);
+ mdev->resync_locked = 0;
+ mdev->resync_wenr = LC_FREE;
spin_unlock_irq(&mdev->al_lock);
wake_up(&mdev->al_wait);
}
Modified: trunk/drbd/drbd_int.h
===================================================================
--- trunk/drbd/drbd_int.h 2006-10-19 16:28:12 UTC (rev 2552)
+++ trunk/drbd/drbd_int.h 2006-10-20 18:07:34 UTC (rev 2553)
@@ -809,14 +809,12 @@
struct Drbd_thread asender;
struct drbd_bitmap* bitmap;
struct lru_cache* resync; // Used to track operations of resync...
- atomic_t resync_locked; // Number of locked elements in resync LRU
+ unsigned int resync_locked; // Number of locked elements in resync LRU
+ unsigned int resync_wenr; // resync extent number waiting for application requests
int open_cnt;
u64 *p_uuid;
- /* no more ee_lock
- * we had to grab both req_lock _and_ ee_lock in almost every place we
- * needed one of them. so why bother having too spinlocks?
- * FIXME clean comments, restructure so it is more obvious which
- * members areprotected by what */
+ /* FIXME clean comments, restructure so it is more obvious which
+ * members are protected by what */
unsigned int epoch_size;
struct list_head active_ee; // IO in progress
struct list_head sync_ee; // IO in progress
@@ -1319,6 +1317,7 @@
extern void drbd_al_complete_io(struct Drbd_Conf *mdev, sector_t sector);
extern void drbd_rs_complete_io(struct Drbd_Conf *mdev, sector_t sector);
extern int drbd_rs_begin_io(struct Drbd_Conf *mdev, sector_t sector);
+extern int drbd_try_rs_begin_io(struct Drbd_Conf *mdev, sector_t sector);
extern void drbd_rs_cancel_all(drbd_dev* mdev);
extern void drbd_rs_del_all(drbd_dev* mdev);
extern void drbd_rs_failed_io(drbd_dev* mdev, sector_t sector, int size);
Modified: trunk/drbd/drbd_main.c
===================================================================
--- trunk/drbd/drbd_main.c 2006-10-19 16:28:12 UTC (rev 2552)
+++ trunk/drbd/drbd_main.c 2006-10-20 18:07:34 UTC (rev 2553)
@@ -1031,7 +1031,12 @@
}
}
- /* it feels better to have the module_put last ... */
+ /* FIXME what about Primary, Diskless, and then losing
+ * the connection? since we survive that "somehow",
+ * maybe we may not stop the worker yet,
+ * since that would call drbd_mdev_cleanup.
+ * after which we probably won't survive the next
+ * request from the upper layers ... BOOM again :( */
if ( (os.disk > Diskless || os.conn > StandAlone) &&
ns.disk == Diskless && ns.conn == StandAlone ) {
drbd_thread_stop_nowait(&mdev->worker);
@@ -1925,7 +1930,6 @@
atomic_set(&mdev->unacked_cnt,0);
atomic_set(&mdev->local_cnt,0);
atomic_set(&mdev->net_cnt,0);
- atomic_set(&mdev->resync_locked,0);
atomic_set(&mdev->packet_seq,0);
atomic_set(&mdev->pp_in_use, 0);
Modified: trunk/drbd/drbd_receiver.c
===================================================================
--- trunk/drbd/drbd_receiver.c 2006-10-19 16:28:12 UTC (rev 2552)
+++ trunk/drbd/drbd_receiver.c 2006-10-20 18:07:34 UTC (rev 2553)
@@ -2477,9 +2477,7 @@
drbd_cmd_handler_f handler;
Drbd_Header *header = &mdev->data.rbuf.head;
- /* FIXME test for thread state == RUNNING here,
- * in case we missed some signal! */
- for (;;) {
+ while (get_t_state(&mdev->receiver) == Running) {
if (!drbd_recv_header(mdev,header))
break;
Modified: trunk/drbd/drbd_req.c
===================================================================
--- trunk/drbd/drbd_req.c 2006-10-19 16:28:12 UTC (rev 2552)
+++ trunk/drbd/drbd_req.c 2006-10-20 18:07:34 UTC (rev 2553)
@@ -243,6 +243,16 @@
drbd_set_in_sync(mdev,req->sector,req->size);
}
+ /* one might be tempted to move the drbd_al_complete_io
+ * to the local io completion callback drbd_endio_pri.
+ * but, if this was a mirror write, we may only
+ * drbd_al_complete_io after this is RQ_NET_DONE,
+ * otherwise the extent could be dropped from the al
+ * before it has actually been written on the peer.
+ * if we crash before our peer knows about the request,
+ * but after the extent has been dropped from the al,
+ * we would forget to resync the corresponding extent.
+ */
if (s & RQ_LOCAL_MASK) {
drbd_al_complete_io(mdev, req->sector);
}
Modified: trunk/drbd/drbd_worker.c
===================================================================
--- trunk/drbd/drbd_worker.c 2006-10-19 16:28:12 UTC (rev 2552)
+++ trunk/drbd/drbd_worker.c 2006-10-20 18:07:34 UTC (rev 2553)
@@ -305,13 +305,12 @@
sector = BM_BIT_TO_SECT(bit);
- if(!drbd_rs_begin_io(mdev,sector)) {
- // we have been interrupted, probably connection lost!
- D_ASSERT(signal_pending(current));
- return 0;
+ if (drbd_try_rs_begin_io(mdev, sector)) {
+ drbd_bm_set_find(mdev,bit);
+ goto requeue;
}
- if(unlikely( drbd_bm_test_bit(mdev,bit) == 0 )) {
+ if (unlikely(drbd_bm_test_bit(mdev,bit) == 0 )) {
//INFO("Block got synced while in drbd_rs_begin_io()\n");
drbd_rs_complete_io(mdev,sector);
goto next_sector;
@@ -367,7 +366,7 @@
sector,size,ID_SYNCER)) {
ERR("drbd_send_drequest() failed, aborting...\n");
dec_rs_pending(mdev);
- return 0; // FAILED. worker will abort!
+ return 0;
}
}
@@ -855,7 +854,7 @@
sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
- for (;;) {
+ while (get_t_state(thi) == Running) {
intr = down_interruptible(&mdev->data.work.s);
if (intr) {
@@ -921,6 +920,11 @@
* So don't do that.
*/
spin_unlock_irq(&mdev->data.work.q_lock);
+ /* FIXME verify that there absolutely can not be any more work
+ * on the queue now...
+ * if so, the comment above is no longer true, but historic
+ * from the times when the worker did not live as long as the
+ * device.. */
D_ASSERT( mdev->state.disk == Diskless && mdev->state.conn == StandAlone );
drbd_mdev_cleanup(mdev);
Modified: trunk/drbd/lru_cache.c
===================================================================
--- trunk/drbd/lru_cache.c 2006-10-19 16:28:12 UTC (rev 2552)
+++ trunk/drbd/lru_cache.c 2006-10-20 18:07:34 UTC (rev 2553)
@@ -260,6 +260,32 @@
RETURN(e);
}
+/* similar to lc_get,
+ * but only gets a new reference on an existing element.
+ * you either get the requested element, or NULL.
+ */
+struct lc_element* lc_try_get(struct lru_cache* lc, unsigned int enr)
+{
+ struct lc_element *e;
+
+ BUG_ON(!lc);
+ BUG_ON(!lc->nr_elements);
+
+ PARANOIA_ENTRY();
+ if ( lc->flags & LC_STARVING ) {
+ ++lc->starving;
+ RETURN(NULL);
+ }
+
+ e = lc_find(lc, enr);
+ if (e) {
+ ++lc->hits;
+ if( e->refcnt++ == 0) lc->used++;
+ list_move(&e->list,&lc->in_use); // Not evictable...
+ }
+ RETURN(e);
+}
+
void lc_changed(struct lru_cache* lc, struct lc_element* e)
{
PARANOIA_ENTRY();
Modified: trunk/drbd/lru_cache.h
===================================================================
--- trunk/drbd/lru_cache.h 2006-10-19 16:28:12 UTC (rev 2552)
+++ trunk/drbd/lru_cache.h 2006-10-20 18:07:34 UTC (rev 2553)
@@ -104,6 +104,7 @@
extern void lc_set (struct lru_cache* lc, unsigned int enr, int index);
extern void lc_del (struct lru_cache* lc, struct lc_element *element);
+extern struct lc_element* lc_try_get(struct lru_cache* lc, unsigned int enr);
extern struct lc_element* lc_find(struct lru_cache* lc, unsigned int enr);
extern struct lc_element* lc_get (struct lru_cache* lc, unsigned int enr);
extern unsigned int lc_put (struct lru_cache* lc, struct lc_element* e);
@@ -130,8 +131,14 @@
smp_mb__after_clear_bit();
}
-#define LC_FREE (-1)
+static inline int lc_is_used(struct lru_cache* lc, unsigned int enr)
+{
+ struct lc_element* e = lc_find(lc,enr);
+ return (e && e->refcnt);
+}
+#define LC_FREE (-1U)
+
#define lc_e_base(lc) ((char*) ( (lc)->slot + (lc)->nr_elements ) )
#define lc_entry(lc,i) ((struct lc_element*) \
(lc_e_base(lc) + (i)*(lc)->element_size))
More information about the drbd-cvs
mailing list