[DRBD-cvs] DRBD CVS: drbd by phil from

drbd-user@lists.linbit.com drbd-user@lists.linbit.com
Wed, 14 Jan 2004 17:05:36 +0100 (CET)


DRBD CVS committal

Author  : phil
Host    : 
Module  : drbd

Dir     : drbd/drbd


Modified Files:
      Tag: rel-0_7-branch
	drbd_dsender.c drbd_int.h drbd_main.c drbd_receiver.c 


Log Message:
By LGE

* New asender main loop
* New struct drbd_socket

===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_dsender.c,v
retrieving revision 1.1.2.45
retrieving revision 1.1.2.46
diff -u -3 -r1.1.2.45 -r1.1.2.46
--- drbd_dsender.c	14 Jan 2004 14:48:36 -0000	1.1.2.45
+++ drbd_dsender.c	14 Jan 2004 16:05:36 -0000	1.1.2.46
@@ -214,7 +214,7 @@
 			Drbd_Header h;
 			INVALIDATE_MAGIC(pr);
 			mempool_free(pr,drbd_pr_mempool);
-			drbd_send_cmd(mdev,mdev->sock,WriteHint,&h,sizeof(h));
+			drbd_send_cmd(mdev,mdev->data.socket,WriteHint,&h,sizeof(h));
 			return FALSE;
 		}
 
@@ -325,7 +325,7 @@
 			}
 			if (!disable_io_hints) {
 				Drbd_Header h;
-				drbd_send_cmd(mdev,mdev->sock,WriteHint,&h,
+				drbd_send_cmd(mdev,mdev->data.socket,WriteHint,&h,
 					      sizeof(h));
 			}
 		}
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_int.h,v
retrieving revision 1.58.2.87
retrieving revision 1.58.2.88
diff -u -3 -r1.58.2.87 -r1.58.2.88
--- drbd_int.h	14 Jan 2004 14:48:36 -0000	1.58.2.87
+++ drbd_int.h	14 Jan 2004 16:05:36 -0000	1.58.2.88
@@ -392,12 +392,8 @@
 	u32       magic;
 	u16       command;
 	u16       length;	// bytes of data after this header
-	// char        payload[];
+	char      payload[0];
 } Drbd_Header __attribute((packed));
-// (typesafe) hack for older gcc
-#define PAYLOAD_P(p) ({ \
-	const Drbd_Header *_p = (p); \
-	(char*)_p+sizeof(*_p); })
 
 /*
  * short commands, packets without payload, plain Drbd_Header:
@@ -482,6 +478,7 @@
 } Drbd_Parameter_Packet  __attribute((packed));
 
 typedef union {
+	Drbd_Header              head;
 	Drbd_Data_Packet         Data;
 	Drbd_BlockAck_Packet     BlockAck;
 	Drbd_Barrier_Packet      Barrier;
@@ -640,6 +637,16 @@
 // TODO sort members for performance
 // MAYBE group them further
 
+/*
+ */
+struct drbd_socket {
+	struct semaphore  mutex;
+	struct list_head  send_q;
+	struct socket    *socket;
+	Drbd_Polymorph_Packet sbuf;  // this way we get our
+	Drbd_Polymorph_Packet rbuf;  // send/receive buffers off the stack
+};
+
 struct Drbd_Conf {
 #ifdef PARANOIA
 	long magic;
@@ -647,12 +654,10 @@
 	struct net_config conf;
 	struct syncer_config sync_conf;
 	int do_panic;
-	struct socket *sock;  /* for data/barrier/cstate/parameter packets */
-	struct socket *msock; /* for ping/ack (metadata) packets */
-	volatile unsigned long last_received;
-	struct semaphore sock_mutex;
-	struct semaphore msock_mutex;
 	struct semaphore device_mutex;
+	struct drbd_socket data; // for data/barrier/cstate/parameter packets
+	struct drbd_socket meta; // for ping/ack (metadata) packets
+	volatile unsigned long last_received; // in jiffies, either socket
 	kdev_t lo_device;         // backing device
 	struct file *lo_file;
 	kdev_t md_device;         // device for meta-data.
@@ -988,19 +993,19 @@
 static inline int drbd_send_short_cmd(drbd_dev *mdev, Drbd_Packet_Cmd cmd)
 {
 	Drbd_Header h;
-	return drbd_send_cmd(mdev,mdev->sock,cmd,&h,sizeof(h));
+	return drbd_send_cmd(mdev,mdev->data.socket,cmd,&h,sizeof(h));
 }
 
 static inline int drbd_send_ping(drbd_dev *mdev)
 {
 	Drbd_Header h;
-	return drbd_send_cmd(mdev,mdev->msock,Ping,&h,sizeof(h));
+	return drbd_send_cmd(mdev,mdev->meta.socket,Ping,&h,sizeof(h));
 }
 
 static inline int drbd_send_ping_ack(drbd_dev *mdev)
 {
 	Drbd_Header h;
-	return drbd_send_cmd(mdev,mdev->msock,PingAck,&h,sizeof(h));
+	return drbd_send_cmd(mdev,mdev->meta.socket,PingAck,&h,sizeof(h));
 }
 
 static inline void drbd_thread_stop(struct Drbd_thread *thi)
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_main.c,v
retrieving revision 1.73.2.93
retrieving revision 1.73.2.94
diff -u -3 -r1.73.2.93 -r1.73.2.94
--- drbd_main.c	14 Jan 2004 13:36:01 -0000	1.73.2.93
+++ drbd_main.c	14 Jan 2004 16:05:36 -0000	1.73.2.94
@@ -475,7 +475,7 @@
 		    cmdname(cmd), size, sent);
 	}
 	C_DBG(5,"on %s >>> %s l: %d\n",
-	    sock == mdev->msock ? "msock" : "sock",
+	    sock == mdev->meta.socket ? "msock" : "sock",
 	    cmdname(cmd), size-sizeof(Drbd_Header));
 	return ok;
 }
@@ -484,23 +484,23 @@
 		  Drbd_Packet_Cmd cmd, Drbd_Header* h, size_t size)
 {
 	int ok;
-	if (sock == mdev->sock) {
-		down(&mdev->sock_mutex);
+	if (sock == mdev->data.socket) {
+		down(&mdev->data.mutex);
 		spin_lock(&mdev->send_task_lock);
 		mdev->send_task=current;
 		spin_unlock(&mdev->send_task_lock);
 	} else
-		down(&mdev->msock_mutex);
+		down(&mdev->meta.mutex);
 
 	ok = _drbd_send_cmd(mdev,sock,cmd,h,size,0);
 
-	if (sock == mdev->sock) {
-		up(&mdev->sock_mutex);
+	if (sock == mdev->data.socket) {
+		up(&mdev->data.mutex);
 		spin_lock(&mdev->send_task_lock);
 		mdev->send_task=NULL;
 		spin_unlock(&mdev->send_task_lock);
 	} else
-		up(&mdev->msock_mutex);
+		up(&mdev->meta.mutex);
 	return ok;
 }
 
@@ -514,8 +514,8 @@
 		  Drbd_Packet_Cmd cmd, Drbd_Header* h, size_t size)
 {
 	int ok;
-	struct semaphore *mutex = sock == mdev->msock ?
-		&mdev->msock_mutex : &mdev->sock_mutex;
+	struct semaphore *mutex = sock == mdev->meta.socket ?
+		&mdev->meta.mutex : &mdev->data.mutex;
 	if (down_trylock(mutex)) return -EAGAIN;
 	ok = _drbd_send_cmd(mdev,sock,cmd,h,size, MSG_DONTWAIT);
 	up  (mutex);
@@ -532,7 +532,7 @@
 	p.skip      = cpu_to_be32(mdev->sync_conf.skip);
 	p.group     = cpu_to_be32(mdev->sync_conf.group);
 
-	ok = drbd_send_cmd(mdev,mdev->sock,SyncParam,(Drbd_Header*)&p,sizeof(p));
+	ok = drbd_send_cmd(mdev,mdev->data.socket,SyncParam,(Drbd_Header*)&p,sizeof(p));
 	if ( ok
 	    && (mdev->cstate == SkippedSyncS || mdev->cstate == SkippedSyncT)
 	    && !mdev->sync_conf.skip )
@@ -572,7 +572,7 @@
 	p.skip_sync      = cpu_to_be32(mdev->sync_conf.skip);
 	p.sync_group     = cpu_to_be32(mdev->sync_conf.group);
 
-	ok = drbd_send_cmd(mdev,mdev->sock,ReportParams,(Drbd_Header*)&p,sizeof(p));
+	ok = drbd_send_cmd(mdev,mdev->data.socket,ReportParams,(Drbd_Header*)&p,sizeof(p));
 	return ok;
 }
 
@@ -590,7 +590,7 @@
 	bm_words = mdev->mbds_id->size/sizeof(unsigned long);
 	bm = mdev->mbds_id->bm;
 	p  = vmalloc(PAGE_SIZE); // sleeps. cannot fail.
-	buffer = (unsigned long*)PAYLOAD_P(p);
+	buffer = (unsigned long*)p->payload;
 
 	/*
 	 * maybe TODO use some simple compression scheme, nowadays there are
@@ -600,7 +600,7 @@
 		want=min_t(int,MBDS_PACKET_SIZE,(bm_words-bm_i)*sizeof(long));
 		for(buf_i=0;buf_i<want/sizeof(unsigned long);buf_i++)
 			buffer[buf_i] = cpu_to_lel(bm[bm_i++]);
-		ok = drbd_send_cmd(mdev,mdev->sock,ReportBitMap,
+		ok = drbd_send_cmd(mdev,mdev->data.socket,ReportBitMap,
 				   p, sizeof(*p) + want);
 	} while (ok && want);
 	vfree(p);
@@ -617,7 +617,7 @@
 	p.barrier=tl_add_barrier(mdev);
 
 	inc_pending(mdev);
-	ok = _drbd_send_cmd(mdev,mdev->sock,Barrier,(Drbd_Header*)&p,sizeof(p),0);
+	ok = _drbd_send_cmd(mdev,mdev->data.socket,Barrier,(Drbd_Header*)&p,sizeof(p),0);
 	if (!ok) dec_pending(mdev,HERE);
 	return ok;
 }
@@ -630,7 +630,7 @@
 	p.barrier  = barrier_nr;
 	p.set_size = cpu_to_be32(set_size);
 
-	ok = drbd_send_cmd(mdev,mdev->msock,BarrierAck,(Drbd_Header*)&p,sizeof(p));
+	ok = drbd_send_cmd(mdev,mdev->meta.socket,BarrierAck,(Drbd_Header*)&p,sizeof(p));
 	return ok;
 }
 
@@ -650,8 +650,8 @@
 		return FALSE;
 	}
 
-	if (!mdev->msock || mdev->cstate < Connected) return FALSE;
-	ok = drbd_send_cmd(mdev,mdev->msock,cmd,(Drbd_Header*)&p,sizeof(p));
+	if (!mdev->meta.socket || mdev->cstate < Connected) return FALSE;
+	ok = drbd_send_cmd(mdev,mdev->meta.socket,cmd,(Drbd_Header*)&p,sizeof(p));
 	return ok;
 }
 
@@ -665,7 +665,7 @@
 	p.block_id = block_id;
 	p.blksize  = cpu_to_be32(size);
 
-	ok = drbd_send_cmd(mdev,mdev->sock,cmd,(Drbd_Header*)&p,sizeof(p));
+	ok = drbd_send_cmd(mdev,mdev->data.socket,cmd,(Drbd_Header*)&p,sizeof(p));
 	return ok;
 }
 
@@ -713,10 +713,10 @@
 	else
 		offset = (int)bh->b_data - (int)page_address(page);
 	do {
-		sent = mdev->sock->ops->sendpage(mdev->sock, page, offset, size, MSG_NOSIGNAL);
+		sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page, offset, size, MSG_NOSIGNAL);
 		if (sent == -EINTR) {
 			// FIXME move "retry--" into drbd_retry_send()
-			if (drbd_retry_send(mdev,mdev->sock) && retry--)
+			if (drbd_retry_send(mdev,mdev->data.socket) && retry--)
 				continue;
 			else
 				break;
@@ -774,7 +774,7 @@
 	*/
 	// SIGKILL: see comment in _drbd_send_cmd
 	old_blocked = block_sigs_but(SIGKILL);
-	down(&mdev->sock_mutex);
+	down(&mdev->data.mutex);
 	spin_lock(&mdev->send_task_lock);
 	mdev->send_task=current;
 	spin_unlock(&mdev->send_task_lock);
@@ -782,13 +782,13 @@
 	if(test_and_clear_bit(ISSUE_BARRIER,&mdev->flags))
 		_drbd_send_barrier(mdev);
 	tl_add(mdev,req);
-	ok =  (drbd_send(mdev,mdev->sock,&p,sizeof(p),MSG_MORE) == sizeof(p))
+	ok =  (drbd_send(mdev,mdev->data.socket,&p,sizeof(p),MSG_MORE) == sizeof(p))
 	   && _drbd_send_zc_bh(mdev,req->bh);
 
 	spin_lock(&mdev->send_task_lock);
 	mdev->send_task=NULL;
 	spin_unlock(&mdev->send_task_lock);
-	up(&mdev->sock_mutex);
+	up(&mdev->data.mutex);
 	restore_old_sigset(old_blocked);
 	return ok;
 }
@@ -816,18 +816,18 @@
 	 * ioctl or module unload
 	 */
 	old_blocked = block_sigs_but(SIGTERM);
-	down(&mdev->sock_mutex);
+	down(&mdev->data.mutex);
 	spin_lock(&mdev->send_task_lock);
 	mdev->send_task=current;
 	spin_unlock(&mdev->send_task_lock);
 
-	ok =  (drbd_send(mdev,mdev->sock,&p,sizeof(p),MSG_MORE) == sizeof(p))
+	ok =  (drbd_send(mdev,mdev->data.socket,&p,sizeof(p),MSG_MORE) == sizeof(p))
 	   && _drbd_send_zc_bh(mdev,&e->pbh);
 
 	spin_lock(&mdev->send_task_lock);
 	mdev->send_task=NULL;
 	spin_unlock(&mdev->send_task_lock);
-	up(&mdev->sock_mutex);
+	up(&mdev->data.mutex);
 	restore_old_sigset(old_blocked);
 	return ok;
 }
@@ -905,7 +905,7 @@
 	if (rv <= 0) {
 		if (rv != -EINTR) {
 			ERR("%s_sendmsg returned %d\n",
-			    sock == mdev->msock ? "msock" : "sock",
+			    sock == mdev->meta.socket ? "msock" : "sock",
 			    rv);
 			set_cstate(mdev, BrokenPipe);
 		} else
@@ -989,7 +989,7 @@
 	}
 
 	// THINK: sock or msock ?
-	if (drbd_send_cmd_dontwait(mdev,mdev->sock,WriteHint,&h,sizeof(h))==1){
+	if (drbd_send_cmd_dontwait(mdev,mdev->data.socket,WriteHint,&h,sizeof(h))==1){
 		clear_bit(WRITE_HINT_QUEUED, &mdev->flags);
 	} else {
 		if(mdev->cstate < Connected) {
@@ -1021,8 +1021,8 @@
 	atomic_set(&mdev->unacked_cnt,0);
 
 	init_MUTEX(&mdev->device_mutex);
-	init_MUTEX(&mdev->sock_mutex);
-	init_MUTEX(&mdev->msock_mutex);
+	init_MUTEX(&mdev->data.mutex);
+	init_MUTEX(&mdev->meta.mutex);
 	init_MUTEX(&mdev->md_io_mutex);
 
 	mdev->rs_lock        = SPIN_LOCK_UNLOCKED;
@@ -1309,6 +1309,17 @@
 
 int __init init_module(void)
 {
+#if 0
+/* I am too lazy to calculate this by hand	-lge
+ */
+#define SZO(x) printk(KERN_ERR "sizeof(" #x ") = %d\n", sizeof(x))
+	SZO(struct Drbd_Conf);
+	SZO(struct buffer_head);
+	SZO(Drbd_Polymorph_Packet);
+	SZO(struct drbd_socket);
+	return -EBUSY;
+#endif
+
 	if (1 > minor_count||minor_count > 255) {
 		printk(KERN_ERR DEVICE_NAME
 			": invalid minor_count (%d)\n",minor_count);
@@ -1377,13 +1388,13 @@
 
 void drbd_free_sock(drbd_dev *mdev)
 {
-	if (mdev->sock) {
-		sock_release(mdev->sock);
-		mdev->sock = 0;
-	}
-	if (mdev->msock) {
-		sock_release(mdev->msock);
-		mdev->msock = 0;
+	if (mdev->data.socket) {
+		sock_release(mdev->data.socket);
+		mdev->data.socket = 0;
+	}
+	if (mdev->meta.socket) {
+		sock_release(mdev->meta.socket);
+		mdev->meta.socket = 0;
 	}
 }
 
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_receiver.c,v
retrieving revision 1.97.2.78
retrieving revision 1.97.2.79
diff -u -3 -r1.97.2.78 -r1.97.2.79
--- drbd_receiver.c	14 Jan 2004 14:48:36 -0000	1.97.2.78
+++ drbd_receiver.c	14 Jan 2004 16:05:36 -0000	1.97.2.79
@@ -231,7 +231,7 @@
 	page=alloc_page(mask);
 	if(!page) return FALSE;
 
-	if(!_drbd_alloc_ee(mdev,page,mask)) {
+	if(!_drbd_alloc_ee(mdev,page,GFP_KERNEL)) {
 		__free_page(page);
 		return FALSE;
 	}
@@ -462,13 +462,41 @@
 	return 0;
 }
 
+STATIC int drbd_recv_short(drbd_dev *mdev, struct socket* sock,
+		    void *buf, size_t size)
+{
+	mm_segment_t oldfs;
+	struct iovec iov;
+	struct msghdr msg;
+	int rv;
+
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+	msg.msg_iovlen = 1;
+	msg.msg_iov = &iov;
+	iov.iov_len = size;
+	iov.iov_base = buf;
+	msg.msg_name = NULL;
+	msg.msg_namelen = 0;
+	msg.msg_flags = MSG_WAITALL | MSG_NOSIGNAL;
+
+	oldfs = get_fs();
+	set_fs(KERNEL_DS);
+
+	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
+
+	set_fs(oldfs);
+
+	return rv;
+}
+
 int drbd_recv(drbd_dev *mdev, struct socket* sock,
 	      void *buf, size_t size)
 {
 	mm_segment_t oldfs;
 	struct iovec iov;
 	struct msghdr msg;
-	char * sockname = (sock == mdev->msock ? "msock" : "sock");
+	char * sockname = (sock == mdev->meta.socket ? "msock" : "sock");
 	int rv;
 
 	msg.msg_control = NULL;
@@ -495,11 +523,11 @@
 		 * EAGAIN       (on msock) rcvtimeo expired
 		 */
 		if (rv == -EAGAIN) {
-			D_ASSERT(sock == mdev->msock);
+			D_ASSERT(sock == mdev->meta.socket);
 			D_ASSERT(current == mdev->asender.task);
 
 			// FIXME decide this more elegantly
-			if ( mdev->msock->sk->rcvtimeo == mdev->conf.ping_int*HZ) {
+			if ( mdev->meta.socket->sk->rcvtimeo == mdev->conf.ping_int*HZ) {
 				C_DBG(0,"recv_header timed out, sending ping\n");
 				// goto do_ping;
 			} else {
@@ -509,7 +537,7 @@
 		} else if (rv == -EINTR) {
 			unsigned long flags = 0;
 
-			D_ASSERT(sock == mdev->msock);
+			D_ASSERT(sock == mdev->meta.socket);
 			D_ASSERT(current == mdev->asender.task);
 
 			LOCK_SIGMASK(current,flags);
@@ -541,7 +569,7 @@
 		if (!drbd_send_ping(mdev))
 			break;
 		// full ack timeout
-		mdev->msock->sk->rcvtimeo = mdev->conf.timeout*HZ/10;
+		mdev->meta.socket->sk->rcvtimeo = mdev->conf.timeout*HZ/10;
 
 	};
 
@@ -613,7 +641,7 @@
 
 	if (mdev->cstate==Unconfigured) return 0;
 
-	if (mdev->sock) {
+	if (mdev->data.socket) {
 		ERR("There is already a socket!!\n");
 		return 0;
 	}
@@ -670,8 +698,8 @@
 	msock->sk->sndtimeo = mdev->conf.timeout*HZ/20;
 	msock->sk->rcvtimeo = mdev->conf.ping_int*HZ;
 
-	mdev->sock = sock;
-	mdev->msock = msock;
+	mdev->data.socket = sock;
+	mdev->meta.socket = msock;
 	mdev->last_received = jiffies;
 
 	set_cstate(mdev,WFReportParams);
@@ -712,7 +740,7 @@
 	}
 
 	r = drbd_recv(mdev,sock,h,sizeof(*h));
-	if (r == -EINTR && sock == mdev->msock) {
+	if (r == -EINTR && sock == mdev->meta.socket) {
 		h->command = WakeAsender;
 		h->length  = 0;
 		return TRUE;
@@ -720,7 +748,7 @@
 
 	if (unlikely( r != sizeof(*h) )) {
 		ERR("short read expecting header on %s: r=%d\n",
-		    sock == mdev->msock ? "msock" : "sock",
+		    sock == mdev->meta.socket ? "msock" : "sock",
 		    r);
 		return FALSE;
 	};
@@ -733,27 +761,21 @@
 		return FALSE;
 	}
 	mdev->last_received = jiffies;
-	if (sock == mdev->msock) {
+	if (sock == mdev->meta.socket) {
 		// restore idle timeout
-		mdev->msock->sk->rcvtimeo = mdev->conf.ping_int*HZ;
+		mdev->meta.socket->sk->rcvtimeo = mdev->conf.ping_int*HZ;
 	}
 	C_DBG(5,"on %s <<< %s l: %d\n",
-	    sock == mdev->msock ? "msock" : "sock",
+	    sock == mdev->meta.socket ? "msock" : "sock",
 	    cmdname(h->command), h->length);
 	return TRUE;
 }
 
-STATIC int receive_BlockAck(drbd_dev *mdev, Drbd_Header* h)
+STATIC int process_BlockAck(drbd_dev *mdev, Drbd_Header* h)
 {
-	int rv;
 	drbd_request_t *req;
 	Drbd_BlockAck_Packet *p = (Drbd_BlockAck_Packet*)h;
 
-	ERR_IF (h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-
-	rv = drbd_recv(mdev, mdev->msock, PAYLOAD_P(h), h->length);
-	ERR_IF (rv != h->length) return FALSE;
-
 	if( is_syncer_blk(mdev,p->block_id)) {
 		drbd_set_in_sync(mdev,
 				 be64_to_cpu(p->sector),
@@ -789,7 +811,7 @@
 	ERR_IF(mdev->state != Secondary) return FALSE;
 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
 
-	rv = drbd_recv(mdev, mdev->sock, PAYLOAD_P(h), h->length);
+	rv = drbd_recv(mdev, mdev->data.socket, h->payload, h->length);
 	ERR_IF(rv != h->length) return FALSE;
 
 	inc_unacked(mdev);
@@ -816,20 +838,12 @@
 	return TRUE;
 }
 
-STATIC int receive_BarrierAck(drbd_dev *mdev, Drbd_Header* h)
+STATIC void process_BarrierAck(drbd_dev *mdev, Drbd_Header* h)
 {
-	int rv;
 	Drbd_BarrierAck_Packet *p = (Drbd_BarrierAck_Packet*)h;
 
-	ERR_IF(mdev->state != Primary) return FALSE;
-	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-	rv = drbd_recv(mdev, mdev->msock, PAYLOAD_P(h), h->length);
-	ERR_IF(rv != h->length) return FALSE;
-
 	tl_release(mdev,p->barrier,be32_to_cpu(p->set_size));
 	dec_pending(mdev,HERE);
-
-	return TRUE;
 }
 
 STATIC struct Tl_epoch_entry *
@@ -844,7 +858,7 @@
 	spin_unlock_irq(&mdev->ee_lock);
 	bh=&e->pbh;
 
-	rr=drbd_recv(mdev,mdev->sock,bh_kmap(bh),data_size);
+	rr=drbd_recv(mdev,mdev->data.socket,bh_kmap(bh),data_size);
 	bh_kunmap(bh);
 
 	if ( rr != data_size) {
@@ -899,7 +913,7 @@
 
 	D_ASSERT( sector == APP_BH_SECTOR(bh) );
 
-	rr=drbd_recv(mdev,mdev->sock,bh_kmap(bh),data_size);
+	rr=drbd_recv(mdev,mdev->data.socket,bh_kmap(bh),data_size);
 	bh_kunmap(bh);
 
 	ok=(rr==data_size);
@@ -1046,7 +1060,7 @@
 	ERR_IF(data_size &  0xff) return FALSE;
 	ERR_IF(data_size >  PAGE_SIZE) return FALSE;
 
-	if (drbd_recv(mdev, mdev->sock, PAYLOAD_P(h), header_size) != header_size)
+	if (drbd_recv(mdev, mdev->data.socket, h->payload, header_size) != header_size)
 		return FALSE;
 
 	sector = be64_to_cpu(p->sector);
@@ -1113,7 +1127,7 @@
 	ERR_IF(data_size &  0xff) return FALSE;
 	ERR_IF(data_size >  PAGE_SIZE) return FALSE;
 
-	if (drbd_recv(mdev, mdev->sock, PAYLOAD_P(h), header_size) != header_size)
+	if (drbd_recv(mdev, mdev->data.socket, h->payload, header_size) != header_size)
 		return FALSE;
 
 	sector = be64_to_cpu(p->sector);
@@ -1176,7 +1190,7 @@
 
 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
 
-	if (drbd_recv(mdev, mdev->sock, PAYLOAD_P(h), h->length) != h->length)
+	if (drbd_recv(mdev, mdev->data.socket, h->payload, h->length) != h->length)
 		return FALSE;
 
 	sector    = be64_to_cpu(p->sector);
@@ -1223,7 +1237,7 @@
 	// FIXME move into helper
 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
 
-	if (drbd_recv(mdev, mdev->sock, PAYLOAD_P(h), h->length) != h->length)
+	if (drbd_recv(mdev, mdev->data.socket, h->payload, h->length) != h->length)
 		return FALSE;
 
 	// XXX harmless race with ioctl ...
@@ -1252,7 +1266,7 @@
 
 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
 
-	if (drbd_recv(mdev, mdev->sock, PAYLOAD_P(h), h->length) != h->length)
+	if (drbd_recv(mdev, mdev->data.socket, h->payload, h->length) != h->length)
 		return FALSE;
 
 	if(be32_to_cpu(p->state) == Primary && mdev->state == Primary ) {
@@ -1390,14 +1404,14 @@
 		want=min_t(int,MBDS_PACKET_SIZE,(bm_words-bm_i)*sizeof(word));
 		ERR_IF(want != h->length) goto out;
 		if (want==0) break;
-		if (drbd_recv(mdev, mdev->sock, buffer, want) != want)
+		if (drbd_recv(mdev, mdev->data.socket, buffer, want) != want)
 			goto out;
 		for(buf_i=0;buf_i<want/sizeof(unsigned long);buf_i++) {
 			word = lel_to_cpu(buffer[buf_i]) | bm[bm_i];
 			bits += hweight_long(word);
 			bm[bm_i++] = word;
 		}
-		if (!drbd_recv_header(mdev,mdev->sock,h))
+		if (!drbd_recv_header(mdev,mdev->data.socket,h))
 			goto out;
 		D_ASSERT(h->command == ReportBitMap);
 	}
@@ -1480,7 +1494,7 @@
 	size = h->length;
 	while (size > 0) {
 		want = min_t(int,size,sizeof(sink));
-		r = drbd_recv(mdev,mdev->sock,sink,want);
+		r = drbd_recv(mdev,mdev->data.socket,sink,want);
 		D_ASSERT(r >= 0);
 		if (r < 0) break;
 		size -= r;
@@ -1576,7 +1590,7 @@
 
 	for (;;) {
 		drbd_collect_zombies(mdev); // in case a syncer exited.
-		if (!drbd_recv_header(mdev,mdev->sock,header))
+		if (!drbd_recv_header(mdev,mdev->data.socket,header))
 			break;
 
 		if (header->command < MAX_CMD)
@@ -1607,14 +1621,14 @@
 	drbd_thread_stop_nowait(&mdev->dsender);
 	drbd_thread_stop(&mdev->asender);
 
-	while(down_trylock(&mdev->sock_mutex))
+	while(down_trylock(&mdev->data.mutex))
 	{
 		struct task_struct *task;
 		spin_lock(&mdev->send_task_lock);
 		if((task=mdev->send_task)) {
 			drbd_queue_signal(DRBD_SIG, task);
 			spin_unlock(&mdev->send_task_lock);
-			down(&mdev->sock_mutex);
+			down(&mdev->data.mutex);
 			break;
 		} else {
 			spin_unlock(&mdev->send_task_lock);
@@ -1624,7 +1638,7 @@
 	/* By grabbing the sock_mutex we make sure that no one
 	   uses the socket right now. */
 	drbd_free_sock(mdev);
-	up(&mdev->sock_mutex);
+	up(&mdev->data.mutex);
 
 	drbd_thread_stop(&mdev->dsender);
 	drbd_collect_zombies(mdev);
@@ -1707,69 +1721,104 @@
 STATIC int drbd_try_send_barrier(drbd_dev *mdev)
 {
 	int rv=TRUE;
-	if(down_trylock(&mdev->sock_mutex)==0) {
+	if(down_trylock(&mdev->data.mutex)==0) {
 		if(test_and_clear_bit(ISSUE_BARRIER,&mdev->flags)) {
 			if(! _drbd_send_barrier(mdev)) rv=FALSE;
 		}
-		up(&mdev->sock_mutex);
+		up(&mdev->data.mutex);
 	}
 	return rv;
 }
 
 int drbd_asender(struct Drbd_thread *thi)
 {
-	int ok;
-	unsigned long flags = 0;
+	// shortcuts
 	drbd_dev *mdev = thi->mdev;
-	Drbd_Polymorph_Packet p; // XXX BarrierAck_Packet should be enough ...
-	Drbd_Header *header = (Drbd_Header*)&p;
+	Drbd_Header *h = &mdev->meta.rbuf.head;
+
+	unsigned long flags = 0;
+	int rv;
+	void *buf    = h;
+	int received = 0;
+	int expect   = sizeof(Drbd_Header);
+
 
 	sprintf(current->comm, "drbd%d_asender", (int)(mdev-drbd_conf));
 
 	current->policy = SCHED_RR;  /* Make this a realtime task! */
 	current->rt_priority = 2;    /* more important than all other tasks */
 
-	while(thi->t_state == Running) {
+	while (thi->t_state == Running) {
 		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
 			ERR_IF(!drbd_send_ping(mdev)) goto err;
 			// half ack timeout only,
 			// since sendmsg waited the other half already
-			mdev->msock->sk->rcvtimeo =
+			mdev->meta.socket->sk->rcvtimeo =
 				mdev->conf.timeout*HZ/20;
 		}
 
-		/* going to wait for input ...  maybe we need to output
-		 * something before input is ready, so allow for wakeup
-		 * signal...
-		 */
-		LOCK_SIGMASK(current,flags);
-		sigdelset(&current->blocked,DRBD_SIG);
-		RECALC_SIGPENDING(current);
-		UNLOCK_SIGMASK(current,flags);
+		if (mdev->state == Primary) {
+			if(!drbd_try_send_barrier(mdev)) goto err;
+		}
+
+		if (!drbd_process_ee(mdev,&mdev->done_ee)) goto err;
 
-		ok = drbd_recv_header(mdev,mdev->msock,header);
-		ERR_IF(!ok)
-			goto err;
-
-		/* we don't want to be "woken up" by DRBD_SIG while
-		 * receiving payload data, nor while sending out pings
-		 * and acks!  SIGTERM is unaffected...
+		rv = drbd_recv_short(mdev,mdev->meta.socket,buf,expect);
+
+		/* Note:
+		 * -EINTR        (on meta) we got a signal
+		 * -EAGAIN       (on meta) rcvtimeo expired
+		 * -ECONNRESET   other side closed the connection
+		 * -ERESTARTSYS  (on data) we got a signal
+		 * rv <  0       other than above: unexpected error!
+		 * rv == expected: full header or command
+		 * rv <  expected: "woken" by signal during receive
+		 * rv == 0       : "connection shut down by peer"
 		 */
-		LOCK_SIGMASK(current,flags);
-		if (sigismember(&current->pending.signal, DRBD_SIG)) {
-			sigdelset(&current->pending.signal, DRBD_SIG);
-			sigaddset(&current->blocked,DRBD_SIG);
+		if (rv == -EAGAIN) {
+			set_bit(SEND_PING,&mdev->flags);
+			continue;
+		} else if (rv == -EINTR || rv < expect) {
+			LOCK_SIGMASK(current,flags);
+			sigemptyset(&current->pending.signal);
 			RECALC_SIGPENDING(current);
-		}
-		UNLOCK_SIGMASK(current,flags);
+			UNLOCK_SIGMASK(current,flags);
+			if (rv == -EINTR) rv=0;
+		} else if (rv < 0) {
+			// if (rv != -ECONNRESET)
+				ERR("sock_recvmsg returned %d\n", rv);
+			break;
+		} /* else if (rv > expect) {
+			BUG();
+		} */
+		received += rv;
+		expect   -= rv;
+		buf      += rv;
 
+		if (expect)
+			continue;
 
 		// MAYBE use jump table
 
-		switch (header->command) {
-		case WakeAsender:
-			// we were just woken up
-			break;
+		if (received == sizeof(Drbd_Header)) {
+			h->command = be16_to_cpu(h->command);
+			h->length  = be16_to_cpu(h->length);
+			if (unlikely( h->magic != BE_DRBD_MAGIC )) {
+				ERR("magic?? m: 0x%lx c: %d l: %d\n",
+				    (long)be32_to_cpu(h->magic),
+				    h->command, h->length);
+				goto err;
+			}
+		}
+
+		/*
+		 * If packet command numbers were ordered by packet size,
+		 * we could just say something like
+		 * if (h->command > [last command without payload data])
+		 *	{ expect = whatever; continue; }
+		 */
+
+		switch (h->command) {
 		case Ping:
 			ERR_IF(!drbd_send_ping_ack(mdev))
 				goto err;
@@ -1778,30 +1827,35 @@
 			break;
 		case PingAck:
 			// restore idle timeout
-			mdev->msock->sk->rcvtimeo =
+			mdev->meta.socket->sk->rcvtimeo =
 				mdev->conf.ping_int*HZ;
 			break;
 		case RecvAck:
 		case WriteAck:
-			ERR_IF(!receive_BlockAck(mdev,header))
-				goto err;
+			if (received != sizeof(Drbd_BlockAck_Packet)) {
+				expect = sizeof(Drbd_BlockAck_Packet)
+					- received;
+				ERR_IF (h->length != expect) goto err;
+				continue;
+			}
+			ERR_IF (!process_BlockAck(mdev,h)) goto err;
 			break;
 		case BarrierAck:
-			ERR_IF(!receive_BarrierAck(mdev, header))
-				goto err;
+			if (received != sizeof(Drbd_Barrier_Packet)) {
+				expect = sizeof(Drbd_Barrier_Packet)
+					- received;
+				ERR_IF (h->length != expect) goto err;
+				continue;
+			}
+			ERR_IF (mdev->state != Primary) goto err;
+			process_BarrierAck(mdev,h);
 			break;
 		default:
 			D_ASSERT(0);
 		}
-
-
-		if( mdev->state == Primary ) {
-			ERR_IF(!drbd_try_send_barrier(mdev))
-				goto err;
-		}
-
-		ERR_IF(!drbd_process_ee(mdev,&mdev->done_ee))
-			goto err;
+		buf      = h;
+		received = 0;
+		expect   = sizeof(Drbd_Header);
 	} //while
 
 	if(0) {
@@ -1813,4 +1867,3 @@
 
 	return 0;
 }
-