[DRBD-cvs] drbd by phil; * drbd_threads are thread safe now :) * ...
drbd-user@lists.linbit.com
drbd-user@lists.linbit.com
Wed, 5 May 2004 19:07:07 +0200 (CEST)
DRBD CVS committal
Author : phil
Module : drbd
Dir : drbd/drbd
Modified Files:
Tag: rel-0_7-branch
drbd_actlog.c drbd_dsender.c drbd_int.h drbd_main.c
drbd_proc.c drbd_receiver.c
Log Message:
* drbd_threads are thread safe now :)
* drbd_rs_begin_io is now INTERRUPTIBLE, to aviod deadlock in drbd_disconnect()
* worker's cleanup improved
* Fixed a painfull bug in drbd_cleanup_ee()
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_actlog.c,v
retrieving revision 1.1.2.97
retrieving revision 1.1.2.98
diff -u -3 -r1.1.2.97 -r1.1.2.98
--- drbd_actlog.c 4 May 2004 08:44:15 -0000 1.1.2.97
+++ drbd_actlog.c 5 May 2004 17:07:02 -0000 1.1.2.98
@@ -773,21 +773,34 @@
*
* @sector: The sector number
*/
-void drbd_rs_begin_io(drbd_dev* mdev, sector_t sector)
+int drbd_rs_begin_io(drbd_dev* mdev, sector_t sector)
{
unsigned int enr = (sector >> (BM_EXTENT_SIZE_B-9));
struct bm_extent* bm_ext;
int i;
- wait_event(mdev->al_wait, (bm_ext = _bme_get(mdev,enr)) );
+ if( wait_event_interruptible(mdev->al_wait,
+ (bm_ext = _bme_get(mdev,enr)) ) ) {
+ return 0;
+ }
- if(test_bit(BME_LOCKED,&bm_ext->flags)) return;
+ if(test_bit(BME_LOCKED,&bm_ext->flags)) return 1;
for(i=0;i<SM;i++) {
- wait_event(mdev->al_wait, !_is_in_al(mdev,enr*SM+i) );
+ if( wait_event_interruptible(mdev->al_wait,
+ !_is_in_al(mdev,enr*SM+i) ) ) {
+ if( lc_put(mdev->resync,&bm_ext->lce) == 0 ) {
+ clear_bit(BME_NO_WRITES,&bm_ext->flags);
+ atomic_dec(&mdev->resync_locked);
+ wake_up(&mdev->al_wait);
+ return 0;
+ }
+ }
}
set_bit(BME_LOCKED,&bm_ext->flags);
+
+ return 1;
}
void drbd_rs_complete_io(drbd_dev* mdev, sector_t sector)
@@ -828,13 +841,13 @@
for(i=0;i<mdev->resync->nr_elements;i++) {
bm_ext = (struct bm_extent*) lc_entry(mdev->resync,i);
if(bm_ext->lce.lc_number == LC_FREE) continue;
+ atomic_sub(bm_ext->lce.refcnt,&mdev->rs_pending_cnt);
bm_ext->lce.refcnt = 0; // Rude but ok.
bm_ext->rs_left = 0;
clear_bit(BME_LOCKED,&bm_ext->flags);
clear_bit(BME_NO_WRITES,&bm_ext->flags);
lc_del(mdev->resync,&bm_ext->lce);
}
-
- wake_up(&mdev->al_wait);
spin_unlock_irq(&mdev->al_lock);
+ wake_up(&mdev->al_wait);
}
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_dsender.c,v
retrieving revision 1.1.2.103
retrieving revision 1.1.2.104
diff -u -3 -r1.1.2.103 -r1.1.2.104
--- drbd_dsender.c 5 May 2004 13:25:15 -0000 1.1.2.103
+++ drbd_dsender.c 5 May 2004 17:07:02 -0000 1.1.2.104
@@ -479,7 +479,8 @@
return 1;
}
- drbd_rs_begin_io(mdev,sector);
+ if(!drbd_rs_begin_io(mdev,sector)) return 0;
+
if(unlikely(!bm_get_bit(mdev->mbds_id,sector,BM_BLOCK_SIZE))) {
//INFO("Block got synced while in drbd_rs_begin_io()\n");
drbd_rs_complete_io(mdev,sector);
@@ -541,6 +542,7 @@
spin_lock_irq(&mdev->ee_lock);
drbd_put_ee(mdev,e);
spin_unlock_irq(&mdev->ee_lock);
+ dec_unacked(mdev,HERE);
return 1;
}
@@ -571,6 +573,7 @@
spin_lock_irq(&mdev->ee_lock);
drbd_put_ee(mdev,e);
spin_unlock_irq(&mdev->ee_lock);
+ dec_unacked(mdev,HERE);
return 1;
}
@@ -834,7 +837,8 @@
{
drbd_dev *mdev = thi->mdev;
struct drbd_work *w = 0;
- int intr;
+ LIST_HEAD(work_list);
+ int intr,i;
sprintf(current->comm, "drbd%d_worker", (int)(mdev-drbd_conf));
@@ -859,7 +863,7 @@
ERR_IF (get_t_state(thi) != Running)
break;
-
+
// if (need_resched()) schedule();
w = 0;
@@ -881,17 +885,22 @@
drbd_wait_ee(mdev,&mdev->read_ee);
-
- while(!down_trylock(&mdev->data.work.s)) {
- spin_lock_irq(&mdev->req_lock);
- if (!list_empty(&mdev->data.work.q)) {
- w = list_entry(mdev->data.work.q.next,
- struct drbd_work,list);
- list_del_init(&w->list);
- }
- spin_unlock_irq(&mdev->req_lock);
+ spin_lock_irq(&mdev->req_lock);
+ list_splice_init(&mdev->data.work.q,&work_list);
+ spin_unlock_irq(&mdev->req_lock);
+
+ i = 0;
+ while(!list_empty(&work_list)) {
+ w = list_entry(work_list.next, struct drbd_work,list);
+ list_del_init(&w->list);
w->cb(mdev,w,1);
+ i++;
}
+
+ spin_lock_irq(&mdev->req_lock);
+ D_ASSERT(list_empty(&mdev->data.work.q));
+ sema_init(&mdev->data.work.s,0);
+ spin_unlock_irq(&mdev->req_lock);
INFO("worker terminated\n");
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_int.h,v
retrieving revision 1.58.2.157
retrieving revision 1.58.2.158
diff -u -3 -r1.58.2.157 -r1.58.2.158
--- drbd_int.h 5 May 2004 13:25:15 -0000 1.58.2.157
+++ drbd_int.h 5 May 2004 17:07:02 -0000 1.58.2.158
@@ -894,12 +894,14 @@
// drbd_proc.c
extern struct proc_dir_entry *drbd_proc;
extern int drbd_proc_get_info(char *, char **, off_t, int, int *, void *);
+extern const char* cstate_to_name(Drbd_CState s);
+extern const char* nodestate_to_name(Drbd_State s);
// drbd_actlog.c
extern void drbd_al_begin_io(struct Drbd_Conf *mdev, sector_t sector);
extern void drbd_al_complete_io(struct Drbd_Conf *mdev, sector_t sector);
extern void drbd_rs_complete_io(struct Drbd_Conf *mdev, sector_t sector);
-extern void drbd_rs_begin_io(struct Drbd_Conf *mdev, sector_t sector);
+extern int drbd_rs_begin_io(struct Drbd_Conf *mdev, sector_t sector);
extern void drbd_rs_cancel_all(drbd_dev* mdev);
extern void drbd_al_read_log(struct Drbd_Conf *mdev);
extern void drbd_set_in_sync(drbd_dev* mdev, sector_t sector,int blk_size);
@@ -1196,16 +1198,21 @@
bm_set_bit(mdev, sector, blk_size, SS_OUT_OF_SYNC);
}
-#if 0
+#ifdef DUMP_ALL_PACKETS
/*
* enable to dump information about every packet exchange.
*/
+#define INFOP(fmt, args...) \
+ INFO("%s:%d: %s [%d] %s %s " fmt , \
+ file, line, current->comm, current->pid, \
+ sockname, recv?"<<<":">>>" \
+ , ## args )
static inline void
dump_packet(drbd_dev *mdev, struct socket *sock,
- int recv, Drbd_Polymorph_Packet *p)
+ int recv, Drbd_Polymorph_Packet *p, char* file, int line)
{
char *sockname = sock == mdev->meta.socket ? "meta" : "data";
- int cmd = be16_to_cpu(p->head.command);
+ int cmd = (recv == 2) ? p->head.command : be16_to_cpu(p->head.command);
switch (cmd) {
case Ping:
case PingAck:
@@ -1215,8 +1222,10 @@
case SyncParam:
case ReportParams:
- INFO(" %s [%d] %s %s %s\n", current->comm, current->pid,
- sockname, recv?"<<<":">>>", cmdname(cmd));
+ INFOP("%s\n", cmdname(cmd));
+ break;
+
+ case ReportBitMap: /* don't report this */
break;
case Data:
@@ -1229,31 +1238,23 @@
case DataRequest:
case RSDataRequest:
- INFO(" %s [%d] %s %s %s (%lu,%lx)\n", current->comm, current->pid,
- sockname, recv?"<<<":">>>", cmdname(cmd),
- (long)be64_to_cpu(p->Data.sector), (long)p->Data.block_id
+ INFOP("%s (%lu,%llx)\n", cmdname(cmd),
+ (long)be64_to_cpu(p->Data.sector), (long long)p->Data.block_id
);
break;
case Barrier:
case BarrierAck:
- INFO(" %s [%d] %s %s %s (%u)\n", current->comm, current->pid,
- sockname, recv?"<<<":">>>", cmdname(cmd),
- p->Barrier.barrier
- );
+ INFOP("%s (%u)\n", cmdname(cmd), p->Barrier.barrier);
break;
default:
- INFO(" %s [%d] %s %s %s (%u)\n", current->comm, current->pid,
- sockname, recv?"<<<":">>>", cmdname(cmd), cmd
- );
+ INFOP("%s (%u)\n",cmdname(cmd), cmd);
break;
}
}
#else
-static inline void
-dump_packet(drbd_dev *mdev, struct socket *sock,
- int recv, Drbd_Polymorph_Packet *p) { /* DO NOTHING */ }
+#define dump_packet(ignored...) ((void)0)
#endif
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_main.c,v
retrieving revision 1.73.2.161
retrieving revision 1.73.2.162
diff -u -3 -r1.73.2.161 -r1.73.2.162
--- drbd_main.c 5 May 2004 13:25:15 -0000 1.73.2.161
+++ drbd_main.c 5 May 2004 17:07:02 -0000 1.73.2.162
@@ -388,6 +388,10 @@
Drbd_CState os;
os = mdev->cstate;
+
+ INFO("%s [%d]: cstate %s --> %s\n", current->comm, current->pid,
+ cstate_to_name(os), cstate_to_name(ns) );
+
mdev->cstate = ns;
smp_mb();
wake_up(&mdev->cstate_wait);
@@ -409,9 +413,10 @@
int retval;
drbd_daemonize();
- D_ASSERT(get_t_state(thi) == Running);
+ D_ASSERT(get_t_state(thi) == None);
D_ASSERT(thi->task == NULL);
thi->task = current;
+ thi->t_state = Running;
smp_mb();
complete(&thi->startstop); // notify: thi->task is set.
@@ -419,7 +424,8 @@
spin_lock(&thi->t_lock);
thi->task = 0;
- D_ASSERT(thi->t_state == Exiting);
+ thi->t_state = Exiting;
+ smp_mb();
spin_unlock(&thi->t_lock);
// THINK maybe two different completions?
@@ -446,10 +452,17 @@
drbd_dev *mdev = thi->mdev;
spin_lock(&thi->t_lock);
+
+ /* INFO("%s [%d]: %s %d -> Running\n",
+ current->comm, current->pid,
+ thi == &mdev->receiver ? "receiver" :
+ thi == &mdev->asender ? "asender" :
+ thi == &mdev->worker ? "worker" : "NONSENSE",
+ thi->t_state); */
+
if (thi->t_state == None) {
D_ASSERT(thi->task == NULL);
// XXX D_ASSERT( thi->startstop something ? )
- thi->t_state = Running;
spin_unlock(&thi->t_lock);
pid = kernel_thread(drbd_thread_setup, (void *) thi, CLONE_FS);
@@ -458,18 +471,27 @@
return;
}
wait_for_completion(&thi->startstop); // waits until thi->task is set
+ D_ASSERT(thi->task);
+ D_ASSERT(get_t_state(thi) == Running);
} else {
spin_unlock(&thi->t_lock);
}
+
}
void _drbd_thread_stop(struct Drbd_thread *thi, int restart,int wait)
{
drbd_dev *mdev = thi->mdev;
-
Drbd_thread_state ns = restart ? Restarting : Exiting;
+
spin_lock(&thi->t_lock);
+
+ /* INFO("%s [%d]: %s %d -> %d; %d\n",
+ current->comm, current->pid,
+ thi->task ? thi->task->comm : "NULL", thi->t_state, ns, wait); */
+
+
if (thi->t_state == None) {
spin_unlock(&thi->t_lock);
return;
@@ -502,6 +524,7 @@
wait_for_completion(&thi->startstop);
spin_lock(&thi->t_lock);
thi->t_state = None;
+ smp_mb();
D_ASSERT(thi->task == NULL);
spin_unlock(&thi->t_lock);
}
@@ -543,6 +566,7 @@
h->command = cpu_to_be16(cmd);
h->length = cpu_to_be16(size-sizeof(Drbd_Header));
+ dump_packet(mdev,sock,0,(void*)h, __FILE__, __LINE__);
sent = drbd_send(mdev,sock,h,size,msg_flags);
ok = ( sent == size );
@@ -550,7 +574,6 @@
ERR("short sent %s size=%d sent=%d\n",
cmdname(cmd), (int)size, sent);
}
- dump_packet(mdev,sock,0,(void*)h);
return ok;
}
@@ -882,6 +905,7 @@
mdev->send_task=current;
spin_unlock(&mdev->send_task_lock);
+ dump_packet(mdev,mdev->data.socket,0,(void*)&p, __FILE__, __LINE__);
ok = (drbd_send(mdev,mdev->data.socket,&p,sizeof(p),MSG_MORE) == sizeof(p))
&& _drbd_send_zc_bio(mdev,&e->private_bio);
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_proc.c,v
retrieving revision 1.8.2.23
retrieving revision 1.8.2.24
diff -u -3 -r1.8.2.23 -r1.8.2.24
--- drbd_proc.c 1 Apr 2004 06:47:38 -0000 1.8.2.23
+++ drbd_proc.c 5 May 2004 17:07:02 -0000 1.8.2.24
@@ -124,18 +124,14 @@
return sz;
}
-int drbd_proc_get_info(char *buf, char **start, off_t offset,
- int len, int *unused, void *data)
-{
- int rlen, i;
- const char *sn;
-
+const char* cstate_to_name(Drbd_CState s) {
static const char *cstate_names[] = {
[Unconfigured] = "Unconfigured",
[StandAlone] = "StandAlone",
[Unconnected] = "Unconnected",
[Timeout] = "Timeout",
[BrokenPipe] = "BrokenPipe",
+ [NetworkFailure] = "NetworkFailure",
[WFConnection] = "WFConnection",
[WFReportParams] = "WFReportParams",
[Connected] = "Connected",
@@ -148,12 +144,29 @@
[PausedSyncS] = "PausedSyncS",
[PausedSyncT] = "PausedSyncT",
};
+
+ return s < Unconfigured ? "TO_SMALL" :
+ s > PausedSyncS ? "TO_LARGE"
+ : cstate_names[s];
+}
+
+const char* nodestate_to_name(Drbd_State s) {
static const char *state_names[] = {
[Primary] = "Primary",
[Secondary] = "Secondary",
[Unknown] = "Unknown"
};
+ return s < Unknown ? "TO_SMALL" :
+ s > Secondary ? "TO_LARGE"
+ : state_names[s];
+}
+
+int drbd_proc_get_info(char *buf, char **start, off_t offset,
+ int len, int *unused, void *data)
+{
+ int rlen, i;
+ const char *sn;
rlen = sprintf(buf, "version: " REL_VERSION " (api:%d/proto:%d)\n\n",
API_VERSION,PRO_VERSION);
@@ -172,7 +185,7 @@
*/
for (i = 0; i < minor_count; i++) {
- sn = cstate_names[drbd_conf[i].cstate];
+ sn = cstate_to_name(drbd_conf[i].cstate);
if(drbd_conf[i].cstate == Connected) {
if(test_bit(DISKLESS,&drbd_conf[i].flags))
sn = "DiskLessClient";
@@ -188,8 +201,8 @@
" ns:%u nr:%u dw:%u dr:%u al:%u bm:%u "
"lo:%d pe:%d ua:%d\n",
i, sn,
- state_names[drbd_conf[i].state],
- state_names[drbd_conf[i].o_state],
+ nodestate_to_name(drbd_conf[i].state),
+ nodestate_to_name(drbd_conf[i].o_state),
(drbd_conf[i].gen_cnt[Flags]
& MDF_Consistent) ? "Consistent" : "Inconsistent",
drbd_conf[i].send_cnt/2,
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_receiver.c,v
retrieving revision 1.97.2.142
retrieving revision 1.97.2.143
diff -u -3 -r1.97.2.142 -r1.97.2.143
--- drbd_receiver.c 5 May 2004 13:25:15 -0000 1.97.2.142
+++ drbd_receiver.c 5 May 2004 17:07:02 -0000 1.97.2.143
@@ -372,12 +372,11 @@
le = mdev->done_ee.next;
list_del(le);
e = list_entry(le, struct Tl_epoch_entry, w.list);
- drbd_put_ee(mdev,e);
if(mdev->conf.wire_protocol == DRBD_PROT_C ||
is_syncer_blk(mdev,e->block_id)) {
dec_unacked(mdev,HERE);
}
-
+ drbd_put_ee(mdev,e);
}
spin_unlock_irq(&mdev->ee_lock);
@@ -509,9 +508,9 @@
} else if (rv == 0) {
INFO("sock was shut down by peer\n");
break;
- } else {
- // if data comes in bytewise, this might trigger ...
- ERR("logic error: sock_recvmsg returned %d\n",rv);
+ } else {
+ /* signal came in after we read a partial message */
+ D_ASSERT(signal_pending(current));
break;
}
};
@@ -683,7 +682,6 @@
ERR("short read expecting header on sock: r=%d\n",r);
return FALSE;
};
- dump_packet(mdev,mdev->data.socket,1,(void*)h);
h->command = be16_to_cpu(h->command);
h->length = be16_to_cpu(h->length);
if (unlikely( h->magic != BE_DRBD_MAGIC )) {
@@ -1317,17 +1315,15 @@
STATIC void drbd_fail_pending_reads(drbd_dev *mdev)
{
- struct list_head workset,*le;
+ struct list_head *le;
drbd_bio_t *bio;
+ LIST_HEAD(workset);
/*
* Application READ requests
*/
spin_lock(&mdev->pr_lock);
- // FIXME use list_splice_init
- list_add(&workset,&mdev->app_reads);
- list_del(&mdev->app_reads);
- INIT_LIST_HEAD(&mdev->app_reads);
+ list_splice_init(&mdev->app_reads,&workset);
spin_unlock(&mdev->pr_lock);
while(!list_empty(&workset)) {
@@ -1444,6 +1440,7 @@
cmdname(header->command), header->length);
break;
}
+ dump_packet(mdev,mdev->data.socket,2,&mdev->data.rbuf, __FILE__, __LINE__);
}
}
@@ -1493,16 +1490,17 @@
mdev->epoch_size=0;
if(atomic_read(&mdev->unacked_cnt)) {
- ERR("unacked_cnt!=0\n");
+ ERR("unacked_cnt = %d\n",atomic_read(&mdev->unacked_cnt));
atomic_set(&mdev->unacked_cnt,0);
}
- /* Since syncer's blocks are also counted, there is no hope that
- pending_cnt is zero. */
+ if(atomic_read(&mdev->rs_pending_cnt)) {
+ ERR("rs_pending_cnt = %d\n",atomic_read(&mdev->rs_pending_cnt));
+ atomic_set(&mdev->rs_pending_cnt,0);
+ }
+
ERR_IF(atomic_read(&mdev->ap_pending_cnt))
atomic_set(&mdev->ap_pending_cnt,0);
- ERR_IF(atomic_read(&mdev->rs_pending_cnt))
- atomic_set(&mdev->rs_pending_cnt,0);
wake_up_interruptible(&mdev->cstate_wait);
@@ -1786,13 +1784,13 @@
}
expect = asender_tbl[cmd].pkt_size;
ERR_IF(len != expect-sizeof(Drbd_Header)) {
- dump_packet(mdev,mdev->meta.socket,1,(void*)h);
+ dump_packet(mdev,mdev->meta.socket,1,(void*)h, __FILE__, __LINE__);
DUMPI(expect);
}
}
if(received == expect) {
D_ASSERT(cmd != -1);
- dump_packet(mdev,mdev->meta.socket,1,(void*)h);
+ dump_packet(mdev,mdev->meta.socket,1,(void*)h, __FILE__, __LINE__);
if(!asender_tbl[cmd].process(mdev,h)) goto err;
buf = h;