summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@inktank.com>2013-06-20 21:29:05 -0700
committerYehuda Sadeh <yehuda@inktank.com>2013-06-20 21:57:40 -0700
commit24e59b4377ed03d18e935c5877ffcd2b6be32f27 (patch)
treee6caa1ad43a78e58f40b4c7dbddb52e0480fd002
parent02de43ad731137179e750a318edf857eb67941e8 (diff)
downloadceph-24e59b4377ed03d18e935c5877ffcd2b6be32f27.tar.gz
rgw: buffer atomic put handler
Since we tied the atomic put handler to libcurl output data, which uses much smaller chunks, we need to buffer data, otherwise we'd end up with a huge amount of small writes. Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r--src/rgw/rgw_rados.cc43
-rw-r--r--src/rgw/rgw_rados.h4
2 files changed, 38 insertions, 9 deletions
diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc
index 9115c6b5e88..1c1194fd22d 100644
--- a/src/rgw/rgw_rados.cc
+++ b/src/rgw/rgw_rados.cc
@@ -612,13 +612,21 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle)
return 0;
}
+int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle)
+{
+ if (ofs >= next_part_ofs)
+ prepare_next_part(ofs);
+
+ return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle);
+}
+
int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle) {
+ *phandle = NULL;
if (extra_data_len) {
size_t extra_len = bl.length();
if (extra_len > extra_data_len)
extra_len = extra_data_len;
- /* is there a better way to split a bl into two bufferlists? */
bufferlist extra;
bl.splice(0, extra_len, &extra);
extra_data_bl.append(extra);
@@ -629,19 +637,23 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **pha
}
ofs = extra_data_bl.length();
}
- off_t actual_ofs = ofs - extra_data_bl.length();
- if (!actual_ofs && !immutable_head()) {
+
+ pending_data_bl.claim_append(bl);
+ if (pending_data_bl.length() < RGW_MAX_CHUNK_SIZE)
+ return 0;
+
+ pending_data_bl.splice(0, RGW_MAX_CHUNK_SIZE, &bl);
+
+ if (!data_ofs && !immutable_head()) {
first_chunk.claim(bl);
- *phandle = NULL;
obj_len = (uint64_t)first_chunk.length();
prepare_next_part(first_chunk.length());
+ data_ofs = obj_len;
return 0;
}
- if (actual_ofs >= next_part_ofs)
- prepare_next_part(actual_ofs);
- int r = RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, actual_ofs - cur_part_ofs, actual_ofs, phandle);
-
- return r;
+ off_t write_ofs = data_ofs;
+ data_ofs = write_ofs + bl.length();
+ return write_data(bl, write_ofs, phandle);
}
int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx)
@@ -700,6 +712,19 @@ void RGWPutObjProcessor_Atomic::complete_parts()
int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs)
{
+ if (pending_data_bl.length()) {
+ void *handle;
+ int r = write_data(pending_data_bl, data_ofs, &handle);
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl;
+ return r;
+ }
+ r = throttle_data(handle);
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl;
+ return r;
+ }
+ }
complete_parts();
store->set_atomic(obj_ctx, head_obj);
diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h
index 866736fc816..1a78c6d6a9f 100644
--- a/src/rgw/rgw_rados.h
+++ b/src/rgw/rgw_rados.h
@@ -259,9 +259,11 @@ class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio
off_t cur_part_ofs;
off_t next_part_ofs;
int cur_part_id;
+ off_t data_ofs;
uint64_t extra_data_len;
bufferlist extra_data_bl;
+ bufferlist pending_data_bl;
protected:
rgw_bucket bucket;
string obj_str;
@@ -275,6 +277,7 @@ protected:
virtual bool immutable_head() { return false; }
+ int write_data(bufferlist& bl, off_t ofs, void **phandle);
virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
void prepare_next_part(off_t ofs);
@@ -286,6 +289,7 @@ public:
cur_part_ofs(0),
next_part_ofs(_p),
cur_part_id(0),
+ data_ofs(0),
extra_data_len(0),
bucket(_b),
obj_str(_o),