diff options
author | Josh Durgin <josh.durgin@dreamhost.com> | 2012-03-01 12:08:33 -0800 |
---|---|---|
committer | Josh Durgin <josh.durgin@dreamhost.com> | 2012-03-13 11:46:02 -0700 |
commit | 5f92f338be3e0a7afff1957edc0d7027b23e0378 (patch) | |
tree | 6550de46a8ff419b6d45fe96ede4afecd61ddd1c /src/librados | |
parent | 8f278647f4aac364c543059a039610d8a28a6b4d (diff) | |
download | ceph-5f92f338be3e0a7afff1957edc0d7027b23e0378.tar.gz |
librados: move methods that require an IoCtx to IoCtxImpl
RadosClient still does a few different things, but at least it
no longer does all the work of an IoCtx.
Signed-off-by: Josh Durgin <josh.durgin@dreamhost.com>
Diffstat (limited to 'src/librados')
-rw-r--r-- | src/librados/IoCtxImpl.cc | 1534 | ||||
-rw-r--r-- | src/librados/IoCtxImpl.h | 134 | ||||
-rw-r--r-- | src/librados/PoolAsyncCompletionImpl.h | 150 | ||||
-rw-r--r-- | src/librados/RadosClient.cc | 1644 | ||||
-rw-r--r-- | src/librados/RadosClient.h | 152 | ||||
-rw-r--r-- | src/librados/librados.cc | 235 |
6 files changed, 1914 insertions, 1935 deletions
diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc index f32556d8a6d..d22aebc0f81 100644 --- a/src/librados/IoCtxImpl.cc +++ b/src/librados/IoCtxImpl.cc @@ -15,8 +15,10 @@ #include "IoCtxImpl.h" #include "librados/AioCompletionImpl.h" +#include "librados/PoolAsyncCompletionImpl.h" #include "librados/RadosClient.h" + #define DOUT_SUBSYS rados #undef dout_prefix #define dout_prefix *_dout << "librados: " @@ -26,11 +28,14 @@ librados::IoCtxImpl::IoCtxImpl() { } -librados::IoCtxImpl::IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s) - : ref_cnt(0), client(c), poolid(pid), - pool_name(pool_name_), snap_seq(s), assert_ver(0), - notify_timeout(c->cct->_conf->client_notify_timeout), oloc(pid), - aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"), aio_write_seq(0) +librados::IoCtxImpl::IoCtxImpl(RadosClient *c, Objecter *objecter, + Mutex *client_lock, int poolid, + const char *pool_name, snapid_t s) + : ref_cnt(0), client(c), poolid(poolid), pool_name(pool_name), snap_seq(s), + assert_ver(0), notify_timeout(c->cct->_conf->client_notify_timeout), + oloc(poolid), + aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"), + aio_write_seq(0), lock(client_lock), objecter(objecter) { } @@ -45,7 +50,8 @@ void librados::IoCtxImpl::set_snap_read(snapid_t s) int librados::IoCtxImpl::set_snap_write_context(snapid_t seq, vector<snapid_t>& snaps) { ::SnapContext n; - ldout(client->cct, 10) << "set snap write context: seq = " << seq << " and snaps = " << snaps << dendl; + ldout(client->cct, 10) << "set snap write context: seq = " << seq + << " and snaps = " << snaps << dendl; n.seq = seq; n.snaps = snaps; if (!n.is_valid()) @@ -85,3 +91,1519 @@ void librados::IoCtxImpl::flush_aio_writes() aio_write_cond.Wait(aio_write_list_lock); aio_write_list_lock.Unlock(); } + +// SNAPS + +int librados::IoCtxImpl::snap_create(const char *snapName) +{ + int reply; + string sName(snapName); + + Mutex mylock ("IoCtxImpl::snap_create::mylock"); + Cond cond; + bool done; + lock->Lock(); + objecter->create_pool_snap(poolid, + sName, + new C_SafeCond(&mylock, &cond, &done, &reply)); + lock->Unlock(); + + mylock.Lock(); + while(!done) cond.Wait(mylock); + mylock.Unlock(); + return reply; +} + +int librados::IoCtxImpl::selfmanaged_snap_create(uint64_t *psnapid) +{ + int reply; + + Mutex mylock("IoCtxImpl::selfmanaged_snap_create::mylock"); + Cond cond; + bool done; + lock->Lock(); + snapid_t snapid; + objecter->allocate_selfmanaged_snap(poolid, &snapid, + new C_SafeCond(&mylock, &cond, &done, &reply)); + lock->Unlock(); + + mylock.Lock(); + while (!done) cond.Wait(mylock); + mylock.Unlock(); + if (reply == 0) + *psnapid = snapid; + return reply; +} + +int librados::IoCtxImpl::snap_remove(const char *snapName) +{ + int reply; + string sName(snapName); + + Mutex mylock ("IoCtxImpl::snap_remove::mylock"); + Cond cond; + bool done; + lock->Lock(); + objecter->delete_pool_snap(poolid, + sName, + new C_SafeCond(&mylock, &cond, &done, &reply)); + lock->Unlock(); + + mylock.Lock(); + while(!done) cond.Wait(mylock); + mylock.Unlock(); + return reply; +} + +int librados::IoCtxImpl::selfmanaged_snap_rollback_object(const object_t& oid, + ::SnapContext& snapc, + uint64_t snapid) +{ + int reply; + + Mutex mylock("IoCtxImpl::snap_rollback::mylock"); + Cond cond; + bool done; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &reply); + + lock->Lock(); + objecter->rollback_object(oid, oloc, snapc, snapid, + ceph_clock_now(client->cct), onack, NULL); + lock->Unlock(); + + mylock.Lock(); + while (!done) cond.Wait(mylock); + mylock.Unlock(); + return reply; +} + +int librados::IoCtxImpl::rollback(const object_t& oid, const char *snapName) +{ + string sName(snapName); + + lock->Lock(); + snapid_t snap; + const map<int64_t, pg_pool_t>& pools = objecter->osdmap->get_pools(); + const pg_pool_t& pg_pool = pools.find(poolid)->second; + map<snapid_t, pool_snap_info_t>::const_iterator p; + for (p = pg_pool.snaps.begin(); + p != pg_pool.snaps.end(); + ++p) { + if (p->second.name == snapName) { + snap = p->first; + break; + } + } + if (p == pg_pool.snaps.end()) { + lock->Unlock(); + return -ENOENT; + } + lock->Unlock(); + + return selfmanaged_snap_rollback_object(oid, snapc, snap); +} + +int librados::IoCtxImpl::selfmanaged_snap_remove(uint64_t snapid) +{ + int reply; + + Mutex mylock("IoCtxImpl::selfmanaged_snap_remove::mylock"); + Cond cond; + bool done; + lock->Lock(); + objecter->delete_selfmanaged_snap(poolid, snapid_t(snapid), + new C_SafeCond(&mylock, &cond, &done, &reply)); + lock->Unlock(); + + mylock.Lock(); + while (!done) cond.Wait(mylock); + mylock.Unlock(); + return (int)reply; +} + +int librados::IoCtxImpl::pool_change_auid(unsigned long long auid) +{ + int reply; + + Mutex mylock("IoCtxImpl::pool_change_auid::mylock"); + Cond cond; + bool done; + lock->Lock(); + objecter->change_pool_auid(poolid, + new C_SafeCond(&mylock, &cond, &done, &reply), + auid); + lock->Unlock(); + + mylock.Lock(); + while (!done) cond.Wait(mylock); + mylock.Unlock(); + return reply; +} + +int librados::IoCtxImpl::pool_change_auid_async(unsigned long long auid, + PoolAsyncCompletionImpl *c) +{ + Mutex::Locker l(*lock); + objecter->change_pool_auid(poolid, + new C_PoolAsync_Safe(c), + auid); + return 0; +} + +int librados::IoCtxImpl::snap_list(vector<uint64_t> *snaps) +{ + Mutex::Locker l(*lock); + const pg_pool_t *pi = objecter->osdmap->get_pg_pool(poolid); + for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin(); + p != pi->snaps.end(); + p++) + snaps->push_back(p->first); + return 0; +} + +int librados::IoCtxImpl::snap_lookup(const char *name, uint64_t *snapid) +{ + Mutex::Locker l(*lock); + const pg_pool_t *pi = objecter->osdmap->get_pg_pool(poolid); + for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin(); + p != pi->snaps.end(); + p++) { + if (p->second.name == name) { + *snapid = p->first; + return 0; + } + } + return -ENOENT; +} + +int librados::IoCtxImpl::snap_get_name(uint64_t snapid, std::string *s) +{ + Mutex::Locker l(*lock); + const pg_pool_t *pi = objecter->osdmap->get_pg_pool(poolid); + map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.find(snapid); + if (p == pi->snaps.end()) + return -ENOENT; + *s = p->second.name.c_str(); + return 0; +} + +int librados::IoCtxImpl::snap_get_stamp(uint64_t snapid, time_t *t) +{ + Mutex::Locker l(*lock); + const pg_pool_t *pi = objecter->osdmap->get_pg_pool(poolid); + map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.find(snapid); + if (p == pi->snaps.end()) + return -ENOENT; + *t = p->second.stamp.sec(); + return 0; +} + + +// IO + +int librados::IoCtxImpl::list(Objecter::ListContext *context, int max_entries) +{ + Cond cond; + bool done; + int r = 0; + object_t oid; + Mutex mylock("IoCtxImpl::list::mylock"); + + if (context->at_end) + return 0; + + context->max_entries = max_entries; + + lock->Lock(); + objecter->list_objects(context, new C_SafeCond(&mylock, &cond, &done, &r)); + lock->Unlock(); + + mylock.Lock(); + while(!done) + cond.Wait(mylock); + mylock.Unlock(); + + return r; +} + +int librados::IoCtxImpl::create(const object_t& oid, bool exclusive) +{ + utime_t ut = ceph_clock_now(client->cct); + + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("IoCtxImpl::create::mylock"); + Cond cond; + bool done; + int r; + eversion_t ver; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock->Lock(); + objecter->create(oid, oloc, + snapc, ut, 0, (exclusive ? CEPH_OSD_OP_FLAG_EXCL : 0), + onack, NULL, &ver); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + return r; +} + +int librados::IoCtxImpl::create(const object_t& oid, bool exclusive, + const std::string& category) +{ + utime_t ut = ceph_clock_now(client->cct); + + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("IoCtxImpl::create::mylock"); + Cond cond; + bool done; + int r; + eversion_t ver; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + ::ObjectOperation o; + o.create(exclusive ? CEPH_OSD_OP_FLAG_EXCL : 0, category); + + lock->Lock(); + objecter->mutate(oid, oloc, o, snapc, ut, 0, onack, NULL, &ver); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + return r; +} + +/* + * add any version assert operations that are appropriate given the + * stat in the IoCtx, either the target version assert or any src + * object asserts. these affect a single ioctx operation, so clear + * the ioctx state when we're doing. + * + * return a pointer to the ObjectOperation if we added any events; + * this is convenient for passing the extra_ops argument into Objecter + * methods. + */ +::ObjectOperation *librados::IoCtxImpl::prepare_assert_ops(::ObjectOperation *op) +{ + ::ObjectOperation *pop = NULL; + if (assert_ver) { + op->assert_version(assert_ver); + assert_ver = 0; + pop = op; + } + while (!assert_src_version.empty()) { + map<object_t,uint64_t>::iterator p = assert_src_version.begin(); + op->assert_src_version(p->first, CEPH_NOSNAP, p->second); + assert_src_version.erase(p); + pop = op; + } + return pop; +} + +int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl, + size_t len, uint64_t off) +{ + utime_t ut = ceph_clock_now(client->cct); + + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("IoCtxImpl::write::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + // extra ops? + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&op); + + lock->Lock(); + objecter->write(oid, oloc, + off, len, snapc, bl, ut, 0, + onack, NULL, &ver, pop); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + if (r < 0) + return r; + + return len; +} + +int librados::IoCtxImpl::append(const object_t& oid, bufferlist& bl, size_t len) +{ + utime_t ut = ceph_clock_now(client->cct); + + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("IoCtxImpl::append::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&op); + + lock->Lock(); + objecter->append(oid, oloc, + len, snapc, bl, ut, 0, + onack, NULL, &ver, pop); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + if (r < 0) + return r; + + return len; +} + +int librados::IoCtxImpl::write_full(const object_t& oid, bufferlist& bl) +{ + utime_t ut = ceph_clock_now(client->cct); + + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("IoCtxImpl::write_full::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&op); + + lock->Lock(); + objecter->write_full(oid, oloc, + snapc, bl, ut, 0, + onack, NULL, &ver, pop); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + return r; +} + +int librados::IoCtxImpl::clone_range(const object_t& dst_oid, + uint64_t dst_offset, + const object_t& src_oid, + uint64_t src_offset, + uint64_t len) +{ + utime_t ut = ceph_clock_now(client->cct); + + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("IoCtxImpl::clone_range::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + bufferlist outbl; + + lock->Lock(); + ::ObjectOperation wr; + prepare_assert_ops(&wr); + wr.clone_range(src_oid, src_offset, len, dst_offset); + objecter->mutate(dst_oid, oloc, wr, snapc, ut, 0, onack, NULL, &ver); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + return r; +} + +int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o, + time_t *pmtime) +{ + utime_t ut; + if (pmtime) { + ut = utime_t(*pmtime, 0); + } else { + ut = ceph_clock_now(client->cct); + } + + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + if (!o->size()) + return 0; + + Mutex mylock("IoCtxImpl::mutate::mylock"); + Cond cond; + bool done; + int r; + eversion_t ver; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock->Lock(); + objecter->mutate(oid, oloc, + *o, snapc, ut, 0, + onack, NULL, &ver); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + return r; +} + +int librados::IoCtxImpl::operate_read(const object_t& oid, + ::ObjectOperation *o, bufferlist *pbl) +{ + if (!o->size()) + return 0; + + Mutex mylock("IoCtxImpl::mutate::mylock"); + Cond cond; + bool done; + int r; + eversion_t ver; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock->Lock(); + objecter->read(oid, oloc, + *o, snap_seq, pbl, 0, + onack, &ver); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + return r; +} + +int librados::IoCtxImpl::aio_operate_read(const object_t &oid, + ::ObjectOperation *o, + AioCompletionImpl *c, bufferlist *pbl) +{ + Context *onack = new C_aio_Ack(c); + + c->pbl = pbl; + + Mutex::Locker l(*lock); + objecter->read(oid, oloc, + *o, snap_seq, pbl, 0, + onack, 0); + return 0; +} + +int librados::IoCtxImpl::aio_operate(const object_t& oid, + ::ObjectOperation *o, AioCompletionImpl *c) +{ + utime_t ut = ceph_clock_now(client->cct); + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + Context *onack = new C_aio_Ack(c); + Context *oncommit = new C_aio_Safe(c); + + queue_aio_write(c); + + Mutex::Locker l(*lock); + objecter->mutate(oid, oloc, *o, snapc, ut, 0, onack, oncommit, &c->objver); + + return 0; +} + +int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c, + bufferlist *pbl, size_t len, uint64_t off) +{ + + Context *onack = new C_aio_Ack(c); + eversion_t ver; + + c->pbl = pbl; + + Mutex::Locker l(*lock); + objecter->read(oid, oloc, + off, len, snap_seq, &c->bl, 0, + onack, &c->objver); + return 0; +} + +int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c, + char *buf, size_t len, uint64_t off) +{ + Context *onack = new C_aio_Ack(c); + + c->buf = buf; + c->maxlen = len; + + Mutex::Locker l(*lock); + objecter->read(oid, oloc, + off, len, snap_seq, &c->bl, 0, + onack, &c->objver); + + return 0; +} + +int librados::IoCtxImpl::aio_sparse_read(const object_t oid, + AioCompletionImpl *c, + std::map<uint64_t,uint64_t> *m, + bufferlist *data_bl, size_t len, + uint64_t off) +{ + + C_aio_sparse_read_Ack *onack = new C_aio_sparse_read_Ack(c); + onack->m = m; + onack->data_bl = data_bl; + eversion_t ver; + + c->pbl = NULL; + + Mutex::Locker l(*lock); + objecter->sparse_read(oid, oloc, + off, len, snap_seq, &c->bl, 0, + onack); + return 0; +} + +int librados::IoCtxImpl::aio_write(const object_t &oid, AioCompletionImpl *c, + const bufferlist& bl, size_t len, + uint64_t off) +{ + utime_t ut = ceph_clock_now(client->cct); + + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + queue_aio_write(c); + + Context *onack = new C_aio_Ack(c); + Context *onsafe = new C_aio_Safe(c); + + Mutex::Locker l(*lock); + objecter->write(oid, oloc, + off, len, snapc, bl, ut, 0, + onack, onsafe, &c->objver); + + return 0; +} + +int librados::IoCtxImpl::aio_append(const object_t &oid, AioCompletionImpl *c, + const bufferlist& bl, size_t len) +{ + utime_t ut = ceph_clock_now(client->cct); + + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + queue_aio_write(c); + + Context *onack = new C_aio_Ack(c); + Context *onsafe = new C_aio_Safe(c); + + Mutex::Locker l(*lock); + objecter->append(oid, oloc, + len, snapc, bl, ut, 0, + onack, onsafe, &c->objver); + + return 0; +} + +int librados::IoCtxImpl::aio_write_full(const object_t &oid, + AioCompletionImpl *c, + const bufferlist& bl) +{ + utime_t ut = ceph_clock_now(client->cct); + + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + queue_aio_write(c); + + Context *onack = new C_aio_Ack(c); + Context *onsafe = new C_aio_Safe(c); + + Mutex::Locker l(*lock); + objecter->write_full(oid, oloc, + snapc, bl, ut, 0, + onack, onsafe, &c->objver); + + return 0; +} + +int librados::IoCtxImpl::remove(const object_t& oid) +{ + utime_t ut = ceph_clock_now(client->cct); + + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("IoCtxImpl::remove::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&op); + + lock->Lock(); + objecter->remove(oid, oloc, + snapc, ut, 0, + onack, NULL, &ver, pop); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + return r; +} + +int librados::IoCtxImpl::trunc(const object_t& oid, uint64_t size) +{ + utime_t ut = ceph_clock_now(client->cct); + + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("IoCtxImpl::write_full::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&op); + + lock->Lock(); + objecter->trunc(oid, oloc, + snapc, ut, 0, + size, 0, + onack, NULL, &ver, pop); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + return r; +} + +int librados::IoCtxImpl::tmap_update(const object_t& oid, bufferlist& cmdbl) +{ + utime_t ut = ceph_clock_now(client->cct); + + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("IoCtxImpl::tmap_update::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + bufferlist outbl; + + lock->Lock(); + ::ObjectOperation wr; + prepare_assert_ops(&wr); + wr.tmap_update(cmdbl); + objecter->mutate(oid, oloc, wr, snapc, ut, 0, onack, NULL, &ver); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + return r; +} + +int librados::IoCtxImpl::tmap_put(const object_t& oid, bufferlist& bl) +{ + utime_t ut = ceph_clock_now(client->cct); + + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("IoCtxImpl::tmap_put::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + bufferlist outbl; + + lock->Lock(); + ::ObjectOperation wr; + prepare_assert_ops(&wr); + wr.tmap_put(bl); + objecter->mutate(oid, oloc, wr, snapc, ut, 0, onack, NULL, &ver); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + return r; +} + +int librados::IoCtxImpl::tmap_get(const object_t& oid, bufferlist& bl) +{ + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("IoCtxImpl::tmap_put::mylock"); + Cond cond; + bool done; + int r = 0; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + bufferlist outbl; + + lock->Lock(); + ::ObjectOperation rd; + prepare_assert_ops(&rd); + rd.tmap_get(&bl, NULL); + objecter->read(oid, oloc, rd, snap_seq, 0, 0, onack, &ver); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + return r; +} + + +int librados::IoCtxImpl::exec(const object_t& oid, + const char *cls, const char *method, + bufferlist& inbl, bufferlist& outbl) +{ + Mutex mylock("IoCtxImpl::exec::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + + lock->Lock(); + ::ObjectOperation rd; + prepare_assert_ops(&rd); + rd.call(cls, method, inbl); + objecter->read(oid, oloc, rd, snap_seq, &outbl, 0, onack, &ver); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + return r; +} + +int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c, + const char *cls, const char *method, + bufferlist& inbl, bufferlist *outbl) +{ + Context *onack = new C_aio_Ack(c); + + Mutex::Locker l(*lock); + ::ObjectOperation rd; + prepare_assert_ops(&rd); + rd.call(cls, method, inbl); + objecter->read(oid, oloc, rd, snap_seq, outbl, 0, onack, &c->objver); + + return 0; +} + +int librados::IoCtxImpl::read(const object_t& oid, + bufferlist& bl, size_t len, uint64_t off) +{ + Mutex mylock("IoCtxImpl::read::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&op); + + lock->Lock(); + objecter->read(oid, oloc, + off, len, snap_seq, &bl, 0, + onack, &ver, pop); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + ldout(client->cct, 10) << "Objecter returned from read r=" << r << dendl; + + set_sync_op_version(ver); + + if (r < 0) + return r; + + if (bl.length() < len) { + ldout(client->cct, 10) << "Returned length " << bl.length() + << " less than original length "<< len << dendl; + } + + return bl.length(); +} + +int librados::IoCtxImpl::mapext(const object_t& oid, + uint64_t off, size_t len, + std::map<uint64_t,uint64_t>& m) +{ + bufferlist bl; + + Mutex mylock("IoCtxImpl::read::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock->Lock(); + objecter->mapext(oid, oloc, + off, len, snap_seq, &bl, 0, + onack); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + ldout(client->cct, 10) << "Objecter returned from read r=" << r << dendl; + + if (r < 0) + return r; + + bufferlist::iterator iter = bl.begin(); + ::decode(m, iter); + + return m.size(); +} + +int librados::IoCtxImpl::sparse_read(const object_t& oid, + std::map<uint64_t,uint64_t>& m, + bufferlist& data_bl, size_t len, + uint64_t off) +{ + bufferlist bl; + + Mutex mylock("IoCtxImpl::read::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock->Lock(); + objecter->sparse_read(oid, oloc, + off, len, snap_seq, &bl, 0, + onack); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + ldout(client->cct, 10) << "Objecter returned from read r=" << r << dendl; + + if (r < 0) + return r; + + bufferlist::iterator iter = bl.begin(); + ::decode(m, iter); + ::decode(data_bl, iter); + + return m.size(); +} + +int librados::IoCtxImpl::stat(const object_t& oid, uint64_t *psize, time_t *pmtime) +{ + Mutex mylock("IoCtxImpl::stat::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + uint64_t size; + utime_t mtime; + eversion_t ver; + + if (!psize) + psize = &size; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&op); + + lock->Lock(); + objecter->stat(oid, oloc, + snap_seq, psize, &mtime, 0, + onack, &ver, pop); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + ldout(client->cct, 10) << "Objecter returned from stat" << dendl; + + if (r >= 0 && pmtime) { + *pmtime = mtime.sec(); + } + + set_sync_op_version(ver); + + return r; +} + +int librados::IoCtxImpl::getxattr(const object_t& oid, + const char *name, bufferlist& bl) +{ + Mutex mylock("IoCtxImpl::getxattr::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&op); + + lock->Lock(); + objecter->getxattr(oid, oloc, + name, snap_seq, &bl, 0, + onack, &ver, pop); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + ldout(client->cct, 10) << "Objecter returned from getxattr" << dendl; + + set_sync_op_version(ver); + + if (r < 0) + return r; + + return bl.length(); +} + +int librados::IoCtxImpl::rmxattr(const object_t& oid, const char *name) +{ + utime_t ut = ceph_clock_now(client->cct); + + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("IoCtxImpl::rmxattr::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&op); + + lock->Lock(); + objecter->removexattr(oid, oloc, name, + snapc, ut, 0, + onack, NULL, &ver, pop); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + if (r < 0) + return r; + + return 0; +} + +int librados::IoCtxImpl::setxattr(const object_t& oid, + const char *name, bufferlist& bl) +{ + utime_t ut = ceph_clock_now(client->cct); + + /* can't write to a snapshot */ + if (snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("IoCtxImpl::setxattr::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&op); + + lock->Lock(); + objecter->setxattr(oid, oloc, name, + snapc, bl, ut, 0, + onack, NULL, &ver, pop); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + if (r < 0) + return r; + + return 0; +} + +int librados::IoCtxImpl::getxattrs(const object_t& oid, + map<std::string, bufferlist>& attrset) +{ + Mutex mylock("IoCtxImpl::getexattrs::mylock"); + Cond cond; + bool done; + int r; + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&op); + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock->Lock(); + map<string, bufferlist> aset; + objecter->getxattrs(oid, oloc, snap_seq, + aset, + 0, onack, &ver, pop); + lock->Unlock(); + + attrset.clear(); + + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + for (map<string,bufferlist>::iterator p = aset.begin(); p != aset.end(); p++) { + ldout(client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl; + attrset[p->first.c_str()] = p->second; + } + + set_sync_op_version(ver); + + return r; +} + +void librados::IoCtxImpl::set_sync_op_version(eversion_t& ver) +{ + last_objver = ver; +} + +int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver, + uint64_t *cookie, librados::WatchCtx *ctx) +{ + ::ObjectOperation rd; + Mutex mylock("IoCtxImpl::watch::mylock"); + Cond cond; + bool done; + int r; + Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t objver; + + lock->Lock(); + + WatchContext *wc = new WatchContext(this, oid, ctx); + client->register_watcher(wc, oid, ctx, cookie); + prepare_assert_ops(&rd); + rd.watch(*cookie, ver, 1); + bufferlist bl; + wc->linger_id = objecter->linger(oid, oloc, rd, snap_seq, bl, NULL, 0, + NULL, onfinish, &objver); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(objver); + + if (r < 0) { + lock->Lock(); + client->unregister_watcher(*cookie); + lock->Unlock(); + } + + return r; +} + + +/* this is called with IoCtxImpl::lock held */ +int librados::IoCtxImpl::_notify_ack(const object_t& oid, + uint64_t notify_id, uint64_t ver) +{ + Mutex mylock("IoCtxImpl::watch::mylock"); + Cond cond; + eversion_t objver; + + ::ObjectOperation rd; + prepare_assert_ops(&rd); + rd.notify_ack(notify_id, ver); + objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0); + + return 0; +} + +int librados::IoCtxImpl::unwatch(const object_t& oid, uint64_t cookie) +{ + bufferlist inbl, outbl; + + Mutex mylock("IoCtxImpl::watch::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + lock->Lock(); + + client->unregister_watcher(cookie); + + ::ObjectOperation rd; + prepare_assert_ops(&rd); + rd.watch(cookie, 0, 0); + objecter->read(oid, oloc, rd, snap_seq, &outbl, 0, onack, &ver); + lock->Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(ver); + + return r; +} + +int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& bl) +{ + bufferlist inbl, outbl; + + Mutex mylock("IoCtxImpl::notify::mylock"); + Mutex mylock_all("IoCtxImpl::notify::mylock_all"); + Cond cond, cond_all; + bool done, done_all; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t objver; + uint64_t cookie; + C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all); + + ::ObjectOperation rd; + prepare_assert_ops(&rd); + + lock->Lock(); + WatchContext *wc = new WatchContext(this, oid, ctx); + client->register_watcher(wc, oid, ctx, &cookie); + uint32_t prot_ver = 1; + uint32_t timeout = notify_timeout; + ::encode(prot_ver, inbl); + ::encode(timeout, inbl); + ::encode(bl, inbl); + rd.notify(cookie, ver, inbl); + wc->linger_id = objecter->linger(oid, oloc, rd, snap_seq, inbl, NULL, + 0, onack, NULL, &objver); + lock->Unlock(); + + mylock_all.Lock(); + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + if (r == 0) { + while (!done_all) + cond_all.Wait(mylock_all); + } + + mylock_all.Unlock(); + + lock->Lock(); + client->unregister_watcher(cookie); + lock->Unlock(); + + set_sync_op_version(objver); + delete ctx; + + return r; +} + +eversion_t librados::IoCtxImpl::last_version() +{ + return last_objver; +} + +void librados::IoCtxImpl::set_assert_version(uint64_t ver) +{ + assert_ver = ver; +} +void librados::IoCtxImpl::set_assert_src_version(const object_t& oid, + uint64_t ver) +{ + assert_src_version[oid] = ver; +} + +void librados::IoCtxImpl::set_notify_timeout(uint32_t timeout) +{ + notify_timeout = timeout; +} + +///////////////////////////// C_aio_Ack //////////////////////////////// + +librados::IoCtxImpl::C_aio_Ack::C_aio_Ack(AioCompletionImpl *_c) : c(_c) +{ + c->get(); +} + +void librados::IoCtxImpl::C_aio_Ack::finish(int r) +{ + c->lock.Lock(); + c->rval = r; + c->ack = true; + c->cond.Signal(); + + if (c->buf && c->bl.length() > 0) { + unsigned l = MIN(c->bl.length(), c->maxlen); + c->bl.copy(0, l, c->buf); + c->rval = c->bl.length(); + } + if (c->pbl) { + *c->pbl = c->bl; + } + + if (c->callback_complete) { + rados_callback_t cb = c->callback_complete; + void *cb_arg = c->callback_arg; + c->lock.Unlock(); + cb(c, cb_arg); + c->lock.Lock(); + } + + c->put_unlock(); +} + +/////////////////////// C_aio_sparse_read_Ack ////////////////////////// + +librados::IoCtxImpl::C_aio_sparse_read_Ack::C_aio_sparse_read_Ack(AioCompletionImpl *_c) + : c(_c) +{ + c->get(); +} + +void librados::IoCtxImpl::C_aio_sparse_read_Ack::finish(int r) +{ + c->lock.Lock(); + c->rval = r; + c->ack = true; + c->cond.Signal(); + + bufferlist::iterator iter = c->bl.begin(); + if (r >= 0) { + ::decode(*m, iter); + ::decode(*data_bl, iter); + } + + if (c->callback_complete) { + rados_callback_t cb = c->callback_complete; + void *cb_arg = c->callback_arg; + c->lock.Unlock(); + cb(c, cb_arg); + c->lock.Lock(); + } + + c->put_unlock(); +} + +//////////////////////////// C_aio_Safe //////////////////////////////// + +librados::IoCtxImpl::C_aio_Safe::C_aio_Safe(AioCompletionImpl *_c) : c(_c) +{ + c->get(); +} + +void librados::IoCtxImpl::C_aio_Safe::finish(int r) +{ + c->lock.Lock(); + if (!c->ack) { + c->rval = r; + c->ack = true; + } + c->safe = true; + c->cond.Signal(); + + if (c->callback_safe) { + rados_callback_t cb = c->callback_safe; + void *cb_arg = c->callback_arg; + c->lock.Unlock(); + cb(c, cb_arg); + c->lock.Lock(); + } + + c->io->complete_aio_write(c); + + c->put_unlock(); +} + +///////////////////////// C_NotifyComplete ///////////////////////////// + +librados::IoCtxImpl::C_NotifyComplete::C_NotifyComplete(Mutex *_l, + Cond *_c, + bool *_d) + : lock(_l), cond(_c), done(_d) +{ + *done = false; +} + +void librados::IoCtxImpl::C_NotifyComplete::notify(uint8_t opcode, + uint64_t ver, + bufferlist& bl) +{ + *done = true; + cond->Signal(); +} + +/////////////////////////// WatchContext /////////////////////////////// + +librados::WatchContext::WatchContext(IoCtxImpl *io_ctx_impl_, + const object_t& _oc, + librados::WatchCtx *_ctx) + : io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0) +{ + io_ctx_impl->get(); +} + +librados::WatchContext::~WatchContext() +{ + io_ctx_impl->put(); +} + +void librados::WatchContext::notify(uint8_t opcode, + uint64_t ver, + uint64_t notify_id, + bufferlist& payload) +{ + ctx->notify(opcode, ver, payload); + if (opcode != WATCH_NOTIFY_COMPLETE) { + io_ctx_impl->_notify_ack(oid, notify_id, ver); + } +} diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h index 950e165e2eb..d0e99d2a71b 100644 --- a/src/librados/IoCtxImpl.h +++ b/src/librados/IoCtxImpl.h @@ -25,6 +25,7 @@ #include "include/types.h" #include "include/xlist.h" #include "osd/osd_types.h" +#include "osdc/Objecter.h" class RadosClient; @@ -46,8 +47,12 @@ struct librados::IoCtxImpl { Cond aio_write_cond; xlist<AioCompletionImpl*> aio_write_list; + Mutex *lock; + Objecter *objecter; + IoCtxImpl(); - IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s); + IoCtxImpl(RadosClient *c, Objecter *objecter, Mutex *client_lock, + int poolid, const char *pool_name, snapid_t s); void dup(const IoCtxImpl& rhs) { // Copy everything except the ref count @@ -61,6 +66,8 @@ struct librados::IoCtxImpl { last_objver = rhs.last_objver; notify_timeout = rhs.notify_timeout; oloc = rhs.oloc; + lock = rhs.lock; + objecter = rhs.objecter; } void set_snap_read(snapid_t s); @@ -82,6 +89,131 @@ struct librados::IoCtxImpl { int64_t get_id() { return poolid; } + + ::ObjectOperation *prepare_assert_ops(::ObjectOperation *op); + + // snaps + int snap_list(vector<uint64_t> *snaps); + int snap_lookup(const char *name, uint64_t *snapid); + int snap_get_name(uint64_t snapid, std::string *s); + int snap_get_stamp(uint64_t snapid, time_t *t); + int snap_create(const char* snapname); + int selfmanaged_snap_create(uint64_t *snapid); + int snap_remove(const char* snapname); + int rollback(const object_t& oid, const char *snapName); + int selfmanaged_snap_remove(uint64_t snapid); + int selfmanaged_snap_rollback_object(const object_t& oid, + ::SnapContext& snapc, uint64_t snapid); + + // io + int list(Objecter::ListContext *context, int max_entries); + int create(const object_t& oid, bool exclusive); + int create(const object_t& oid, bool exclusive, const std::string& category); + int write(const object_t& oid, bufferlist& bl, size_t len, uint64_t off); + int append(const object_t& oid, bufferlist& bl, size_t len); + int write_full(const object_t& oid, bufferlist& bl); + int clone_range(const object_t& dst_oid, uint64_t dst_offset, + const object_t& src_oid, uint64_t src_offset, uint64_t len); + int read(const object_t& oid, bufferlist& bl, size_t len, uint64_t off); + int mapext(const object_t& oid, uint64_t off, size_t len, + std::map<uint64_t,uint64_t>& m); + int sparse_read(const object_t& oid, std::map<uint64_t,uint64_t>& m, + bufferlist& bl, size_t len, uint64_t off); + int remove(const object_t& oid); + int stat(const object_t& oid, uint64_t *psize, time_t *pmtime); + int trunc(const object_t& oid, uint64_t size); + + int tmap_update(const object_t& oid, bufferlist& cmdbl); + int tmap_put(const object_t& oid, bufferlist& bl); + int tmap_get(const object_t& oid, bufferlist& bl); + + int exec(const object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl); + + int getxattr(const object_t& oid, const char *name, bufferlist& bl); + int setxattr(const object_t& oid, const char *name, bufferlist& bl); + int getxattrs(const object_t& oid, map<string, bufferlist>& attrset); + int rmxattr(const object_t& oid, const char *name); + + int operate(const object_t& oid, ::ObjectOperation *o, time_t *pmtime); + int operate_read(const object_t& oid, ::ObjectOperation *o, bufferlist *pbl); + int aio_operate(const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c); + int aio_operate_read(const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c, bufferlist *pbl); + + struct C_aio_Ack : public Context { + librados::AioCompletionImpl *c; + C_aio_Ack(AioCompletionImpl *_c); + void finish(int r); + }; + + struct C_aio_sparse_read_Ack : public Context { + AioCompletionImpl *c; + bufferlist *data_bl; + std::map<uint64_t,uint64_t> *m; + C_aio_sparse_read_Ack(AioCompletionImpl *_c); + void finish(int r); + }; + + struct C_aio_Safe : public Context { + AioCompletionImpl *c; + C_aio_Safe(AioCompletionImpl *_c); + void finish(int r); + }; + + int aio_read(const object_t oid, AioCompletionImpl *c, + bufferlist *pbl, size_t len, uint64_t off); + int aio_read(object_t oid, AioCompletionImpl *c, + char *buf, size_t len, uint64_t off); + int aio_sparse_read(const object_t oid, AioCompletionImpl *c, + std::map<uint64_t,uint64_t> *m, bufferlist *data_bl, + size_t len, uint64_t off); + int aio_write(const object_t &oid, AioCompletionImpl *c, + const bufferlist& bl, size_t len, uint64_t off); + int aio_append(const object_t &oid, AioCompletionImpl *c, + const bufferlist& bl, size_t len); + int aio_write_full(const object_t &oid, AioCompletionImpl *c, + const bufferlist& bl); + int aio_exec(const object_t& oid, AioCompletionImpl *c, const char *cls, + const char *method, bufferlist& inbl, bufferlist *outbl); + + int pool_change_auid(unsigned long long auid); + int pool_change_auid_async(unsigned long long auid, PoolAsyncCompletionImpl *c); + + void set_sync_op_version(eversion_t& ver); + int watch(const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx); + int unwatch(const object_t& oid, uint64_t cookie); + int notify(const object_t& oid, uint64_t ver, bufferlist& bl); + int _notify_ack(const object_t& oid, uint64_t notify_id, uint64_t ver); + + eversion_t last_version(); + void set_assert_version(uint64_t ver); + void set_assert_src_version(const object_t& oid, uint64_t ver); + void set_notify_timeout(uint32_t timeout); + + struct C_NotifyComplete : public librados::WatchCtx { + Mutex *lock; + Cond *cond; + bool *done; + + C_NotifyComplete(Mutex *_l, Cond *_c, bool *_d); + void notify(uint8_t opcode, uint64_t ver, bufferlist& bl); + }; }; +namespace librados { +struct WatchContext { + IoCtxImpl *io_ctx_impl; + const object_t oid; + uint64_t cookie; + uint64_t ver; + librados::WatchCtx *ctx; + uint64_t linger_id; + + WatchContext(IoCtxImpl *io_ctx_impl_, + const object_t& _oc, + librados::WatchCtx *_ctx); + ~WatchContext(); + void notify(uint8_t opcode, uint64_t ver, uint64_t notify_id, + bufferlist& payload); +}; +} #endif diff --git a/src/librados/PoolAsyncCompletionImpl.h b/src/librados/PoolAsyncCompletionImpl.h index e204aaf2a53..efb89641466 100644 --- a/src/librados/PoolAsyncCompletionImpl.h +++ b/src/librados/PoolAsyncCompletionImpl.h @@ -17,72 +17,100 @@ #include "common/Cond.h" #include "common/Mutex.h" +#include "include/Context.h" #include "include/rados/librados.h" #include "include/rados/librados.hpp" -struct librados::PoolAsyncCompletionImpl { - Mutex lock; - Cond cond; - int ref, rval; - bool released; - bool done; +namespace librados { + struct PoolAsyncCompletionImpl { + Mutex lock; + Cond cond; + int ref, rval; + bool released; + bool done; - rados_callback_t callback; - void *callback_arg; + rados_callback_t callback; + void *callback_arg; - PoolAsyncCompletionImpl() : lock("PoolAsyncCompletionImpl lock"), - ref(1), rval(0), released(false), done(false), - callback(0), callback_arg(0) {} + PoolAsyncCompletionImpl() : lock("PoolAsyncCompletionImpl lock"), + ref(1), rval(0), released(false), done(false), + callback(0), callback_arg(0) {} - int set_callback(void *cb_arg, rados_callback_t cb) { - lock.Lock(); - callback = cb; - callback_arg = cb_arg; - lock.Unlock(); - return 0; - } - int wait() { - lock.Lock(); - while (!done) - cond.Wait(lock); - lock.Unlock(); - return 0; - } - int is_complete() { - lock.Lock(); - int r = done; - lock.Unlock(); - return r; - } - int get_return_value() { - lock.Lock(); - int r = rval; - lock.Unlock(); - return r; - } - void get() { - lock.Lock(); - assert(ref > 0); - ref++; - lock.Unlock(); - } - void release() { - lock.Lock(); - assert(!released); - released = true; - put_unlock(); - } - void put() { - lock.Lock(); - put_unlock(); - } - void put_unlock() { - assert(ref > 0); - int n = --ref; - lock.Unlock(); - if (!n) - delete this; - } -}; + int set_callback(void *cb_arg, rados_callback_t cb) { + lock.Lock(); + callback = cb; + callback_arg = cb_arg; + lock.Unlock(); + return 0; + } + int wait() { + lock.Lock(); + while (!done) + cond.Wait(lock); + lock.Unlock(); + return 0; + } + int is_complete() { + lock.Lock(); + int r = done; + lock.Unlock(); + return r; + } + int get_return_value() { + lock.Lock(); + int r = rval; + lock.Unlock(); + return r; + } + void get() { + lock.Lock(); + assert(ref > 0); + ref++; + lock.Unlock(); + } + void release() { + lock.Lock(); + assert(!released); + released = true; + put_unlock(); + } + void put() { + lock.Lock(); + put_unlock(); + } + void put_unlock() { + assert(ref > 0); + int n = --ref; + lock.Unlock(); + if (!n) + delete this; + } + }; + class C_PoolAsync_Safe : public Context { + PoolAsyncCompletionImpl *c; + + public: + C_PoolAsync_Safe(PoolAsyncCompletionImpl *_c) : c(_c) { + c->get(); + } + + void finish(int r) { + c->lock.Lock(); + c->rval = r; + c->done = true; + c->cond.Signal(); + + if (c->callback) { + rados_callback_t cb = c->callback; + void *cb_arg = c->callback_arg; + c->lock.Unlock(); + cb(c, cb_arg); + c->lock.Lock(); + } + + c->put_unlock(); + } + }; +} #endif diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index b760534a234..2c0fc086834 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -42,36 +42,6 @@ static atomic_t rados_instance; -librados::PoolAsyncCompletionImpl *librados::RadosClient::pool_async_create_completion() -{ - return new librados::PoolAsyncCompletionImpl; -} - -librados::PoolAsyncCompletionImpl *librados::RadosClient::pool_async_create_completion(void *cb_arg, rados_callback_t cb) -{ - librados::PoolAsyncCompletionImpl *c = new librados::PoolAsyncCompletionImpl; - if (cb) - c->set_callback(cb_arg, cb); - return c; -} - -librados::AioCompletionImpl *librados::RadosClient::aio_create_completion() -{ - return new librados::AioCompletionImpl; -} - -librados::AioCompletionImpl *librados::RadosClient::aio_create_completion(void *cb_arg, - rados_callback_t cb_complete, - rados_callback_t cb_safe) -{ - librados::AioCompletionImpl *c = new librados::AioCompletionImpl; - if (cb_complete) - c->set_complete_callback(cb_arg, cb_complete); - if (cb_safe) - c->set_safe_callback(cb_arg, cb_safe); - return c; -} - bool librados::RadosClient::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) { @@ -103,9 +73,19 @@ int64_t librados::RadosClient::lookup_pool(const char *name) { return ret; } -const char *librados::RadosClient::get_pool_name(int64_t poolid_) +const char *librados::RadosClient::get_pool_name(int64_t pool_id) +{ + return osdmap.get_pool_name(pool_id); +} + +int librados::RadosClient::pool_get_auid(uint64_t pool_id, unsigned long long *auid) { - return osdmap.get_pool_name(poolid_); + Mutex::Locker l(lock); + const pg_pool_t *pg = osdmap.get_pg_pool(pool_id); + if (!pg) + return -ENOENT; + *auid = pg->auid; + return 0; } int librados::RadosClient::connect() @@ -226,6 +206,16 @@ librados::RadosClient::~RadosClient() cct = NULL; } +int librados::RadosClient::create_ioctx(const char *name, IoCtxImpl **io) +{ + int64_t poolid = lookup_pool(name); + if (poolid < 0) + return (int)poolid; + + *io = new librados::IoCtxImpl(this, objecter, &lock, poolid, name, + CEPH_NOSNAP); + return 0; +} bool librados::RadosClient::ms_dispatch(Message *m) { @@ -344,144 +334,6 @@ int librados::RadosClient::get_fs_stats(ceph_statfs& stats) return 0; } - -// SNAPS - -int librados::RadosClient::snap_create(rados_ioctx_t io, const char *snapName) -{ - int reply; - int64_t poolID = ((IoCtxImpl *)io)->poolid; - string sName(snapName); - - Mutex mylock ("RadosClient::snap_create::mylock"); - Cond cond; - bool done; - lock.Lock(); - objecter->create_pool_snap(poolID, - sName, - new C_SafeCond(&mylock, &cond, &done, &reply)); - lock.Unlock(); - - mylock.Lock(); - while(!done) cond.Wait(mylock); - mylock.Unlock(); - return reply; -} - -int librados::RadosClient::selfmanaged_snap_create(rados_ioctx_t io, uint64_t *psnapid) -{ - int reply; - int64_t poolID = ((IoCtxImpl *)io)->poolid; - - Mutex mylock("RadosClient::selfmanaged_snap_create::mylock"); - Cond cond; - bool done; - lock.Lock(); - snapid_t snapid; - objecter->allocate_selfmanaged_snap(poolID, &snapid, - new C_SafeCond(&mylock, &cond, &done, &reply)); - lock.Unlock(); - - mylock.Lock(); - while (!done) cond.Wait(mylock); - mylock.Unlock(); - if (reply == 0) - *psnapid = snapid; - return reply; -} - -int librados::RadosClient::snap_remove(rados_ioctx_t io, const char *snapName) -{ - int reply; - int64_t poolID = ((IoCtxImpl *)io)->poolid; - string sName(snapName); - - Mutex mylock ("RadosClient::snap_remove::mylock"); - Cond cond; - bool done; - lock.Lock(); - objecter->delete_pool_snap(poolID, - sName, - new C_SafeCond(&mylock, &cond, &done, &reply)); - lock.Unlock(); - - mylock.Lock(); - while(!done) cond.Wait(mylock); - mylock.Unlock(); - return reply; -} - -int librados::RadosClient::selfmanaged_snap_rollback_object(rados_ioctx_t io, - const object_t& oid, - ::SnapContext& snapc, - uint64_t snapid) -{ - int reply; - IoCtxImpl* ctx = (IoCtxImpl *) io; - - Mutex mylock("RadosClient::snap_rollback::mylock"); - Cond cond; - bool done; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &reply); - - lock.Lock(); - objecter->rollback_object(oid, ctx->oloc, snapc, snapid, - ceph_clock_now(cct), onack, NULL); - lock.Unlock(); - - mylock.Lock(); - while (!done) cond.Wait(mylock); - mylock.Unlock(); - return reply; -} - -int librados::RadosClient::rollback(rados_ioctx_t io_, const object_t& oid, - const char *snapName) -{ - IoCtxImpl* io = (IoCtxImpl *) io_; - string sName(snapName); - - lock.Lock(); - snapid_t snap; - const map<int64_t, pg_pool_t>& pools = objecter->osdmap->get_pools(); - const pg_pool_t& pg_pool = pools.find(io->poolid)->second; - map<snapid_t, pool_snap_info_t>::const_iterator p; - for (p = pg_pool.snaps.begin(); - p != pg_pool.snaps.end(); - ++p) { - if (p->second.name == snapName) { - snap = p->first; - break; - } - } - if (p == pg_pool.snaps.end()) { - lock.Unlock(); - return -ENOENT; - } - lock.Unlock(); - - return selfmanaged_snap_rollback_object(io_, oid, io->snapc, snap); -} - -int librados::RadosClient::selfmanaged_snap_remove(rados_ioctx_t io, uint64_t snapid) -{ - int reply; - int64_t poolID = ((IoCtxImpl *)io)->poolid; - - Mutex mylock("RadosClient::selfmanaged_snap_remove::mylock"); - Cond cond; - bool done; - lock.Lock(); - objecter->delete_selfmanaged_snap(poolID, snapid_t(snapid), - new C_SafeCond(&mylock, &cond, &done, &reply)); - lock.Unlock(); - - mylock.Lock(); - while (!done) cond.Wait(mylock); - mylock.Unlock(); - return (int)reply; -} - int librados::RadosClient::pool_create(string& name, unsigned long long auid, __u8 crush_rule) { @@ -546,1273 +398,14 @@ int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompleti return 0; } -int librados::RadosClient::pool_change_auid(rados_ioctx_t io, unsigned long long auid) -{ - int reply; - - int64_t poolID = ((IoCtxImpl *)io)->poolid; - - Mutex mylock("RadosClient::pool_change_auid::mylock"); - Cond cond; - bool done; - lock.Lock(); - objecter->change_pool_auid(poolID, - new C_SafeCond(&mylock, &cond, &done, &reply), - auid); - lock.Unlock(); - - mylock.Lock(); - while (!done) cond.Wait(mylock); - mylock.Unlock(); - return reply; -} - -int librados::RadosClient::pool_change_auid_async(rados_ioctx_t io, unsigned long long auid, - PoolAsyncCompletionImpl *c) -{ - int64_t poolID = ((IoCtxImpl *)io)->poolid; - - Mutex::Locker l(lock); - objecter->change_pool_auid(poolID, - new C_PoolAsync_Safe(c), - auid); - return 0; -} - -int librados::RadosClient::pool_get_auid(rados_ioctx_t io, unsigned long long *auid) -{ - Mutex::Locker l(lock); - int64_t pool_id = ((IoCtxImpl *)io)->poolid; - const pg_pool_t *pg = osdmap.get_pg_pool(pool_id); - if (!pg) - return -ENOENT; - *auid = pg->auid; - return 0; -} - -int librados::RadosClient::snap_list(IoCtxImpl *io, vector<uint64_t> *snaps) -{ - Mutex::Locker l(lock); - const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid); - for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin(); - p != pi->snaps.end(); - p++) - snaps->push_back(p->first); - return 0; -} - -int librados::RadosClient::snap_lookup(IoCtxImpl *io, const char *name, uint64_t *snapid) -{ - Mutex::Locker l(lock); - const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid); - for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin(); - p != pi->snaps.end(); - p++) { - if (p->second.name == name) { - *snapid = p->first; - return 0; - } - } - return -ENOENT; -} - -int librados::RadosClient::snap_get_name(IoCtxImpl *io, uint64_t snapid, std::string *s) -{ - Mutex::Locker l(lock); - const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid); - map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.find(snapid); - if (p == pi->snaps.end()) - return -ENOENT; - *s = p->second.name.c_str(); - return 0; -} - -int librados::RadosClient::snap_get_stamp(IoCtxImpl *io, uint64_t snapid, time_t *t) -{ - Mutex::Locker l(lock); - const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid); - map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.find(snapid); - if (p == pi->snaps.end()) - return -ENOENT; - *t = p->second.stamp.sec(); - return 0; -} - - -// IO - -int librados::RadosClient::list(Objecter::ListContext *context, int max_entries) -{ - Cond cond; - bool done; - int r = 0; - object_t oid; - Mutex mylock("RadosClient::list::mylock"); - - if (context->at_end) - return 0; - - context->max_entries = max_entries; - - lock.Lock(); - objecter->list_objects(context, new C_SafeCond(&mylock, &cond, &done, &r)); - lock.Unlock(); - - mylock.Lock(); - while(!done) - cond.Wait(mylock); - mylock.Unlock(); - - return r; -} - -int librados::RadosClient::create(IoCtxImpl& io, const object_t& oid, bool exclusive) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::create::mylock"); - Cond cond; - bool done; - int r; - eversion_t ver; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - lock.Lock(); - objecter->create(oid, io.oloc, - io.snapc, ut, 0, (exclusive ? CEPH_OSD_OP_FLAG_EXCL : 0), - onack, NULL, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::create(IoCtxImpl& io, const object_t& oid, bool exclusive, - const std::string& category) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::create::mylock"); - Cond cond; - bool done; - int r; - eversion_t ver; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - ::ObjectOperation o; - o.create(exclusive ? CEPH_OSD_OP_FLAG_EXCL : 0, category); - - lock.Lock(); - objecter->mutate(oid, io.oloc, o, io.snapc, ut, 0, onack, NULL, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -/* - * add any version assert operations that are appropriate given the - * stat in the IoCtx, either the target version assert or any src - * object asserts. these affect a single ioctx operation, so clear - * the ioctx state when we're doing. - * - * return a pointer to the ObjectOperation if we added any events; - * this is convenient for passing the extra_ops argument into Objecter - * methods. - */ -::ObjectOperation *librados::RadosClient::prepare_assert_ops(IoCtxImpl *io, ::ObjectOperation *op) -{ - ::ObjectOperation *pop = NULL; - if (io->assert_ver) { - op->assert_version(io->assert_ver); - io->assert_ver = 0; - pop = op; - } - while (!io->assert_src_version.empty()) { - map<object_t,uint64_t>::iterator p = io->assert_src_version.begin(); - op->assert_src_version(p->first, CEPH_NOSNAP, p->second); - io->assert_src_version.erase(p); - pop = op; - } - return pop; -} - -int librados::RadosClient::write(IoCtxImpl& io, const object_t& oid, bufferlist& bl, - size_t len, uint64_t off) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::write::mylock"); - Cond cond; - bool done; - int r; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - // extra ops? - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->write(oid, io.oloc, - off, len, io.snapc, bl, ut, 0, - onack, NULL, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - if (r < 0) - return r; - - return len; -} - -int librados::RadosClient::append(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::append::mylock"); - Cond cond; - bool done; - int r; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->append(oid, io.oloc, - len, io.snapc, bl, ut, 0, - onack, NULL, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - if (r < 0) - return r; - - return len; -} - -int librados::RadosClient::write_full(IoCtxImpl& io, const object_t& oid, bufferlist& bl) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::write_full::mylock"); - Cond cond; - bool done; - int r; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->write_full(oid, io.oloc, - io.snapc, bl, ut, 0, - onack, NULL, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::clone_range(IoCtxImpl& io, - const object_t& dst_oid, uint64_t dst_offset, - const object_t& src_oid, uint64_t src_offset, - uint64_t len) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::clone_range::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - bufferlist outbl; - - lock.Lock(); - ::ObjectOperation wr; - prepare_assert_ops(&io, &wr); - wr.clone_range(src_oid, src_offset, len, dst_offset); - objecter->mutate(dst_oid, io.oloc, wr, io.snapc, ut, 0, onack, NULL, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::operate(IoCtxImpl& io, const object_t& oid, - ::ObjectOperation *o, time_t *pmtime) -{ - utime_t ut; - if (pmtime) { - ut = utime_t(*pmtime, 0); - } else { - ut = ceph_clock_now(cct); - } - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - if (!o->size()) - return 0; - - Mutex mylock("RadosClient::mutate::mylock"); - Cond cond; - bool done; - int r; - eversion_t ver; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - lock.Lock(); - objecter->mutate(oid, io.oloc, - *o, io.snapc, ut, 0, - onack, NULL, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::operate_read(IoCtxImpl& io, const object_t& oid, - ::ObjectOperation *o, bufferlist *pbl) -{ - if (!o->size()) - return 0; - - Mutex mylock("RadosClient::mutate::mylock"); - Cond cond; - bool done; - int r; - eversion_t ver; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - lock.Lock(); - objecter->read(oid, io.oloc, - *o, io.snap_seq, pbl, 0, - onack, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::aio_operate_read(IoCtxImpl& io, const object_t &oid, - ::ObjectOperation *o, - AioCompletionImpl *c, bufferlist *pbl) -{ - Context *onack = new C_aio_Ack(c); - - c->pbl = pbl; - - Mutex::Locker l(lock); - objecter->read(oid, io.oloc, - *o, io.snap_seq, pbl, 0, - onack, 0); - return 0; -} - -int librados::RadosClient::aio_operate(IoCtxImpl& io, const object_t& oid, - ::ObjectOperation *o, AioCompletionImpl *c) -{ - utime_t ut = ceph_clock_now(cct); - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Context *onack = new C_aio_Ack(c); - Context *oncommit = new C_aio_Safe(c); - - io.queue_aio_write(c); - - Mutex::Locker l(lock); - objecter->mutate(oid, io.oloc, *o, io.snapc, ut, 0, onack, oncommit, &c->objver); - - return 0; -} - -int librados::RadosClient::aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c, - bufferlist *pbl, size_t len, uint64_t off) -{ - - Context *onack = new C_aio_Ack(c); - eversion_t ver; - - c->pbl = pbl; - - Mutex::Locker l(lock); - objecter->read(oid, io.oloc, - off, len, io.snap_seq, &c->bl, 0, - onack, &c->objver); - return 0; -} - -int librados::RadosClient::aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c, - char *buf, size_t len, uint64_t off) -{ - Context *onack = new C_aio_Ack(c); - - c->buf = buf; - c->maxlen = len; - - Mutex::Locker l(lock); - objecter->read(oid, io.oloc, - off, len, io.snap_seq, &c->bl, 0, - onack, &c->objver); - - return 0; -} - -int librados::RadosClient::aio_sparse_read(IoCtxImpl& io, const object_t oid, - AioCompletionImpl *c, std::map<uint64_t,uint64_t> *m, - bufferlist *data_bl, size_t len, uint64_t off) -{ - - C_aio_sparse_read_Ack *onack = new C_aio_sparse_read_Ack(c); - onack->m = m; - onack->data_bl = data_bl; - eversion_t ver; - - c->pbl = NULL; - - Mutex::Locker l(lock); - objecter->sparse_read(oid, io.oloc, - off, len, io.snap_seq, &c->bl, 0, - onack); - return 0; -} - -int librados::RadosClient::aio_write(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, - const bufferlist& bl, size_t len, uint64_t off) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - io.queue_aio_write(c); - - Context *onack = new C_aio_Ack(c); - Context *onsafe = new C_aio_Safe(c); - - Mutex::Locker l(lock); - objecter->write(oid, io.oloc, - off, len, io.snapc, bl, ut, 0, - onack, onsafe, &c->objver); - - return 0; -} - -int librados::RadosClient::aio_append(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, - const bufferlist& bl, size_t len) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - io.queue_aio_write(c); - - Context *onack = new C_aio_Ack(c); - Context *onsafe = new C_aio_Safe(c); - - Mutex::Locker l(lock); - objecter->append(oid, io.oloc, - len, io.snapc, bl, ut, 0, - onack, onsafe, &c->objver); - - return 0; -} - -int librados::RadosClient::aio_write_full(IoCtxImpl& io, const object_t &oid, - AioCompletionImpl *c, const bufferlist& bl) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - io.queue_aio_write(c); - - Context *onack = new C_aio_Ack(c); - Context *onsafe = new C_aio_Safe(c); - - Mutex::Locker l(lock); - objecter->write_full(oid, io.oloc, - io.snapc, bl, ut, 0, - onack, onsafe, &c->objver); - - return 0; -} - -int librados::RadosClient::remove(IoCtxImpl& io, const object_t& oid) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::remove::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->remove(oid, io.oloc, - io.snapc, ut, 0, - onack, NULL, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::trunc(IoCtxImpl& io, const object_t& oid, uint64_t size) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::write_full::mylock"); - Cond cond; - bool done; - int r; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->trunc(oid, io.oloc, - io.snapc, ut, 0, - size, 0, - onack, NULL, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::tmap_update(IoCtxImpl& io, const object_t& oid, bufferlist& cmdbl) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::tmap_update::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - bufferlist outbl; - - lock.Lock(); - ::ObjectOperation wr; - prepare_assert_ops(&io, &wr); - wr.tmap_update(cmdbl); - objecter->mutate(oid, io.oloc, wr, io.snapc, ut, 0, onack, NULL, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::tmap_put(IoCtxImpl& io, const object_t& oid, bufferlist& bl) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::tmap_put::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - bufferlist outbl; - - lock.Lock(); - ::ObjectOperation wr; - prepare_assert_ops(&io, &wr); - wr.tmap_put(bl); - objecter->mutate(oid, io.oloc, wr, io.snapc, ut, 0, onack, NULL, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::tmap_get(IoCtxImpl& io, const object_t& oid, bufferlist& bl) -{ - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::tmap_put::mylock"); - Cond cond; - bool done; - int r = 0; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - bufferlist outbl; - - lock.Lock(); - ::ObjectOperation rd; - prepare_assert_ops(&io, &rd); - rd.tmap_get(&bl, NULL); - objecter->read(oid, io.oloc, rd, io.snap_seq, 0, 0, onack, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::exec(IoCtxImpl& io, const object_t& oid, - const char *cls, const char *method, - bufferlist& inbl, bufferlist& outbl) -{ - Mutex mylock("RadosClient::exec::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - - lock.Lock(); - ::ObjectOperation rd; - prepare_assert_ops(&io, &rd); - rd.call(cls, method, inbl); - objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::aio_exec(IoCtxImpl& io, const object_t& oid, AioCompletionImpl *c, - const char *cls, const char *method, - bufferlist& inbl, bufferlist *outbl) -{ - Context *onack = new C_aio_Ack(c); - - Mutex::Locker l(lock); - ::ObjectOperation rd; - prepare_assert_ops(&io, &rd); - rd.call(cls, method, inbl); - objecter->read(oid, io.oloc, rd, io.snap_seq, outbl, 0, onack, &c->objver); - - return 0; -} - -int librados::RadosClient::read(IoCtxImpl& io, const object_t& oid, - bufferlist& bl, size_t len, uint64_t off) -{ - CephContext *cct = io.client->cct; - Mutex mylock("RadosClient::read::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->read(oid, io.oloc, - off, len, io.snap_seq, &bl, 0, - onack, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - ldout(cct, 10) << "Objecter returned from read r=" << r << dendl; - - set_sync_op_version(io, ver); - - if (r < 0) - return r; - - if (bl.length() < len) { - ldout(cct, 10) << "Returned length " << bl.length() - << " less than original length "<< len << dendl; - } - - return bl.length(); -} - -int librados::RadosClient::mapext(IoCtxImpl& io, const object_t& oid, - uint64_t off, size_t len, std::map<uint64_t,uint64_t>& m) -{ - CephContext *cct = io.client->cct; - bufferlist bl; - - Mutex mylock("RadosClient::read::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - lock.Lock(); - objecter->mapext(oid, io.oloc, - off, len, io.snap_seq, &bl, 0, - onack); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - ldout(cct, 10) << "Objecter returned from read r=" << r << dendl; - - if (r < 0) - return r; - - bufferlist::iterator iter = bl.begin(); - ::decode(m, iter); - - return m.size(); -} - -int librados::RadosClient::sparse_read(IoCtxImpl& io, const object_t& oid, - std::map<uint64_t,uint64_t>& m, - bufferlist& data_bl, size_t len, uint64_t off) -{ - CephContext *cct = io.client->cct; - bufferlist bl; - - Mutex mylock("RadosClient::read::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - lock.Lock(); - objecter->sparse_read(oid, io.oloc, - off, len, io.snap_seq, &bl, 0, - onack); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - ldout(cct, 10) << "Objecter returned from read r=" << r << dendl; - - if (r < 0) - return r; - - bufferlist::iterator iter = bl.begin(); - ::decode(m, iter); - ::decode(data_bl, iter); - - return m.size(); -} - -int librados::RadosClient::stat(IoCtxImpl& io, const object_t& oid, uint64_t *psize, time_t *pmtime) -{ - CephContext *cct = io.client->cct; - Mutex mylock("RadosClient::stat::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - uint64_t size; - utime_t mtime; - eversion_t ver; - - if (!psize) - psize = &size; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->stat(oid, io.oloc, - io.snap_seq, psize, &mtime, 0, - onack, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - ldout(cct, 10) << "Objecter returned from stat" << dendl; - - if (r >= 0 && pmtime) { - *pmtime = mtime.sec(); - } - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::getxattr(IoCtxImpl& io, const object_t& oid, - const char *name, bufferlist& bl) -{ - CephContext *cct = io.client->cct; - Mutex mylock("RadosClient::getxattr::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->getxattr(oid, io.oloc, - name, io.snap_seq, &bl, 0, - onack, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - ldout(cct, 10) << "Objecter returned from getxattr" << dendl; - - set_sync_op_version(io, ver); - - if (r < 0) - return r; - - return bl.length(); -} - -int librados::RadosClient::rmxattr(IoCtxImpl& io, const object_t& oid, const char *name) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::rmxattr::mylock"); - Cond cond; - bool done; - int r; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->removexattr(oid, io.oloc, name, - io.snapc, ut, 0, - onack, NULL, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - if (r < 0) - return r; - - return 0; -} - -int librados::RadosClient::setxattr(IoCtxImpl& io, const object_t& oid, - const char *name, bufferlist& bl) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::setxattr::mylock"); - Cond cond; - bool done; - int r; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->setxattr(oid, io.oloc, name, - io.snapc, bl, ut, 0, - onack, NULL, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - if (r < 0) - return r; - - return 0; -} - -int librados::RadosClient::getxattrs(IoCtxImpl& io, const object_t& oid, - map<std::string, bufferlist>& attrset) -{ - CephContext *cct = io.client->cct; - Mutex mylock("RadosClient::getexattrs::mylock"); - Cond cond; - bool done; - int r; - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - lock.Lock(); - map<string, bufferlist> aset; - objecter->getxattrs(oid, io.oloc, io.snap_seq, - aset, - 0, onack, &ver, pop); - lock.Unlock(); - - attrset.clear(); - - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - for (map<string,bufferlist>::iterator p = aset.begin(); p != aset.end(); p++) { - ldout(cct, 10) << "RadosClient::getxattrs: xattr=" << p->first << dendl; - attrset[p->first.c_str()] = p->second; - } - - set_sync_op_version(io, ver); - - return r; -} - -void librados::RadosClient::watch_notify(MWatchNotify *m) -{ - assert(lock.is_locked()); - WatchContext *wc = NULL; - map<uint64_t, WatchContext *>::iterator iter = watchers.find(m->cookie); - if (iter != watchers.end()) - wc = iter->second; - - if (!wc) - return; - - wc->notify(this, m); - - m->put(); -} - -int librados::RadosClient::watch(IoCtxImpl& io, const object_t& oid, uint64_t ver, - uint64_t *cookie, librados::WatchCtx *ctx) -{ - ::ObjectOperation rd; - Mutex mylock("RadosClient::watch::mylock"); - Cond cond; - bool done; - int r; - Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t objver; - - lock.Lock(); - - WatchContext *wc; - register_watcher(io, oid, ctx, cookie, &wc); - prepare_assert_ops(&io, &rd); - rd.watch(*cookie, ver, 1); - bufferlist bl; - wc->linger_id = objecter->linger(oid, io.oloc, rd, io.snap_seq, bl, NULL, 0, NULL, onfinish, &objver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, objver); - - if (r < 0) { - lock.Lock(); - unregister_watcher(*cookie); - lock.Unlock(); - } - - return r; -} - - -/* this is called with RadosClient::lock held */ -int librados::RadosClient::_notify_ack(IoCtxImpl& io, const object_t& oid, - uint64_t notify_id, uint64_t ver) -{ - Mutex mylock("RadosClient::watch::mylock"); - Cond cond; - eversion_t objver; - - ::ObjectOperation rd; - prepare_assert_ops(&io, &rd); - rd.notify_ack(notify_id, ver); - objecter->read(oid, io.oloc, rd, io.snap_seq, (bufferlist*)NULL, 0, 0, 0); - - return 0; -} - -int librados::RadosClient::unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie) -{ - bufferlist inbl, outbl; - - Mutex mylock("RadosClient::watch::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - lock.Lock(); - - unregister_watcher(cookie); - - ::ObjectOperation rd; - prepare_assert_ops(&io, &rd); - rd.watch(cookie, 0, 0); - objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl) -{ - bufferlist inbl, outbl; - - Mutex mylock("RadosClient::notify::mylock"); - Mutex mylock_all("RadosClient::notify::mylock_all"); - Cond cond, cond_all; - bool done, done_all; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t objver; - uint64_t cookie; - C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all); - - ::ObjectOperation rd; - prepare_assert_ops(&io, &rd); - - lock.Lock(); - WatchContext *wc; - register_watcher(io, oid, ctx, &cookie, &wc); - uint32_t prot_ver = 1; - uint32_t timeout = io.notify_timeout; - ::encode(prot_ver, inbl); - ::encode(timeout, inbl); - ::encode(bl, inbl); - rd.notify(cookie, ver, inbl); - wc->linger_id = objecter->linger(oid, io.oloc, rd, io.snap_seq, inbl, NULL, - 0, onack, NULL, &objver); - lock.Unlock(); - - mylock_all.Lock(); - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - if (r == 0) { - while (!done_all) - cond_all.Wait(mylock_all); - } - - mylock_all.Unlock(); - - lock.Lock(); - unregister_watcher(cookie); - lock.Unlock(); - - set_sync_op_version(io, objver); - delete ctx; - - return r; -} - -void librados::RadosClient::set_sync_op_version(IoCtxImpl& io, eversion_t& ver) -{ - io.last_objver = ver; -} - -void librados::RadosClient::register_watcher(IoCtxImpl& io, +void librados::RadosClient::register_watcher(WatchContext *wc, const object_t& oid, librados::WatchCtx *ctx, - uint64_t *cookie, - WatchContext **pwc) + uint64_t *cookie) { assert(lock.is_locked()); - WatchContext *wc = new WatchContext(&io, oid, ctx); *cookie = ++max_watch_cookie; watchers[*cookie] = wc; - if (pwc) - *pwc = wc; } void librados::RadosClient::unregister_watcher(uint64_t cookie) @@ -1828,187 +421,18 @@ void librados::RadosClient::unregister_watcher(uint64_t cookie) } } -eversion_t librados::RadosClient::last_version(IoCtxImpl& io) -{ - return io.last_objver; -} - -void librados::RadosClient::set_assert_version(IoCtxImpl& io, uint64_t ver) -{ - io.assert_ver = ver; -} -void librados::RadosClient::set_assert_src_version(IoCtxImpl& io, - const object_t& oid, - uint64_t ver) -{ - io.assert_src_version[oid] = ver; -} - -void librados::RadosClient::set_notify_timeout(IoCtxImpl& io, uint32_t timeout) -{ - io.notify_timeout = timeout; -} - -///////////////////////////// C_aio_Ack //////////////////////////////// - -librados::RadosClient::C_aio_Ack::C_aio_Ack(AioCompletionImpl *_c) : c(_c) -{ - c->get(); -} - -void librados::RadosClient::C_aio_Ack::finish(int r) -{ - c->lock.Lock(); - c->rval = r; - c->ack = true; - c->cond.Signal(); - - if (c->buf && c->bl.length() > 0) { - unsigned l = MIN(c->bl.length(), c->maxlen); - c->bl.copy(0, l, c->buf); - c->rval = c->bl.length(); - } - if (c->pbl) { - *c->pbl = c->bl; - } - - if (c->callback_complete) { - rados_callback_t cb = c->callback_complete; - void *cb_arg = c->callback_arg; - c->lock.Unlock(); - cb(c, cb_arg); - c->lock.Lock(); - } - - c->put_unlock(); -} - -/////////////////////// C_aio_sparse_read_Ack ////////////////////////// - -librados::RadosClient::C_aio_sparse_read_Ack::C_aio_sparse_read_Ack(AioCompletionImpl *_c) - : c(_c) -{ - c->get(); -} - -void librados::RadosClient::C_aio_sparse_read_Ack::finish(int r) -{ - c->lock.Lock(); - c->rval = r; - c->ack = true; - c->cond.Signal(); - - bufferlist::iterator iter = c->bl.begin(); - if (r >= 0) { - ::decode(*m, iter); - ::decode(*data_bl, iter); - } - - if (c->callback_complete) { - rados_callback_t cb = c->callback_complete; - void *cb_arg = c->callback_arg; - c->lock.Unlock(); - cb(c, cb_arg); - c->lock.Lock(); - } - - c->put_unlock(); -} - -//////////////////////////// C_aio_Safe //////////////////////////////// - -librados::RadosClient::C_aio_Safe::C_aio_Safe(AioCompletionImpl *_c) : c(_c) -{ - c->get(); -} - -void librados::RadosClient::C_aio_Safe::finish(int r) -{ - c->lock.Lock(); - if (!c->ack) { - c->rval = r; - c->ack = true; - } - c->safe = true; - c->cond.Signal(); - - if (c->callback_safe) { - rados_callback_t cb = c->callback_safe; - void *cb_arg = c->callback_arg; - c->lock.Unlock(); - cb(c, cb_arg); - c->lock.Lock(); - } - - c->io->complete_aio_write(c); - - c->put_unlock(); -} - -///////////////////////// C_PoolAsync_Safe ///////////////////////////// - -librados::RadosClient::C_PoolAsync_Safe::C_PoolAsync_Safe(PoolAsyncCompletionImpl *_c) - : c(_c) -{ - c->get(); -} - -void librados::RadosClient::C_PoolAsync_Safe::finish(int r) -{ - c->lock.Lock(); - c->rval = r; - c->done = true; - c->cond.Signal(); - - if (c->callback) { - rados_callback_t cb = c->callback; - void *cb_arg = c->callback_arg; - c->lock.Unlock(); - cb(c, cb_arg); - c->lock.Lock(); - } - - c->put_unlock(); -} - -///////////////////////// C_NotifyComplete ///////////////////////////// - -librados::RadosClient::C_NotifyComplete::C_NotifyComplete(Mutex *_l, - Cond *_c, - bool *_d) - : lock(_l), cond(_c), done(_d) -{ - *done = false; -} - -void librados::RadosClient::C_NotifyComplete::notify(uint8_t opcode, - uint64_t ver, - bufferlist& bl) +void librados::RadosClient::watch_notify(MWatchNotify *m) { - *done = true; - cond->Signal(); -} - -/////////////////////////// WatchContext /////////////////////////////// + assert(lock.is_locked()); + WatchContext *wc = NULL; + map<uint64_t, WatchContext *>::iterator iter = watchers.find(m->cookie); + if (iter != watchers.end()) + wc = iter->second; -librados::RadosClient::WatchContext::WatchContext(IoCtxImpl *io_ctx_impl_, - const object_t& _oc, - librados::WatchCtx *_ctx) - : io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0) -{ - io_ctx_impl->get(); -} + if (!wc) + return; -librados::RadosClient::WatchContext::~WatchContext() -{ - io_ctx_impl->put(); -} + wc->notify(m->opcode, m->ver, m->notify_id, m->bl); -void librados::RadosClient::WatchContext::notify(RadosClient *client, - MWatchNotify *m) -{ - ctx->notify(m->opcode, m->ver, m->bl); - if (m->opcode != WATCH_NOTIFY_COMPLETE) { - client->_notify_ack(*io_ctx_impl, oid, m->notify_id, m->ver); - } + m->put(); } diff --git a/src/librados/RadosClient.h b/src/librados/RadosClient.h index 98082df30b7..4ef76d1576f 100644 --- a/src/librados/RadosClient.h +++ b/src/librados/RadosClient.h @@ -22,7 +22,8 @@ #include "mon/MonClient.h" #include "msg/Dispatcher.h" #include "osd/OSDMap.h" -#include "osdc/Objecter.h" + +#include "IoCtxImpl.h" class AuthAuthorizer; class CephContext; @@ -69,50 +70,11 @@ public: int connect(); void shutdown(); + int create_ioctx(const char *name, IoCtxImpl **io); + int64_t lookup_pool(const char *name); - const char *get_pool_name(int64_t poolid_); - ::ObjectOperation *prepare_assert_ops(IoCtxImpl *io, ::ObjectOperation *op); - - // snaps - int snap_list(IoCtxImpl *io, vector<uint64_t> *snaps); - int snap_lookup(IoCtxImpl *io, const char *name, uint64_t *snapid); - int snap_get_name(IoCtxImpl *io, uint64_t snapid, std::string *s); - int snap_get_stamp(IoCtxImpl *io, uint64_t snapid, time_t *t); - int snap_create(rados_ioctx_t io, const char* snapname); - int selfmanaged_snap_create(rados_ioctx_t io, uint64_t *snapid); - int snap_remove(rados_ioctx_t io, const char* snapname); - int rollback(rados_ioctx_t io_, const object_t& oid, const char *snapName); - int selfmanaged_snap_remove(rados_ioctx_t io, uint64_t snapid); - int selfmanaged_snap_rollback_object(rados_ioctx_t io, const object_t& oid, - ::SnapContext& snapc, uint64_t snapid); - - // io - int create(IoCtxImpl& io, const object_t& oid, bool exclusive); - int create(IoCtxImpl& io, const object_t& oid, bool exclusive, const std::string& category); - int write(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len, uint64_t off); - int append(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len); - int write_full(IoCtxImpl& io, const object_t& oid, bufferlist& bl); - int clone_range(IoCtxImpl& io, const object_t& dst_oid, uint64_t dst_offset, - const object_t& src_oid, uint64_t src_offset, uint64_t len); - int read(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len, uint64_t off); - int mapext(IoCtxImpl& io, const object_t& oid, uint64_t off, size_t len, - std::map<uint64_t,uint64_t>& m); - int sparse_read(IoCtxImpl& io, const object_t& oid, std::map<uint64_t,uint64_t>& m, - bufferlist& bl, size_t len, uint64_t off); - int remove(IoCtxImpl& io, const object_t& oid); - int stat(IoCtxImpl& io, const object_t& oid, uint64_t *psize, time_t *pmtime); - int trunc(IoCtxImpl& io, const object_t& oid, uint64_t size); - - int tmap_update(IoCtxImpl& io, const object_t& oid, bufferlist& cmdbl); - int tmap_put(IoCtxImpl& io, const object_t& oid, bufferlist& bl); - int tmap_get(IoCtxImpl& io, const object_t& oid, bufferlist& bl); - - int exec(IoCtxImpl& io, const object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl); - - int getxattr(IoCtxImpl& io, const object_t& oid, const char *name, bufferlist& bl); - int setxattr(IoCtxImpl& io, const object_t& oid, const char *name, bufferlist& bl); - int getxattrs(IoCtxImpl& io, const object_t& oid, map<string, bufferlist>& attrset); - int rmxattr(IoCtxImpl& io, const object_t& oid, const char *name); + const char *get_pool_name(int64_t pool_id); + int pool_get_auid(uint64_t pool_id, unsigned long long *auid); int pool_list(std::list<string>& ls); int get_pool_stats(std::list<string>& ls, map<string,::pool_stat_t>& result); @@ -122,113 +84,17 @@ public: int pool_create_async(string& name, PoolAsyncCompletionImpl *c, unsigned long long auid=0, __u8 crush_rule=0); int pool_delete(const char *name); - int pool_change_auid(rados_ioctx_t io, unsigned long long auid); - int pool_get_auid(rados_ioctx_t io, unsigned long long *auid); int pool_delete_async(const char *name, PoolAsyncCompletionImpl *c); - int pool_change_auid_async(rados_ioctx_t io, unsigned long long auid, PoolAsyncCompletionImpl *c); - - int list(Objecter::ListContext *context, int max_entries); - - int operate(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, time_t *pmtime); - int operate_read(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, bufferlist *pbl); - int aio_operate(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c); - int aio_operate_read(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c, bufferlist *pbl); - - struct C_aio_Ack : public Context { - librados::AioCompletionImpl *c; - C_aio_Ack(AioCompletionImpl *_c); - void finish(int r); - }; - - struct C_aio_sparse_read_Ack : public Context { - AioCompletionImpl *c; - bufferlist *data_bl; - std::map<uint64_t,uint64_t> *m; - C_aio_sparse_read_Ack(AioCompletionImpl *_c); - void finish(int r); - }; - - struct C_aio_Safe : public Context { - AioCompletionImpl *c; - C_aio_Safe(AioCompletionImpl *_c); - void finish(int r); - }; - - int aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c, - bufferlist *pbl, size_t len, uint64_t off); - int aio_read(IoCtxImpl& io, object_t oid, AioCompletionImpl *c, - char *buf, size_t len, uint64_t off); - int aio_sparse_read(IoCtxImpl& io, const object_t oid, - AioCompletionImpl *c, std::map<uint64_t,uint64_t> *m, - bufferlist *data_bl, size_t len, uint64_t off); - int aio_write(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, - const bufferlist& bl, size_t len, uint64_t off); - int aio_append(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, - const bufferlist& bl, size_t len); - int aio_write_full(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, - const bufferlist& bl); - int aio_exec(IoCtxImpl& io, const object_t& oid, AioCompletionImpl *c, - const char *cls, const char *method, bufferlist& inbl, bufferlist *outbl); - - struct C_PoolAsync_Safe : public Context { - PoolAsyncCompletionImpl *c; - C_PoolAsync_Safe(PoolAsyncCompletionImpl *_c); - void finish(int r); - }; - - static PoolAsyncCompletionImpl *pool_async_create_completion(); - static PoolAsyncCompletionImpl *pool_async_create_completion(void *cb_arg, - rados_callback_t cb); - static AioCompletionImpl *aio_create_completion(); - static AioCompletionImpl *aio_create_completion(void *cb_arg, - rados_callback_t cb_complete, - rados_callback_t cb_safe); // watch/notify - struct WatchContext { - IoCtxImpl *io_ctx_impl; - const object_t oid; - uint64_t cookie; - uint64_t ver; - librados::WatchCtx *ctx; - uint64_t linger_id; - - WatchContext(IoCtxImpl *io_ctx_impl_, - const object_t& _oc, - librados::WatchCtx *_ctx); - ~WatchContext(); - void notify(RadosClient *client, MWatchNotify *m); - }; - - struct C_NotifyComplete : public librados::WatchCtx { - Mutex *lock; - Cond *cond; - bool *done; - - C_NotifyComplete(Mutex *_l, Cond *_c, bool *_d); - void notify(uint8_t opcode, uint64_t ver, bufferlist& bl); - }; - uint64_t max_watch_cookie; - map<uint64_t, WatchContext *> watchers; - - void set_sync_op_version(IoCtxImpl& io, eversion_t& ver); + map<uint64_t, librados::WatchContext *> watchers; - void register_watcher(IoCtxImpl& io, const object_t& oid, - librados::WatchCtx *ctx, uint64_t *cookie, - WatchContext **pwc = NULL); + void register_watcher(librados::WatchContext *wc, const object_t& oid, + librados::WatchCtx *ctx, uint64_t *cookie); void unregister_watcher(uint64_t cookie); void watch_notify(MWatchNotify *m); - int watch(IoCtxImpl& io, const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx); - int unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie); - int notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl); - int _notify_ack(IoCtxImpl& io, const object_t& oid, uint64_t notify_id, uint64_t ver); - - eversion_t last_version(IoCtxImpl& io); - void set_assert_version(IoCtxImpl& io, uint64_t ver); - void set_assert_src_version(IoCtxImpl& io, const object_t& oid, uint64_t ver); - void set_notify_timeout(IoCtxImpl& io, uint32_t timeout); }; #endif diff --git a/src/librados/librados.cc b/src/librados/librados.cc index 48c988fccbb..bd113f8b02a 100644 --- a/src/librados/librados.cc +++ b/src/librados/librados.cc @@ -534,47 +534,47 @@ void librados::IoCtx::dup(const IoCtx& rhs) int librados::IoCtx::set_auid(uint64_t auid_) { - return io_ctx_impl->client->pool_change_auid(io_ctx_impl, auid_); + return io_ctx_impl->pool_change_auid(auid_); } int librados::IoCtx::set_auid_async(uint64_t auid_, PoolAsyncCompletion *c) { - return io_ctx_impl->client->pool_change_auid_async(io_ctx_impl, auid_, c->pc); + return io_ctx_impl->pool_change_auid_async(auid_, c->pc); } int librados::IoCtx::get_auid(uint64_t *auid_) { - return io_ctx_impl->client->pool_get_auid(io_ctx_impl, (unsigned long long *)auid_); + return rados_ioctx_pool_get_auid(io_ctx_impl, auid_); } int librados::IoCtx::create(const std::string& oid, bool exclusive) { object_t obj(oid); - return io_ctx_impl->client->create(*io_ctx_impl, obj, exclusive); + return io_ctx_impl->create(obj, exclusive); } int librados::IoCtx::create(const std::string& oid, bool exclusive, const std::string& category) { object_t obj(oid); - return io_ctx_impl->client->create(*io_ctx_impl, obj, exclusive, category); + return io_ctx_impl->create(obj, exclusive, category); } int librados::IoCtx::write(const std::string& oid, bufferlist& bl, size_t len, uint64_t off) { object_t obj(oid); - return io_ctx_impl->client->write(*io_ctx_impl, obj, bl, len, off); + return io_ctx_impl->write(obj, bl, len, off); } int librados::IoCtx::append(const std::string& oid, bufferlist& bl, size_t len) { object_t obj(oid); - return io_ctx_impl->client->append(*io_ctx_impl, obj, bl, len); + return io_ctx_impl->append(obj, bl, len); } int librados::IoCtx::write_full(const std::string& oid, bufferlist& bl) { object_t obj(oid); - return io_ctx_impl->client->write_full(*io_ctx_impl, obj, bl); + return io_ctx_impl->write_full(obj, bl); } int librados::IoCtx::clone_range(const std::string& dst_oid, uint64_t dst_off, @@ -582,94 +582,94 @@ int librados::IoCtx::clone_range(const std::string& dst_oid, uint64_t dst_off, size_t len) { object_t src(src_oid), dst(dst_oid); - return io_ctx_impl->client->clone_range(*io_ctx_impl, dst, dst_off, src, src_off, len); + return io_ctx_impl->clone_range(dst, dst_off, src, src_off, len); } int librados::IoCtx::read(const std::string& oid, bufferlist& bl, size_t len, uint64_t off) { object_t obj(oid); - return io_ctx_impl->client->read(*io_ctx_impl, obj, bl, len, off); + return io_ctx_impl->read(obj, bl, len, off); } int librados::IoCtx::remove(const std::string& oid) { object_t obj(oid); - return io_ctx_impl->client->remove(*io_ctx_impl, obj); + return io_ctx_impl->remove(obj); } int librados::IoCtx::trunc(const std::string& oid, uint64_t size) { object_t obj(oid); - return io_ctx_impl->client->trunc(*io_ctx_impl, obj, size); + return io_ctx_impl->trunc(obj, size); } int librados::IoCtx::mapext(const std::string& oid, uint64_t off, size_t len, std::map<uint64_t,uint64_t>& m) { object_t obj(oid); - return io_ctx_impl->client->mapext(*io_ctx_impl, oid, off, len, m); + return io_ctx_impl->mapext(oid, off, len, m); } int librados::IoCtx::sparse_read(const std::string& oid, std::map<uint64_t,uint64_t>& m, bufferlist& bl, size_t len, uint64_t off) { object_t obj(oid); - return io_ctx_impl->client->sparse_read(*io_ctx_impl, oid, m, bl, len, off); + return io_ctx_impl->sparse_read(oid, m, bl, len, off); } int librados::IoCtx::getxattr(const std::string& oid, const char *name, bufferlist& bl) { object_t obj(oid); - return io_ctx_impl->client->getxattr(*io_ctx_impl, obj, name, bl); + return io_ctx_impl->getxattr(obj, name, bl); } int librados::IoCtx::getxattrs(const std::string& oid, map<std::string, bufferlist>& attrset) { object_t obj(oid); - return io_ctx_impl->client->getxattrs(*io_ctx_impl, obj, attrset); + return io_ctx_impl->getxattrs(obj, attrset); } int librados::IoCtx::setxattr(const std::string& oid, const char *name, bufferlist& bl) { object_t obj(oid); - return io_ctx_impl->client->setxattr(*io_ctx_impl, obj, name, bl); + return io_ctx_impl->setxattr(obj, name, bl); } int librados::IoCtx::rmxattr(const std::string& oid, const char *name) { object_t obj(oid); - return io_ctx_impl->client->rmxattr(*io_ctx_impl, obj, name); + return io_ctx_impl->rmxattr(obj, name); } int librados::IoCtx::stat(const std::string& oid, uint64_t *psize, time_t *pmtime) { object_t obj(oid); - return io_ctx_impl->client->stat(*io_ctx_impl, oid, psize, pmtime); + return io_ctx_impl->stat(oid, psize, pmtime); } int librados::IoCtx::exec(const std::string& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl) { object_t obj(oid); - return io_ctx_impl->client->exec(*io_ctx_impl, obj, cls, method, inbl, outbl); + return io_ctx_impl->exec(obj, cls, method, inbl, outbl); } int librados::IoCtx::tmap_update(const std::string& oid, bufferlist& cmdbl) { object_t obj(oid); - return io_ctx_impl->client->tmap_update(*io_ctx_impl, obj, cmdbl); + return io_ctx_impl->tmap_update(obj, cmdbl); } int librados::IoCtx::tmap_put(const std::string& oid, bufferlist& bl) { object_t obj(oid); - return io_ctx_impl->client->tmap_put(*io_ctx_impl, obj, bl); + return io_ctx_impl->tmap_put(obj, bl); } int librados::IoCtx::tmap_get(const std::string& oid, bufferlist& bl) { object_t obj(oid); - return io_ctx_impl->client->tmap_get(*io_ctx_impl, obj, bl); + return io_ctx_impl->tmap_get(obj, bl); } int librados::IoCtx::omap_get_vals(const std::string& oid, @@ -767,25 +767,25 @@ int librados::IoCtx::omap_rm_keys(const std::string& oid, int librados::IoCtx::operate(const std::string& oid, librados::ObjectWriteOperation *o) { object_t obj(oid); - return io_ctx_impl->client->operate(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, o->pmtime); + return io_ctx_impl->operate(obj, (::ObjectOperation*)o->impl, o->pmtime); } int librados::IoCtx::operate(const std::string& oid, librados::ObjectReadOperation *o, bufferlist *pbl) { object_t obj(oid); - return io_ctx_impl->client->operate_read(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, pbl); + return io_ctx_impl->operate_read(obj, (::ObjectOperation*)o->impl, pbl); } int librados::IoCtx::aio_operate(const std::string& oid, AioCompletion *c, librados::ObjectWriteOperation *o) { object_t obj(oid); - return io_ctx_impl->client->aio_operate(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, c->pc); + return io_ctx_impl->aio_operate(obj, (::ObjectOperation*)o->impl, c->pc); } int librados::IoCtx::aio_operate(const std::string& oid, AioCompletion *c, librados::ObjectReadOperation *o, bufferlist *pbl) { object_t obj(oid); - return io_ctx_impl->client->aio_operate_read(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, c->pc, pbl); + return io_ctx_impl->aio_operate_read(obj, (::ObjectOperation*)o->impl, c->pc, pbl); } void librados::IoCtx::snap_set_read(snap_t seq) @@ -804,55 +804,54 @@ int librados::IoCtx::selfmanaged_snap_set_write_ctx(snap_t seq, vector<snap_t>& int librados::IoCtx::snap_create(const char *snapname) { - return io_ctx_impl->client->snap_create(io_ctx_impl, snapname); + return io_ctx_impl->snap_create(snapname); } int librados::IoCtx::snap_lookup(const char *name, snap_t *snapid) { - return io_ctx_impl->client->snap_lookup(io_ctx_impl, name, snapid); + return io_ctx_impl->snap_lookup(name, snapid); } int librados::IoCtx::snap_get_stamp(snap_t snapid, time_t *t) { - return io_ctx_impl->client->snap_get_stamp(io_ctx_impl, snapid, t); + return io_ctx_impl->snap_get_stamp(snapid, t); } int librados::IoCtx::snap_get_name(snap_t snapid, std::string *s) { - return io_ctx_impl->client->snap_get_name(io_ctx_impl, snapid, s); + return io_ctx_impl->snap_get_name(snapid, s); } int librados::IoCtx::snap_remove(const char *snapname) { - return io_ctx_impl->client->snap_remove(io_ctx_impl, snapname); + return io_ctx_impl->snap_remove(snapname); } int librados::IoCtx::snap_list(std::vector<snap_t> *snaps) { - return io_ctx_impl->client->snap_list(io_ctx_impl, snaps); + return io_ctx_impl->snap_list(snaps); } int librados::IoCtx::rollback(const std::string& oid, const char *snapname) { - return io_ctx_impl->client->rollback(io_ctx_impl, oid, snapname); + return io_ctx_impl->rollback(oid, snapname); } int librados::IoCtx::selfmanaged_snap_create(uint64_t *snapid) { - return io_ctx_impl->client->selfmanaged_snap_create(io_ctx_impl, snapid); + return io_ctx_impl->selfmanaged_snap_create(snapid); } int librados::IoCtx::selfmanaged_snap_remove(uint64_t snapid) { - return io_ctx_impl->client->selfmanaged_snap_remove(io_ctx_impl, snapid); + return io_ctx_impl->selfmanaged_snap_remove(snapid); } int librados::IoCtx::selfmanaged_snap_rollback(const std::string& oid, uint64_t snapid) { - return io_ctx_impl->client->selfmanaged_snap_rollback_object(io_ctx_impl, - oid, - io_ctx_impl->snapc, - snapid); + return io_ctx_impl->selfmanaged_snap_rollback_object(oid, + io_ctx_impl->snapc, + snapid); } librados::ObjectIterator librados::IoCtx::objects_begin() @@ -871,48 +870,50 @@ const librados::ObjectIterator& librados::IoCtx::objects_end() const uint64_t librados::IoCtx::get_last_version() { - eversion_t ver = io_ctx_impl->client->last_version(*io_ctx_impl); + eversion_t ver = io_ctx_impl->last_version(); return ver.version; } int librados::IoCtx::aio_read(const std::string& oid, librados::AioCompletion *c, bufferlist *pbl, size_t len, uint64_t off) { - return io_ctx_impl->client->aio_read(*io_ctx_impl, oid, c->pc, pbl, len, off); + return io_ctx_impl->aio_read(oid, c->pc, pbl, len, off); } -int librados::IoCtx::aio_exec(const std::string& oid, librados::AioCompletion *c, const char *cls, const char *method, - bufferlist& inbl, bufferlist *outbl) +int librados::IoCtx::aio_exec(const std::string& oid, + librados::AioCompletion *c, const char *cls, + const char *method, bufferlist& inbl, + bufferlist *outbl) { object_t obj(oid); - return io_ctx_impl->client->aio_exec(*io_ctx_impl, obj, c->pc, cls, method, inbl, outbl); + return io_ctx_impl->aio_exec(obj, c->pc, cls, method, inbl, outbl); } int librados::IoCtx::aio_sparse_read(const std::string& oid, librados::AioCompletion *c, std::map<uint64_t,uint64_t> *m, bufferlist *data_bl, size_t len, uint64_t off) { - return io_ctx_impl->client->aio_sparse_read(*io_ctx_impl, oid, c->pc, - m, data_bl, len, off); + return io_ctx_impl->aio_sparse_read(oid, c->pc, + m, data_bl, len, off); } int librados::IoCtx::aio_write(const std::string& oid, librados::AioCompletion *c, const bufferlist& bl, size_t len, uint64_t off) { - return io_ctx_impl->client->aio_write(*io_ctx_impl, oid, c->pc, bl, len, off); + return io_ctx_impl->aio_write(oid, c->pc, bl, len, off); } int librados::IoCtx::aio_append(const std::string& oid, librados::AioCompletion *c, const bufferlist& bl, size_t len) { - return io_ctx_impl->client->aio_append(*io_ctx_impl, oid, c->pc, bl, len); + return io_ctx_impl->aio_append(oid, c->pc, bl, len); } int librados::IoCtx::aio_write_full(const std::string& oid, librados::AioCompletion *c, const bufferlist& bl) { object_t obj(oid); - return io_ctx_impl->client->aio_write_full(*io_ctx_impl, obj, c->pc, bl); + return io_ctx_impl->aio_write_full(obj, c->pc, bl); } int librados::IoCtx::aio_flush() @@ -925,36 +926,36 @@ int librados::IoCtx::watch(const string& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx) { object_t obj(oid); - return io_ctx_impl->client->watch(*io_ctx_impl, obj, ver, cookie, ctx); + return io_ctx_impl->watch(obj, ver, cookie, ctx); } int librados::IoCtx::unwatch(const string& oid, uint64_t handle) { uint64_t cookie = handle; object_t obj(oid); - return io_ctx_impl->client->unwatch(*io_ctx_impl, obj, cookie); + return io_ctx_impl->unwatch(obj, cookie); } int librados::IoCtx::notify(const string& oid, uint64_t ver, bufferlist& bl) { object_t obj(oid); - return io_ctx_impl->client->notify(*io_ctx_impl, obj, ver, bl); + return io_ctx_impl->notify(obj, ver, bl); } void librados::IoCtx::set_notify_timeout(uint32_t timeout) { - io_ctx_impl->client->set_notify_timeout(*io_ctx_impl, timeout); + io_ctx_impl->set_notify_timeout(timeout); } void librados::IoCtx::set_assert_version(uint64_t ver) { - io_ctx_impl->client->set_assert_version(*io_ctx_impl, ver); + io_ctx_impl->set_assert_version(ver); } void librados::IoCtx::set_assert_src_version(const std::string& oid, uint64_t ver) { object_t obj(oid); - io_ctx_impl->client->set_assert_src_version(*io_ctx_impl, obj, ver); + io_ctx_impl->set_assert_src_version(obj, ver); } const std::string& librados::IoCtx::get_pool_name() const @@ -1199,13 +1200,13 @@ int librados::Rados::cluster_stat(cluster_stat_t& result) librados::PoolAsyncCompletion *librados::Rados::pool_async_create_completion() { - PoolAsyncCompletionImpl *c = RadosClient::pool_async_create_completion(); + PoolAsyncCompletionImpl *c = new PoolAsyncCompletionImpl; return new PoolAsyncCompletion(c); } librados::AioCompletion *librados::Rados::aio_create_completion() { - AioCompletionImpl *c = RadosClient::aio_create_completion(); + AioCompletionImpl *c = new AioCompletionImpl; return new AioCompletion(c); } @@ -1213,11 +1214,12 @@ librados::AioCompletion *librados::Rados::aio_create_completion(void *cb_arg, callback_t cb_complete, callback_t cb_safe) { - AioCompletionImpl *c = RadosClient::aio_create_completion(cb_arg, cb_complete, cb_safe); + AioCompletionImpl *c; + int r = rados_aio_create_completion(cb_arg, cb_complete, cb_safe, (void**)&c); + assert(r == 0); return new AioCompletion(c); } - librados::ObjectOperation::ObjectOperation() { impl = (ObjectOperationImpl *)new ::ObjectOperation; @@ -1399,14 +1401,13 @@ extern "C" int rados_pool_list(rados_t cluster, char *buf, size_t len) extern "C" int rados_ioctx_create(rados_t cluster, const char *name, rados_ioctx_t *io) { - librados::RadosClient *radosp = (librados::RadosClient *)cluster; - int64_t poolid = radosp->lookup_pool(name); - if (poolid < 0) - return (int)poolid; + librados::RadosClient *client = (librados::RadosClient *)cluster; + librados::IoCtxImpl *ctx; + + int r = client->create_ioctx(name, &ctx); + if (r < 0) + return r; - librados::IoCtxImpl *ctx = new librados::IoCtxImpl(radosp, poolid, name, CEPH_NOSNAP); - if (!ctx) - return -ENOMEM; *io = ctx; ctx->get(); return 0; @@ -1474,7 +1475,7 @@ extern "C" int rados_write(rados_ioctx_t io, const char *o, const char *buf, siz object_t oid(o); bufferlist bl; bl.append(buf, len); - return ctx->client->write(*ctx, oid, bl, len, off); + return ctx->write(oid, bl, len, off); } extern "C" int rados_append(rados_ioctx_t io, const char *o, const char *buf, size_t len) @@ -1483,7 +1484,7 @@ extern "C" int rados_append(rados_ioctx_t io, const char *o, const char *buf, si object_t oid(o); bufferlist bl; bl.append(buf, len); - return ctx->client->append(*ctx, oid, bl, len); + return ctx->append(oid, bl, len); } extern "C" int rados_write_full(rados_ioctx_t io, const char *o, const char *buf, size_t len) @@ -1492,7 +1493,7 @@ extern "C" int rados_write_full(rados_ioctx_t io, const char *o, const char *buf object_t oid(o); bufferlist bl; bl.append(buf, len); - return ctx->client->write_full(*ctx, oid, bl); + return ctx->write_full(oid, bl); } extern "C" int rados_clone_range(rados_ioctx_t io, const char *dst, uint64_t dst_off, @@ -1500,21 +1501,21 @@ extern "C" int rados_clone_range(rados_ioctx_t io, const char *dst, uint64_t dst { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; object_t dst_oid(dst), src_oid(src); - return ctx->client->clone_range(*ctx, dst_oid, dst_off, src_oid, src_off, len); + return ctx->clone_range(dst_oid, dst_off, src_oid, src_off, len); } extern "C" int rados_trunc(rados_ioctx_t io, const char *o, uint64_t size) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; object_t oid(o); - return ctx->client->trunc(*ctx, oid, size); + return ctx->trunc(oid, size); } extern "C" int rados_remove(rados_ioctx_t io, const char *o) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; object_t oid(o); - return ctx->client->remove(*ctx, oid); + return ctx->remove(oid); } extern "C" int rados_read(rados_ioctx_t io, const char *o, char *buf, size_t len, uint64_t off) @@ -1527,7 +1528,7 @@ extern "C" int rados_read(rados_ioctx_t io, const char *o, char *buf, size_t len bufferptr bp = buffer::create_static(len, buf); bl.push_back(bp); - ret = ctx->client->read(*ctx, oid, bl, len, off); + ret = ctx->read(oid, bl, len, off); if (ret >= 0) { if (bl.length() > len) return -ERANGE; @@ -1542,7 +1543,7 @@ extern "C" int rados_read(rados_ioctx_t io, const char *o, char *buf, size_t len extern "C" uint64_t rados_get_last_version(rados_ioctx_t io) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - eversion_t ver = ctx->client->last_version(*ctx); + eversion_t ver = ctx->last_version(); return ver.version; } @@ -1586,13 +1587,13 @@ extern "C" int rados_pool_delete(rados_t cluster, const char *pool_name) extern "C" int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->pool_change_auid(ctx, auid); + return ctx->pool_change_auid(auid); } extern "C" int rados_ioctx_pool_get_auid(rados_ioctx_t io, uint64_t *auid) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->pool_get_auid(ctx, (unsigned long long *)auid); + return ctx->client->pool_get_auid(ctx->get_id(), (unsigned long long *)auid); } extern "C" void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key) @@ -1614,34 +1615,34 @@ extern "C" int64_t rados_ioctx_get_id(rados_ioctx_t io) extern "C" int rados_ioctx_snap_create(rados_ioctx_t io, const char *snapname) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->snap_create(ctx, snapname); + return ctx->snap_create(snapname); } extern "C" int rados_ioctx_snap_remove(rados_ioctx_t io, const char *snapname) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->snap_remove(ctx, snapname); + return ctx->snap_remove(snapname); } extern "C" int rados_rollback(rados_ioctx_t io, const char *oid, const char *snapname) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->rollback(ctx, oid, snapname); + return ctx->rollback(oid, snapname); } extern "C" int rados_ioctx_selfmanaged_snap_create(rados_ioctx_t io, uint64_t *snapid) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->selfmanaged_snap_create(ctx, snapid); + return ctx->selfmanaged_snap_create(snapid); } extern "C" int rados_ioctx_selfmanaged_snap_remove(rados_ioctx_t io, uint64_t snapid) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->selfmanaged_snap_remove(ctx, snapid); + return ctx->selfmanaged_snap_remove(snapid); } extern "C" int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io, @@ -1649,7 +1650,7 @@ extern "C" int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io, uint64_t snapid) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->selfmanaged_snap_rollback_object(ctx, oid, ctx->snapc, snapid); + return ctx->selfmanaged_snap_rollback_object(oid, ctx->snapc, snapid); } extern "C" int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t *snaps, @@ -1657,7 +1658,7 @@ extern "C" int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t *snaps, { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; vector<uint64_t> snapvec; - int r = ctx->client->snap_list(ctx, &snapvec); + int r = ctx->snap_list(&snapvec); if (r < 0) return r; if ((int)snapvec.size() <= maxlen) { @@ -1672,7 +1673,7 @@ extern "C" int rados_ioctx_snap_lookup(rados_ioctx_t io, const char *name, rados_snap_t *id) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->snap_lookup(ctx, name, (uint64_t *)id); + return ctx->snap_lookup(name, (uint64_t *)id); } extern "C" int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, @@ -1680,7 +1681,7 @@ extern "C" int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; std::string sname; - int r = ctx->client->snap_get_name(ctx, id, &sname); + int r = ctx->snap_get_name(id, &sname); if (r < 0) return r; if ((int)sname.length() >= maxlen) @@ -1692,7 +1693,7 @@ extern "C" int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, extern "C" int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t *t) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->snap_get_stamp(ctx, id, t); + return ctx->snap_get_stamp(id, t); } extern "C" int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, @@ -1702,7 +1703,7 @@ extern "C" int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, int ret; object_t oid(o); bufferlist bl; - ret = ctx->client->getxattr(*ctx, oid, name, bl); + ret = ctx->getxattr(oid, name, bl); if (ret >= 0) { if (bl.length() > len) return -ERANGE; @@ -1738,7 +1739,7 @@ extern "C" int rados_getxattrs(rados_ioctx_t io, const char *oid, return -ENOMEM; librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; object_t obj(oid); - int ret = ctx->client->getxattrs(*ctx, obj, it->attrset); + int ret = ctx->getxattrs(obj, it->attrset); if (ret) { delete it; return ret; @@ -1788,21 +1789,21 @@ extern "C" int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, object_t oid(o); bufferlist bl; bl.append(buf, len); - return ctx->client->setxattr(*ctx, oid, name, bl); + return ctx->setxattr(oid, name, bl); } extern "C" int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; object_t oid(o); - return ctx->client->rmxattr(*ctx, oid, name); + return ctx->rmxattr(oid, name); } extern "C" int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; object_t oid(o); - return ctx->client->stat(*ctx, oid, psize, pmtime); + return ctx->stat(oid, psize, pmtime); } extern "C" int rados_tmap_update(rados_ioctx_t io, const char *o, const char *cmdbuf, size_t cmdbuflen) @@ -1811,7 +1812,7 @@ extern "C" int rados_tmap_update(rados_ioctx_t io, const char *o, const char *cm object_t oid(o); bufferlist cmdbl; cmdbl.append(cmdbuf, cmdbuflen); - return ctx->client->tmap_update(*ctx, oid, cmdbl); + return ctx->tmap_update(oid, cmdbl); } extern "C" int rados_tmap_put(rados_ioctx_t io, const char *o, const char *buf, size_t buflen) @@ -1820,7 +1821,7 @@ extern "C" int rados_tmap_put(rados_ioctx_t io, const char *o, const char *buf, object_t oid(o); bufferlist bl; bl.append(buf, buflen); - return ctx->client->tmap_put(*ctx, oid, bl); + return ctx->tmap_put(oid, bl); } extern "C" int rados_tmap_get(rados_ioctx_t io, const char *o, char *buf, size_t buflen) @@ -1828,7 +1829,7 @@ extern "C" int rados_tmap_get(rados_ioctx_t io, const char *o, char *buf, size_t librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; object_t oid(o); bufferlist bl; - int r = ctx->client->tmap_get(*ctx, oid, bl); + int r = ctx->tmap_get(oid, bl); if (r < 0) return r; if (bl.length() > buflen) @@ -1845,7 +1846,7 @@ extern "C" int rados_exec(rados_ioctx_t io, const char *o, const char *cls, cons bufferlist inbl, outbl; int ret; inbl.append(inbuf, in_len); - ret = ctx->client->exec(*ctx, oid, cls, method, inbl, outbl); + ret = ctx->exec(oid, cls, method, inbl, outbl); if (ret >= 0) { if (outbl.length()) { if (outbl.length() > out_len) @@ -1887,7 +1888,7 @@ extern "C" int rados_objects_list_next(rados_list_ctx_t listctx, const char **en h->list.pop_front(); if (h->list.empty()) { - ret = lh->ctx->client->list(lh->lc, RADOS_LIST_MAX_ENTRIES); + ret = lh->ctx->list(lh->lc, RADOS_LIST_MAX_ENTRIES); if (ret < 0) return ret; if (h->list.empty()) @@ -1910,10 +1911,17 @@ extern "C" int rados_objects_list_next(rados_list_ctx_t listctx, const char **en // ------------------------- // aio -extern "C" int rados_aio_create_completion(void *cb_arg, rados_callback_t cb_complete, - rados_callback_t cb_safe, rados_completion_t *pc) -{ - *pc = librados::RadosClient::aio_create_completion(cb_arg, cb_complete, cb_safe); +extern "C" int rados_aio_create_completion(void *cb_arg, + rados_callback_t cb_complete, + rados_callback_t cb_safe, + rados_completion_t *pc) +{ + librados::AioCompletionImpl *c = new librados::AioCompletionImpl; + if (cb_complete) + c->set_complete_callback(cb_arg, cb_complete); + if (cb_safe) + c->set_safe_callback(cb_arg, cb_safe); + *pc = c; return 0; } @@ -1958,8 +1966,8 @@ extern "C" int rados_aio_read(rados_ioctx_t io, const char *o, { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; object_t oid(o); - return ctx->client->aio_read(*ctx, oid, - (librados::AioCompletionImpl*)completion, buf, len, off); + return ctx->aio_read(oid, (librados::AioCompletionImpl*)completion, + buf, len, off); } extern "C" int rados_aio_write(rados_ioctx_t io, const char *o, @@ -1970,8 +1978,8 @@ extern "C" int rados_aio_write(rados_ioctx_t io, const char *o, object_t oid(o); bufferlist bl; bl.append(buf, len); - return ctx->client->aio_write(*ctx, oid, - (librados::AioCompletionImpl*)completion, bl, len, off); + return ctx->aio_write(oid, (librados::AioCompletionImpl*)completion, + bl, len, off); } extern "C" int rados_aio_append(rados_ioctx_t io, const char *o, @@ -1982,20 +1990,19 @@ extern "C" int rados_aio_append(rados_ioctx_t io, const char *o, object_t oid(o); bufferlist bl; bl.append(buf, len); - return ctx->client->aio_append(*ctx, oid, - (librados::AioCompletionImpl*)completion, bl, len); + return ctx->aio_append(oid, (librados::AioCompletionImpl*)completion, + bl, len); } extern "C" int rados_aio_write_full(rados_ioctx_t io, const char *o, - rados_completion_t completion, - const char *buf, size_t len) + rados_completion_t completion, + const char *buf, size_t len) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; object_t oid(o); bufferlist bl; bl.append(buf, len); - return ctx->client->aio_write_full(*ctx, oid, - (librados::AioCompletionImpl*)completion, bl); + return ctx->aio_write_full(oid, (librados::AioCompletionImpl*)completion, bl); } extern "C" int rados_aio_flush(rados_ioctx_t io) @@ -2021,7 +2028,7 @@ int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, uint64_t *handle, librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; object_t oid(o); C_WatchCB *wc = new C_WatchCB(watchcb, arg); - return ctx->client->watch(*ctx, oid, ver, cookie, wc); + return ctx->watch(oid, ver, cookie, wc); } int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle) @@ -2029,7 +2036,7 @@ int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle) uint64_t cookie = handle; librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; object_t oid(o); - return ctx->client->unwatch(*ctx, oid, cookie); + return ctx->unwatch(oid, cookie); } int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len) @@ -2042,5 +2049,5 @@ int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, memcpy(p.c_str(), buf, buf_len); bl.push_back(p); } - return ctx->client->notify(*ctx, oid, ver, bl); + return ctx->notify(oid, ver, bl); } |