summaryrefslogtreecommitdiff
path: root/src/librados
diff options
context:
space:
mode:
authorJosh Durgin <josh.durgin@dreamhost.com>2012-03-01 12:08:33 -0800
committerJosh Durgin <josh.durgin@dreamhost.com>2012-03-13 11:46:02 -0700
commit5f92f338be3e0a7afff1957edc0d7027b23e0378 (patch)
tree6550de46a8ff419b6d45fe96ede4afecd61ddd1c /src/librados
parent8f278647f4aac364c543059a039610d8a28a6b4d (diff)
downloadceph-5f92f338be3e0a7afff1957edc0d7027b23e0378.tar.gz
librados: move methods that require an IoCtx to IoCtxImpl
RadosClient still does a few different things, but at least it no longer does all the work of an IoCtx. Signed-off-by: Josh Durgin <josh.durgin@dreamhost.com>
Diffstat (limited to 'src/librados')
-rw-r--r--src/librados/IoCtxImpl.cc1534
-rw-r--r--src/librados/IoCtxImpl.h134
-rw-r--r--src/librados/PoolAsyncCompletionImpl.h150
-rw-r--r--src/librados/RadosClient.cc1644
-rw-r--r--src/librados/RadosClient.h152
-rw-r--r--src/librados/librados.cc235
6 files changed, 1914 insertions, 1935 deletions
diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc
index f32556d8a6d..d22aebc0f81 100644
--- a/src/librados/IoCtxImpl.cc
+++ b/src/librados/IoCtxImpl.cc
@@ -15,8 +15,10 @@
#include "IoCtxImpl.h"
#include "librados/AioCompletionImpl.h"
+#include "librados/PoolAsyncCompletionImpl.h"
#include "librados/RadosClient.h"
+
#define DOUT_SUBSYS rados
#undef dout_prefix
#define dout_prefix *_dout << "librados: "
@@ -26,11 +28,14 @@ librados::IoCtxImpl::IoCtxImpl()
{
}
-librados::IoCtxImpl::IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s)
- : ref_cnt(0), client(c), poolid(pid),
- pool_name(pool_name_), snap_seq(s), assert_ver(0),
- notify_timeout(c->cct->_conf->client_notify_timeout), oloc(pid),
- aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"), aio_write_seq(0)
+librados::IoCtxImpl::IoCtxImpl(RadosClient *c, Objecter *objecter,
+ Mutex *client_lock, int poolid,
+ const char *pool_name, snapid_t s)
+ : ref_cnt(0), client(c), poolid(poolid), pool_name(pool_name), snap_seq(s),
+ assert_ver(0), notify_timeout(c->cct->_conf->client_notify_timeout),
+ oloc(poolid),
+ aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
+ aio_write_seq(0), lock(client_lock), objecter(objecter)
{
}
@@ -45,7 +50,8 @@ void librados::IoCtxImpl::set_snap_read(snapid_t s)
int librados::IoCtxImpl::set_snap_write_context(snapid_t seq, vector<snapid_t>& snaps)
{
::SnapContext n;
- ldout(client->cct, 10) << "set snap write context: seq = " << seq << " and snaps = " << snaps << dendl;
+ ldout(client->cct, 10) << "set snap write context: seq = " << seq
+ << " and snaps = " << snaps << dendl;
n.seq = seq;
n.snaps = snaps;
if (!n.is_valid())
@@ -85,3 +91,1519 @@ void librados::IoCtxImpl::flush_aio_writes()
aio_write_cond.Wait(aio_write_list_lock);
aio_write_list_lock.Unlock();
}
+
+// SNAPS
+
+int librados::IoCtxImpl::snap_create(const char *snapName)
+{
+ int reply;
+ string sName(snapName);
+
+ Mutex mylock ("IoCtxImpl::snap_create::mylock");
+ Cond cond;
+ bool done;
+ lock->Lock();
+ objecter->create_pool_snap(poolid,
+ sName,
+ new C_SafeCond(&mylock, &cond, &done, &reply));
+ lock->Unlock();
+
+ mylock.Lock();
+ while(!done) cond.Wait(mylock);
+ mylock.Unlock();
+ return reply;
+}
+
+int librados::IoCtxImpl::selfmanaged_snap_create(uint64_t *psnapid)
+{
+ int reply;
+
+ Mutex mylock("IoCtxImpl::selfmanaged_snap_create::mylock");
+ Cond cond;
+ bool done;
+ lock->Lock();
+ snapid_t snapid;
+ objecter->allocate_selfmanaged_snap(poolid, &snapid,
+ new C_SafeCond(&mylock, &cond, &done, &reply));
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done) cond.Wait(mylock);
+ mylock.Unlock();
+ if (reply == 0)
+ *psnapid = snapid;
+ return reply;
+}
+
+int librados::IoCtxImpl::snap_remove(const char *snapName)
+{
+ int reply;
+ string sName(snapName);
+
+ Mutex mylock ("IoCtxImpl::snap_remove::mylock");
+ Cond cond;
+ bool done;
+ lock->Lock();
+ objecter->delete_pool_snap(poolid,
+ sName,
+ new C_SafeCond(&mylock, &cond, &done, &reply));
+ lock->Unlock();
+
+ mylock.Lock();
+ while(!done) cond.Wait(mylock);
+ mylock.Unlock();
+ return reply;
+}
+
+int librados::IoCtxImpl::selfmanaged_snap_rollback_object(const object_t& oid,
+ ::SnapContext& snapc,
+ uint64_t snapid)
+{
+ int reply;
+
+ Mutex mylock("IoCtxImpl::snap_rollback::mylock");
+ Cond cond;
+ bool done;
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &reply);
+
+ lock->Lock();
+ objecter->rollback_object(oid, oloc, snapc, snapid,
+ ceph_clock_now(client->cct), onack, NULL);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done) cond.Wait(mylock);
+ mylock.Unlock();
+ return reply;
+}
+
+int librados::IoCtxImpl::rollback(const object_t& oid, const char *snapName)
+{
+ string sName(snapName);
+
+ lock->Lock();
+ snapid_t snap;
+ const map<int64_t, pg_pool_t>& pools = objecter->osdmap->get_pools();
+ const pg_pool_t& pg_pool = pools.find(poolid)->second;
+ map<snapid_t, pool_snap_info_t>::const_iterator p;
+ for (p = pg_pool.snaps.begin();
+ p != pg_pool.snaps.end();
+ ++p) {
+ if (p->second.name == snapName) {
+ snap = p->first;
+ break;
+ }
+ }
+ if (p == pg_pool.snaps.end()) {
+ lock->Unlock();
+ return -ENOENT;
+ }
+ lock->Unlock();
+
+ return selfmanaged_snap_rollback_object(oid, snapc, snap);
+}
+
+int librados::IoCtxImpl::selfmanaged_snap_remove(uint64_t snapid)
+{
+ int reply;
+
+ Mutex mylock("IoCtxImpl::selfmanaged_snap_remove::mylock");
+ Cond cond;
+ bool done;
+ lock->Lock();
+ objecter->delete_selfmanaged_snap(poolid, snapid_t(snapid),
+ new C_SafeCond(&mylock, &cond, &done, &reply));
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done) cond.Wait(mylock);
+ mylock.Unlock();
+ return (int)reply;
+}
+
+int librados::IoCtxImpl::pool_change_auid(unsigned long long auid)
+{
+ int reply;
+
+ Mutex mylock("IoCtxImpl::pool_change_auid::mylock");
+ Cond cond;
+ bool done;
+ lock->Lock();
+ objecter->change_pool_auid(poolid,
+ new C_SafeCond(&mylock, &cond, &done, &reply),
+ auid);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done) cond.Wait(mylock);
+ mylock.Unlock();
+ return reply;
+}
+
+int librados::IoCtxImpl::pool_change_auid_async(unsigned long long auid,
+ PoolAsyncCompletionImpl *c)
+{
+ Mutex::Locker l(*lock);
+ objecter->change_pool_auid(poolid,
+ new C_PoolAsync_Safe(c),
+ auid);
+ return 0;
+}
+
+int librados::IoCtxImpl::snap_list(vector<uint64_t> *snaps)
+{
+ Mutex::Locker l(*lock);
+ const pg_pool_t *pi = objecter->osdmap->get_pg_pool(poolid);
+ for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
+ p != pi->snaps.end();
+ p++)
+ snaps->push_back(p->first);
+ return 0;
+}
+
+int librados::IoCtxImpl::snap_lookup(const char *name, uint64_t *snapid)
+{
+ Mutex::Locker l(*lock);
+ const pg_pool_t *pi = objecter->osdmap->get_pg_pool(poolid);
+ for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
+ p != pi->snaps.end();
+ p++) {
+ if (p->second.name == name) {
+ *snapid = p->first;
+ return 0;
+ }
+ }
+ return -ENOENT;
+}
+
+int librados::IoCtxImpl::snap_get_name(uint64_t snapid, std::string *s)
+{
+ Mutex::Locker l(*lock);
+ const pg_pool_t *pi = objecter->osdmap->get_pg_pool(poolid);
+ map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.find(snapid);
+ if (p == pi->snaps.end())
+ return -ENOENT;
+ *s = p->second.name.c_str();
+ return 0;
+}
+
+int librados::IoCtxImpl::snap_get_stamp(uint64_t snapid, time_t *t)
+{
+ Mutex::Locker l(*lock);
+ const pg_pool_t *pi = objecter->osdmap->get_pg_pool(poolid);
+ map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.find(snapid);
+ if (p == pi->snaps.end())
+ return -ENOENT;
+ *t = p->second.stamp.sec();
+ return 0;
+}
+
+
+// IO
+
+int librados::IoCtxImpl::list(Objecter::ListContext *context, int max_entries)
+{
+ Cond cond;
+ bool done;
+ int r = 0;
+ object_t oid;
+ Mutex mylock("IoCtxImpl::list::mylock");
+
+ if (context->at_end)
+ return 0;
+
+ context->max_entries = max_entries;
+
+ lock->Lock();
+ objecter->list_objects(context, new C_SafeCond(&mylock, &cond, &done, &r));
+ lock->Unlock();
+
+ mylock.Lock();
+ while(!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ return r;
+}
+
+int librados::IoCtxImpl::create(const object_t& oid, bool exclusive)
+{
+ utime_t ut = ceph_clock_now(client->cct);
+
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ Mutex mylock("IoCtxImpl::create::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ eversion_t ver;
+
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+
+ lock->Lock();
+ objecter->create(oid, oloc,
+ snapc, ut, 0, (exclusive ? CEPH_OSD_OP_FLAG_EXCL : 0),
+ onack, NULL, &ver);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ return r;
+}
+
+int librados::IoCtxImpl::create(const object_t& oid, bool exclusive,
+ const std::string& category)
+{
+ utime_t ut = ceph_clock_now(client->cct);
+
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ Mutex mylock("IoCtxImpl::create::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ eversion_t ver;
+
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+
+ ::ObjectOperation o;
+ o.create(exclusive ? CEPH_OSD_OP_FLAG_EXCL : 0, category);
+
+ lock->Lock();
+ objecter->mutate(oid, oloc, o, snapc, ut, 0, onack, NULL, &ver);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ return r;
+}
+
+/*
+ * add any version assert operations that are appropriate given the
+ * stat in the IoCtx, either the target version assert or any src
+ * object asserts. these affect a single ioctx operation, so clear
+ * the ioctx state when we're doing.
+ *
+ * return a pointer to the ObjectOperation if we added any events;
+ * this is convenient for passing the extra_ops argument into Objecter
+ * methods.
+ */
+::ObjectOperation *librados::IoCtxImpl::prepare_assert_ops(::ObjectOperation *op)
+{
+ ::ObjectOperation *pop = NULL;
+ if (assert_ver) {
+ op->assert_version(assert_ver);
+ assert_ver = 0;
+ pop = op;
+ }
+ while (!assert_src_version.empty()) {
+ map<object_t,uint64_t>::iterator p = assert_src_version.begin();
+ op->assert_src_version(p->first, CEPH_NOSNAP, p->second);
+ assert_src_version.erase(p);
+ pop = op;
+ }
+ return pop;
+}
+
+int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl,
+ size_t len, uint64_t off)
+{
+ utime_t ut = ceph_clock_now(client->cct);
+
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ Mutex mylock("IoCtxImpl::write::mylock");
+ Cond cond;
+ bool done;
+ int r;
+
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ eversion_t ver;
+
+ // extra ops?
+ ::ObjectOperation op;
+ ::ObjectOperation *pop = prepare_assert_ops(&op);
+
+ lock->Lock();
+ objecter->write(oid, oloc,
+ off, len, snapc, bl, ut, 0,
+ onack, NULL, &ver, pop);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ if (r < 0)
+ return r;
+
+ return len;
+}
+
+int librados::IoCtxImpl::append(const object_t& oid, bufferlist& bl, size_t len)
+{
+ utime_t ut = ceph_clock_now(client->cct);
+
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ Mutex mylock("IoCtxImpl::append::mylock");
+ Cond cond;
+ bool done;
+ int r;
+
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ eversion_t ver;
+
+ ::ObjectOperation op;
+ ::ObjectOperation *pop = prepare_assert_ops(&op);
+
+ lock->Lock();
+ objecter->append(oid, oloc,
+ len, snapc, bl, ut, 0,
+ onack, NULL, &ver, pop);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ if (r < 0)
+ return r;
+
+ return len;
+}
+
+int librados::IoCtxImpl::write_full(const object_t& oid, bufferlist& bl)
+{
+ utime_t ut = ceph_clock_now(client->cct);
+
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ Mutex mylock("IoCtxImpl::write_full::mylock");
+ Cond cond;
+ bool done;
+ int r;
+
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+
+ eversion_t ver;
+
+ ::ObjectOperation op;
+ ::ObjectOperation *pop = prepare_assert_ops(&op);
+
+ lock->Lock();
+ objecter->write_full(oid, oloc,
+ snapc, bl, ut, 0,
+ onack, NULL, &ver, pop);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ return r;
+}
+
+int librados::IoCtxImpl::clone_range(const object_t& dst_oid,
+ uint64_t dst_offset,
+ const object_t& src_oid,
+ uint64_t src_offset,
+ uint64_t len)
+{
+ utime_t ut = ceph_clock_now(client->cct);
+
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ Mutex mylock("IoCtxImpl::clone_range::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ eversion_t ver;
+
+ bufferlist outbl;
+
+ lock->Lock();
+ ::ObjectOperation wr;
+ prepare_assert_ops(&wr);
+ wr.clone_range(src_oid, src_offset, len, dst_offset);
+ objecter->mutate(dst_oid, oloc, wr, snapc, ut, 0, onack, NULL, &ver);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ return r;
+}
+
+int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o,
+ time_t *pmtime)
+{
+ utime_t ut;
+ if (pmtime) {
+ ut = utime_t(*pmtime, 0);
+ } else {
+ ut = ceph_clock_now(client->cct);
+ }
+
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ if (!o->size())
+ return 0;
+
+ Mutex mylock("IoCtxImpl::mutate::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ eversion_t ver;
+
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+
+ lock->Lock();
+ objecter->mutate(oid, oloc,
+ *o, snapc, ut, 0,
+ onack, NULL, &ver);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ return r;
+}
+
+int librados::IoCtxImpl::operate_read(const object_t& oid,
+ ::ObjectOperation *o, bufferlist *pbl)
+{
+ if (!o->size())
+ return 0;
+
+ Mutex mylock("IoCtxImpl::mutate::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ eversion_t ver;
+
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+
+ lock->Lock();
+ objecter->read(oid, oloc,
+ *o, snap_seq, pbl, 0,
+ onack, &ver);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ return r;
+}
+
+int librados::IoCtxImpl::aio_operate_read(const object_t &oid,
+ ::ObjectOperation *o,
+ AioCompletionImpl *c, bufferlist *pbl)
+{
+ Context *onack = new C_aio_Ack(c);
+
+ c->pbl = pbl;
+
+ Mutex::Locker l(*lock);
+ objecter->read(oid, oloc,
+ *o, snap_seq, pbl, 0,
+ onack, 0);
+ return 0;
+}
+
+int librados::IoCtxImpl::aio_operate(const object_t& oid,
+ ::ObjectOperation *o, AioCompletionImpl *c)
+{
+ utime_t ut = ceph_clock_now(client->cct);
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ Context *onack = new C_aio_Ack(c);
+ Context *oncommit = new C_aio_Safe(c);
+
+ queue_aio_write(c);
+
+ Mutex::Locker l(*lock);
+ objecter->mutate(oid, oloc, *o, snapc, ut, 0, onack, oncommit, &c->objver);
+
+ return 0;
+}
+
+int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
+ bufferlist *pbl, size_t len, uint64_t off)
+{
+
+ Context *onack = new C_aio_Ack(c);
+ eversion_t ver;
+
+ c->pbl = pbl;
+
+ Mutex::Locker l(*lock);
+ objecter->read(oid, oloc,
+ off, len, snap_seq, &c->bl, 0,
+ onack, &c->objver);
+ return 0;
+}
+
+int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
+ char *buf, size_t len, uint64_t off)
+{
+ Context *onack = new C_aio_Ack(c);
+
+ c->buf = buf;
+ c->maxlen = len;
+
+ Mutex::Locker l(*lock);
+ objecter->read(oid, oloc,
+ off, len, snap_seq, &c->bl, 0,
+ onack, &c->objver);
+
+ return 0;
+}
+
+int librados::IoCtxImpl::aio_sparse_read(const object_t oid,
+ AioCompletionImpl *c,
+ std::map<uint64_t,uint64_t> *m,
+ bufferlist *data_bl, size_t len,
+ uint64_t off)
+{
+
+ C_aio_sparse_read_Ack *onack = new C_aio_sparse_read_Ack(c);
+ onack->m = m;
+ onack->data_bl = data_bl;
+ eversion_t ver;
+
+ c->pbl = NULL;
+
+ Mutex::Locker l(*lock);
+ objecter->sparse_read(oid, oloc,
+ off, len, snap_seq, &c->bl, 0,
+ onack);
+ return 0;
+}
+
+int librados::IoCtxImpl::aio_write(const object_t &oid, AioCompletionImpl *c,
+ const bufferlist& bl, size_t len,
+ uint64_t off)
+{
+ utime_t ut = ceph_clock_now(client->cct);
+
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ queue_aio_write(c);
+
+ Context *onack = new C_aio_Ack(c);
+ Context *onsafe = new C_aio_Safe(c);
+
+ Mutex::Locker l(*lock);
+ objecter->write(oid, oloc,
+ off, len, snapc, bl, ut, 0,
+ onack, onsafe, &c->objver);
+
+ return 0;
+}
+
+int librados::IoCtxImpl::aio_append(const object_t &oid, AioCompletionImpl *c,
+ const bufferlist& bl, size_t len)
+{
+ utime_t ut = ceph_clock_now(client->cct);
+
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ queue_aio_write(c);
+
+ Context *onack = new C_aio_Ack(c);
+ Context *onsafe = new C_aio_Safe(c);
+
+ Mutex::Locker l(*lock);
+ objecter->append(oid, oloc,
+ len, snapc, bl, ut, 0,
+ onack, onsafe, &c->objver);
+
+ return 0;
+}
+
+int librados::IoCtxImpl::aio_write_full(const object_t &oid,
+ AioCompletionImpl *c,
+ const bufferlist& bl)
+{
+ utime_t ut = ceph_clock_now(client->cct);
+
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ queue_aio_write(c);
+
+ Context *onack = new C_aio_Ack(c);
+ Context *onsafe = new C_aio_Safe(c);
+
+ Mutex::Locker l(*lock);
+ objecter->write_full(oid, oloc,
+ snapc, bl, ut, 0,
+ onack, onsafe, &c->objver);
+
+ return 0;
+}
+
+int librados::IoCtxImpl::remove(const object_t& oid)
+{
+ utime_t ut = ceph_clock_now(client->cct);
+
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ Mutex mylock("IoCtxImpl::remove::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ eversion_t ver;
+
+ ::ObjectOperation op;
+ ::ObjectOperation *pop = prepare_assert_ops(&op);
+
+ lock->Lock();
+ objecter->remove(oid, oloc,
+ snapc, ut, 0,
+ onack, NULL, &ver, pop);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ return r;
+}
+
+int librados::IoCtxImpl::trunc(const object_t& oid, uint64_t size)
+{
+ utime_t ut = ceph_clock_now(client->cct);
+
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ Mutex mylock("IoCtxImpl::write_full::mylock");
+ Cond cond;
+ bool done;
+ int r;
+
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ eversion_t ver;
+
+ ::ObjectOperation op;
+ ::ObjectOperation *pop = prepare_assert_ops(&op);
+
+ lock->Lock();
+ objecter->trunc(oid, oloc,
+ snapc, ut, 0,
+ size, 0,
+ onack, NULL, &ver, pop);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ return r;
+}
+
+int librados::IoCtxImpl::tmap_update(const object_t& oid, bufferlist& cmdbl)
+{
+ utime_t ut = ceph_clock_now(client->cct);
+
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ Mutex mylock("IoCtxImpl::tmap_update::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ eversion_t ver;
+
+ bufferlist outbl;
+
+ lock->Lock();
+ ::ObjectOperation wr;
+ prepare_assert_ops(&wr);
+ wr.tmap_update(cmdbl);
+ objecter->mutate(oid, oloc, wr, snapc, ut, 0, onack, NULL, &ver);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ return r;
+}
+
+int librados::IoCtxImpl::tmap_put(const object_t& oid, bufferlist& bl)
+{
+ utime_t ut = ceph_clock_now(client->cct);
+
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ Mutex mylock("IoCtxImpl::tmap_put::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ eversion_t ver;
+
+ bufferlist outbl;
+
+ lock->Lock();
+ ::ObjectOperation wr;
+ prepare_assert_ops(&wr);
+ wr.tmap_put(bl);
+ objecter->mutate(oid, oloc, wr, snapc, ut, 0, onack, NULL, &ver);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ return r;
+}
+
+int librados::IoCtxImpl::tmap_get(const object_t& oid, bufferlist& bl)
+{
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ Mutex mylock("IoCtxImpl::tmap_put::mylock");
+ Cond cond;
+ bool done;
+ int r = 0;
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ eversion_t ver;
+
+ bufferlist outbl;
+
+ lock->Lock();
+ ::ObjectOperation rd;
+ prepare_assert_ops(&rd);
+ rd.tmap_get(&bl, NULL);
+ objecter->read(oid, oloc, rd, snap_seq, 0, 0, onack, &ver);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ return r;
+}
+
+
+int librados::IoCtxImpl::exec(const object_t& oid,
+ const char *cls, const char *method,
+ bufferlist& inbl, bufferlist& outbl)
+{
+ Mutex mylock("IoCtxImpl::exec::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ eversion_t ver;
+
+
+ lock->Lock();
+ ::ObjectOperation rd;
+ prepare_assert_ops(&rd);
+ rd.call(cls, method, inbl);
+ objecter->read(oid, oloc, rd, snap_seq, &outbl, 0, onack, &ver);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ return r;
+}
+
+int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
+ const char *cls, const char *method,
+ bufferlist& inbl, bufferlist *outbl)
+{
+ Context *onack = new C_aio_Ack(c);
+
+ Mutex::Locker l(*lock);
+ ::ObjectOperation rd;
+ prepare_assert_ops(&rd);
+ rd.call(cls, method, inbl);
+ objecter->read(oid, oloc, rd, snap_seq, outbl, 0, onack, &c->objver);
+
+ return 0;
+}
+
+int librados::IoCtxImpl::read(const object_t& oid,
+ bufferlist& bl, size_t len, uint64_t off)
+{
+ Mutex mylock("IoCtxImpl::read::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ eversion_t ver;
+
+ ::ObjectOperation op;
+ ::ObjectOperation *pop = prepare_assert_ops(&op);
+
+ lock->Lock();
+ objecter->read(oid, oloc,
+ off, len, snap_seq, &bl, 0,
+ onack, &ver, pop);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+ ldout(client->cct, 10) << "Objecter returned from read r=" << r << dendl;
+
+ set_sync_op_version(ver);
+
+ if (r < 0)
+ return r;
+
+ if (bl.length() < len) {
+ ldout(client->cct, 10) << "Returned length " << bl.length()
+ << " less than original length "<< len << dendl;
+ }
+
+ return bl.length();
+}
+
+int librados::IoCtxImpl::mapext(const object_t& oid,
+ uint64_t off, size_t len,
+ std::map<uint64_t,uint64_t>& m)
+{
+ bufferlist bl;
+
+ Mutex mylock("IoCtxImpl::read::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+
+ lock->Lock();
+ objecter->mapext(oid, oloc,
+ off, len, snap_seq, &bl, 0,
+ onack);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+ ldout(client->cct, 10) << "Objecter returned from read r=" << r << dendl;
+
+ if (r < 0)
+ return r;
+
+ bufferlist::iterator iter = bl.begin();
+ ::decode(m, iter);
+
+ return m.size();
+}
+
+int librados::IoCtxImpl::sparse_read(const object_t& oid,
+ std::map<uint64_t,uint64_t>& m,
+ bufferlist& data_bl, size_t len,
+ uint64_t off)
+{
+ bufferlist bl;
+
+ Mutex mylock("IoCtxImpl::read::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+
+ lock->Lock();
+ objecter->sparse_read(oid, oloc,
+ off, len, snap_seq, &bl, 0,
+ onack);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+ ldout(client->cct, 10) << "Objecter returned from read r=" << r << dendl;
+
+ if (r < 0)
+ return r;
+
+ bufferlist::iterator iter = bl.begin();
+ ::decode(m, iter);
+ ::decode(data_bl, iter);
+
+ return m.size();
+}
+
+int librados::IoCtxImpl::stat(const object_t& oid, uint64_t *psize, time_t *pmtime)
+{
+ Mutex mylock("IoCtxImpl::stat::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ uint64_t size;
+ utime_t mtime;
+ eversion_t ver;
+
+ if (!psize)
+ psize = &size;
+
+ ::ObjectOperation op;
+ ::ObjectOperation *pop = prepare_assert_ops(&op);
+
+ lock->Lock();
+ objecter->stat(oid, oloc,
+ snap_seq, psize, &mtime, 0,
+ onack, &ver, pop);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+ ldout(client->cct, 10) << "Objecter returned from stat" << dendl;
+
+ if (r >= 0 && pmtime) {
+ *pmtime = mtime.sec();
+ }
+
+ set_sync_op_version(ver);
+
+ return r;
+}
+
+int librados::IoCtxImpl::getxattr(const object_t& oid,
+ const char *name, bufferlist& bl)
+{
+ Mutex mylock("IoCtxImpl::getxattr::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ eversion_t ver;
+
+ ::ObjectOperation op;
+ ::ObjectOperation *pop = prepare_assert_ops(&op);
+
+ lock->Lock();
+ objecter->getxattr(oid, oloc,
+ name, snap_seq, &bl, 0,
+ onack, &ver, pop);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+ ldout(client->cct, 10) << "Objecter returned from getxattr" << dendl;
+
+ set_sync_op_version(ver);
+
+ if (r < 0)
+ return r;
+
+ return bl.length();
+}
+
+int librados::IoCtxImpl::rmxattr(const object_t& oid, const char *name)
+{
+ utime_t ut = ceph_clock_now(client->cct);
+
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ Mutex mylock("IoCtxImpl::rmxattr::mylock");
+ Cond cond;
+ bool done;
+ int r;
+
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ eversion_t ver;
+
+ ::ObjectOperation op;
+ ::ObjectOperation *pop = prepare_assert_ops(&op);
+
+ lock->Lock();
+ objecter->removexattr(oid, oloc, name,
+ snapc, ut, 0,
+ onack, NULL, &ver, pop);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
+int librados::IoCtxImpl::setxattr(const object_t& oid,
+ const char *name, bufferlist& bl)
+{
+ utime_t ut = ceph_clock_now(client->cct);
+
+ /* can't write to a snapshot */
+ if (snap_seq != CEPH_NOSNAP)
+ return -EROFS;
+
+ Mutex mylock("IoCtxImpl::setxattr::mylock");
+ Cond cond;
+ bool done;
+ int r;
+
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ eversion_t ver;
+
+ ::ObjectOperation op;
+ ::ObjectOperation *pop = prepare_assert_ops(&op);
+
+ lock->Lock();
+ objecter->setxattr(oid, oloc, name,
+ snapc, bl, ut, 0,
+ onack, NULL, &ver, pop);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
+int librados::IoCtxImpl::getxattrs(const object_t& oid,
+ map<std::string, bufferlist>& attrset)
+{
+ Mutex mylock("IoCtxImpl::getexattrs::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ eversion_t ver;
+
+ ::ObjectOperation op;
+ ::ObjectOperation *pop = prepare_assert_ops(&op);
+
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+
+ lock->Lock();
+ map<string, bufferlist> aset;
+ objecter->getxattrs(oid, oloc, snap_seq,
+ aset,
+ 0, onack, &ver, pop);
+ lock->Unlock();
+
+ attrset.clear();
+
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ for (map<string,bufferlist>::iterator p = aset.begin(); p != aset.end(); p++) {
+ ldout(client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl;
+ attrset[p->first.c_str()] = p->second;
+ }
+
+ set_sync_op_version(ver);
+
+ return r;
+}
+
+void librados::IoCtxImpl::set_sync_op_version(eversion_t& ver)
+{
+ last_objver = ver;
+}
+
+int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver,
+ uint64_t *cookie, librados::WatchCtx *ctx)
+{
+ ::ObjectOperation rd;
+ Mutex mylock("IoCtxImpl::watch::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
+ eversion_t objver;
+
+ lock->Lock();
+
+ WatchContext *wc = new WatchContext(this, oid, ctx);
+ client->register_watcher(wc, oid, ctx, cookie);
+ prepare_assert_ops(&rd);
+ rd.watch(*cookie, ver, 1);
+ bufferlist bl;
+ wc->linger_id = objecter->linger(oid, oloc, rd, snap_seq, bl, NULL, 0,
+ NULL, onfinish, &objver);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(objver);
+
+ if (r < 0) {
+ lock->Lock();
+ client->unregister_watcher(*cookie);
+ lock->Unlock();
+ }
+
+ return r;
+}
+
+
+/* this is called with IoCtxImpl::lock held */
+int librados::IoCtxImpl::_notify_ack(const object_t& oid,
+ uint64_t notify_id, uint64_t ver)
+{
+ Mutex mylock("IoCtxImpl::watch::mylock");
+ Cond cond;
+ eversion_t objver;
+
+ ::ObjectOperation rd;
+ prepare_assert_ops(&rd);
+ rd.notify_ack(notify_id, ver);
+ objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0);
+
+ return 0;
+}
+
+int librados::IoCtxImpl::unwatch(const object_t& oid, uint64_t cookie)
+{
+ bufferlist inbl, outbl;
+
+ Mutex mylock("IoCtxImpl::watch::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ eversion_t ver;
+ lock->Lock();
+
+ client->unregister_watcher(cookie);
+
+ ::ObjectOperation rd;
+ prepare_assert_ops(&rd);
+ rd.watch(cookie, 0, 0);
+ objecter->read(oid, oloc, rd, snap_seq, &outbl, 0, onack, &ver);
+ lock->Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ set_sync_op_version(ver);
+
+ return r;
+}
+
+int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& bl)
+{
+ bufferlist inbl, outbl;
+
+ Mutex mylock("IoCtxImpl::notify::mylock");
+ Mutex mylock_all("IoCtxImpl::notify::mylock_all");
+ Cond cond, cond_all;
+ bool done, done_all;
+ int r;
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ eversion_t objver;
+ uint64_t cookie;
+ C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all);
+
+ ::ObjectOperation rd;
+ prepare_assert_ops(&rd);
+
+ lock->Lock();
+ WatchContext *wc = new WatchContext(this, oid, ctx);
+ client->register_watcher(wc, oid, ctx, &cookie);
+ uint32_t prot_ver = 1;
+ uint32_t timeout = notify_timeout;
+ ::encode(prot_ver, inbl);
+ ::encode(timeout, inbl);
+ ::encode(bl, inbl);
+ rd.notify(cookie, ver, inbl);
+ wc->linger_id = objecter->linger(oid, oloc, rd, snap_seq, inbl, NULL,
+ 0, onack, NULL, &objver);
+ lock->Unlock();
+
+ mylock_all.Lock();
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ if (r == 0) {
+ while (!done_all)
+ cond_all.Wait(mylock_all);
+ }
+
+ mylock_all.Unlock();
+
+ lock->Lock();
+ client->unregister_watcher(cookie);
+ lock->Unlock();
+
+ set_sync_op_version(objver);
+ delete ctx;
+
+ return r;
+}
+
+eversion_t librados::IoCtxImpl::last_version()
+{
+ return last_objver;
+}
+
+void librados::IoCtxImpl::set_assert_version(uint64_t ver)
+{
+ assert_ver = ver;
+}
+void librados::IoCtxImpl::set_assert_src_version(const object_t& oid,
+ uint64_t ver)
+{
+ assert_src_version[oid] = ver;
+}
+
+void librados::IoCtxImpl::set_notify_timeout(uint32_t timeout)
+{
+ notify_timeout = timeout;
+}
+
+///////////////////////////// C_aio_Ack ////////////////////////////////
+
+librados::IoCtxImpl::C_aio_Ack::C_aio_Ack(AioCompletionImpl *_c) : c(_c)
+{
+ c->get();
+}
+
+void librados::IoCtxImpl::C_aio_Ack::finish(int r)
+{
+ c->lock.Lock();
+ c->rval = r;
+ c->ack = true;
+ c->cond.Signal();
+
+ if (c->buf && c->bl.length() > 0) {
+ unsigned l = MIN(c->bl.length(), c->maxlen);
+ c->bl.copy(0, l, c->buf);
+ c->rval = c->bl.length();
+ }
+ if (c->pbl) {
+ *c->pbl = c->bl;
+ }
+
+ if (c->callback_complete) {
+ rados_callback_t cb = c->callback_complete;
+ void *cb_arg = c->callback_arg;
+ c->lock.Unlock();
+ cb(c, cb_arg);
+ c->lock.Lock();
+ }
+
+ c->put_unlock();
+}
+
+/////////////////////// C_aio_sparse_read_Ack //////////////////////////
+
+librados::IoCtxImpl::C_aio_sparse_read_Ack::C_aio_sparse_read_Ack(AioCompletionImpl *_c)
+ : c(_c)
+{
+ c->get();
+}
+
+void librados::IoCtxImpl::C_aio_sparse_read_Ack::finish(int r)
+{
+ c->lock.Lock();
+ c->rval = r;
+ c->ack = true;
+ c->cond.Signal();
+
+ bufferlist::iterator iter = c->bl.begin();
+ if (r >= 0) {
+ ::decode(*m, iter);
+ ::decode(*data_bl, iter);
+ }
+
+ if (c->callback_complete) {
+ rados_callback_t cb = c->callback_complete;
+ void *cb_arg = c->callback_arg;
+ c->lock.Unlock();
+ cb(c, cb_arg);
+ c->lock.Lock();
+ }
+
+ c->put_unlock();
+}
+
+//////////////////////////// C_aio_Safe ////////////////////////////////
+
+librados::IoCtxImpl::C_aio_Safe::C_aio_Safe(AioCompletionImpl *_c) : c(_c)
+{
+ c->get();
+}
+
+void librados::IoCtxImpl::C_aio_Safe::finish(int r)
+{
+ c->lock.Lock();
+ if (!c->ack) {
+ c->rval = r;
+ c->ack = true;
+ }
+ c->safe = true;
+ c->cond.Signal();
+
+ if (c->callback_safe) {
+ rados_callback_t cb = c->callback_safe;
+ void *cb_arg = c->callback_arg;
+ c->lock.Unlock();
+ cb(c, cb_arg);
+ c->lock.Lock();
+ }
+
+ c->io->complete_aio_write(c);
+
+ c->put_unlock();
+}
+
+///////////////////////// C_NotifyComplete /////////////////////////////
+
+librados::IoCtxImpl::C_NotifyComplete::C_NotifyComplete(Mutex *_l,
+ Cond *_c,
+ bool *_d)
+ : lock(_l), cond(_c), done(_d)
+{
+ *done = false;
+}
+
+void librados::IoCtxImpl::C_NotifyComplete::notify(uint8_t opcode,
+ uint64_t ver,
+ bufferlist& bl)
+{
+ *done = true;
+ cond->Signal();
+}
+
+/////////////////////////// WatchContext ///////////////////////////////
+
+librados::WatchContext::WatchContext(IoCtxImpl *io_ctx_impl_,
+ const object_t& _oc,
+ librados::WatchCtx *_ctx)
+ : io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0)
+{
+ io_ctx_impl->get();
+}
+
+librados::WatchContext::~WatchContext()
+{
+ io_ctx_impl->put();
+}
+
+void librados::WatchContext::notify(uint8_t opcode,
+ uint64_t ver,
+ uint64_t notify_id,
+ bufferlist& payload)
+{
+ ctx->notify(opcode, ver, payload);
+ if (opcode != WATCH_NOTIFY_COMPLETE) {
+ io_ctx_impl->_notify_ack(oid, notify_id, ver);
+ }
+}
diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h
index 950e165e2eb..d0e99d2a71b 100644
--- a/src/librados/IoCtxImpl.h
+++ b/src/librados/IoCtxImpl.h
@@ -25,6 +25,7 @@
#include "include/types.h"
#include "include/xlist.h"
#include "osd/osd_types.h"
+#include "osdc/Objecter.h"
class RadosClient;
@@ -46,8 +47,12 @@ struct librados::IoCtxImpl {
Cond aio_write_cond;
xlist<AioCompletionImpl*> aio_write_list;
+ Mutex *lock;
+ Objecter *objecter;
+
IoCtxImpl();
- IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s);
+ IoCtxImpl(RadosClient *c, Objecter *objecter, Mutex *client_lock,
+ int poolid, const char *pool_name, snapid_t s);
void dup(const IoCtxImpl& rhs) {
// Copy everything except the ref count
@@ -61,6 +66,8 @@ struct librados::IoCtxImpl {
last_objver = rhs.last_objver;
notify_timeout = rhs.notify_timeout;
oloc = rhs.oloc;
+ lock = rhs.lock;
+ objecter = rhs.objecter;
}
void set_snap_read(snapid_t s);
@@ -82,6 +89,131 @@ struct librados::IoCtxImpl {
int64_t get_id() {
return poolid;
}
+
+ ::ObjectOperation *prepare_assert_ops(::ObjectOperation *op);
+
+ // snaps
+ int snap_list(vector<uint64_t> *snaps);
+ int snap_lookup(const char *name, uint64_t *snapid);
+ int snap_get_name(uint64_t snapid, std::string *s);
+ int snap_get_stamp(uint64_t snapid, time_t *t);
+ int snap_create(const char* snapname);
+ int selfmanaged_snap_create(uint64_t *snapid);
+ int snap_remove(const char* snapname);
+ int rollback(const object_t& oid, const char *snapName);
+ int selfmanaged_snap_remove(uint64_t snapid);
+ int selfmanaged_snap_rollback_object(const object_t& oid,
+ ::SnapContext& snapc, uint64_t snapid);
+
+ // io
+ int list(Objecter::ListContext *context, int max_entries);
+ int create(const object_t& oid, bool exclusive);
+ int create(const object_t& oid, bool exclusive, const std::string& category);
+ int write(const object_t& oid, bufferlist& bl, size_t len, uint64_t off);
+ int append(const object_t& oid, bufferlist& bl, size_t len);
+ int write_full(const object_t& oid, bufferlist& bl);
+ int clone_range(const object_t& dst_oid, uint64_t dst_offset,
+ const object_t& src_oid, uint64_t src_offset, uint64_t len);
+ int read(const object_t& oid, bufferlist& bl, size_t len, uint64_t off);
+ int mapext(const object_t& oid, uint64_t off, size_t len,
+ std::map<uint64_t,uint64_t>& m);
+ int sparse_read(const object_t& oid, std::map<uint64_t,uint64_t>& m,
+ bufferlist& bl, size_t len, uint64_t off);
+ int remove(const object_t& oid);
+ int stat(const object_t& oid, uint64_t *psize, time_t *pmtime);
+ int trunc(const object_t& oid, uint64_t size);
+
+ int tmap_update(const object_t& oid, bufferlist& cmdbl);
+ int tmap_put(const object_t& oid, bufferlist& bl);
+ int tmap_get(const object_t& oid, bufferlist& bl);
+
+ int exec(const object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl);
+
+ int getxattr(const object_t& oid, const char *name, bufferlist& bl);
+ int setxattr(const object_t& oid, const char *name, bufferlist& bl);
+ int getxattrs(const object_t& oid, map<string, bufferlist>& attrset);
+ int rmxattr(const object_t& oid, const char *name);
+
+ int operate(const object_t& oid, ::ObjectOperation *o, time_t *pmtime);
+ int operate_read(const object_t& oid, ::ObjectOperation *o, bufferlist *pbl);
+ int aio_operate(const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c);
+ int aio_operate_read(const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c, bufferlist *pbl);
+
+ struct C_aio_Ack : public Context {
+ librados::AioCompletionImpl *c;
+ C_aio_Ack(AioCompletionImpl *_c);
+ void finish(int r);
+ };
+
+ struct C_aio_sparse_read_Ack : public Context {
+ AioCompletionImpl *c;
+ bufferlist *data_bl;
+ std::map<uint64_t,uint64_t> *m;
+ C_aio_sparse_read_Ack(AioCompletionImpl *_c);
+ void finish(int r);
+ };
+
+ struct C_aio_Safe : public Context {
+ AioCompletionImpl *c;
+ C_aio_Safe(AioCompletionImpl *_c);
+ void finish(int r);
+ };
+
+ int aio_read(const object_t oid, AioCompletionImpl *c,
+ bufferlist *pbl, size_t len, uint64_t off);
+ int aio_read(object_t oid, AioCompletionImpl *c,
+ char *buf, size_t len, uint64_t off);
+ int aio_sparse_read(const object_t oid, AioCompletionImpl *c,
+ std::map<uint64_t,uint64_t> *m, bufferlist *data_bl,
+ size_t len, uint64_t off);
+ int aio_write(const object_t &oid, AioCompletionImpl *c,
+ const bufferlist& bl, size_t len, uint64_t off);
+ int aio_append(const object_t &oid, AioCompletionImpl *c,
+ const bufferlist& bl, size_t len);
+ int aio_write_full(const object_t &oid, AioCompletionImpl *c,
+ const bufferlist& bl);
+ int aio_exec(const object_t& oid, AioCompletionImpl *c, const char *cls,
+ const char *method, bufferlist& inbl, bufferlist *outbl);
+
+ int pool_change_auid(unsigned long long auid);
+ int pool_change_auid_async(unsigned long long auid, PoolAsyncCompletionImpl *c);
+
+ void set_sync_op_version(eversion_t& ver);
+ int watch(const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx);
+ int unwatch(const object_t& oid, uint64_t cookie);
+ int notify(const object_t& oid, uint64_t ver, bufferlist& bl);
+ int _notify_ack(const object_t& oid, uint64_t notify_id, uint64_t ver);
+
+ eversion_t last_version();
+ void set_assert_version(uint64_t ver);
+ void set_assert_src_version(const object_t& oid, uint64_t ver);
+ void set_notify_timeout(uint32_t timeout);
+
+ struct C_NotifyComplete : public librados::WatchCtx {
+ Mutex *lock;
+ Cond *cond;
+ bool *done;
+
+ C_NotifyComplete(Mutex *_l, Cond *_c, bool *_d);
+ void notify(uint8_t opcode, uint64_t ver, bufferlist& bl);
+ };
};
+namespace librados {
+struct WatchContext {
+ IoCtxImpl *io_ctx_impl;
+ const object_t oid;
+ uint64_t cookie;
+ uint64_t ver;
+ librados::WatchCtx *ctx;
+ uint64_t linger_id;
+
+ WatchContext(IoCtxImpl *io_ctx_impl_,
+ const object_t& _oc,
+ librados::WatchCtx *_ctx);
+ ~WatchContext();
+ void notify(uint8_t opcode, uint64_t ver, uint64_t notify_id,
+ bufferlist& payload);
+};
+}
#endif
diff --git a/src/librados/PoolAsyncCompletionImpl.h b/src/librados/PoolAsyncCompletionImpl.h
index e204aaf2a53..efb89641466 100644
--- a/src/librados/PoolAsyncCompletionImpl.h
+++ b/src/librados/PoolAsyncCompletionImpl.h
@@ -17,72 +17,100 @@
#include "common/Cond.h"
#include "common/Mutex.h"
+#include "include/Context.h"
#include "include/rados/librados.h"
#include "include/rados/librados.hpp"
-struct librados::PoolAsyncCompletionImpl {
- Mutex lock;
- Cond cond;
- int ref, rval;
- bool released;
- bool done;
+namespace librados {
+ struct PoolAsyncCompletionImpl {
+ Mutex lock;
+ Cond cond;
+ int ref, rval;
+ bool released;
+ bool done;
- rados_callback_t callback;
- void *callback_arg;
+ rados_callback_t callback;
+ void *callback_arg;
- PoolAsyncCompletionImpl() : lock("PoolAsyncCompletionImpl lock"),
- ref(1), rval(0), released(false), done(false),
- callback(0), callback_arg(0) {}
+ PoolAsyncCompletionImpl() : lock("PoolAsyncCompletionImpl lock"),
+ ref(1), rval(0), released(false), done(false),
+ callback(0), callback_arg(0) {}
- int set_callback(void *cb_arg, rados_callback_t cb) {
- lock.Lock();
- callback = cb;
- callback_arg = cb_arg;
- lock.Unlock();
- return 0;
- }
- int wait() {
- lock.Lock();
- while (!done)
- cond.Wait(lock);
- lock.Unlock();
- return 0;
- }
- int is_complete() {
- lock.Lock();
- int r = done;
- lock.Unlock();
- return r;
- }
- int get_return_value() {
- lock.Lock();
- int r = rval;
- lock.Unlock();
- return r;
- }
- void get() {
- lock.Lock();
- assert(ref > 0);
- ref++;
- lock.Unlock();
- }
- void release() {
- lock.Lock();
- assert(!released);
- released = true;
- put_unlock();
- }
- void put() {
- lock.Lock();
- put_unlock();
- }
- void put_unlock() {
- assert(ref > 0);
- int n = --ref;
- lock.Unlock();
- if (!n)
- delete this;
- }
-};
+ int set_callback(void *cb_arg, rados_callback_t cb) {
+ lock.Lock();
+ callback = cb;
+ callback_arg = cb_arg;
+ lock.Unlock();
+ return 0;
+ }
+ int wait() {
+ lock.Lock();
+ while (!done)
+ cond.Wait(lock);
+ lock.Unlock();
+ return 0;
+ }
+ int is_complete() {
+ lock.Lock();
+ int r = done;
+ lock.Unlock();
+ return r;
+ }
+ int get_return_value() {
+ lock.Lock();
+ int r = rval;
+ lock.Unlock();
+ return r;
+ }
+ void get() {
+ lock.Lock();
+ assert(ref > 0);
+ ref++;
+ lock.Unlock();
+ }
+ void release() {
+ lock.Lock();
+ assert(!released);
+ released = true;
+ put_unlock();
+ }
+ void put() {
+ lock.Lock();
+ put_unlock();
+ }
+ void put_unlock() {
+ assert(ref > 0);
+ int n = --ref;
+ lock.Unlock();
+ if (!n)
+ delete this;
+ }
+ };
+ class C_PoolAsync_Safe : public Context {
+ PoolAsyncCompletionImpl *c;
+
+ public:
+ C_PoolAsync_Safe(PoolAsyncCompletionImpl *_c) : c(_c) {
+ c->get();
+ }
+
+ void finish(int r) {
+ c->lock.Lock();
+ c->rval = r;
+ c->done = true;
+ c->cond.Signal();
+
+ if (c->callback) {
+ rados_callback_t cb = c->callback;
+ void *cb_arg = c->callback_arg;
+ c->lock.Unlock();
+ cb(c, cb_arg);
+ c->lock.Lock();
+ }
+
+ c->put_unlock();
+ }
+ };
+}
#endif
diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc
index b760534a234..2c0fc086834 100644
--- a/src/librados/RadosClient.cc
+++ b/src/librados/RadosClient.cc
@@ -42,36 +42,6 @@
static atomic_t rados_instance;
-librados::PoolAsyncCompletionImpl *librados::RadosClient::pool_async_create_completion()
-{
- return new librados::PoolAsyncCompletionImpl;
-}
-
-librados::PoolAsyncCompletionImpl *librados::RadosClient::pool_async_create_completion(void *cb_arg, rados_callback_t cb)
-{
- librados::PoolAsyncCompletionImpl *c = new librados::PoolAsyncCompletionImpl;
- if (cb)
- c->set_callback(cb_arg, cb);
- return c;
-}
-
-librados::AioCompletionImpl *librados::RadosClient::aio_create_completion()
-{
- return new librados::AioCompletionImpl;
-}
-
-librados::AioCompletionImpl *librados::RadosClient::aio_create_completion(void *cb_arg,
- rados_callback_t cb_complete,
- rados_callback_t cb_safe)
-{
- librados::AioCompletionImpl *c = new librados::AioCompletionImpl;
- if (cb_complete)
- c->set_complete_callback(cb_arg, cb_complete);
- if (cb_safe)
- c->set_safe_callback(cb_arg, cb_safe);
- return c;
-}
-
bool librados::RadosClient::ms_get_authorizer(int dest_type,
AuthAuthorizer **authorizer,
bool force_new) {
@@ -103,9 +73,19 @@ int64_t librados::RadosClient::lookup_pool(const char *name) {
return ret;
}
-const char *librados::RadosClient::get_pool_name(int64_t poolid_)
+const char *librados::RadosClient::get_pool_name(int64_t pool_id)
+{
+ return osdmap.get_pool_name(pool_id);
+}
+
+int librados::RadosClient::pool_get_auid(uint64_t pool_id, unsigned long long *auid)
{
- return osdmap.get_pool_name(poolid_);
+ Mutex::Locker l(lock);
+ const pg_pool_t *pg = osdmap.get_pg_pool(pool_id);
+ if (!pg)
+ return -ENOENT;
+ *auid = pg->auid;
+ return 0;
}
int librados::RadosClient::connect()
@@ -226,6 +206,16 @@ librados::RadosClient::~RadosClient()
cct = NULL;
}
+int librados::RadosClient::create_ioctx(const char *name, IoCtxImpl **io)
+{
+ int64_t poolid = lookup_pool(name);
+ if (poolid < 0)
+ return (int)poolid;
+
+ *io = new librados::IoCtxImpl(this, objecter, &lock, poolid, name,
+ CEPH_NOSNAP);
+ return 0;
+}
bool librados::RadosClient::ms_dispatch(Message *m)
{
@@ -344,144 +334,6 @@ int librados::RadosClient::get_fs_stats(ceph_statfs& stats)
return 0;
}
-
-// SNAPS
-
-int librados::RadosClient::snap_create(rados_ioctx_t io, const char *snapName)
-{
- int reply;
- int64_t poolID = ((IoCtxImpl *)io)->poolid;
- string sName(snapName);
-
- Mutex mylock ("RadosClient::snap_create::mylock");
- Cond cond;
- bool done;
- lock.Lock();
- objecter->create_pool_snap(poolID,
- sName,
- new C_SafeCond(&mylock, &cond, &done, &reply));
- lock.Unlock();
-
- mylock.Lock();
- while(!done) cond.Wait(mylock);
- mylock.Unlock();
- return reply;
-}
-
-int librados::RadosClient::selfmanaged_snap_create(rados_ioctx_t io, uint64_t *psnapid)
-{
- int reply;
- int64_t poolID = ((IoCtxImpl *)io)->poolid;
-
- Mutex mylock("RadosClient::selfmanaged_snap_create::mylock");
- Cond cond;
- bool done;
- lock.Lock();
- snapid_t snapid;
- objecter->allocate_selfmanaged_snap(poolID, &snapid,
- new C_SafeCond(&mylock, &cond, &done, &reply));
- lock.Unlock();
-
- mylock.Lock();
- while (!done) cond.Wait(mylock);
- mylock.Unlock();
- if (reply == 0)
- *psnapid = snapid;
- return reply;
-}
-
-int librados::RadosClient::snap_remove(rados_ioctx_t io, const char *snapName)
-{
- int reply;
- int64_t poolID = ((IoCtxImpl *)io)->poolid;
- string sName(snapName);
-
- Mutex mylock ("RadosClient::snap_remove::mylock");
- Cond cond;
- bool done;
- lock.Lock();
- objecter->delete_pool_snap(poolID,
- sName,
- new C_SafeCond(&mylock, &cond, &done, &reply));
- lock.Unlock();
-
- mylock.Lock();
- while(!done) cond.Wait(mylock);
- mylock.Unlock();
- return reply;
-}
-
-int librados::RadosClient::selfmanaged_snap_rollback_object(rados_ioctx_t io,
- const object_t& oid,
- ::SnapContext& snapc,
- uint64_t snapid)
-{
- int reply;
- IoCtxImpl* ctx = (IoCtxImpl *) io;
-
- Mutex mylock("RadosClient::snap_rollback::mylock");
- Cond cond;
- bool done;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &reply);
-
- lock.Lock();
- objecter->rollback_object(oid, ctx->oloc, snapc, snapid,
- ceph_clock_now(cct), onack, NULL);
- lock.Unlock();
-
- mylock.Lock();
- while (!done) cond.Wait(mylock);
- mylock.Unlock();
- return reply;
-}
-
-int librados::RadosClient::rollback(rados_ioctx_t io_, const object_t& oid,
- const char *snapName)
-{
- IoCtxImpl* io = (IoCtxImpl *) io_;
- string sName(snapName);
-
- lock.Lock();
- snapid_t snap;
- const map<int64_t, pg_pool_t>& pools = objecter->osdmap->get_pools();
- const pg_pool_t& pg_pool = pools.find(io->poolid)->second;
- map<snapid_t, pool_snap_info_t>::const_iterator p;
- for (p = pg_pool.snaps.begin();
- p != pg_pool.snaps.end();
- ++p) {
- if (p->second.name == snapName) {
- snap = p->first;
- break;
- }
- }
- if (p == pg_pool.snaps.end()) {
- lock.Unlock();
- return -ENOENT;
- }
- lock.Unlock();
-
- return selfmanaged_snap_rollback_object(io_, oid, io->snapc, snap);
-}
-
-int librados::RadosClient::selfmanaged_snap_remove(rados_ioctx_t io, uint64_t snapid)
-{
- int reply;
- int64_t poolID = ((IoCtxImpl *)io)->poolid;
-
- Mutex mylock("RadosClient::selfmanaged_snap_remove::mylock");
- Cond cond;
- bool done;
- lock.Lock();
- objecter->delete_selfmanaged_snap(poolID, snapid_t(snapid),
- new C_SafeCond(&mylock, &cond, &done, &reply));
- lock.Unlock();
-
- mylock.Lock();
- while (!done) cond.Wait(mylock);
- mylock.Unlock();
- return (int)reply;
-}
-
int librados::RadosClient::pool_create(string& name, unsigned long long auid,
__u8 crush_rule)
{
@@ -546,1273 +398,14 @@ int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompleti
return 0;
}
-int librados::RadosClient::pool_change_auid(rados_ioctx_t io, unsigned long long auid)
-{
- int reply;
-
- int64_t poolID = ((IoCtxImpl *)io)->poolid;
-
- Mutex mylock("RadosClient::pool_change_auid::mylock");
- Cond cond;
- bool done;
- lock.Lock();
- objecter->change_pool_auid(poolID,
- new C_SafeCond(&mylock, &cond, &done, &reply),
- auid);
- lock.Unlock();
-
- mylock.Lock();
- while (!done) cond.Wait(mylock);
- mylock.Unlock();
- return reply;
-}
-
-int librados::RadosClient::pool_change_auid_async(rados_ioctx_t io, unsigned long long auid,
- PoolAsyncCompletionImpl *c)
-{
- int64_t poolID = ((IoCtxImpl *)io)->poolid;
-
- Mutex::Locker l(lock);
- objecter->change_pool_auid(poolID,
- new C_PoolAsync_Safe(c),
- auid);
- return 0;
-}
-
-int librados::RadosClient::pool_get_auid(rados_ioctx_t io, unsigned long long *auid)
-{
- Mutex::Locker l(lock);
- int64_t pool_id = ((IoCtxImpl *)io)->poolid;
- const pg_pool_t *pg = osdmap.get_pg_pool(pool_id);
- if (!pg)
- return -ENOENT;
- *auid = pg->auid;
- return 0;
-}
-
-int librados::RadosClient::snap_list(IoCtxImpl *io, vector<uint64_t> *snaps)
-{
- Mutex::Locker l(lock);
- const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid);
- for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
- p != pi->snaps.end();
- p++)
- snaps->push_back(p->first);
- return 0;
-}
-
-int librados::RadosClient::snap_lookup(IoCtxImpl *io, const char *name, uint64_t *snapid)
-{
- Mutex::Locker l(lock);
- const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid);
- for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
- p != pi->snaps.end();
- p++) {
- if (p->second.name == name) {
- *snapid = p->first;
- return 0;
- }
- }
- return -ENOENT;
-}
-
-int librados::RadosClient::snap_get_name(IoCtxImpl *io, uint64_t snapid, std::string *s)
-{
- Mutex::Locker l(lock);
- const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid);
- map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.find(snapid);
- if (p == pi->snaps.end())
- return -ENOENT;
- *s = p->second.name.c_str();
- return 0;
-}
-
-int librados::RadosClient::snap_get_stamp(IoCtxImpl *io, uint64_t snapid, time_t *t)
-{
- Mutex::Locker l(lock);
- const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid);
- map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.find(snapid);
- if (p == pi->snaps.end())
- return -ENOENT;
- *t = p->second.stamp.sec();
- return 0;
-}
-
-
-// IO
-
-int librados::RadosClient::list(Objecter::ListContext *context, int max_entries)
-{
- Cond cond;
- bool done;
- int r = 0;
- object_t oid;
- Mutex mylock("RadosClient::list::mylock");
-
- if (context->at_end)
- return 0;
-
- context->max_entries = max_entries;
-
- lock.Lock();
- objecter->list_objects(context, new C_SafeCond(&mylock, &cond, &done, &r));
- lock.Unlock();
-
- mylock.Lock();
- while(!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- return r;
-}
-
-int librados::RadosClient::create(IoCtxImpl& io, const object_t& oid, bool exclusive)
-{
- utime_t ut = ceph_clock_now(cct);
-
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- Mutex mylock("RadosClient::create::mylock");
- Cond cond;
- bool done;
- int r;
- eversion_t ver;
-
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
-
- lock.Lock();
- objecter->create(oid, io.oloc,
- io.snapc, ut, 0, (exclusive ? CEPH_OSD_OP_FLAG_EXCL : 0),
- onack, NULL, &ver);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- return r;
-}
-
-int librados::RadosClient::create(IoCtxImpl& io, const object_t& oid, bool exclusive,
- const std::string& category)
-{
- utime_t ut = ceph_clock_now(cct);
-
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- Mutex mylock("RadosClient::create::mylock");
- Cond cond;
- bool done;
- int r;
- eversion_t ver;
-
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
-
- ::ObjectOperation o;
- o.create(exclusive ? CEPH_OSD_OP_FLAG_EXCL : 0, category);
-
- lock.Lock();
- objecter->mutate(oid, io.oloc, o, io.snapc, ut, 0, onack, NULL, &ver);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- return r;
-}
-
-/*
- * add any version assert operations that are appropriate given the
- * stat in the IoCtx, either the target version assert or any src
- * object asserts. these affect a single ioctx operation, so clear
- * the ioctx state when we're doing.
- *
- * return a pointer to the ObjectOperation if we added any events;
- * this is convenient for passing the extra_ops argument into Objecter
- * methods.
- */
-::ObjectOperation *librados::RadosClient::prepare_assert_ops(IoCtxImpl *io, ::ObjectOperation *op)
-{
- ::ObjectOperation *pop = NULL;
- if (io->assert_ver) {
- op->assert_version(io->assert_ver);
- io->assert_ver = 0;
- pop = op;
- }
- while (!io->assert_src_version.empty()) {
- map<object_t,uint64_t>::iterator p = io->assert_src_version.begin();
- op->assert_src_version(p->first, CEPH_NOSNAP, p->second);
- io->assert_src_version.erase(p);
- pop = op;
- }
- return pop;
-}
-
-int librados::RadosClient::write(IoCtxImpl& io, const object_t& oid, bufferlist& bl,
- size_t len, uint64_t off)
-{
- utime_t ut = ceph_clock_now(cct);
-
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- Mutex mylock("RadosClient::write::mylock");
- Cond cond;
- bool done;
- int r;
-
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t ver;
-
- // extra ops?
- ::ObjectOperation op;
- ::ObjectOperation *pop = prepare_assert_ops(&io, &op);
-
- lock.Lock();
- objecter->write(oid, io.oloc,
- off, len, io.snapc, bl, ut, 0,
- onack, NULL, &ver, pop);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- if (r < 0)
- return r;
-
- return len;
-}
-
-int librados::RadosClient::append(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len)
-{
- utime_t ut = ceph_clock_now(cct);
-
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- Mutex mylock("RadosClient::append::mylock");
- Cond cond;
- bool done;
- int r;
-
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t ver;
-
- ::ObjectOperation op;
- ::ObjectOperation *pop = prepare_assert_ops(&io, &op);
-
- lock.Lock();
- objecter->append(oid, io.oloc,
- len, io.snapc, bl, ut, 0,
- onack, NULL, &ver, pop);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- if (r < 0)
- return r;
-
- return len;
-}
-
-int librados::RadosClient::write_full(IoCtxImpl& io, const object_t& oid, bufferlist& bl)
-{
- utime_t ut = ceph_clock_now(cct);
-
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- Mutex mylock("RadosClient::write_full::mylock");
- Cond cond;
- bool done;
- int r;
-
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
-
- eversion_t ver;
-
- ::ObjectOperation op;
- ::ObjectOperation *pop = prepare_assert_ops(&io, &op);
-
- lock.Lock();
- objecter->write_full(oid, io.oloc,
- io.snapc, bl, ut, 0,
- onack, NULL, &ver, pop);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- return r;
-}
-
-int librados::RadosClient::clone_range(IoCtxImpl& io,
- const object_t& dst_oid, uint64_t dst_offset,
- const object_t& src_oid, uint64_t src_offset,
- uint64_t len)
-{
- utime_t ut = ceph_clock_now(cct);
-
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- Mutex mylock("RadosClient::clone_range::mylock");
- Cond cond;
- bool done;
- int r;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t ver;
-
- bufferlist outbl;
-
- lock.Lock();
- ::ObjectOperation wr;
- prepare_assert_ops(&io, &wr);
- wr.clone_range(src_oid, src_offset, len, dst_offset);
- objecter->mutate(dst_oid, io.oloc, wr, io.snapc, ut, 0, onack, NULL, &ver);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- return r;
-}
-
-int librados::RadosClient::operate(IoCtxImpl& io, const object_t& oid,
- ::ObjectOperation *o, time_t *pmtime)
-{
- utime_t ut;
- if (pmtime) {
- ut = utime_t(*pmtime, 0);
- } else {
- ut = ceph_clock_now(cct);
- }
-
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- if (!o->size())
- return 0;
-
- Mutex mylock("RadosClient::mutate::mylock");
- Cond cond;
- bool done;
- int r;
- eversion_t ver;
-
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
-
- lock.Lock();
- objecter->mutate(oid, io.oloc,
- *o, io.snapc, ut, 0,
- onack, NULL, &ver);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- return r;
-}
-
-int librados::RadosClient::operate_read(IoCtxImpl& io, const object_t& oid,
- ::ObjectOperation *o, bufferlist *pbl)
-{
- if (!o->size())
- return 0;
-
- Mutex mylock("RadosClient::mutate::mylock");
- Cond cond;
- bool done;
- int r;
- eversion_t ver;
-
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
-
- lock.Lock();
- objecter->read(oid, io.oloc,
- *o, io.snap_seq, pbl, 0,
- onack, &ver);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- return r;
-}
-
-int librados::RadosClient::aio_operate_read(IoCtxImpl& io, const object_t &oid,
- ::ObjectOperation *o,
- AioCompletionImpl *c, bufferlist *pbl)
-{
- Context *onack = new C_aio_Ack(c);
-
- c->pbl = pbl;
-
- Mutex::Locker l(lock);
- objecter->read(oid, io.oloc,
- *o, io.snap_seq, pbl, 0,
- onack, 0);
- return 0;
-}
-
-int librados::RadosClient::aio_operate(IoCtxImpl& io, const object_t& oid,
- ::ObjectOperation *o, AioCompletionImpl *c)
-{
- utime_t ut = ceph_clock_now(cct);
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- Context *onack = new C_aio_Ack(c);
- Context *oncommit = new C_aio_Safe(c);
-
- io.queue_aio_write(c);
-
- Mutex::Locker l(lock);
- objecter->mutate(oid, io.oloc, *o, io.snapc, ut, 0, onack, oncommit, &c->objver);
-
- return 0;
-}
-
-int librados::RadosClient::aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c,
- bufferlist *pbl, size_t len, uint64_t off)
-{
-
- Context *onack = new C_aio_Ack(c);
- eversion_t ver;
-
- c->pbl = pbl;
-
- Mutex::Locker l(lock);
- objecter->read(oid, io.oloc,
- off, len, io.snap_seq, &c->bl, 0,
- onack, &c->objver);
- return 0;
-}
-
-int librados::RadosClient::aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c,
- char *buf, size_t len, uint64_t off)
-{
- Context *onack = new C_aio_Ack(c);
-
- c->buf = buf;
- c->maxlen = len;
-
- Mutex::Locker l(lock);
- objecter->read(oid, io.oloc,
- off, len, io.snap_seq, &c->bl, 0,
- onack, &c->objver);
-
- return 0;
-}
-
-int librados::RadosClient::aio_sparse_read(IoCtxImpl& io, const object_t oid,
- AioCompletionImpl *c, std::map<uint64_t,uint64_t> *m,
- bufferlist *data_bl, size_t len, uint64_t off)
-{
-
- C_aio_sparse_read_Ack *onack = new C_aio_sparse_read_Ack(c);
- onack->m = m;
- onack->data_bl = data_bl;
- eversion_t ver;
-
- c->pbl = NULL;
-
- Mutex::Locker l(lock);
- objecter->sparse_read(oid, io.oloc,
- off, len, io.snap_seq, &c->bl, 0,
- onack);
- return 0;
-}
-
-int librados::RadosClient::aio_write(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c,
- const bufferlist& bl, size_t len, uint64_t off)
-{
- utime_t ut = ceph_clock_now(cct);
-
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- io.queue_aio_write(c);
-
- Context *onack = new C_aio_Ack(c);
- Context *onsafe = new C_aio_Safe(c);
-
- Mutex::Locker l(lock);
- objecter->write(oid, io.oloc,
- off, len, io.snapc, bl, ut, 0,
- onack, onsafe, &c->objver);
-
- return 0;
-}
-
-int librados::RadosClient::aio_append(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c,
- const bufferlist& bl, size_t len)
-{
- utime_t ut = ceph_clock_now(cct);
-
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- io.queue_aio_write(c);
-
- Context *onack = new C_aio_Ack(c);
- Context *onsafe = new C_aio_Safe(c);
-
- Mutex::Locker l(lock);
- objecter->append(oid, io.oloc,
- len, io.snapc, bl, ut, 0,
- onack, onsafe, &c->objver);
-
- return 0;
-}
-
-int librados::RadosClient::aio_write_full(IoCtxImpl& io, const object_t &oid,
- AioCompletionImpl *c, const bufferlist& bl)
-{
- utime_t ut = ceph_clock_now(cct);
-
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- io.queue_aio_write(c);
-
- Context *onack = new C_aio_Ack(c);
- Context *onsafe = new C_aio_Safe(c);
-
- Mutex::Locker l(lock);
- objecter->write_full(oid, io.oloc,
- io.snapc, bl, ut, 0,
- onack, onsafe, &c->objver);
-
- return 0;
-}
-
-int librados::RadosClient::remove(IoCtxImpl& io, const object_t& oid)
-{
- utime_t ut = ceph_clock_now(cct);
-
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- Mutex mylock("RadosClient::remove::mylock");
- Cond cond;
- bool done;
- int r;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t ver;
-
- ::ObjectOperation op;
- ::ObjectOperation *pop = prepare_assert_ops(&io, &op);
-
- lock.Lock();
- objecter->remove(oid, io.oloc,
- io.snapc, ut, 0,
- onack, NULL, &ver, pop);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- return r;
-}
-
-int librados::RadosClient::trunc(IoCtxImpl& io, const object_t& oid, uint64_t size)
-{
- utime_t ut = ceph_clock_now(cct);
-
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- Mutex mylock("RadosClient::write_full::mylock");
- Cond cond;
- bool done;
- int r;
-
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t ver;
-
- ::ObjectOperation op;
- ::ObjectOperation *pop = prepare_assert_ops(&io, &op);
-
- lock.Lock();
- objecter->trunc(oid, io.oloc,
- io.snapc, ut, 0,
- size, 0,
- onack, NULL, &ver, pop);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- return r;
-}
-
-int librados::RadosClient::tmap_update(IoCtxImpl& io, const object_t& oid, bufferlist& cmdbl)
-{
- utime_t ut = ceph_clock_now(cct);
-
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- Mutex mylock("RadosClient::tmap_update::mylock");
- Cond cond;
- bool done;
- int r;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t ver;
-
- bufferlist outbl;
-
- lock.Lock();
- ::ObjectOperation wr;
- prepare_assert_ops(&io, &wr);
- wr.tmap_update(cmdbl);
- objecter->mutate(oid, io.oloc, wr, io.snapc, ut, 0, onack, NULL, &ver);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- return r;
-}
-
-int librados::RadosClient::tmap_put(IoCtxImpl& io, const object_t& oid, bufferlist& bl)
-{
- utime_t ut = ceph_clock_now(cct);
-
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- Mutex mylock("RadosClient::tmap_put::mylock");
- Cond cond;
- bool done;
- int r;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t ver;
-
- bufferlist outbl;
-
- lock.Lock();
- ::ObjectOperation wr;
- prepare_assert_ops(&io, &wr);
- wr.tmap_put(bl);
- objecter->mutate(oid, io.oloc, wr, io.snapc, ut, 0, onack, NULL, &ver);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- return r;
-}
-
-int librados::RadosClient::tmap_get(IoCtxImpl& io, const object_t& oid, bufferlist& bl)
-{
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- Mutex mylock("RadosClient::tmap_put::mylock");
- Cond cond;
- bool done;
- int r = 0;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t ver;
-
- bufferlist outbl;
-
- lock.Lock();
- ::ObjectOperation rd;
- prepare_assert_ops(&io, &rd);
- rd.tmap_get(&bl, NULL);
- objecter->read(oid, io.oloc, rd, io.snap_seq, 0, 0, onack, &ver);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- return r;
-}
-
-int librados::RadosClient::exec(IoCtxImpl& io, const object_t& oid,
- const char *cls, const char *method,
- bufferlist& inbl, bufferlist& outbl)
-{
- Mutex mylock("RadosClient::exec::mylock");
- Cond cond;
- bool done;
- int r;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t ver;
-
-
- lock.Lock();
- ::ObjectOperation rd;
- prepare_assert_ops(&io, &rd);
- rd.call(cls, method, inbl);
- objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &ver);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- return r;
-}
-
-int librados::RadosClient::aio_exec(IoCtxImpl& io, const object_t& oid, AioCompletionImpl *c,
- const char *cls, const char *method,
- bufferlist& inbl, bufferlist *outbl)
-{
- Context *onack = new C_aio_Ack(c);
-
- Mutex::Locker l(lock);
- ::ObjectOperation rd;
- prepare_assert_ops(&io, &rd);
- rd.call(cls, method, inbl);
- objecter->read(oid, io.oloc, rd, io.snap_seq, outbl, 0, onack, &c->objver);
-
- return 0;
-}
-
-int librados::RadosClient::read(IoCtxImpl& io, const object_t& oid,
- bufferlist& bl, size_t len, uint64_t off)
-{
- CephContext *cct = io.client->cct;
- Mutex mylock("RadosClient::read::mylock");
- Cond cond;
- bool done;
- int r;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t ver;
-
- ::ObjectOperation op;
- ::ObjectOperation *pop = prepare_assert_ops(&io, &op);
-
- lock.Lock();
- objecter->read(oid, io.oloc,
- off, len, io.snap_seq, &bl, 0,
- onack, &ver, pop);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
- ldout(cct, 10) << "Objecter returned from read r=" << r << dendl;
-
- set_sync_op_version(io, ver);
-
- if (r < 0)
- return r;
-
- if (bl.length() < len) {
- ldout(cct, 10) << "Returned length " << bl.length()
- << " less than original length "<< len << dendl;
- }
-
- return bl.length();
-}
-
-int librados::RadosClient::mapext(IoCtxImpl& io, const object_t& oid,
- uint64_t off, size_t len, std::map<uint64_t,uint64_t>& m)
-{
- CephContext *cct = io.client->cct;
- bufferlist bl;
-
- Mutex mylock("RadosClient::read::mylock");
- Cond cond;
- bool done;
- int r;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
-
- lock.Lock();
- objecter->mapext(oid, io.oloc,
- off, len, io.snap_seq, &bl, 0,
- onack);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
- ldout(cct, 10) << "Objecter returned from read r=" << r << dendl;
-
- if (r < 0)
- return r;
-
- bufferlist::iterator iter = bl.begin();
- ::decode(m, iter);
-
- return m.size();
-}
-
-int librados::RadosClient::sparse_read(IoCtxImpl& io, const object_t& oid,
- std::map<uint64_t,uint64_t>& m,
- bufferlist& data_bl, size_t len, uint64_t off)
-{
- CephContext *cct = io.client->cct;
- bufferlist bl;
-
- Mutex mylock("RadosClient::read::mylock");
- Cond cond;
- bool done;
- int r;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
-
- lock.Lock();
- objecter->sparse_read(oid, io.oloc,
- off, len, io.snap_seq, &bl, 0,
- onack);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
- ldout(cct, 10) << "Objecter returned from read r=" << r << dendl;
-
- if (r < 0)
- return r;
-
- bufferlist::iterator iter = bl.begin();
- ::decode(m, iter);
- ::decode(data_bl, iter);
-
- return m.size();
-}
-
-int librados::RadosClient::stat(IoCtxImpl& io, const object_t& oid, uint64_t *psize, time_t *pmtime)
-{
- CephContext *cct = io.client->cct;
- Mutex mylock("RadosClient::stat::mylock");
- Cond cond;
- bool done;
- int r;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- uint64_t size;
- utime_t mtime;
- eversion_t ver;
-
- if (!psize)
- psize = &size;
-
- ::ObjectOperation op;
- ::ObjectOperation *pop = prepare_assert_ops(&io, &op);
-
- lock.Lock();
- objecter->stat(oid, io.oloc,
- io.snap_seq, psize, &mtime, 0,
- onack, &ver, pop);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
- ldout(cct, 10) << "Objecter returned from stat" << dendl;
-
- if (r >= 0 && pmtime) {
- *pmtime = mtime.sec();
- }
-
- set_sync_op_version(io, ver);
-
- return r;
-}
-
-int librados::RadosClient::getxattr(IoCtxImpl& io, const object_t& oid,
- const char *name, bufferlist& bl)
-{
- CephContext *cct = io.client->cct;
- Mutex mylock("RadosClient::getxattr::mylock");
- Cond cond;
- bool done;
- int r;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t ver;
-
- ::ObjectOperation op;
- ::ObjectOperation *pop = prepare_assert_ops(&io, &op);
-
- lock.Lock();
- objecter->getxattr(oid, io.oloc,
- name, io.snap_seq, &bl, 0,
- onack, &ver, pop);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
- ldout(cct, 10) << "Objecter returned from getxattr" << dendl;
-
- set_sync_op_version(io, ver);
-
- if (r < 0)
- return r;
-
- return bl.length();
-}
-
-int librados::RadosClient::rmxattr(IoCtxImpl& io, const object_t& oid, const char *name)
-{
- utime_t ut = ceph_clock_now(cct);
-
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- Mutex mylock("RadosClient::rmxattr::mylock");
- Cond cond;
- bool done;
- int r;
-
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t ver;
-
- ::ObjectOperation op;
- ::ObjectOperation *pop = prepare_assert_ops(&io, &op);
-
- lock.Lock();
- objecter->removexattr(oid, io.oloc, name,
- io.snapc, ut, 0,
- onack, NULL, &ver, pop);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- if (r < 0)
- return r;
-
- return 0;
-}
-
-int librados::RadosClient::setxattr(IoCtxImpl& io, const object_t& oid,
- const char *name, bufferlist& bl)
-{
- utime_t ut = ceph_clock_now(cct);
-
- /* can't write to a snapshot */
- if (io.snap_seq != CEPH_NOSNAP)
- return -EROFS;
-
- Mutex mylock("RadosClient::setxattr::mylock");
- Cond cond;
- bool done;
- int r;
-
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t ver;
-
- ::ObjectOperation op;
- ::ObjectOperation *pop = prepare_assert_ops(&io, &op);
-
- lock.Lock();
- objecter->setxattr(oid, io.oloc, name,
- io.snapc, bl, ut, 0,
- onack, NULL, &ver, pop);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- if (r < 0)
- return r;
-
- return 0;
-}
-
-int librados::RadosClient::getxattrs(IoCtxImpl& io, const object_t& oid,
- map<std::string, bufferlist>& attrset)
-{
- CephContext *cct = io.client->cct;
- Mutex mylock("RadosClient::getexattrs::mylock");
- Cond cond;
- bool done;
- int r;
- eversion_t ver;
-
- ::ObjectOperation op;
- ::ObjectOperation *pop = prepare_assert_ops(&io, &op);
-
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
-
- lock.Lock();
- map<string, bufferlist> aset;
- objecter->getxattrs(oid, io.oloc, io.snap_seq,
- aset,
- 0, onack, &ver, pop);
- lock.Unlock();
-
- attrset.clear();
-
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- for (map<string,bufferlist>::iterator p = aset.begin(); p != aset.end(); p++) {
- ldout(cct, 10) << "RadosClient::getxattrs: xattr=" << p->first << dendl;
- attrset[p->first.c_str()] = p->second;
- }
-
- set_sync_op_version(io, ver);
-
- return r;
-}
-
-void librados::RadosClient::watch_notify(MWatchNotify *m)
-{
- assert(lock.is_locked());
- WatchContext *wc = NULL;
- map<uint64_t, WatchContext *>::iterator iter = watchers.find(m->cookie);
- if (iter != watchers.end())
- wc = iter->second;
-
- if (!wc)
- return;
-
- wc->notify(this, m);
-
- m->put();
-}
-
-int librados::RadosClient::watch(IoCtxImpl& io, const object_t& oid, uint64_t ver,
- uint64_t *cookie, librados::WatchCtx *ctx)
-{
- ::ObjectOperation rd;
- Mutex mylock("RadosClient::watch::mylock");
- Cond cond;
- bool done;
- int r;
- Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t objver;
-
- lock.Lock();
-
- WatchContext *wc;
- register_watcher(io, oid, ctx, cookie, &wc);
- prepare_assert_ops(&io, &rd);
- rd.watch(*cookie, ver, 1);
- bufferlist bl;
- wc->linger_id = objecter->linger(oid, io.oloc, rd, io.snap_seq, bl, NULL, 0, NULL, onfinish, &objver);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, objver);
-
- if (r < 0) {
- lock.Lock();
- unregister_watcher(*cookie);
- lock.Unlock();
- }
-
- return r;
-}
-
-
-/* this is called with RadosClient::lock held */
-int librados::RadosClient::_notify_ack(IoCtxImpl& io, const object_t& oid,
- uint64_t notify_id, uint64_t ver)
-{
- Mutex mylock("RadosClient::watch::mylock");
- Cond cond;
- eversion_t objver;
-
- ::ObjectOperation rd;
- prepare_assert_ops(&io, &rd);
- rd.notify_ack(notify_id, ver);
- objecter->read(oid, io.oloc, rd, io.snap_seq, (bufferlist*)NULL, 0, 0, 0);
-
- return 0;
-}
-
-int librados::RadosClient::unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie)
-{
- bufferlist inbl, outbl;
-
- Mutex mylock("RadosClient::watch::mylock");
- Cond cond;
- bool done;
- int r;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t ver;
- lock.Lock();
-
- unregister_watcher(cookie);
-
- ::ObjectOperation rd;
- prepare_assert_ops(&io, &rd);
- rd.watch(cookie, 0, 0);
- objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &ver);
- lock.Unlock();
-
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- set_sync_op_version(io, ver);
-
- return r;
-}
-
-int librados::RadosClient::notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl)
-{
- bufferlist inbl, outbl;
-
- Mutex mylock("RadosClient::notify::mylock");
- Mutex mylock_all("RadosClient::notify::mylock_all");
- Cond cond, cond_all;
- bool done, done_all;
- int r;
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t objver;
- uint64_t cookie;
- C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all);
-
- ::ObjectOperation rd;
- prepare_assert_ops(&io, &rd);
-
- lock.Lock();
- WatchContext *wc;
- register_watcher(io, oid, ctx, &cookie, &wc);
- uint32_t prot_ver = 1;
- uint32_t timeout = io.notify_timeout;
- ::encode(prot_ver, inbl);
- ::encode(timeout, inbl);
- ::encode(bl, inbl);
- rd.notify(cookie, ver, inbl);
- wc->linger_id = objecter->linger(oid, io.oloc, rd, io.snap_seq, inbl, NULL,
- 0, onack, NULL, &objver);
- lock.Unlock();
-
- mylock_all.Lock();
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- if (r == 0) {
- while (!done_all)
- cond_all.Wait(mylock_all);
- }
-
- mylock_all.Unlock();
-
- lock.Lock();
- unregister_watcher(cookie);
- lock.Unlock();
-
- set_sync_op_version(io, objver);
- delete ctx;
-
- return r;
-}
-
-void librados::RadosClient::set_sync_op_version(IoCtxImpl& io, eversion_t& ver)
-{
- io.last_objver = ver;
-}
-
-void librados::RadosClient::register_watcher(IoCtxImpl& io,
+void librados::RadosClient::register_watcher(WatchContext *wc,
const object_t& oid,
librados::WatchCtx *ctx,
- uint64_t *cookie,
- WatchContext **pwc)
+ uint64_t *cookie)
{
assert(lock.is_locked());
- WatchContext *wc = new WatchContext(&io, oid, ctx);
*cookie = ++max_watch_cookie;
watchers[*cookie] = wc;
- if (pwc)
- *pwc = wc;
}
void librados::RadosClient::unregister_watcher(uint64_t cookie)
@@ -1828,187 +421,18 @@ void librados::RadosClient::unregister_watcher(uint64_t cookie)
}
}
-eversion_t librados::RadosClient::last_version(IoCtxImpl& io)
-{
- return io.last_objver;
-}
-
-void librados::RadosClient::set_assert_version(IoCtxImpl& io, uint64_t ver)
-{
- io.assert_ver = ver;
-}
-void librados::RadosClient::set_assert_src_version(IoCtxImpl& io,
- const object_t& oid,
- uint64_t ver)
-{
- io.assert_src_version[oid] = ver;
-}
-
-void librados::RadosClient::set_notify_timeout(IoCtxImpl& io, uint32_t timeout)
-{
- io.notify_timeout = timeout;
-}
-
-///////////////////////////// C_aio_Ack ////////////////////////////////
-
-librados::RadosClient::C_aio_Ack::C_aio_Ack(AioCompletionImpl *_c) : c(_c)
-{
- c->get();
-}
-
-void librados::RadosClient::C_aio_Ack::finish(int r)
-{
- c->lock.Lock();
- c->rval = r;
- c->ack = true;
- c->cond.Signal();
-
- if (c->buf && c->bl.length() > 0) {
- unsigned l = MIN(c->bl.length(), c->maxlen);
- c->bl.copy(0, l, c->buf);
- c->rval = c->bl.length();
- }
- if (c->pbl) {
- *c->pbl = c->bl;
- }
-
- if (c->callback_complete) {
- rados_callback_t cb = c->callback_complete;
- void *cb_arg = c->callback_arg;
- c->lock.Unlock();
- cb(c, cb_arg);
- c->lock.Lock();
- }
-
- c->put_unlock();
-}
-
-/////////////////////// C_aio_sparse_read_Ack //////////////////////////
-
-librados::RadosClient::C_aio_sparse_read_Ack::C_aio_sparse_read_Ack(AioCompletionImpl *_c)
- : c(_c)
-{
- c->get();
-}
-
-void librados::RadosClient::C_aio_sparse_read_Ack::finish(int r)
-{
- c->lock.Lock();
- c->rval = r;
- c->ack = true;
- c->cond.Signal();
-
- bufferlist::iterator iter = c->bl.begin();
- if (r >= 0) {
- ::decode(*m, iter);
- ::decode(*data_bl, iter);
- }
-
- if (c->callback_complete) {
- rados_callback_t cb = c->callback_complete;
- void *cb_arg = c->callback_arg;
- c->lock.Unlock();
- cb(c, cb_arg);
- c->lock.Lock();
- }
-
- c->put_unlock();
-}
-
-//////////////////////////// C_aio_Safe ////////////////////////////////
-
-librados::RadosClient::C_aio_Safe::C_aio_Safe(AioCompletionImpl *_c) : c(_c)
-{
- c->get();
-}
-
-void librados::RadosClient::C_aio_Safe::finish(int r)
-{
- c->lock.Lock();
- if (!c->ack) {
- c->rval = r;
- c->ack = true;
- }
- c->safe = true;
- c->cond.Signal();
-
- if (c->callback_safe) {
- rados_callback_t cb = c->callback_safe;
- void *cb_arg = c->callback_arg;
- c->lock.Unlock();
- cb(c, cb_arg);
- c->lock.Lock();
- }
-
- c->io->complete_aio_write(c);
-
- c->put_unlock();
-}
-
-///////////////////////// C_PoolAsync_Safe /////////////////////////////
-
-librados::RadosClient::C_PoolAsync_Safe::C_PoolAsync_Safe(PoolAsyncCompletionImpl *_c)
- : c(_c)
-{
- c->get();
-}
-
-void librados::RadosClient::C_PoolAsync_Safe::finish(int r)
-{
- c->lock.Lock();
- c->rval = r;
- c->done = true;
- c->cond.Signal();
-
- if (c->callback) {
- rados_callback_t cb = c->callback;
- void *cb_arg = c->callback_arg;
- c->lock.Unlock();
- cb(c, cb_arg);
- c->lock.Lock();
- }
-
- c->put_unlock();
-}
-
-///////////////////////// C_NotifyComplete /////////////////////////////
-
-librados::RadosClient::C_NotifyComplete::C_NotifyComplete(Mutex *_l,
- Cond *_c,
- bool *_d)
- : lock(_l), cond(_c), done(_d)
-{
- *done = false;
-}
-
-void librados::RadosClient::C_NotifyComplete::notify(uint8_t opcode,
- uint64_t ver,
- bufferlist& bl)
+void librados::RadosClient::watch_notify(MWatchNotify *m)
{
- *done = true;
- cond->Signal();
-}
-
-/////////////////////////// WatchContext ///////////////////////////////
+ assert(lock.is_locked());
+ WatchContext *wc = NULL;
+ map<uint64_t, WatchContext *>::iterator iter = watchers.find(m->cookie);
+ if (iter != watchers.end())
+ wc = iter->second;
-librados::RadosClient::WatchContext::WatchContext(IoCtxImpl *io_ctx_impl_,
- const object_t& _oc,
- librados::WatchCtx *_ctx)
- : io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0)
-{
- io_ctx_impl->get();
-}
+ if (!wc)
+ return;
-librados::RadosClient::WatchContext::~WatchContext()
-{
- io_ctx_impl->put();
-}
+ wc->notify(m->opcode, m->ver, m->notify_id, m->bl);
-void librados::RadosClient::WatchContext::notify(RadosClient *client,
- MWatchNotify *m)
-{
- ctx->notify(m->opcode, m->ver, m->bl);
- if (m->opcode != WATCH_NOTIFY_COMPLETE) {
- client->_notify_ack(*io_ctx_impl, oid, m->notify_id, m->ver);
- }
+ m->put();
}
diff --git a/src/librados/RadosClient.h b/src/librados/RadosClient.h
index 98082df30b7..4ef76d1576f 100644
--- a/src/librados/RadosClient.h
+++ b/src/librados/RadosClient.h
@@ -22,7 +22,8 @@
#include "mon/MonClient.h"
#include "msg/Dispatcher.h"
#include "osd/OSDMap.h"
-#include "osdc/Objecter.h"
+
+#include "IoCtxImpl.h"
class AuthAuthorizer;
class CephContext;
@@ -69,50 +70,11 @@ public:
int connect();
void shutdown();
+ int create_ioctx(const char *name, IoCtxImpl **io);
+
int64_t lookup_pool(const char *name);
- const char *get_pool_name(int64_t poolid_);
- ::ObjectOperation *prepare_assert_ops(IoCtxImpl *io, ::ObjectOperation *op);
-
- // snaps
- int snap_list(IoCtxImpl *io, vector<uint64_t> *snaps);
- int snap_lookup(IoCtxImpl *io, const char *name, uint64_t *snapid);
- int snap_get_name(IoCtxImpl *io, uint64_t snapid, std::string *s);
- int snap_get_stamp(IoCtxImpl *io, uint64_t snapid, time_t *t);
- int snap_create(rados_ioctx_t io, const char* snapname);
- int selfmanaged_snap_create(rados_ioctx_t io, uint64_t *snapid);
- int snap_remove(rados_ioctx_t io, const char* snapname);
- int rollback(rados_ioctx_t io_, const object_t& oid, const char *snapName);
- int selfmanaged_snap_remove(rados_ioctx_t io, uint64_t snapid);
- int selfmanaged_snap_rollback_object(rados_ioctx_t io, const object_t& oid,
- ::SnapContext& snapc, uint64_t snapid);
-
- // io
- int create(IoCtxImpl& io, const object_t& oid, bool exclusive);
- int create(IoCtxImpl& io, const object_t& oid, bool exclusive, const std::string& category);
- int write(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len, uint64_t off);
- int append(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len);
- int write_full(IoCtxImpl& io, const object_t& oid, bufferlist& bl);
- int clone_range(IoCtxImpl& io, const object_t& dst_oid, uint64_t dst_offset,
- const object_t& src_oid, uint64_t src_offset, uint64_t len);
- int read(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len, uint64_t off);
- int mapext(IoCtxImpl& io, const object_t& oid, uint64_t off, size_t len,
- std::map<uint64_t,uint64_t>& m);
- int sparse_read(IoCtxImpl& io, const object_t& oid, std::map<uint64_t,uint64_t>& m,
- bufferlist& bl, size_t len, uint64_t off);
- int remove(IoCtxImpl& io, const object_t& oid);
- int stat(IoCtxImpl& io, const object_t& oid, uint64_t *psize, time_t *pmtime);
- int trunc(IoCtxImpl& io, const object_t& oid, uint64_t size);
-
- int tmap_update(IoCtxImpl& io, const object_t& oid, bufferlist& cmdbl);
- int tmap_put(IoCtxImpl& io, const object_t& oid, bufferlist& bl);
- int tmap_get(IoCtxImpl& io, const object_t& oid, bufferlist& bl);
-
- int exec(IoCtxImpl& io, const object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl);
-
- int getxattr(IoCtxImpl& io, const object_t& oid, const char *name, bufferlist& bl);
- int setxattr(IoCtxImpl& io, const object_t& oid, const char *name, bufferlist& bl);
- int getxattrs(IoCtxImpl& io, const object_t& oid, map<string, bufferlist>& attrset);
- int rmxattr(IoCtxImpl& io, const object_t& oid, const char *name);
+ const char *get_pool_name(int64_t pool_id);
+ int pool_get_auid(uint64_t pool_id, unsigned long long *auid);
int pool_list(std::list<string>& ls);
int get_pool_stats(std::list<string>& ls, map<string,::pool_stat_t>& result);
@@ -122,113 +84,17 @@ public:
int pool_create_async(string& name, PoolAsyncCompletionImpl *c, unsigned long long auid=0,
__u8 crush_rule=0);
int pool_delete(const char *name);
- int pool_change_auid(rados_ioctx_t io, unsigned long long auid);
- int pool_get_auid(rados_ioctx_t io, unsigned long long *auid);
int pool_delete_async(const char *name, PoolAsyncCompletionImpl *c);
- int pool_change_auid_async(rados_ioctx_t io, unsigned long long auid, PoolAsyncCompletionImpl *c);
-
- int list(Objecter::ListContext *context, int max_entries);
-
- int operate(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, time_t *pmtime);
- int operate_read(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, bufferlist *pbl);
- int aio_operate(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c);
- int aio_operate_read(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c, bufferlist *pbl);
-
- struct C_aio_Ack : public Context {
- librados::AioCompletionImpl *c;
- C_aio_Ack(AioCompletionImpl *_c);
- void finish(int r);
- };
-
- struct C_aio_sparse_read_Ack : public Context {
- AioCompletionImpl *c;
- bufferlist *data_bl;
- std::map<uint64_t,uint64_t> *m;
- C_aio_sparse_read_Ack(AioCompletionImpl *_c);
- void finish(int r);
- };
-
- struct C_aio_Safe : public Context {
- AioCompletionImpl *c;
- C_aio_Safe(AioCompletionImpl *_c);
- void finish(int r);
- };
-
- int aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c,
- bufferlist *pbl, size_t len, uint64_t off);
- int aio_read(IoCtxImpl& io, object_t oid, AioCompletionImpl *c,
- char *buf, size_t len, uint64_t off);
- int aio_sparse_read(IoCtxImpl& io, const object_t oid,
- AioCompletionImpl *c, std::map<uint64_t,uint64_t> *m,
- bufferlist *data_bl, size_t len, uint64_t off);
- int aio_write(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c,
- const bufferlist& bl, size_t len, uint64_t off);
- int aio_append(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c,
- const bufferlist& bl, size_t len);
- int aio_write_full(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c,
- const bufferlist& bl);
- int aio_exec(IoCtxImpl& io, const object_t& oid, AioCompletionImpl *c,
- const char *cls, const char *method, bufferlist& inbl, bufferlist *outbl);
-
- struct C_PoolAsync_Safe : public Context {
- PoolAsyncCompletionImpl *c;
- C_PoolAsync_Safe(PoolAsyncCompletionImpl *_c);
- void finish(int r);
- };
-
- static PoolAsyncCompletionImpl *pool_async_create_completion();
- static PoolAsyncCompletionImpl *pool_async_create_completion(void *cb_arg,
- rados_callback_t cb);
- static AioCompletionImpl *aio_create_completion();
- static AioCompletionImpl *aio_create_completion(void *cb_arg,
- rados_callback_t cb_complete,
- rados_callback_t cb_safe);
// watch/notify
- struct WatchContext {
- IoCtxImpl *io_ctx_impl;
- const object_t oid;
- uint64_t cookie;
- uint64_t ver;
- librados::WatchCtx *ctx;
- uint64_t linger_id;
-
- WatchContext(IoCtxImpl *io_ctx_impl_,
- const object_t& _oc,
- librados::WatchCtx *_ctx);
- ~WatchContext();
- void notify(RadosClient *client, MWatchNotify *m);
- };
-
- struct C_NotifyComplete : public librados::WatchCtx {
- Mutex *lock;
- Cond *cond;
- bool *done;
-
- C_NotifyComplete(Mutex *_l, Cond *_c, bool *_d);
- void notify(uint8_t opcode, uint64_t ver, bufferlist& bl);
- };
-
uint64_t max_watch_cookie;
- map<uint64_t, WatchContext *> watchers;
-
- void set_sync_op_version(IoCtxImpl& io, eversion_t& ver);
+ map<uint64_t, librados::WatchContext *> watchers;
- void register_watcher(IoCtxImpl& io, const object_t& oid,
- librados::WatchCtx *ctx, uint64_t *cookie,
- WatchContext **pwc = NULL);
+ void register_watcher(librados::WatchContext *wc, const object_t& oid,
+ librados::WatchCtx *ctx, uint64_t *cookie);
void unregister_watcher(uint64_t cookie);
void watch_notify(MWatchNotify *m);
- int watch(IoCtxImpl& io, const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx);
- int unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie);
- int notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl);
- int _notify_ack(IoCtxImpl& io, const object_t& oid, uint64_t notify_id, uint64_t ver);
-
- eversion_t last_version(IoCtxImpl& io);
- void set_assert_version(IoCtxImpl& io, uint64_t ver);
- void set_assert_src_version(IoCtxImpl& io, const object_t& oid, uint64_t ver);
- void set_notify_timeout(IoCtxImpl& io, uint32_t timeout);
};
#endif
diff --git a/src/librados/librados.cc b/src/librados/librados.cc
index 48c988fccbb..bd113f8b02a 100644
--- a/src/librados/librados.cc
+++ b/src/librados/librados.cc
@@ -534,47 +534,47 @@ void librados::IoCtx::dup(const IoCtx& rhs)
int librados::IoCtx::set_auid(uint64_t auid_)
{
- return io_ctx_impl->client->pool_change_auid(io_ctx_impl, auid_);
+ return io_ctx_impl->pool_change_auid(auid_);
}
int librados::IoCtx::set_auid_async(uint64_t auid_, PoolAsyncCompletion *c)
{
- return io_ctx_impl->client->pool_change_auid_async(io_ctx_impl, auid_, c->pc);
+ return io_ctx_impl->pool_change_auid_async(auid_, c->pc);
}
int librados::IoCtx::get_auid(uint64_t *auid_)
{
- return io_ctx_impl->client->pool_get_auid(io_ctx_impl, (unsigned long long *)auid_);
+ return rados_ioctx_pool_get_auid(io_ctx_impl, auid_);
}
int librados::IoCtx::create(const std::string& oid, bool exclusive)
{
object_t obj(oid);
- return io_ctx_impl->client->create(*io_ctx_impl, obj, exclusive);
+ return io_ctx_impl->create(obj, exclusive);
}
int librados::IoCtx::create(const std::string& oid, bool exclusive, const std::string& category)
{
object_t obj(oid);
- return io_ctx_impl->client->create(*io_ctx_impl, obj, exclusive, category);
+ return io_ctx_impl->create(obj, exclusive, category);
}
int librados::IoCtx::write(const std::string& oid, bufferlist& bl, size_t len, uint64_t off)
{
object_t obj(oid);
- return io_ctx_impl->client->write(*io_ctx_impl, obj, bl, len, off);
+ return io_ctx_impl->write(obj, bl, len, off);
}
int librados::IoCtx::append(const std::string& oid, bufferlist& bl, size_t len)
{
object_t obj(oid);
- return io_ctx_impl->client->append(*io_ctx_impl, obj, bl, len);
+ return io_ctx_impl->append(obj, bl, len);
}
int librados::IoCtx::write_full(const std::string& oid, bufferlist& bl)
{
object_t obj(oid);
- return io_ctx_impl->client->write_full(*io_ctx_impl, obj, bl);
+ return io_ctx_impl->write_full(obj, bl);
}
int librados::IoCtx::clone_range(const std::string& dst_oid, uint64_t dst_off,
@@ -582,94 +582,94 @@ int librados::IoCtx::clone_range(const std::string& dst_oid, uint64_t dst_off,
size_t len)
{
object_t src(src_oid), dst(dst_oid);
- return io_ctx_impl->client->clone_range(*io_ctx_impl, dst, dst_off, src, src_off, len);
+ return io_ctx_impl->clone_range(dst, dst_off, src, src_off, len);
}
int librados::IoCtx::read(const std::string& oid, bufferlist& bl, size_t len, uint64_t off)
{
object_t obj(oid);
- return io_ctx_impl->client->read(*io_ctx_impl, obj, bl, len, off);
+ return io_ctx_impl->read(obj, bl, len, off);
}
int librados::IoCtx::remove(const std::string& oid)
{
object_t obj(oid);
- return io_ctx_impl->client->remove(*io_ctx_impl, obj);
+ return io_ctx_impl->remove(obj);
}
int librados::IoCtx::trunc(const std::string& oid, uint64_t size)
{
object_t obj(oid);
- return io_ctx_impl->client->trunc(*io_ctx_impl, obj, size);
+ return io_ctx_impl->trunc(obj, size);
}
int librados::IoCtx::mapext(const std::string& oid, uint64_t off, size_t len,
std::map<uint64_t,uint64_t>& m)
{
object_t obj(oid);
- return io_ctx_impl->client->mapext(*io_ctx_impl, oid, off, len, m);
+ return io_ctx_impl->mapext(oid, off, len, m);
}
int librados::IoCtx::sparse_read(const std::string& oid, std::map<uint64_t,uint64_t>& m,
bufferlist& bl, size_t len, uint64_t off)
{
object_t obj(oid);
- return io_ctx_impl->client->sparse_read(*io_ctx_impl, oid, m, bl, len, off);
+ return io_ctx_impl->sparse_read(oid, m, bl, len, off);
}
int librados::IoCtx::getxattr(const std::string& oid, const char *name, bufferlist& bl)
{
object_t obj(oid);
- return io_ctx_impl->client->getxattr(*io_ctx_impl, obj, name, bl);
+ return io_ctx_impl->getxattr(obj, name, bl);
}
int librados::IoCtx::getxattrs(const std::string& oid, map<std::string, bufferlist>& attrset)
{
object_t obj(oid);
- return io_ctx_impl->client->getxattrs(*io_ctx_impl, obj, attrset);
+ return io_ctx_impl->getxattrs(obj, attrset);
}
int librados::IoCtx::setxattr(const std::string& oid, const char *name, bufferlist& bl)
{
object_t obj(oid);
- return io_ctx_impl->client->setxattr(*io_ctx_impl, obj, name, bl);
+ return io_ctx_impl->setxattr(obj, name, bl);
}
int librados::IoCtx::rmxattr(const std::string& oid, const char *name)
{
object_t obj(oid);
- return io_ctx_impl->client->rmxattr(*io_ctx_impl, obj, name);
+ return io_ctx_impl->rmxattr(obj, name);
}
int librados::IoCtx::stat(const std::string& oid, uint64_t *psize, time_t *pmtime)
{
object_t obj(oid);
- return io_ctx_impl->client->stat(*io_ctx_impl, oid, psize, pmtime);
+ return io_ctx_impl->stat(oid, psize, pmtime);
}
int librados::IoCtx::exec(const std::string& oid, const char *cls, const char *method,
bufferlist& inbl, bufferlist& outbl)
{
object_t obj(oid);
- return io_ctx_impl->client->exec(*io_ctx_impl, obj, cls, method, inbl, outbl);
+ return io_ctx_impl->exec(obj, cls, method, inbl, outbl);
}
int librados::IoCtx::tmap_update(const std::string& oid, bufferlist& cmdbl)
{
object_t obj(oid);
- return io_ctx_impl->client->tmap_update(*io_ctx_impl, obj, cmdbl);
+ return io_ctx_impl->tmap_update(obj, cmdbl);
}
int librados::IoCtx::tmap_put(const std::string& oid, bufferlist& bl)
{
object_t obj(oid);
- return io_ctx_impl->client->tmap_put(*io_ctx_impl, obj, bl);
+ return io_ctx_impl->tmap_put(obj, bl);
}
int librados::IoCtx::tmap_get(const std::string& oid, bufferlist& bl)
{
object_t obj(oid);
- return io_ctx_impl->client->tmap_get(*io_ctx_impl, obj, bl);
+ return io_ctx_impl->tmap_get(obj, bl);
}
int librados::IoCtx::omap_get_vals(const std::string& oid,
@@ -767,25 +767,25 @@ int librados::IoCtx::omap_rm_keys(const std::string& oid,
int librados::IoCtx::operate(const std::string& oid, librados::ObjectWriteOperation *o)
{
object_t obj(oid);
- return io_ctx_impl->client->operate(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, o->pmtime);
+ return io_ctx_impl->operate(obj, (::ObjectOperation*)o->impl, o->pmtime);
}
int librados::IoCtx::operate(const std::string& oid, librados::ObjectReadOperation *o, bufferlist *pbl)
{
object_t obj(oid);
- return io_ctx_impl->client->operate_read(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, pbl);
+ return io_ctx_impl->operate_read(obj, (::ObjectOperation*)o->impl, pbl);
}
int librados::IoCtx::aio_operate(const std::string& oid, AioCompletion *c, librados::ObjectWriteOperation *o)
{
object_t obj(oid);
- return io_ctx_impl->client->aio_operate(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, c->pc);
+ return io_ctx_impl->aio_operate(obj, (::ObjectOperation*)o->impl, c->pc);
}
int librados::IoCtx::aio_operate(const std::string& oid, AioCompletion *c, librados::ObjectReadOperation *o, bufferlist *pbl)
{
object_t obj(oid);
- return io_ctx_impl->client->aio_operate_read(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, c->pc, pbl);
+ return io_ctx_impl->aio_operate_read(obj, (::ObjectOperation*)o->impl, c->pc, pbl);
}
void librados::IoCtx::snap_set_read(snap_t seq)
@@ -804,55 +804,54 @@ int librados::IoCtx::selfmanaged_snap_set_write_ctx(snap_t seq, vector<snap_t>&
int librados::IoCtx::snap_create(const char *snapname)
{
- return io_ctx_impl->client->snap_create(io_ctx_impl, snapname);
+ return io_ctx_impl->snap_create(snapname);
}
int librados::IoCtx::snap_lookup(const char *name, snap_t *snapid)
{
- return io_ctx_impl->client->snap_lookup(io_ctx_impl, name, snapid);
+ return io_ctx_impl->snap_lookup(name, snapid);
}
int librados::IoCtx::snap_get_stamp(snap_t snapid, time_t *t)
{
- return io_ctx_impl->client->snap_get_stamp(io_ctx_impl, snapid, t);
+ return io_ctx_impl->snap_get_stamp(snapid, t);
}
int librados::IoCtx::snap_get_name(snap_t snapid, std::string *s)
{
- return io_ctx_impl->client->snap_get_name(io_ctx_impl, snapid, s);
+ return io_ctx_impl->snap_get_name(snapid, s);
}
int librados::IoCtx::snap_remove(const char *snapname)
{
- return io_ctx_impl->client->snap_remove(io_ctx_impl, snapname);
+ return io_ctx_impl->snap_remove(snapname);
}
int librados::IoCtx::snap_list(std::vector<snap_t> *snaps)
{
- return io_ctx_impl->client->snap_list(io_ctx_impl, snaps);
+ return io_ctx_impl->snap_list(snaps);
}
int librados::IoCtx::rollback(const std::string& oid, const char *snapname)
{
- return io_ctx_impl->client->rollback(io_ctx_impl, oid, snapname);
+ return io_ctx_impl->rollback(oid, snapname);
}
int librados::IoCtx::selfmanaged_snap_create(uint64_t *snapid)
{
- return io_ctx_impl->client->selfmanaged_snap_create(io_ctx_impl, snapid);
+ return io_ctx_impl->selfmanaged_snap_create(snapid);
}
int librados::IoCtx::selfmanaged_snap_remove(uint64_t snapid)
{
- return io_ctx_impl->client->selfmanaged_snap_remove(io_ctx_impl, snapid);
+ return io_ctx_impl->selfmanaged_snap_remove(snapid);
}
int librados::IoCtx::selfmanaged_snap_rollback(const std::string& oid, uint64_t snapid)
{
- return io_ctx_impl->client->selfmanaged_snap_rollback_object(io_ctx_impl,
- oid,
- io_ctx_impl->snapc,
- snapid);
+ return io_ctx_impl->selfmanaged_snap_rollback_object(oid,
+ io_ctx_impl->snapc,
+ snapid);
}
librados::ObjectIterator librados::IoCtx::objects_begin()
@@ -871,48 +870,50 @@ const librados::ObjectIterator& librados::IoCtx::objects_end() const
uint64_t librados::IoCtx::get_last_version()
{
- eversion_t ver = io_ctx_impl->client->last_version(*io_ctx_impl);
+ eversion_t ver = io_ctx_impl->last_version();
return ver.version;
}
int librados::IoCtx::aio_read(const std::string& oid, librados::AioCompletion *c,
bufferlist *pbl, size_t len, uint64_t off)
{
- return io_ctx_impl->client->aio_read(*io_ctx_impl, oid, c->pc, pbl, len, off);
+ return io_ctx_impl->aio_read(oid, c->pc, pbl, len, off);
}
-int librados::IoCtx::aio_exec(const std::string& oid, librados::AioCompletion *c, const char *cls, const char *method,
- bufferlist& inbl, bufferlist *outbl)
+int librados::IoCtx::aio_exec(const std::string& oid,
+ librados::AioCompletion *c, const char *cls,
+ const char *method, bufferlist& inbl,
+ bufferlist *outbl)
{
object_t obj(oid);
- return io_ctx_impl->client->aio_exec(*io_ctx_impl, obj, c->pc, cls, method, inbl, outbl);
+ return io_ctx_impl->aio_exec(obj, c->pc, cls, method, inbl, outbl);
}
int librados::IoCtx::aio_sparse_read(const std::string& oid, librados::AioCompletion *c,
std::map<uint64_t,uint64_t> *m, bufferlist *data_bl,
size_t len, uint64_t off)
{
- return io_ctx_impl->client->aio_sparse_read(*io_ctx_impl, oid, c->pc,
- m, data_bl, len, off);
+ return io_ctx_impl->aio_sparse_read(oid, c->pc,
+ m, data_bl, len, off);
}
int librados::IoCtx::aio_write(const std::string& oid, librados::AioCompletion *c,
const bufferlist& bl, size_t len, uint64_t off)
{
- return io_ctx_impl->client->aio_write(*io_ctx_impl, oid, c->pc, bl, len, off);
+ return io_ctx_impl->aio_write(oid, c->pc, bl, len, off);
}
int librados::IoCtx::aio_append(const std::string& oid, librados::AioCompletion *c,
const bufferlist& bl, size_t len)
{
- return io_ctx_impl->client->aio_append(*io_ctx_impl, oid, c->pc, bl, len);
+ return io_ctx_impl->aio_append(oid, c->pc, bl, len);
}
int librados::IoCtx::aio_write_full(const std::string& oid, librados::AioCompletion *c,
const bufferlist& bl)
{
object_t obj(oid);
- return io_ctx_impl->client->aio_write_full(*io_ctx_impl, obj, c->pc, bl);
+ return io_ctx_impl->aio_write_full(obj, c->pc, bl);
}
int librados::IoCtx::aio_flush()
@@ -925,36 +926,36 @@ int librados::IoCtx::watch(const string& oid, uint64_t ver, uint64_t *cookie,
librados::WatchCtx *ctx)
{
object_t obj(oid);
- return io_ctx_impl->client->watch(*io_ctx_impl, obj, ver, cookie, ctx);
+ return io_ctx_impl->watch(obj, ver, cookie, ctx);
}
int librados::IoCtx::unwatch(const string& oid, uint64_t handle)
{
uint64_t cookie = handle;
object_t obj(oid);
- return io_ctx_impl->client->unwatch(*io_ctx_impl, obj, cookie);
+ return io_ctx_impl->unwatch(obj, cookie);
}
int librados::IoCtx::notify(const string& oid, uint64_t ver, bufferlist& bl)
{
object_t obj(oid);
- return io_ctx_impl->client->notify(*io_ctx_impl, obj, ver, bl);
+ return io_ctx_impl->notify(obj, ver, bl);
}
void librados::IoCtx::set_notify_timeout(uint32_t timeout)
{
- io_ctx_impl->client->set_notify_timeout(*io_ctx_impl, timeout);
+ io_ctx_impl->set_notify_timeout(timeout);
}
void librados::IoCtx::set_assert_version(uint64_t ver)
{
- io_ctx_impl->client->set_assert_version(*io_ctx_impl, ver);
+ io_ctx_impl->set_assert_version(ver);
}
void librados::IoCtx::set_assert_src_version(const std::string& oid, uint64_t ver)
{
object_t obj(oid);
- io_ctx_impl->client->set_assert_src_version(*io_ctx_impl, obj, ver);
+ io_ctx_impl->set_assert_src_version(obj, ver);
}
const std::string& librados::IoCtx::get_pool_name() const
@@ -1199,13 +1200,13 @@ int librados::Rados::cluster_stat(cluster_stat_t& result)
librados::PoolAsyncCompletion *librados::Rados::pool_async_create_completion()
{
- PoolAsyncCompletionImpl *c = RadosClient::pool_async_create_completion();
+ PoolAsyncCompletionImpl *c = new PoolAsyncCompletionImpl;
return new PoolAsyncCompletion(c);
}
librados::AioCompletion *librados::Rados::aio_create_completion()
{
- AioCompletionImpl *c = RadosClient::aio_create_completion();
+ AioCompletionImpl *c = new AioCompletionImpl;
return new AioCompletion(c);
}
@@ -1213,11 +1214,12 @@ librados::AioCompletion *librados::Rados::aio_create_completion(void *cb_arg,
callback_t cb_complete,
callback_t cb_safe)
{
- AioCompletionImpl *c = RadosClient::aio_create_completion(cb_arg, cb_complete, cb_safe);
+ AioCompletionImpl *c;
+ int r = rados_aio_create_completion(cb_arg, cb_complete, cb_safe, (void**)&c);
+ assert(r == 0);
return new AioCompletion(c);
}
-
librados::ObjectOperation::ObjectOperation()
{
impl = (ObjectOperationImpl *)new ::ObjectOperation;
@@ -1399,14 +1401,13 @@ extern "C" int rados_pool_list(rados_t cluster, char *buf, size_t len)
extern "C" int rados_ioctx_create(rados_t cluster, const char *name, rados_ioctx_t *io)
{
- librados::RadosClient *radosp = (librados::RadosClient *)cluster;
- int64_t poolid = radosp->lookup_pool(name);
- if (poolid < 0)
- return (int)poolid;
+ librados::RadosClient *client = (librados::RadosClient *)cluster;
+ librados::IoCtxImpl *ctx;
+
+ int r = client->create_ioctx(name, &ctx);
+ if (r < 0)
+ return r;
- librados::IoCtxImpl *ctx = new librados::IoCtxImpl(radosp, poolid, name, CEPH_NOSNAP);
- if (!ctx)
- return -ENOMEM;
*io = ctx;
ctx->get();
return 0;
@@ -1474,7 +1475,7 @@ extern "C" int rados_write(rados_ioctx_t io, const char *o, const char *buf, siz
object_t oid(o);
bufferlist bl;
bl.append(buf, len);
- return ctx->client->write(*ctx, oid, bl, len, off);
+ return ctx->write(oid, bl, len, off);
}
extern "C" int rados_append(rados_ioctx_t io, const char *o, const char *buf, size_t len)
@@ -1483,7 +1484,7 @@ extern "C" int rados_append(rados_ioctx_t io, const char *o, const char *buf, si
object_t oid(o);
bufferlist bl;
bl.append(buf, len);
- return ctx->client->append(*ctx, oid, bl, len);
+ return ctx->append(oid, bl, len);
}
extern "C" int rados_write_full(rados_ioctx_t io, const char *o, const char *buf, size_t len)
@@ -1492,7 +1493,7 @@ extern "C" int rados_write_full(rados_ioctx_t io, const char *o, const char *buf
object_t oid(o);
bufferlist bl;
bl.append(buf, len);
- return ctx->client->write_full(*ctx, oid, bl);
+ return ctx->write_full(oid, bl);
}
extern "C" int rados_clone_range(rados_ioctx_t io, const char *dst, uint64_t dst_off,
@@ -1500,21 +1501,21 @@ extern "C" int rados_clone_range(rados_ioctx_t io, const char *dst, uint64_t dst
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t dst_oid(dst), src_oid(src);
- return ctx->client->clone_range(*ctx, dst_oid, dst_off, src_oid, src_off, len);
+ return ctx->clone_range(dst_oid, dst_off, src_oid, src_off, len);
}
extern "C" int rados_trunc(rados_ioctx_t io, const char *o, uint64_t size)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
- return ctx->client->trunc(*ctx, oid, size);
+ return ctx->trunc(oid, size);
}
extern "C" int rados_remove(rados_ioctx_t io, const char *o)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
- return ctx->client->remove(*ctx, oid);
+ return ctx->remove(oid);
}
extern "C" int rados_read(rados_ioctx_t io, const char *o, char *buf, size_t len, uint64_t off)
@@ -1527,7 +1528,7 @@ extern "C" int rados_read(rados_ioctx_t io, const char *o, char *buf, size_t len
bufferptr bp = buffer::create_static(len, buf);
bl.push_back(bp);
- ret = ctx->client->read(*ctx, oid, bl, len, off);
+ ret = ctx->read(oid, bl, len, off);
if (ret >= 0) {
if (bl.length() > len)
return -ERANGE;
@@ -1542,7 +1543,7 @@ extern "C" int rados_read(rados_ioctx_t io, const char *o, char *buf, size_t len
extern "C" uint64_t rados_get_last_version(rados_ioctx_t io)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
- eversion_t ver = ctx->client->last_version(*ctx);
+ eversion_t ver = ctx->last_version();
return ver.version;
}
@@ -1586,13 +1587,13 @@ extern "C" int rados_pool_delete(rados_t cluster, const char *pool_name)
extern "C" int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
- return ctx->client->pool_change_auid(ctx, auid);
+ return ctx->pool_change_auid(auid);
}
extern "C" int rados_ioctx_pool_get_auid(rados_ioctx_t io, uint64_t *auid)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
- return ctx->client->pool_get_auid(ctx, (unsigned long long *)auid);
+ return ctx->client->pool_get_auid(ctx->get_id(), (unsigned long long *)auid);
}
extern "C" void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
@@ -1614,34 +1615,34 @@ extern "C" int64_t rados_ioctx_get_id(rados_ioctx_t io)
extern "C" int rados_ioctx_snap_create(rados_ioctx_t io, const char *snapname)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
- return ctx->client->snap_create(ctx, snapname);
+ return ctx->snap_create(snapname);
}
extern "C" int rados_ioctx_snap_remove(rados_ioctx_t io, const char *snapname)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
- return ctx->client->snap_remove(ctx, snapname);
+ return ctx->snap_remove(snapname);
}
extern "C" int rados_rollback(rados_ioctx_t io, const char *oid,
const char *snapname)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
- return ctx->client->rollback(ctx, oid, snapname);
+ return ctx->rollback(oid, snapname);
}
extern "C" int rados_ioctx_selfmanaged_snap_create(rados_ioctx_t io,
uint64_t *snapid)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
- return ctx->client->selfmanaged_snap_create(ctx, snapid);
+ return ctx->selfmanaged_snap_create(snapid);
}
extern "C" int rados_ioctx_selfmanaged_snap_remove(rados_ioctx_t io,
uint64_t snapid)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
- return ctx->client->selfmanaged_snap_remove(ctx, snapid);
+ return ctx->selfmanaged_snap_remove(snapid);
}
extern "C" int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io,
@@ -1649,7 +1650,7 @@ extern "C" int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io,
uint64_t snapid)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
- return ctx->client->selfmanaged_snap_rollback_object(ctx, oid, ctx->snapc, snapid);
+ return ctx->selfmanaged_snap_rollback_object(oid, ctx->snapc, snapid);
}
extern "C" int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t *snaps,
@@ -1657,7 +1658,7 @@ extern "C" int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t *snaps,
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
vector<uint64_t> snapvec;
- int r = ctx->client->snap_list(ctx, &snapvec);
+ int r = ctx->snap_list(&snapvec);
if (r < 0)
return r;
if ((int)snapvec.size() <= maxlen) {
@@ -1672,7 +1673,7 @@ extern "C" int rados_ioctx_snap_lookup(rados_ioctx_t io, const char *name,
rados_snap_t *id)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
- return ctx->client->snap_lookup(ctx, name, (uint64_t *)id);
+ return ctx->snap_lookup(name, (uint64_t *)id);
}
extern "C" int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id,
@@ -1680,7 +1681,7 @@ extern "C" int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id,
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
std::string sname;
- int r = ctx->client->snap_get_name(ctx, id, &sname);
+ int r = ctx->snap_get_name(id, &sname);
if (r < 0)
return r;
if ((int)sname.length() >= maxlen)
@@ -1692,7 +1693,7 @@ extern "C" int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id,
extern "C" int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t *t)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
- return ctx->client->snap_get_stamp(ctx, id, t);
+ return ctx->snap_get_stamp(id, t);
}
extern "C" int rados_getxattr(rados_ioctx_t io, const char *o, const char *name,
@@ -1702,7 +1703,7 @@ extern "C" int rados_getxattr(rados_ioctx_t io, const char *o, const char *name,
int ret;
object_t oid(o);
bufferlist bl;
- ret = ctx->client->getxattr(*ctx, oid, name, bl);
+ ret = ctx->getxattr(oid, name, bl);
if (ret >= 0) {
if (bl.length() > len)
return -ERANGE;
@@ -1738,7 +1739,7 @@ extern "C" int rados_getxattrs(rados_ioctx_t io, const char *oid,
return -ENOMEM;
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t obj(oid);
- int ret = ctx->client->getxattrs(*ctx, obj, it->attrset);
+ int ret = ctx->getxattrs(obj, it->attrset);
if (ret) {
delete it;
return ret;
@@ -1788,21 +1789,21 @@ extern "C" int rados_setxattr(rados_ioctx_t io, const char *o, const char *name,
object_t oid(o);
bufferlist bl;
bl.append(buf, len);
- return ctx->client->setxattr(*ctx, oid, name, bl);
+ return ctx->setxattr(oid, name, bl);
}
extern "C" int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
- return ctx->client->rmxattr(*ctx, oid, name);
+ return ctx->rmxattr(oid, name);
}
extern "C" int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
- return ctx->client->stat(*ctx, oid, psize, pmtime);
+ return ctx->stat(oid, psize, pmtime);
}
extern "C" int rados_tmap_update(rados_ioctx_t io, const char *o, const char *cmdbuf, size_t cmdbuflen)
@@ -1811,7 +1812,7 @@ extern "C" int rados_tmap_update(rados_ioctx_t io, const char *o, const char *cm
object_t oid(o);
bufferlist cmdbl;
cmdbl.append(cmdbuf, cmdbuflen);
- return ctx->client->tmap_update(*ctx, oid, cmdbl);
+ return ctx->tmap_update(oid, cmdbl);
}
extern "C" int rados_tmap_put(rados_ioctx_t io, const char *o, const char *buf, size_t buflen)
@@ -1820,7 +1821,7 @@ extern "C" int rados_tmap_put(rados_ioctx_t io, const char *o, const char *buf,
object_t oid(o);
bufferlist bl;
bl.append(buf, buflen);
- return ctx->client->tmap_put(*ctx, oid, bl);
+ return ctx->tmap_put(oid, bl);
}
extern "C" int rados_tmap_get(rados_ioctx_t io, const char *o, char *buf, size_t buflen)
@@ -1828,7 +1829,7 @@ extern "C" int rados_tmap_get(rados_ioctx_t io, const char *o, char *buf, size_t
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
bufferlist bl;
- int r = ctx->client->tmap_get(*ctx, oid, bl);
+ int r = ctx->tmap_get(oid, bl);
if (r < 0)
return r;
if (bl.length() > buflen)
@@ -1845,7 +1846,7 @@ extern "C" int rados_exec(rados_ioctx_t io, const char *o, const char *cls, cons
bufferlist inbl, outbl;
int ret;
inbl.append(inbuf, in_len);
- ret = ctx->client->exec(*ctx, oid, cls, method, inbl, outbl);
+ ret = ctx->exec(oid, cls, method, inbl, outbl);
if (ret >= 0) {
if (outbl.length()) {
if (outbl.length() > out_len)
@@ -1887,7 +1888,7 @@ extern "C" int rados_objects_list_next(rados_list_ctx_t listctx, const char **en
h->list.pop_front();
if (h->list.empty()) {
- ret = lh->ctx->client->list(lh->lc, RADOS_LIST_MAX_ENTRIES);
+ ret = lh->ctx->list(lh->lc, RADOS_LIST_MAX_ENTRIES);
if (ret < 0)
return ret;
if (h->list.empty())
@@ -1910,10 +1911,17 @@ extern "C" int rados_objects_list_next(rados_list_ctx_t listctx, const char **en
// -------------------------
// aio
-extern "C" int rados_aio_create_completion(void *cb_arg, rados_callback_t cb_complete,
- rados_callback_t cb_safe, rados_completion_t *pc)
-{
- *pc = librados::RadosClient::aio_create_completion(cb_arg, cb_complete, cb_safe);
+extern "C" int rados_aio_create_completion(void *cb_arg,
+ rados_callback_t cb_complete,
+ rados_callback_t cb_safe,
+ rados_completion_t *pc)
+{
+ librados::AioCompletionImpl *c = new librados::AioCompletionImpl;
+ if (cb_complete)
+ c->set_complete_callback(cb_arg, cb_complete);
+ if (cb_safe)
+ c->set_safe_callback(cb_arg, cb_safe);
+ *pc = c;
return 0;
}
@@ -1958,8 +1966,8 @@ extern "C" int rados_aio_read(rados_ioctx_t io, const char *o,
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
- return ctx->client->aio_read(*ctx, oid,
- (librados::AioCompletionImpl*)completion, buf, len, off);
+ return ctx->aio_read(oid, (librados::AioCompletionImpl*)completion,
+ buf, len, off);
}
extern "C" int rados_aio_write(rados_ioctx_t io, const char *o,
@@ -1970,8 +1978,8 @@ extern "C" int rados_aio_write(rados_ioctx_t io, const char *o,
object_t oid(o);
bufferlist bl;
bl.append(buf, len);
- return ctx->client->aio_write(*ctx, oid,
- (librados::AioCompletionImpl*)completion, bl, len, off);
+ return ctx->aio_write(oid, (librados::AioCompletionImpl*)completion,
+ bl, len, off);
}
extern "C" int rados_aio_append(rados_ioctx_t io, const char *o,
@@ -1982,20 +1990,19 @@ extern "C" int rados_aio_append(rados_ioctx_t io, const char *o,
object_t oid(o);
bufferlist bl;
bl.append(buf, len);
- return ctx->client->aio_append(*ctx, oid,
- (librados::AioCompletionImpl*)completion, bl, len);
+ return ctx->aio_append(oid, (librados::AioCompletionImpl*)completion,
+ bl, len);
}
extern "C" int rados_aio_write_full(rados_ioctx_t io, const char *o,
- rados_completion_t completion,
- const char *buf, size_t len)
+ rados_completion_t completion,
+ const char *buf, size_t len)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
bufferlist bl;
bl.append(buf, len);
- return ctx->client->aio_write_full(*ctx, oid,
- (librados::AioCompletionImpl*)completion, bl);
+ return ctx->aio_write_full(oid, (librados::AioCompletionImpl*)completion, bl);
}
extern "C" int rados_aio_flush(rados_ioctx_t io)
@@ -2021,7 +2028,7 @@ int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, uint64_t *handle,
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
C_WatchCB *wc = new C_WatchCB(watchcb, arg);
- return ctx->client->watch(*ctx, oid, ver, cookie, wc);
+ return ctx->watch(oid, ver, cookie, wc);
}
int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle)
@@ -2029,7 +2036,7 @@ int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle)
uint64_t cookie = handle;
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
- return ctx->client->unwatch(*ctx, oid, cookie);
+ return ctx->unwatch(oid, cookie);
}
int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len)
@@ -2042,5 +2049,5 @@ int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf,
memcpy(p.c_str(), buf, buf_len);
bl.push_back(p);
}
- return ctx->client->notify(*ctx, oid, ver, bl);
+ return ctx->notify(oid, ver, bl);
}