summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2009-04-08 15:00:02 -0700
committerYehuda Sadeh <yehuda@hq.newdream.net>2009-04-08 15:00:02 -0700
commit6dc18e08315eb3071f68cb7aeb9f99b2b80457a2 (patch)
tree616c2052702ad084a3c70e9a73763925ec3b9bd4
parent45f04b70bbb3e299ab33bbb57c30a0f4b9d44dda (diff)
downloadceph-historic/msgr_zeropages.tar.gz
messenger: zero pages infrastructure (not working yet)historic/msgr_zeropages
-rw-r--r--src/kernel/messenger.c86
-rw-r--r--src/kernel/messenger.h3
-rw-r--r--src/msg/SimpleMessenger.cc13
3 files changed, 91 insertions, 11 deletions
diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c
index dc1bcd1a3d0..f39dbe2ef80 100644
--- a/src/kernel/messenger.c
+++ b/src/kernel/messenger.c
@@ -646,6 +646,12 @@ static void prepare_write_message(struct ceph_connection *con)
{
struct ceph_msg *m;
int v = 0;
+ int p;
+ void *zero_page_addr = page_address(con->msgr->zero_page);
+ int i = 0;
+ unsigned len;
+ unsigned dlen;
+ __le32 zero_map_len;
con->out_kvec_bytes = 0;
@@ -673,16 +679,56 @@ static void prepare_write_message(struct ceph_connection *con)
m->nr_pages);
BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
+ if (m->nr_pages) {
+ m->zero_map = kmalloc(m->nr_pages, GFP_KERNEL);
+ dlen = le32_to_cpu(m->hdr.data_len);
+ for (p = 0; p < m->nr_pages;
+ p++) {
+ struct page *page = NULL;
+ void *kaddr = NULL;
+ char is_zero = 1;
+ len = min((unsigned)PAGE_SIZE, dlen);
+
+ mutex_lock(&m->page_mutex);
+ if (m->pages) {
+ page = m->pages[p];
+ kaddr = kmap(page);
+ is_zero = (memcmp(kaddr, zero_page_addr, len) == 0);
+ kunmap(page);
+ }
+
+ mutex_unlock(&m->page_mutex);
+
+ dout(0, "page %d is %s\n", i, (is_zero ? "zero" : "not zero"));
+
+ m->zero_map[i++] = is_zero;
+ dlen -= len;
+ }
+ }
+ zero_map_len = cpu_to_le32(m->nr_pages);
/* tag + hdr + front */
- con->out_kvec[v].iov_base = &tag_msg;
- con->out_kvec[v++].iov_len = 1;
- con->out_kvec[v].iov_base = &m->hdr;
- con->out_kvec[v++].iov_len = sizeof(m->hdr);
- con->out_kvec[v++] = m->front;
- con->out_kvec_left = v;
- con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len;
+
+#define REGISTER_DATA(base, len) \
+ con->out_kvec[v].iov_base = base; \
+ con->out_kvec[v++].iov_len = len; \
+ con->out_kvec_bytes += len;
+
+#define REGISTER_KVEC(kvec) \
+ con->out_kvec[v++] = kvec; \
+ con->out_kvec_bytes += kvec.iov_len;
+
+
+ REGISTER_DATA(&tag_msg, 1);
+ REGISTER_DATA(&m->hdr, sizeof(m->hdr));
+ REGISTER_KVEC(m->front);
+ REGISTER_DATA(&zero_map_len, sizeof(zero_map_len));
+ if (m->nr_pages)
+ REGISTER_DATA(m->zero_map, zero_map_len);
+
con->out_kvec_cur = con->out_kvec;
+ con->out_kvec_left = v;
+
/* fill in crc (except data pages), footer */
con->out_msg->hdr.crc =
cpu_to_le32(crc32c(0, (void *)&m->hdr,
@@ -893,6 +939,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
while (con->out_msg_pos.page < con->out_msg->nr_pages) {
struct page *page = NULL;
void *kaddr = NULL;
+ void *zero_page_addr = NULL;
/*
* if we are calculating the data crc (the default), we need
@@ -906,8 +953,9 @@ static int write_partial_msg_pages(struct ceph_connection *con)
kaddr = kmap(page);
} else {
page = con->msgr->zero_page;
- if (crc)
- kaddr = page_address(con->msgr->zero_page);
+ if (crc) {
+ kaddr = zero_page_addr;
+ }
}
len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
(int)(data_len - con->out_msg_pos.data_pos));
@@ -918,14 +966,28 @@ static int write_partial_msg_pages(struct ceph_connection *con)
con->out_msg->footer.data_crc =
cpu_to_le32(crc32c(tmpcrc, base, len));
con->out_msg_pos.did_page_crc = 1;
+
}
+#if 0
+ if (len == PAGE_SIZE && msg->pages) {
+ void *base;
+ if (!kaddr)
+ kaddr = kmap(page);
+ if (!zero_page_addr)
+ zero_page_addr = page_address(con->msgr->zero_page);
+ base = kaddr + con->out_msg_pos.page_pos;
+ if (memcmp(base, zero_page_addr, PAGE_SIZE) == 0) {
+ dout(0, "zero page\n");
+ }
+ }
+#endif
ret = kernel_sendpage(con->sock, page,
con->out_msg_pos.page_pos, len,
MSG_DONTWAIT | MSG_NOSIGNAL |
MSG_MORE);
- if (crc && msg->pages)
+ if (kaddr && kaddr != zero_page_addr)
kunmap(page);
mutex_unlock(&msg->page_mutex);
@@ -1419,6 +1481,7 @@ static int read_partial_message(struct ceph_connection *con)
unsigned front_len, data_len, data_off;
struct ceph_client *client = con->msgr->parent;
int datacrc = !(client->mount_args.flags & CEPH_MOUNT_NOCRC);
+ __le32 zero_map_len;
dout(20, "read_partial_message con %p msg %p\n", con, m);
@@ -1465,6 +1528,9 @@ static int read_partial_message(struct ceph_connection *con)
con->in_front_crc = crc32c(0, m->front.iov_base,
m->front.iov_len);
}
+ ret = ceph_tcp_recvmsg(con->sock, (char *)&zero_map_len, 4);
+ if (ret <= 0)
+ return ret;
/* (page) data */
data_len = le32_to_cpu(m->hdr.data_len);
diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h
index 7a7224a6693..4905bc20f27 100644
--- a/src/kernel/messenger.h
+++ b/src/kernel/messenger.h
@@ -118,6 +118,7 @@ struct ceph_msg {
atomic_t nref;
bool front_is_vmalloc;
bool more_to_follow;
+ char *zero_map;
};
struct ceph_msg_pos {
@@ -203,7 +204,7 @@ struct ceph_connection {
out_sent) */
struct ceph_msg_pos out_msg_pos;
- struct kvec out_kvec[6], /* sending header/footer data */
+ struct kvec out_kvec[7], /* sending header/footer data */
*out_kvec_cur;
int out_kvec_left; /* kvec's left in out_kvec */
int out_kvec_bytes; /* total bytes left */
diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
index 995b0bf5d2f..aab1aa4a5c1 100644
--- a/src/msg/SimpleMessenger.cc
+++ b/src/msg/SimpleMessenger.cc
@@ -1931,6 +1931,16 @@ Message *Rank::Pipe::read_message()
dout(20) << "reader got front " << front.length() << dendl;
}
+ int zero_map_len;
+ bufferptr zero_map_bp;
+ if (tcp_read( sd, (char *)&zero_map_len, 4 ) < 0)
+ return 0;
+ if (zero_map_len) {
+ zero_map_bp = buffer::create(zero_map_len);
+ if (tcp_read( sd, zero_map_bp.c_str(), zero_map_len ) < 0)
+ return 0;
+ }
+
// read data
bufferlist data;
unsigned data_len = le32_to_cpu(header.data_len);
@@ -2074,6 +2084,9 @@ int Rank::Pipe::write_message(Message *m)
m->calc_header_crc();
bufferlist blist = m->get_payload();
+
+ int zero_map_len = 0;
+ blist.append((char *)&zero_map_len, 4);
blist.append(m->get_data());
dout(20) << "write_message " << m << " to " << header.dst << dendl;