[DRBD-cvs] DRBD CVS: drbd by phil from
drbd-user@lists.linbit.com
drbd-user@lists.linbit.com
Wed, 14 Jan 2004 17:05:36 +0100 (CET)
DRBD CVS committal
Author : phil
Host :
Module : drbd
Dir : drbd/drbd
Modified Files:
Tag: rel-0_7-branch
drbd_dsender.c drbd_int.h drbd_main.c drbd_receiver.c
Log Message:
By LGE
* New asender main loop
* New struct drbd_socket
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_dsender.c,v
retrieving revision 1.1.2.45
retrieving revision 1.1.2.46
diff -u -3 -r1.1.2.45 -r1.1.2.46
--- drbd_dsender.c 14 Jan 2004 14:48:36 -0000 1.1.2.45
+++ drbd_dsender.c 14 Jan 2004 16:05:36 -0000 1.1.2.46
@@ -214,7 +214,7 @@
Drbd_Header h;
INVALIDATE_MAGIC(pr);
mempool_free(pr,drbd_pr_mempool);
- drbd_send_cmd(mdev,mdev->sock,WriteHint,&h,sizeof(h));
+ drbd_send_cmd(mdev,mdev->data.socket,WriteHint,&h,sizeof(h));
return FALSE;
}
@@ -325,7 +325,7 @@
}
if (!disable_io_hints) {
Drbd_Header h;
- drbd_send_cmd(mdev,mdev->sock,WriteHint,&h,
+ drbd_send_cmd(mdev,mdev->data.socket,WriteHint,&h,
sizeof(h));
}
}
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_int.h,v
retrieving revision 1.58.2.87
retrieving revision 1.58.2.88
diff -u -3 -r1.58.2.87 -r1.58.2.88
--- drbd_int.h 14 Jan 2004 14:48:36 -0000 1.58.2.87
+++ drbd_int.h 14 Jan 2004 16:05:36 -0000 1.58.2.88
@@ -392,12 +392,8 @@
u32 magic;
u16 command;
u16 length; // bytes of data after this header
- // char payload[];
+ char payload[0];
} Drbd_Header __attribute((packed));
-// (typesafe) hack for older gcc
-#define PAYLOAD_P(p) ({ \
- const Drbd_Header *_p = (p); \
- (char*)_p+sizeof(*_p); })
/*
* short commands, packets without payload, plain Drbd_Header:
@@ -482,6 +478,7 @@
} Drbd_Parameter_Packet __attribute((packed));
typedef union {
+ Drbd_Header head;
Drbd_Data_Packet Data;
Drbd_BlockAck_Packet BlockAck;
Drbd_Barrier_Packet Barrier;
@@ -640,6 +637,16 @@
// TODO sort members for performance
// MAYBE group them further
+/*
+ */
+struct drbd_socket {
+ struct semaphore mutex;
+ struct list_head send_q;
+ struct socket *socket;
+ Drbd_Polymorph_Packet sbuf; // this way we get our
+ Drbd_Polymorph_Packet rbuf; // send/receive buffers off the stack
+};
+
struct Drbd_Conf {
#ifdef PARANOIA
long magic;
@@ -647,12 +654,10 @@
struct net_config conf;
struct syncer_config sync_conf;
int do_panic;
- struct socket *sock; /* for data/barrier/cstate/parameter packets */
- struct socket *msock; /* for ping/ack (metadata) packets */
- volatile unsigned long last_received;
- struct semaphore sock_mutex;
- struct semaphore msock_mutex;
struct semaphore device_mutex;
+ struct drbd_socket data; // for data/barrier/cstate/parameter packets
+ struct drbd_socket meta; // for ping/ack (metadata) packets
+ volatile unsigned long last_received; // in jiffies, either socket
kdev_t lo_device; // backing device
struct file *lo_file;
kdev_t md_device; // device for meta-data.
@@ -988,19 +993,19 @@
static inline int drbd_send_short_cmd(drbd_dev *mdev, Drbd_Packet_Cmd cmd)
{
Drbd_Header h;
- return drbd_send_cmd(mdev,mdev->sock,cmd,&h,sizeof(h));
+ return drbd_send_cmd(mdev,mdev->data.socket,cmd,&h,sizeof(h));
}
static inline int drbd_send_ping(drbd_dev *mdev)
{
Drbd_Header h;
- return drbd_send_cmd(mdev,mdev->msock,Ping,&h,sizeof(h));
+ return drbd_send_cmd(mdev,mdev->meta.socket,Ping,&h,sizeof(h));
}
static inline int drbd_send_ping_ack(drbd_dev *mdev)
{
Drbd_Header h;
- return drbd_send_cmd(mdev,mdev->msock,PingAck,&h,sizeof(h));
+ return drbd_send_cmd(mdev,mdev->meta.socket,PingAck,&h,sizeof(h));
}
static inline void drbd_thread_stop(struct Drbd_thread *thi)
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_main.c,v
retrieving revision 1.73.2.93
retrieving revision 1.73.2.94
diff -u -3 -r1.73.2.93 -r1.73.2.94
--- drbd_main.c 14 Jan 2004 13:36:01 -0000 1.73.2.93
+++ drbd_main.c 14 Jan 2004 16:05:36 -0000 1.73.2.94
@@ -475,7 +475,7 @@
cmdname(cmd), size, sent);
}
C_DBG(5,"on %s >>> %s l: %d\n",
- sock == mdev->msock ? "msock" : "sock",
+ sock == mdev->meta.socket ? "msock" : "sock",
cmdname(cmd), size-sizeof(Drbd_Header));
return ok;
}
@@ -484,23 +484,23 @@
Drbd_Packet_Cmd cmd, Drbd_Header* h, size_t size)
{
int ok;
- if (sock == mdev->sock) {
- down(&mdev->sock_mutex);
+ if (sock == mdev->data.socket) {
+ down(&mdev->data.mutex);
spin_lock(&mdev->send_task_lock);
mdev->send_task=current;
spin_unlock(&mdev->send_task_lock);
} else
- down(&mdev->msock_mutex);
+ down(&mdev->meta.mutex);
ok = _drbd_send_cmd(mdev,sock,cmd,h,size,0);
- if (sock == mdev->sock) {
- up(&mdev->sock_mutex);
+ if (sock == mdev->data.socket) {
+ up(&mdev->data.mutex);
spin_lock(&mdev->send_task_lock);
mdev->send_task=NULL;
spin_unlock(&mdev->send_task_lock);
} else
- up(&mdev->msock_mutex);
+ up(&mdev->meta.mutex);
return ok;
}
@@ -514,8 +514,8 @@
Drbd_Packet_Cmd cmd, Drbd_Header* h, size_t size)
{
int ok;
- struct semaphore *mutex = sock == mdev->msock ?
- &mdev->msock_mutex : &mdev->sock_mutex;
+ struct semaphore *mutex = sock == mdev->meta.socket ?
+ &mdev->meta.mutex : &mdev->data.mutex;
if (down_trylock(mutex)) return -EAGAIN;
ok = _drbd_send_cmd(mdev,sock,cmd,h,size, MSG_DONTWAIT);
up (mutex);
@@ -532,7 +532,7 @@
p.skip = cpu_to_be32(mdev->sync_conf.skip);
p.group = cpu_to_be32(mdev->sync_conf.group);
- ok = drbd_send_cmd(mdev,mdev->sock,SyncParam,(Drbd_Header*)&p,sizeof(p));
+ ok = drbd_send_cmd(mdev,mdev->data.socket,SyncParam,(Drbd_Header*)&p,sizeof(p));
if ( ok
&& (mdev->cstate == SkippedSyncS || mdev->cstate == SkippedSyncT)
&& !mdev->sync_conf.skip )
@@ -572,7 +572,7 @@
p.skip_sync = cpu_to_be32(mdev->sync_conf.skip);
p.sync_group = cpu_to_be32(mdev->sync_conf.group);
- ok = drbd_send_cmd(mdev,mdev->sock,ReportParams,(Drbd_Header*)&p,sizeof(p));
+ ok = drbd_send_cmd(mdev,mdev->data.socket,ReportParams,(Drbd_Header*)&p,sizeof(p));
return ok;
}
@@ -590,7 +590,7 @@
bm_words = mdev->mbds_id->size/sizeof(unsigned long);
bm = mdev->mbds_id->bm;
p = vmalloc(PAGE_SIZE); // sleeps. cannot fail.
- buffer = (unsigned long*)PAYLOAD_P(p);
+ buffer = (unsigned long*)p->payload;
/*
* maybe TODO use some simple compression scheme, nowadays there are
@@ -600,7 +600,7 @@
want=min_t(int,MBDS_PACKET_SIZE,(bm_words-bm_i)*sizeof(long));
for(buf_i=0;buf_i<want/sizeof(unsigned long);buf_i++)
buffer[buf_i] = cpu_to_lel(bm[bm_i++]);
- ok = drbd_send_cmd(mdev,mdev->sock,ReportBitMap,
+ ok = drbd_send_cmd(mdev,mdev->data.socket,ReportBitMap,
p, sizeof(*p) + want);
} while (ok && want);
vfree(p);
@@ -617,7 +617,7 @@
p.barrier=tl_add_barrier(mdev);
inc_pending(mdev);
- ok = _drbd_send_cmd(mdev,mdev->sock,Barrier,(Drbd_Header*)&p,sizeof(p),0);
+ ok = _drbd_send_cmd(mdev,mdev->data.socket,Barrier,(Drbd_Header*)&p,sizeof(p),0);
if (!ok) dec_pending(mdev,HERE);
return ok;
}
@@ -630,7 +630,7 @@
p.barrier = barrier_nr;
p.set_size = cpu_to_be32(set_size);
- ok = drbd_send_cmd(mdev,mdev->msock,BarrierAck,(Drbd_Header*)&p,sizeof(p));
+ ok = drbd_send_cmd(mdev,mdev->meta.socket,BarrierAck,(Drbd_Header*)&p,sizeof(p));
return ok;
}
@@ -650,8 +650,8 @@
return FALSE;
}
- if (!mdev->msock || mdev->cstate < Connected) return FALSE;
- ok = drbd_send_cmd(mdev,mdev->msock,cmd,(Drbd_Header*)&p,sizeof(p));
+ if (!mdev->meta.socket || mdev->cstate < Connected) return FALSE;
+ ok = drbd_send_cmd(mdev,mdev->meta.socket,cmd,(Drbd_Header*)&p,sizeof(p));
return ok;
}
@@ -665,7 +665,7 @@
p.block_id = block_id;
p.blksize = cpu_to_be32(size);
- ok = drbd_send_cmd(mdev,mdev->sock,cmd,(Drbd_Header*)&p,sizeof(p));
+ ok = drbd_send_cmd(mdev,mdev->data.socket,cmd,(Drbd_Header*)&p,sizeof(p));
return ok;
}
@@ -713,10 +713,10 @@
else
offset = (int)bh->b_data - (int)page_address(page);
do {
- sent = mdev->sock->ops->sendpage(mdev->sock, page, offset, size, MSG_NOSIGNAL);
+ sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page, offset, size, MSG_NOSIGNAL);
if (sent == -EINTR) {
// FIXME move "retry--" into drbd_retry_send()
- if (drbd_retry_send(mdev,mdev->sock) && retry--)
+ if (drbd_retry_send(mdev,mdev->data.socket) && retry--)
continue;
else
break;
@@ -774,7 +774,7 @@
*/
// SIGKILL: see comment in _drbd_send_cmd
old_blocked = block_sigs_but(SIGKILL);
- down(&mdev->sock_mutex);
+ down(&mdev->data.mutex);
spin_lock(&mdev->send_task_lock);
mdev->send_task=current;
spin_unlock(&mdev->send_task_lock);
@@ -782,13 +782,13 @@
if(test_and_clear_bit(ISSUE_BARRIER,&mdev->flags))
_drbd_send_barrier(mdev);
tl_add(mdev,req);
- ok = (drbd_send(mdev,mdev->sock,&p,sizeof(p),MSG_MORE) == sizeof(p))
+ ok = (drbd_send(mdev,mdev->data.socket,&p,sizeof(p),MSG_MORE) == sizeof(p))
&& _drbd_send_zc_bh(mdev,req->bh);
spin_lock(&mdev->send_task_lock);
mdev->send_task=NULL;
spin_unlock(&mdev->send_task_lock);
- up(&mdev->sock_mutex);
+ up(&mdev->data.mutex);
restore_old_sigset(old_blocked);
return ok;
}
@@ -816,18 +816,18 @@
* ioctl or module unload
*/
old_blocked = block_sigs_but(SIGTERM);
- down(&mdev->sock_mutex);
+ down(&mdev->data.mutex);
spin_lock(&mdev->send_task_lock);
mdev->send_task=current;
spin_unlock(&mdev->send_task_lock);
- ok = (drbd_send(mdev,mdev->sock,&p,sizeof(p),MSG_MORE) == sizeof(p))
+ ok = (drbd_send(mdev,mdev->data.socket,&p,sizeof(p),MSG_MORE) == sizeof(p))
&& _drbd_send_zc_bh(mdev,&e->pbh);
spin_lock(&mdev->send_task_lock);
mdev->send_task=NULL;
spin_unlock(&mdev->send_task_lock);
- up(&mdev->sock_mutex);
+ up(&mdev->data.mutex);
restore_old_sigset(old_blocked);
return ok;
}
@@ -905,7 +905,7 @@
if (rv <= 0) {
if (rv != -EINTR) {
ERR("%s_sendmsg returned %d\n",
- sock == mdev->msock ? "msock" : "sock",
+ sock == mdev->meta.socket ? "msock" : "sock",
rv);
set_cstate(mdev, BrokenPipe);
} else
@@ -989,7 +989,7 @@
}
// THINK: sock or msock ?
- if (drbd_send_cmd_dontwait(mdev,mdev->sock,WriteHint,&h,sizeof(h))==1){
+ if (drbd_send_cmd_dontwait(mdev,mdev->data.socket,WriteHint,&h,sizeof(h))==1){
clear_bit(WRITE_HINT_QUEUED, &mdev->flags);
} else {
if(mdev->cstate < Connected) {
@@ -1021,8 +1021,8 @@
atomic_set(&mdev->unacked_cnt,0);
init_MUTEX(&mdev->device_mutex);
- init_MUTEX(&mdev->sock_mutex);
- init_MUTEX(&mdev->msock_mutex);
+ init_MUTEX(&mdev->data.mutex);
+ init_MUTEX(&mdev->meta.mutex);
init_MUTEX(&mdev->md_io_mutex);
mdev->rs_lock = SPIN_LOCK_UNLOCKED;
@@ -1309,6 +1309,17 @@
int __init init_module(void)
{
+#if 0
+/* I am too lazy to calculate this by hand -lge
+ */
+#define SZO(x) printk(KERN_ERR "sizeof(" #x ") = %d\n", sizeof(x))
+ SZO(struct Drbd_Conf);
+ SZO(struct buffer_head);
+ SZO(Drbd_Polymorph_Packet);
+ SZO(struct drbd_socket);
+ return -EBUSY;
+#endif
+
if (1 > minor_count||minor_count > 255) {
printk(KERN_ERR DEVICE_NAME
": invalid minor_count (%d)\n",minor_count);
@@ -1377,13 +1388,13 @@
void drbd_free_sock(drbd_dev *mdev)
{
- if (mdev->sock) {
- sock_release(mdev->sock);
- mdev->sock = 0;
- }
- if (mdev->msock) {
- sock_release(mdev->msock);
- mdev->msock = 0;
+ if (mdev->data.socket) {
+ sock_release(mdev->data.socket);
+ mdev->data.socket = 0;
+ }
+ if (mdev->meta.socket) {
+ sock_release(mdev->meta.socket);
+ mdev->meta.socket = 0;
}
}
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_receiver.c,v
retrieving revision 1.97.2.78
retrieving revision 1.97.2.79
diff -u -3 -r1.97.2.78 -r1.97.2.79
--- drbd_receiver.c 14 Jan 2004 14:48:36 -0000 1.97.2.78
+++ drbd_receiver.c 14 Jan 2004 16:05:36 -0000 1.97.2.79
@@ -231,7 +231,7 @@
page=alloc_page(mask);
if(!page) return FALSE;
- if(!_drbd_alloc_ee(mdev,page,mask)) {
+ if(!_drbd_alloc_ee(mdev,page,GFP_KERNEL)) {
__free_page(page);
return FALSE;
}
@@ -462,13 +462,41 @@
return 0;
}
+STATIC int drbd_recv_short(drbd_dev *mdev, struct socket* sock,
+ void *buf, size_t size)
+{
+ mm_segment_t oldfs;
+ struct iovec iov;
+ struct msghdr msg;
+ int rv;
+
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+ msg.msg_iovlen = 1;
+ msg.msg_iov = &iov;
+ iov.iov_len = size;
+ iov.iov_base = buf;
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_flags = MSG_WAITALL | MSG_NOSIGNAL;
+
+ oldfs = get_fs();
+ set_fs(KERNEL_DS);
+
+ rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
+
+ set_fs(oldfs);
+
+ return rv;
+}
+
int drbd_recv(drbd_dev *mdev, struct socket* sock,
void *buf, size_t size)
{
mm_segment_t oldfs;
struct iovec iov;
struct msghdr msg;
- char * sockname = (sock == mdev->msock ? "msock" : "sock");
+ char * sockname = (sock == mdev->meta.socket ? "msock" : "sock");
int rv;
msg.msg_control = NULL;
@@ -495,11 +523,11 @@
* EAGAIN (on msock) rcvtimeo expired
*/
if (rv == -EAGAIN) {
- D_ASSERT(sock == mdev->msock);
+ D_ASSERT(sock == mdev->meta.socket);
D_ASSERT(current == mdev->asender.task);
// FIXME decide this more elegantly
- if ( mdev->msock->sk->rcvtimeo == mdev->conf.ping_int*HZ) {
+ if ( mdev->meta.socket->sk->rcvtimeo == mdev->conf.ping_int*HZ) {
C_DBG(0,"recv_header timed out, sending ping\n");
// goto do_ping;
} else {
@@ -509,7 +537,7 @@
} else if (rv == -EINTR) {
unsigned long flags = 0;
- D_ASSERT(sock == mdev->msock);
+ D_ASSERT(sock == mdev->meta.socket);
D_ASSERT(current == mdev->asender.task);
LOCK_SIGMASK(current,flags);
@@ -541,7 +569,7 @@
if (!drbd_send_ping(mdev))
break;
// full ack timeout
- mdev->msock->sk->rcvtimeo = mdev->conf.timeout*HZ/10;
+ mdev->meta.socket->sk->rcvtimeo = mdev->conf.timeout*HZ/10;
};
@@ -613,7 +641,7 @@
if (mdev->cstate==Unconfigured) return 0;
- if (mdev->sock) {
+ if (mdev->data.socket) {
ERR("There is already a socket!!\n");
return 0;
}
@@ -670,8 +698,8 @@
msock->sk->sndtimeo = mdev->conf.timeout*HZ/20;
msock->sk->rcvtimeo = mdev->conf.ping_int*HZ;
- mdev->sock = sock;
- mdev->msock = msock;
+ mdev->data.socket = sock;
+ mdev->meta.socket = msock;
mdev->last_received = jiffies;
set_cstate(mdev,WFReportParams);
@@ -712,7 +740,7 @@
}
r = drbd_recv(mdev,sock,h,sizeof(*h));
- if (r == -EINTR && sock == mdev->msock) {
+ if (r == -EINTR && sock == mdev->meta.socket) {
h->command = WakeAsender;
h->length = 0;
return TRUE;
@@ -720,7 +748,7 @@
if (unlikely( r != sizeof(*h) )) {
ERR("short read expecting header on %s: r=%d\n",
- sock == mdev->msock ? "msock" : "sock",
+ sock == mdev->meta.socket ? "msock" : "sock",
r);
return FALSE;
};
@@ -733,27 +761,21 @@
return FALSE;
}
mdev->last_received = jiffies;
- if (sock == mdev->msock) {
+ if (sock == mdev->meta.socket) {
// restore idle timeout
- mdev->msock->sk->rcvtimeo = mdev->conf.ping_int*HZ;
+ mdev->meta.socket->sk->rcvtimeo = mdev->conf.ping_int*HZ;
}
C_DBG(5,"on %s <<< %s l: %d\n",
- sock == mdev->msock ? "msock" : "sock",
+ sock == mdev->meta.socket ? "msock" : "sock",
cmdname(h->command), h->length);
return TRUE;
}
-STATIC int receive_BlockAck(drbd_dev *mdev, Drbd_Header* h)
+STATIC int process_BlockAck(drbd_dev *mdev, Drbd_Header* h)
{
- int rv;
drbd_request_t *req;
Drbd_BlockAck_Packet *p = (Drbd_BlockAck_Packet*)h;
- ERR_IF (h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-
- rv = drbd_recv(mdev, mdev->msock, PAYLOAD_P(h), h->length);
- ERR_IF (rv != h->length) return FALSE;
-
if( is_syncer_blk(mdev,p->block_id)) {
drbd_set_in_sync(mdev,
be64_to_cpu(p->sector),
@@ -789,7 +811,7 @@
ERR_IF(mdev->state != Secondary) return FALSE;
ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
- rv = drbd_recv(mdev, mdev->sock, PAYLOAD_P(h), h->length);
+ rv = drbd_recv(mdev, mdev->data.socket, h->payload, h->length);
ERR_IF(rv != h->length) return FALSE;
inc_unacked(mdev);
@@ -816,20 +838,12 @@
return TRUE;
}
-STATIC int receive_BarrierAck(drbd_dev *mdev, Drbd_Header* h)
+STATIC void process_BarrierAck(drbd_dev *mdev, Drbd_Header* h)
{
- int rv;
Drbd_BarrierAck_Packet *p = (Drbd_BarrierAck_Packet*)h;
- ERR_IF(mdev->state != Primary) return FALSE;
- ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
- rv = drbd_recv(mdev, mdev->msock, PAYLOAD_P(h), h->length);
- ERR_IF(rv != h->length) return FALSE;
-
tl_release(mdev,p->barrier,be32_to_cpu(p->set_size));
dec_pending(mdev,HERE);
-
- return TRUE;
}
STATIC struct Tl_epoch_entry *
@@ -844,7 +858,7 @@
spin_unlock_irq(&mdev->ee_lock);
bh=&e->pbh;
- rr=drbd_recv(mdev,mdev->sock,bh_kmap(bh),data_size);
+ rr=drbd_recv(mdev,mdev->data.socket,bh_kmap(bh),data_size);
bh_kunmap(bh);
if ( rr != data_size) {
@@ -899,7 +913,7 @@
D_ASSERT( sector == APP_BH_SECTOR(bh) );
- rr=drbd_recv(mdev,mdev->sock,bh_kmap(bh),data_size);
+ rr=drbd_recv(mdev,mdev->data.socket,bh_kmap(bh),data_size);
bh_kunmap(bh);
ok=(rr==data_size);
@@ -1046,7 +1060,7 @@
ERR_IF(data_size & 0xff) return FALSE;
ERR_IF(data_size > PAGE_SIZE) return FALSE;
- if (drbd_recv(mdev, mdev->sock, PAYLOAD_P(h), header_size) != header_size)
+ if (drbd_recv(mdev, mdev->data.socket, h->payload, header_size) != header_size)
return FALSE;
sector = be64_to_cpu(p->sector);
@@ -1113,7 +1127,7 @@
ERR_IF(data_size & 0xff) return FALSE;
ERR_IF(data_size > PAGE_SIZE) return FALSE;
- if (drbd_recv(mdev, mdev->sock, PAYLOAD_P(h), header_size) != header_size)
+ if (drbd_recv(mdev, mdev->data.socket, h->payload, header_size) != header_size)
return FALSE;
sector = be64_to_cpu(p->sector);
@@ -1176,7 +1190,7 @@
ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
- if (drbd_recv(mdev, mdev->sock, PAYLOAD_P(h), h->length) != h->length)
+ if (drbd_recv(mdev, mdev->data.socket, h->payload, h->length) != h->length)
return FALSE;
sector = be64_to_cpu(p->sector);
@@ -1223,7 +1237,7 @@
// FIXME move into helper
ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
- if (drbd_recv(mdev, mdev->sock, PAYLOAD_P(h), h->length) != h->length)
+ if (drbd_recv(mdev, mdev->data.socket, h->payload, h->length) != h->length)
return FALSE;
// XXX harmless race with ioctl ...
@@ -1252,7 +1266,7 @@
ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
- if (drbd_recv(mdev, mdev->sock, PAYLOAD_P(h), h->length) != h->length)
+ if (drbd_recv(mdev, mdev->data.socket, h->payload, h->length) != h->length)
return FALSE;
if(be32_to_cpu(p->state) == Primary && mdev->state == Primary ) {
@@ -1390,14 +1404,14 @@
want=min_t(int,MBDS_PACKET_SIZE,(bm_words-bm_i)*sizeof(word));
ERR_IF(want != h->length) goto out;
if (want==0) break;
- if (drbd_recv(mdev, mdev->sock, buffer, want) != want)
+ if (drbd_recv(mdev, mdev->data.socket, buffer, want) != want)
goto out;
for(buf_i=0;buf_i<want/sizeof(unsigned long);buf_i++) {
word = lel_to_cpu(buffer[buf_i]) | bm[bm_i];
bits += hweight_long(word);
bm[bm_i++] = word;
}
- if (!drbd_recv_header(mdev,mdev->sock,h))
+ if (!drbd_recv_header(mdev,mdev->data.socket,h))
goto out;
D_ASSERT(h->command == ReportBitMap);
}
@@ -1480,7 +1494,7 @@
size = h->length;
while (size > 0) {
want = min_t(int,size,sizeof(sink));
- r = drbd_recv(mdev,mdev->sock,sink,want);
+ r = drbd_recv(mdev,mdev->data.socket,sink,want);
D_ASSERT(r >= 0);
if (r < 0) break;
size -= r;
@@ -1576,7 +1590,7 @@
for (;;) {
drbd_collect_zombies(mdev); // in case a syncer exited.
- if (!drbd_recv_header(mdev,mdev->sock,header))
+ if (!drbd_recv_header(mdev,mdev->data.socket,header))
break;
if (header->command < MAX_CMD)
@@ -1607,14 +1621,14 @@
drbd_thread_stop_nowait(&mdev->dsender);
drbd_thread_stop(&mdev->asender);
- while(down_trylock(&mdev->sock_mutex))
+ while(down_trylock(&mdev->data.mutex))
{
struct task_struct *task;
spin_lock(&mdev->send_task_lock);
if((task=mdev->send_task)) {
drbd_queue_signal(DRBD_SIG, task);
spin_unlock(&mdev->send_task_lock);
- down(&mdev->sock_mutex);
+ down(&mdev->data.mutex);
break;
} else {
spin_unlock(&mdev->send_task_lock);
@@ -1624,7 +1638,7 @@
/* By grabbing the sock_mutex we make sure that no one
uses the socket right now. */
drbd_free_sock(mdev);
- up(&mdev->sock_mutex);
+ up(&mdev->data.mutex);
drbd_thread_stop(&mdev->dsender);
drbd_collect_zombies(mdev);
@@ -1707,69 +1721,104 @@
STATIC int drbd_try_send_barrier(drbd_dev *mdev)
{
int rv=TRUE;
- if(down_trylock(&mdev->sock_mutex)==0) {
+ if(down_trylock(&mdev->data.mutex)==0) {
if(test_and_clear_bit(ISSUE_BARRIER,&mdev->flags)) {
if(! _drbd_send_barrier(mdev)) rv=FALSE;
}
- up(&mdev->sock_mutex);
+ up(&mdev->data.mutex);
}
return rv;
}
int drbd_asender(struct Drbd_thread *thi)
{
- int ok;
- unsigned long flags = 0;
+ // shortcuts
drbd_dev *mdev = thi->mdev;
- Drbd_Polymorph_Packet p; // XXX BarrierAck_Packet should be enough ...
- Drbd_Header *header = (Drbd_Header*)&p;
+ Drbd_Header *h = &mdev->meta.rbuf.head;
+
+ unsigned long flags = 0;
+ int rv;
+ void *buf = h;
+ int received = 0;
+ int expect = sizeof(Drbd_Header);
+
sprintf(current->comm, "drbd%d_asender", (int)(mdev-drbd_conf));
current->policy = SCHED_RR; /* Make this a realtime task! */
current->rt_priority = 2; /* more important than all other tasks */
- while(thi->t_state == Running) {
+ while (thi->t_state == Running) {
if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
ERR_IF(!drbd_send_ping(mdev)) goto err;
// half ack timeout only,
// since sendmsg waited the other half already
- mdev->msock->sk->rcvtimeo =
+ mdev->meta.socket->sk->rcvtimeo =
mdev->conf.timeout*HZ/20;
}
- /* going to wait for input ... maybe we need to output
- * something before input is ready, so allow for wakeup
- * signal...
- */
- LOCK_SIGMASK(current,flags);
- sigdelset(¤t->blocked,DRBD_SIG);
- RECALC_SIGPENDING(current);
- UNLOCK_SIGMASK(current,flags);
+ if (mdev->state == Primary) {
+ if(!drbd_try_send_barrier(mdev)) goto err;
+ }
+
+ if (!drbd_process_ee(mdev,&mdev->done_ee)) goto err;
- ok = drbd_recv_header(mdev,mdev->msock,header);
- ERR_IF(!ok)
- goto err;
-
- /* we don't want to be "woken up" by DRBD_SIG while
- * receiving payload data, nor while sending out pings
- * and acks! SIGTERM is unaffected...
+ rv = drbd_recv_short(mdev,mdev->meta.socket,buf,expect);
+
+ /* Note:
+ * -EINTR (on meta) we got a signal
+ * -EAGAIN (on meta) rcvtimeo expired
+ * -ECONNRESET other side closed the connection
+ * -ERESTARTSYS (on data) we got a signal
+ * rv < 0 other than above: unexpected error!
+ * rv == expected: full header or command
+ * rv < expected: "woken" by signal during receive
+ * rv == 0 : "connection shut down by peer"
*/
- LOCK_SIGMASK(current,flags);
- if (sigismember(¤t->pending.signal, DRBD_SIG)) {
- sigdelset(¤t->pending.signal, DRBD_SIG);
- sigaddset(¤t->blocked,DRBD_SIG);
+ if (rv == -EAGAIN) {
+ set_bit(SEND_PING,&mdev->flags);
+ continue;
+ } else if (rv == -EINTR || rv < expect) {
+ LOCK_SIGMASK(current,flags);
+ sigemptyset(¤t->pending.signal);
RECALC_SIGPENDING(current);
- }
- UNLOCK_SIGMASK(current,flags);
+ UNLOCK_SIGMASK(current,flags);
+ if (rv == -EINTR) rv=0;
+ } else if (rv < 0) {
+ // if (rv != -ECONNRESET)
+ ERR("sock_recvmsg returned %d\n", rv);
+ break;
+ } /* else if (rv > expect) {
+ BUG();
+ } */
+ received += rv;
+ expect -= rv;
+ buf += rv;
+ if (expect)
+ continue;
// MAYBE use jump table
- switch (header->command) {
- case WakeAsender:
- // we were just woken up
- break;
+ if (received == sizeof(Drbd_Header)) {
+ h->command = be16_to_cpu(h->command);
+ h->length = be16_to_cpu(h->length);
+ if (unlikely( h->magic != BE_DRBD_MAGIC )) {
+ ERR("magic?? m: 0x%lx c: %d l: %d\n",
+ (long)be32_to_cpu(h->magic),
+ h->command, h->length);
+ goto err;
+ }
+ }
+
+ /*
+ * If packet command numbers were ordered by packet size,
+ * we could just say something like
+ * if (h->command > [last command without payload data])
+ * { expect = whatever; continue; }
+ */
+
+ switch (h->command) {
case Ping:
ERR_IF(!drbd_send_ping_ack(mdev))
goto err;
@@ -1778,30 +1827,35 @@
break;
case PingAck:
// restore idle timeout
- mdev->msock->sk->rcvtimeo =
+ mdev->meta.socket->sk->rcvtimeo =
mdev->conf.ping_int*HZ;
break;
case RecvAck:
case WriteAck:
- ERR_IF(!receive_BlockAck(mdev,header))
- goto err;
+ if (received != sizeof(Drbd_BlockAck_Packet)) {
+ expect = sizeof(Drbd_BlockAck_Packet)
+ - received;
+ ERR_IF (h->length != expect) goto err;
+ continue;
+ }
+ ERR_IF (!process_BlockAck(mdev,h)) goto err;
break;
case BarrierAck:
- ERR_IF(!receive_BarrierAck(mdev, header))
- goto err;
+ if (received != sizeof(Drbd_Barrier_Packet)) {
+ expect = sizeof(Drbd_Barrier_Packet)
+ - received;
+ ERR_IF (h->length != expect) goto err;
+ continue;
+ }
+ ERR_IF (mdev->state != Primary) goto err;
+ process_BarrierAck(mdev,h);
break;
default:
D_ASSERT(0);
}
-
-
- if( mdev->state == Primary ) {
- ERR_IF(!drbd_try_send_barrier(mdev))
- goto err;
- }
-
- ERR_IF(!drbd_process_ee(mdev,&mdev->done_ee))
- goto err;
+ buf = h;
+ received = 0;
+ expect = sizeof(Drbd_Header);
} //while
if(0) {
@@ -1813,4 +1867,3 @@
return 0;
}
-