summaryrefslogtreecommitdiff
path: root/src/kernel/messenger.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/kernel/messenger.c')
-rw-r--r--src/kernel/messenger.c86
1 files changed, 76 insertions, 10 deletions
diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c
index dc1bcd1a3d0..f39dbe2ef80 100644
--- a/src/kernel/messenger.c
+++ b/src/kernel/messenger.c
@@ -646,6 +646,12 @@ static void prepare_write_message(struct ceph_connection *con)
{
struct ceph_msg *m;
int v = 0;
+ int p;
+ void *zero_page_addr = page_address(con->msgr->zero_page);
+ int i = 0;
+ unsigned len;
+ unsigned dlen;
+ __le32 zero_map_len;
con->out_kvec_bytes = 0;
@@ -673,16 +679,56 @@ static void prepare_write_message(struct ceph_connection *con)
m->nr_pages);
BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
+ if (m->nr_pages) {
+ m->zero_map = kmalloc(m->nr_pages, GFP_KERNEL);
+ dlen = le32_to_cpu(m->hdr.data_len);
+ for (p = 0; p < m->nr_pages;
+ p++) {
+ struct page *page = NULL;
+ void *kaddr = NULL;
+ char is_zero = 1;
+ len = min((unsigned)PAGE_SIZE, dlen);
+
+ mutex_lock(&m->page_mutex);
+ if (m->pages) {
+ page = m->pages[p];
+ kaddr = kmap(page);
+ is_zero = (memcmp(kaddr, zero_page_addr, len) == 0);
+ kunmap(page);
+ }
+
+ mutex_unlock(&m->page_mutex);
+
+ dout(0, "page %d is %s\n", i, (is_zero ? "zero" : "not zero"));
+
+ m->zero_map[i++] = is_zero;
+ dlen -= len;
+ }
+ }
+ zero_map_len = cpu_to_le32(m->nr_pages);
/* tag + hdr + front */
- con->out_kvec[v].iov_base = &tag_msg;
- con->out_kvec[v++].iov_len = 1;
- con->out_kvec[v].iov_base = &m->hdr;
- con->out_kvec[v++].iov_len = sizeof(m->hdr);
- con->out_kvec[v++] = m->front;
- con->out_kvec_left = v;
- con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len;
+
+#define REGISTER_DATA(base, len) \
+ con->out_kvec[v].iov_base = base; \
+ con->out_kvec[v++].iov_len = len; \
+ con->out_kvec_bytes += len;
+
+#define REGISTER_KVEC(kvec) \
+ con->out_kvec[v++] = kvec; \
+ con->out_kvec_bytes += kvec.iov_len;
+
+
+ REGISTER_DATA(&tag_msg, 1);
+ REGISTER_DATA(&m->hdr, sizeof(m->hdr));
+ REGISTER_KVEC(m->front);
+ REGISTER_DATA(&zero_map_len, sizeof(zero_map_len));
+ if (m->nr_pages)
+ REGISTER_DATA(m->zero_map, zero_map_len);
+
con->out_kvec_cur = con->out_kvec;
+ con->out_kvec_left = v;
+
/* fill in crc (except data pages), footer */
con->out_msg->hdr.crc =
cpu_to_le32(crc32c(0, (void *)&m->hdr,
@@ -893,6 +939,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
while (con->out_msg_pos.page < con->out_msg->nr_pages) {
struct page *page = NULL;
void *kaddr = NULL;
+ void *zero_page_addr = NULL;
/*
* if we are calculating the data crc (the default), we need
@@ -906,8 +953,9 @@ static int write_partial_msg_pages(struct ceph_connection *con)
kaddr = kmap(page);
} else {
page = con->msgr->zero_page;
- if (crc)
- kaddr = page_address(con->msgr->zero_page);
+ if (crc) {
+ kaddr = zero_page_addr;
+ }
}
len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
(int)(data_len - con->out_msg_pos.data_pos));
@@ -918,14 +966,28 @@ static int write_partial_msg_pages(struct ceph_connection *con)
con->out_msg->footer.data_crc =
cpu_to_le32(crc32c(tmpcrc, base, len));
con->out_msg_pos.did_page_crc = 1;
+
}
+#if 0
+ if (len == PAGE_SIZE && msg->pages) {
+ void *base;
+ if (!kaddr)
+ kaddr = kmap(page);
+ if (!zero_page_addr)
+ zero_page_addr = page_address(con->msgr->zero_page);
+ base = kaddr + con->out_msg_pos.page_pos;
+ if (memcmp(base, zero_page_addr, PAGE_SIZE) == 0) {
+ dout(0, "zero page\n");
+ }
+ }
+#endif
ret = kernel_sendpage(con->sock, page,
con->out_msg_pos.page_pos, len,
MSG_DONTWAIT | MSG_NOSIGNAL |
MSG_MORE);
- if (crc && msg->pages)
+ if (kaddr && kaddr != zero_page_addr)
kunmap(page);
mutex_unlock(&msg->page_mutex);
@@ -1419,6 +1481,7 @@ static int read_partial_message(struct ceph_connection *con)
unsigned front_len, data_len, data_off;
struct ceph_client *client = con->msgr->parent;
int datacrc = !(client->mount_args.flags & CEPH_MOUNT_NOCRC);
+ __le32 zero_map_len;
dout(20, "read_partial_message con %p msg %p\n", con, m);
@@ -1465,6 +1528,9 @@ static int read_partial_message(struct ceph_connection *con)
con->in_front_crc = crc32c(0, m->front.iov_base,
m->front.iov_len);
}
+ ret = ceph_tcp_recvmsg(con->sock, (char *)&zero_map_len, 4);
+ if (ret <= 0)
+ return ret;
/* (page) data */
data_len = le32_to_cpu(m->hdr.data_len);