diff options
author | Josh Durgin <josh.durgin@dreamhost.com> | 2012-02-29 16:28:20 -0800 |
---|---|---|
committer | Josh Durgin <josh.durgin@dreamhost.com> | 2012-03-13 11:46:02 -0700 |
commit | 8f278647f4aac364c543059a039610d8a28a6b4d (patch) | |
tree | c6cfe8f5d619b5155766d42f66171f0add3d908a /src/librados | |
parent | 3bba6b72e472d000cac53d67ebbf738446c111f6 (diff) | |
download | ceph-8f278647f4aac364c543059a039610d8a28a6b4d.tar.gz |
librados: split into separate files and remove unnecessary headers
Signed-off-by: Josh Durgin <josh.durgin@dreamhost.com>
Diffstat (limited to 'src/librados')
-rw-r--r-- | src/librados/AioCompletionImpl.h | 133 | ||||
-rw-r--r-- | src/librados/IoCtxImpl.cc | 87 | ||||
-rw-r--r-- | src/librados/IoCtxImpl.h | 87 | ||||
-rw-r--r-- | src/librados/PoolAsyncCompletionImpl.h | 88 | ||||
-rw-r--r-- | src/librados/RadosClient.cc | 2014 | ||||
-rw-r--r-- | src/librados/RadosClient.h | 234 | ||||
-rw-r--r-- | src/librados/librados.cc | 2046 |
7 files changed, 4689 insertions, 0 deletions
diff --git a/src/librados/AioCompletionImpl.h b/src/librados/AioCompletionImpl.h new file mode 100644 index 00000000000..02be2f3984d --- /dev/null +++ b/src/librados/AioCompletionImpl.h @@ -0,0 +1,133 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_LIBRADOS_AIOCOMPLETIONIMPL_H +#define CEPH_LIBRADOS_AIOCOMPLETIONIMPL_H + +#include "common/Cond.h" +#include "common/Mutex.h" + +#include "include/buffer.h" +#include "include/rados/librados.h" +#include "include/rados/librados.hpp" +#include "include/xlist.h" +#include "osd/osd_types.h" + +class IoCtxImpl; + +struct librados::AioCompletionImpl { + Mutex lock; + Cond cond; + int ref, rval; + bool released; + bool ack, safe; + eversion_t objver; + + rados_callback_t callback_complete, callback_safe; + void *callback_arg; + + // for read + bufferlist bl, *pbl; + char *buf; + unsigned maxlen; + + IoCtxImpl *io; + tid_t aio_write_seq; + xlist<AioCompletionImpl*>::item aio_write_list_item; + + AioCompletionImpl() : lock("AioCompletionImpl lock"), + ref(1), rval(0), released(false), ack(false), safe(false), + callback_complete(0), callback_safe(0), callback_arg(0), + pbl(0), buf(0), maxlen(0), + io(NULL), aio_write_seq(0), aio_write_list_item(this) { } + + int set_complete_callback(void *cb_arg, rados_callback_t cb) { + lock.Lock(); + callback_complete = cb; + callback_arg = cb_arg; + lock.Unlock(); + return 0; + } + int set_safe_callback(void *cb_arg, rados_callback_t cb) { + lock.Lock(); + callback_safe = cb; + callback_arg = cb_arg; + lock.Unlock(); + return 0; + } + int wait_for_complete() { + lock.Lock(); + while (!ack) + cond.Wait(lock); + lock.Unlock(); + return 0; + } + int wait_for_safe() { + lock.Lock(); + while (!safe) + cond.Wait(lock); + lock.Unlock(); + return 0; + } + int is_complete() { + lock.Lock(); + int r = ack; + lock.Unlock(); + return r; + } + int is_safe() { + lock.Lock(); + int r = safe; + lock.Unlock(); + return r; + } + int get_return_value() { + lock.Lock(); + int r = rval; + lock.Unlock(); + return r; + } + uint64_t get_version() { + lock.Lock(); + eversion_t v = objver; + lock.Unlock(); + return v.version; + } + + void get() { + lock.Lock(); + assert(ref > 0); + ref++; + lock.Unlock(); + } + void release() { + lock.Lock(); + assert(!released); + released = true; + put_unlock(); + } + void put() { + lock.Lock(); + put_unlock(); + } + void put_unlock() { + assert(ref > 0); + int n = --ref; + lock.Unlock(); + if (!n) + delete this; + } +}; + +#endif diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc new file mode 100644 index 00000000000..f32556d8a6d --- /dev/null +++ b/src/librados/IoCtxImpl.cc @@ -0,0 +1,87 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "IoCtxImpl.h" + +#include "librados/AioCompletionImpl.h" +#include "librados/RadosClient.h" + +#define DOUT_SUBSYS rados +#undef dout_prefix +#define dout_prefix *_dout << "librados: " + +librados::IoCtxImpl::IoCtxImpl() + : aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock") +{ +} + +librados::IoCtxImpl::IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s) + : ref_cnt(0), client(c), poolid(pid), + pool_name(pool_name_), snap_seq(s), assert_ver(0), + notify_timeout(c->cct->_conf->client_notify_timeout), oloc(pid), + aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"), aio_write_seq(0) +{ +} + +void librados::IoCtxImpl::set_snap_read(snapid_t s) +{ + if (!s) + s = CEPH_NOSNAP; + ldout(client->cct, 10) << "set snap read " << snap_seq << " -> " << s << dendl; + snap_seq = s; +} + +int librados::IoCtxImpl::set_snap_write_context(snapid_t seq, vector<snapid_t>& snaps) +{ + ::SnapContext n; + ldout(client->cct, 10) << "set snap write context: seq = " << seq << " and snaps = " << snaps << dendl; + n.seq = seq; + n.snaps = snaps; + if (!n.is_valid()) + return -EINVAL; + snapc = n; + return 0; +} + +void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c) +{ + get(); + aio_write_list_lock.Lock(); + assert(!c->io); + c->io = this; + c->aio_write_seq = ++aio_write_seq; + aio_write_list.push_back(&c->aio_write_list_item); + aio_write_list_lock.Unlock(); +} + +void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c) +{ + aio_write_list_lock.Lock(); + assert(c->io == this); + c->io = NULL; + c->aio_write_list_item.remove_myself(); + aio_write_cond.Signal(); + aio_write_list_lock.Unlock(); + put(); +} + +void librados::IoCtxImpl::flush_aio_writes() +{ + aio_write_list_lock.Lock(); + tid_t seq = aio_write_seq; + while (!aio_write_list.empty() && + aio_write_list.front()->aio_write_seq <= seq) + aio_write_cond.Wait(aio_write_list_lock); + aio_write_list_lock.Unlock(); +} diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h new file mode 100644 index 00000000000..950e165e2eb --- /dev/null +++ b/src/librados/IoCtxImpl.h @@ -0,0 +1,87 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_LIBRADOS_IOCTXIMPL_H +#define CEPH_LIBRADOS_IOCTXIMPL_H + +#include "common/Cond.h" +#include "common/Mutex.h" +#include "common/snap_types.h" +#include "include/atomic.h" +#include "include/rados.h" +#include "include/rados/librados.h" +#include "include/rados/librados.hpp" +#include "include/types.h" +#include "include/xlist.h" +#include "osd/osd_types.h" + +class RadosClient; + +struct librados::IoCtxImpl { + atomic_t ref_cnt; + RadosClient *client; + int64_t poolid; + string pool_name; + snapid_t snap_seq; + ::SnapContext snapc; + uint64_t assert_ver; + map<object_t, uint64_t> assert_src_version; + eversion_t last_objver; + uint32_t notify_timeout; + object_locator_t oloc; + + Mutex aio_write_list_lock; + tid_t aio_write_seq; + Cond aio_write_cond; + xlist<AioCompletionImpl*> aio_write_list; + + IoCtxImpl(); + IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s); + + void dup(const IoCtxImpl& rhs) { + // Copy everything except the ref count + client = rhs.client; + poolid = rhs.poolid; + pool_name = rhs.pool_name; + snap_seq = rhs.snap_seq; + snapc = rhs.snapc; + assert_ver = rhs.assert_ver; + assert_src_version = rhs.assert_src_version; + last_objver = rhs.last_objver; + notify_timeout = rhs.notify_timeout; + oloc = rhs.oloc; + } + + void set_snap_read(snapid_t s); + int set_snap_write_context(snapid_t seq, vector<snapid_t>& snaps); + + void get() { + ref_cnt.inc(); + } + + void put() { + if (ref_cnt.dec() == 0) + delete this; + } + + void queue_aio_write(struct AioCompletionImpl *c); + void complete_aio_write(struct AioCompletionImpl *c); + void flush_aio_writes(); + + int64_t get_id() { + return poolid; + } +}; + +#endif diff --git a/src/librados/PoolAsyncCompletionImpl.h b/src/librados/PoolAsyncCompletionImpl.h new file mode 100644 index 00000000000..e204aaf2a53 --- /dev/null +++ b/src/librados/PoolAsyncCompletionImpl.h @@ -0,0 +1,88 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_LIBRADOS_POOLASYNCCOMPLETIONIMPL_H +#define CEPH_LIBRADOS_POOLASYNCCOMPLETIONIMPL_H + +#include "common/Cond.h" +#include "common/Mutex.h" +#include "include/rados/librados.h" +#include "include/rados/librados.hpp" + +struct librados::PoolAsyncCompletionImpl { + Mutex lock; + Cond cond; + int ref, rval; + bool released; + bool done; + + rados_callback_t callback; + void *callback_arg; + + PoolAsyncCompletionImpl() : lock("PoolAsyncCompletionImpl lock"), + ref(1), rval(0), released(false), done(false), + callback(0), callback_arg(0) {} + + int set_callback(void *cb_arg, rados_callback_t cb) { + lock.Lock(); + callback = cb; + callback_arg = cb_arg; + lock.Unlock(); + return 0; + } + int wait() { + lock.Lock(); + while (!done) + cond.Wait(lock); + lock.Unlock(); + return 0; + } + int is_complete() { + lock.Lock(); + int r = done; + lock.Unlock(); + return r; + } + int get_return_value() { + lock.Lock(); + int r = rval; + lock.Unlock(); + return r; + } + void get() { + lock.Lock(); + assert(ref > 0); + ref++; + lock.Unlock(); + } + void release() { + lock.Lock(); + assert(!released); + released = true; + put_unlock(); + } + void put() { + lock.Lock(); + put_unlock(); + } + void put_unlock() { + assert(ref > 0); + int n = --ref; + lock.Unlock(); + if (!n) + delete this; + } +}; + +#endif diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc new file mode 100644 index 00000000000..b760534a234 --- /dev/null +++ b/src/librados/RadosClient.cc @@ -0,0 +1,2014 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include <sys/stat.h> +#include <iostream> +#include <string> +#include <pthread.h> +#include <errno.h> + +#include "common/ceph_context.h" +#include "common/config.h" +#include "common/common_init.h" +#include "common/errno.h" +#include "include/buffer.h" + +#include "messages/MWatchNotify.h" +#include "msg/SimpleMessenger.h" + +#include "AioCompletionImpl.h" +#include "IoCtxImpl.h" +#include "PoolAsyncCompletionImpl.h" +#include "RadosClient.h" + +#define DOUT_SUBSYS rados +#undef dout_prefix +#define dout_prefix *_dout << "librados: " + +static atomic_t rados_instance; + +librados::PoolAsyncCompletionImpl *librados::RadosClient::pool_async_create_completion() +{ + return new librados::PoolAsyncCompletionImpl; +} + +librados::PoolAsyncCompletionImpl *librados::RadosClient::pool_async_create_completion(void *cb_arg, rados_callback_t cb) +{ + librados::PoolAsyncCompletionImpl *c = new librados::PoolAsyncCompletionImpl; + if (cb) + c->set_callback(cb_arg, cb); + return c; +} + +librados::AioCompletionImpl *librados::RadosClient::aio_create_completion() +{ + return new librados::AioCompletionImpl; +} + +librados::AioCompletionImpl *librados::RadosClient::aio_create_completion(void *cb_arg, + rados_callback_t cb_complete, + rados_callback_t cb_safe) +{ + librados::AioCompletionImpl *c = new librados::AioCompletionImpl; + if (cb_complete) + c->set_complete_callback(cb_arg, cb_complete); + if (cb_safe) + c->set_safe_callback(cb_arg, cb_safe); + return c; +} + +bool librados::RadosClient::ms_get_authorizer(int dest_type, + AuthAuthorizer **authorizer, + bool force_new) { + //ldout(cct, 0) << "RadosClient::ms_get_authorizer type=" << dest_type << dendl; + /* monitor authorization is being handled on different layer */ + if (dest_type == CEPH_ENTITY_TYPE_MON) + return true; + *authorizer = monclient.auth->build_authorizer(dest_type); + return *authorizer != NULL; +} + +librados::RadosClient::RadosClient(CephContext *cct_) : Dispatcher(cct_), + cct(cct_), + conf(cct_->_conf), + state(DISCONNECTED), + monclient(cct_), + messenger(NULL), + objecter(NULL), + lock("radosclient"), + timer(cct, lock), + max_watch_cookie(0) +{ +} + +int64_t librados::RadosClient::lookup_pool(const char *name) { + int64_t ret = osdmap.lookup_pg_pool_name(name); + if (ret < 0) + return -ENOENT; + return ret; +} + +const char *librados::RadosClient::get_pool_name(int64_t poolid_) +{ + return osdmap.get_pool_name(poolid_); +} + +int librados::RadosClient::connect() +{ + common_init_finish(cct); + + int err; + uint64_t nonce; + + // already connected? + if (state == CONNECTING) + return -EINPROGRESS; + if (state == CONNECTED) + return -EISCONN; + state = CONNECTING; + + // get monmap + err = monclient.build_initial_monmap(); + if (err < 0) + goto out; + + err = -ENOMEM; + nonce = getpid() + (1000000 * (uint64_t)rados_instance.inc()); + messenger = new SimpleMessenger(cct, entity_name_t::CLIENT(-1), nonce); + if (!messenger) + goto out; + + // require OSDREPLYMUX feature. this means we will fail to talk to + // old servers. this is necessary because otherwise we won't know + // how to decompose the reply data into its consituent pieces. + messenger->set_default_policy(Messenger::Policy::client(0, CEPH_FEATURE_OSDREPLYMUX)); + + ldout(cct, 1) << "starting msgr at " << messenger->get_myaddr() << dendl; + + ldout(cct, 1) << "starting objecter" << dendl; + + err = -ENOMEM; + objecter = new Objecter(cct, messenger, &monclient, &osdmap, lock, timer); + if (!objecter) + goto out; + objecter->set_balanced_budget(); + + monclient.set_messenger(messenger); + + messenger->add_dispatcher_head(this); + + messenger->start(); + + ldout(cct, 1) << "setting wanted keys" << dendl; + monclient.set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD); + ldout(cct, 1) << "calling monclient init" << dendl; + err = monclient.init(); + if (err) { + ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl; + shutdown(); + goto out; + } + + err = monclient.authenticate(conf->client_mount_timeout); + if (err) { + ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl; + shutdown(); + goto out; + } + messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id())); + + lock.Lock(); + + timer.init(); + + objecter->set_client_incarnation(0); + objecter->init(); + monclient.renew_subs(); + + while (osdmap.get_epoch() == 0) { + ldout(cct, 1) << "waiting for osdmap" << dendl; + cond.Wait(lock); + } + state = CONNECTED; + lock.Unlock(); + + ldout(cct, 1) << "init done" << dendl; + err = 0; + + out: + if (err) + state = DISCONNECTED; + return err; +} + +void librados::RadosClient::shutdown() +{ + lock.Lock(); + if (state == DISCONNECTED) { + lock.Unlock(); + return; + } + monclient.shutdown(); + if (objecter && state == CONNECTED) + objecter->shutdown(); + state = DISCONNECTED; + timer.shutdown(); // will drop+retake lock + lock.Unlock(); + if (messenger) { + messenger->shutdown(); + messenger->wait(); + } + ldout(cct, 1) << "shutdown" << dendl; +} + +librados::RadosClient::~RadosClient() +{ + if (messenger) + delete messenger; + if (objecter) + delete objecter; + common_destroy_context(cct); + cct = NULL; +} + + +bool librados::RadosClient::ms_dispatch(Message *m) +{ + bool ret; + + lock.Lock(); + if (state == DISCONNECTED) { + ldout(cct, 10) << "disconnected, discarding " << *m << dendl; + m->put(); + ret = true; + } else { + ret = _dispatch(m); + } + lock.Unlock(); + return ret; +} + +void librados::RadosClient::ms_handle_connect(Connection *con) +{ + Mutex::Locker l(lock); + objecter->ms_handle_connect(con); +} + +bool librados::RadosClient::ms_handle_reset(Connection *con) +{ + Mutex::Locker l(lock); + objecter->ms_handle_reset(con); + return false; +} + +void librados::RadosClient::ms_handle_remote_reset(Connection *con) +{ + Mutex::Locker l(lock); + objecter->ms_handle_remote_reset(con); +} + + +bool librados::RadosClient::_dispatch(Message *m) +{ + switch (m->get_type()) { + // OSD + case CEPH_MSG_OSD_OPREPLY: + objecter->handle_osd_op_reply((class MOSDOpReply*)m); + break; + case CEPH_MSG_OSD_MAP: + objecter->handle_osd_map((MOSDMap*)m); + cond.Signal(); + break; + case MSG_GETPOOLSTATSREPLY: + objecter->handle_get_pool_stats_reply((MGetPoolStatsReply*)m); + break; + + case CEPH_MSG_MDS_MAP: + break; + + case CEPH_MSG_STATFS_REPLY: + objecter->handle_fs_stats_reply((MStatfsReply*)m); + break; + + case CEPH_MSG_POOLOP_REPLY: + objecter->handle_pool_op_reply((MPoolOpReply*)m); + break; + + case CEPH_MSG_WATCH_NOTIFY: + watch_notify((MWatchNotify *)m); + break; + default: + return false; + } + + return true; +} + +int librados::RadosClient::pool_list(std::list<std::string>& v) +{ + Mutex::Locker l(lock); + for (map<int64_t,pg_pool_t>::const_iterator p = osdmap.get_pools().begin(); + p != osdmap.get_pools().end(); + p++) + v.push_back(osdmap.get_pool_name(p->first)); + return 0; +} + +int librados::RadosClient::get_pool_stats(std::list<string>& pools, + map<string,::pool_stat_t>& result) +{ + Mutex mylock("RadosClient::get_pool_stats::mylock"); + Cond cond; + bool done; + + lock.Lock(); + objecter->get_pool_stats(pools, &result, new C_SafeCond(&mylock, &cond, &done)); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + return 0; +} + +int librados::RadosClient::get_fs_stats(ceph_statfs& stats) +{ + Mutex mylock ("RadosClient::get_fs_stats::mylock"); + Cond cond; + bool done; + lock.Lock(); + objecter->get_fs_stats(stats, new C_SafeCond(&mylock, &cond, &done)); + lock.Unlock(); + + mylock.Lock(); + while (!done) cond.Wait(mylock); + mylock.Unlock(); + + return 0; +} + + +// SNAPS + +int librados::RadosClient::snap_create(rados_ioctx_t io, const char *snapName) +{ + int reply; + int64_t poolID = ((IoCtxImpl *)io)->poolid; + string sName(snapName); + + Mutex mylock ("RadosClient::snap_create::mylock"); + Cond cond; + bool done; + lock.Lock(); + objecter->create_pool_snap(poolID, + sName, + new C_SafeCond(&mylock, &cond, &done, &reply)); + lock.Unlock(); + + mylock.Lock(); + while(!done) cond.Wait(mylock); + mylock.Unlock(); + return reply; +} + +int librados::RadosClient::selfmanaged_snap_create(rados_ioctx_t io, uint64_t *psnapid) +{ + int reply; + int64_t poolID = ((IoCtxImpl *)io)->poolid; + + Mutex mylock("RadosClient::selfmanaged_snap_create::mylock"); + Cond cond; + bool done; + lock.Lock(); + snapid_t snapid; + objecter->allocate_selfmanaged_snap(poolID, &snapid, + new C_SafeCond(&mylock, &cond, &done, &reply)); + lock.Unlock(); + + mylock.Lock(); + while (!done) cond.Wait(mylock); + mylock.Unlock(); + if (reply == 0) + *psnapid = snapid; + return reply; +} + +int librados::RadosClient::snap_remove(rados_ioctx_t io, const char *snapName) +{ + int reply; + int64_t poolID = ((IoCtxImpl *)io)->poolid; + string sName(snapName); + + Mutex mylock ("RadosClient::snap_remove::mylock"); + Cond cond; + bool done; + lock.Lock(); + objecter->delete_pool_snap(poolID, + sName, + new C_SafeCond(&mylock, &cond, &done, &reply)); + lock.Unlock(); + + mylock.Lock(); + while(!done) cond.Wait(mylock); + mylock.Unlock(); + return reply; +} + +int librados::RadosClient::selfmanaged_snap_rollback_object(rados_ioctx_t io, + const object_t& oid, + ::SnapContext& snapc, + uint64_t snapid) +{ + int reply; + IoCtxImpl* ctx = (IoCtxImpl *) io; + + Mutex mylock("RadosClient::snap_rollback::mylock"); + Cond cond; + bool done; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &reply); + + lock.Lock(); + objecter->rollback_object(oid, ctx->oloc, snapc, snapid, + ceph_clock_now(cct), onack, NULL); + lock.Unlock(); + + mylock.Lock(); + while (!done) cond.Wait(mylock); + mylock.Unlock(); + return reply; +} + +int librados::RadosClient::rollback(rados_ioctx_t io_, const object_t& oid, + const char *snapName) +{ + IoCtxImpl* io = (IoCtxImpl *) io_; + string sName(snapName); + + lock.Lock(); + snapid_t snap; + const map<int64_t, pg_pool_t>& pools = objecter->osdmap->get_pools(); + const pg_pool_t& pg_pool = pools.find(io->poolid)->second; + map<snapid_t, pool_snap_info_t>::const_iterator p; + for (p = pg_pool.snaps.begin(); + p != pg_pool.snaps.end(); + ++p) { + if (p->second.name == snapName) { + snap = p->first; + break; + } + } + if (p == pg_pool.snaps.end()) { + lock.Unlock(); + return -ENOENT; + } + lock.Unlock(); + + return selfmanaged_snap_rollback_object(io_, oid, io->snapc, snap); +} + +int librados::RadosClient::selfmanaged_snap_remove(rados_ioctx_t io, uint64_t snapid) +{ + int reply; + int64_t poolID = ((IoCtxImpl *)io)->poolid; + + Mutex mylock("RadosClient::selfmanaged_snap_remove::mylock"); + Cond cond; + bool done; + lock.Lock(); + objecter->delete_selfmanaged_snap(poolID, snapid_t(snapid), + new C_SafeCond(&mylock, &cond, &done, &reply)); + lock.Unlock(); + + mylock.Lock(); + while (!done) cond.Wait(mylock); + mylock.Unlock(); + return (int)reply; +} + +int librados::RadosClient::pool_create(string& name, unsigned long long auid, + __u8 crush_rule) +{ + int reply; + + Mutex mylock ("RadosClient::pool_create::mylock"); + Cond cond; + bool done; + lock.Lock(); + objecter->create_pool(name, + new C_SafeCond(&mylock, &cond, &done, &reply), + auid, crush_rule); + lock.Unlock(); + + mylock.Lock(); + while(!done) + cond.Wait(mylock); + mylock.Unlock(); + return reply; +} + +int librados::RadosClient::pool_create_async(string& name, PoolAsyncCompletionImpl *c, + unsigned long long auid, + __u8 crush_rule) +{ + Mutex::Locker l(lock); + objecter->create_pool(name, + new C_PoolAsync_Safe(c), + auid, crush_rule); + return 0; +} + +int librados::RadosClient::pool_delete(const char *name) +{ + int tmp_pool_id = osdmap.lookup_pg_pool_name(name); + if (tmp_pool_id < 0) + return -ENOENT; + + Mutex mylock("RadosClient::pool_delete::mylock"); + Cond cond; + bool done; + lock.Lock(); + int reply = 0; + objecter->delete_pool(tmp_pool_id, new C_SafeCond(&mylock, &cond, &done, &reply)); + lock.Unlock(); + + mylock.Lock(); + while (!done) cond.Wait(mylock); + mylock.Unlock(); + return reply; +} + +int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompletionImpl *c) +{ + int tmp_pool_id = osdmap.lookup_pg_pool_name(name); + if (tmp_pool_id < 0) + return -ENOENT; + + Mutex::Locker l(lock); + objecter->delete_pool(tmp_pool_id, new C_PoolAsync_Safe(c)); + + return 0; +} + +int librados::RadosClient::pool_change_auid(rados_ioctx_t io, unsigned long long auid) +{ + int reply; + + int64_t poolID = ((IoCtxImpl *)io)->poolid; + + Mutex mylock("RadosClient::pool_change_auid::mylock"); + Cond cond; + bool done; + lock.Lock(); + objecter->change_pool_auid(poolID, + new C_SafeCond(&mylock, &cond, &done, &reply), + auid); + lock.Unlock(); + + mylock.Lock(); + while (!done) cond.Wait(mylock); + mylock.Unlock(); + return reply; +} + +int librados::RadosClient::pool_change_auid_async(rados_ioctx_t io, unsigned long long auid, + PoolAsyncCompletionImpl *c) +{ + int64_t poolID = ((IoCtxImpl *)io)->poolid; + + Mutex::Locker l(lock); + objecter->change_pool_auid(poolID, + new C_PoolAsync_Safe(c), + auid); + return 0; +} + +int librados::RadosClient::pool_get_auid(rados_ioctx_t io, unsigned long long *auid) +{ + Mutex::Locker l(lock); + int64_t pool_id = ((IoCtxImpl *)io)->poolid; + const pg_pool_t *pg = osdmap.get_pg_pool(pool_id); + if (!pg) + return -ENOENT; + *auid = pg->auid; + return 0; +} + +int librados::RadosClient::snap_list(IoCtxImpl *io, vector<uint64_t> *snaps) +{ + Mutex::Locker l(lock); + const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid); + for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin(); + p != pi->snaps.end(); + p++) + snaps->push_back(p->first); + return 0; +} + +int librados::RadosClient::snap_lookup(IoCtxImpl *io, const char *name, uint64_t *snapid) +{ + Mutex::Locker l(lock); + const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid); + for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin(); + p != pi->snaps.end(); + p++) { + if (p->second.name == name) { + *snapid = p->first; + return 0; + } + } + return -ENOENT; +} + +int librados::RadosClient::snap_get_name(IoCtxImpl *io, uint64_t snapid, std::string *s) +{ + Mutex::Locker l(lock); + const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid); + map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.find(snapid); + if (p == pi->snaps.end()) + return -ENOENT; + *s = p->second.name.c_str(); + return 0; +} + +int librados::RadosClient::snap_get_stamp(IoCtxImpl *io, uint64_t snapid, time_t *t) +{ + Mutex::Locker l(lock); + const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid); + map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.find(snapid); + if (p == pi->snaps.end()) + return -ENOENT; + *t = p->second.stamp.sec(); + return 0; +} + + +// IO + +int librados::RadosClient::list(Objecter::ListContext *context, int max_entries) +{ + Cond cond; + bool done; + int r = 0; + object_t oid; + Mutex mylock("RadosClient::list::mylock"); + + if (context->at_end) + return 0; + + context->max_entries = max_entries; + + lock.Lock(); + objecter->list_objects(context, new C_SafeCond(&mylock, &cond, &done, &r)); + lock.Unlock(); + + mylock.Lock(); + while(!done) + cond.Wait(mylock); + mylock.Unlock(); + + return r; +} + +int librados::RadosClient::create(IoCtxImpl& io, const object_t& oid, bool exclusive) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::create::mylock"); + Cond cond; + bool done; + int r; + eversion_t ver; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock.Lock(); + objecter->create(oid, io.oloc, + io.snapc, ut, 0, (exclusive ? CEPH_OSD_OP_FLAG_EXCL : 0), + onack, NULL, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::create(IoCtxImpl& io, const object_t& oid, bool exclusive, + const std::string& category) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::create::mylock"); + Cond cond; + bool done; + int r; + eversion_t ver; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + ::ObjectOperation o; + o.create(exclusive ? CEPH_OSD_OP_FLAG_EXCL : 0, category); + + lock.Lock(); + objecter->mutate(oid, io.oloc, o, io.snapc, ut, 0, onack, NULL, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +/* + * add any version assert operations that are appropriate given the + * stat in the IoCtx, either the target version assert or any src + * object asserts. these affect a single ioctx operation, so clear + * the ioctx state when we're doing. + * + * return a pointer to the ObjectOperation if we added any events; + * this is convenient for passing the extra_ops argument into Objecter + * methods. + */ +::ObjectOperation *librados::RadosClient::prepare_assert_ops(IoCtxImpl *io, ::ObjectOperation *op) +{ + ::ObjectOperation *pop = NULL; + if (io->assert_ver) { + op->assert_version(io->assert_ver); + io->assert_ver = 0; + pop = op; + } + while (!io->assert_src_version.empty()) { + map<object_t,uint64_t>::iterator p = io->assert_src_version.begin(); + op->assert_src_version(p->first, CEPH_NOSNAP, p->second); + io->assert_src_version.erase(p); + pop = op; + } + return pop; +} + +int librados::RadosClient::write(IoCtxImpl& io, const object_t& oid, bufferlist& bl, + size_t len, uint64_t off) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::write::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + // extra ops? + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->write(oid, io.oloc, + off, len, io.snapc, bl, ut, 0, + onack, NULL, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + if (r < 0) + return r; + + return len; +} + +int librados::RadosClient::append(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::append::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->append(oid, io.oloc, + len, io.snapc, bl, ut, 0, + onack, NULL, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + if (r < 0) + return r; + + return len; +} + +int librados::RadosClient::write_full(IoCtxImpl& io, const object_t& oid, bufferlist& bl) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::write_full::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->write_full(oid, io.oloc, + io.snapc, bl, ut, 0, + onack, NULL, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::clone_range(IoCtxImpl& io, + const object_t& dst_oid, uint64_t dst_offset, + const object_t& src_oid, uint64_t src_offset, + uint64_t len) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::clone_range::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + bufferlist outbl; + + lock.Lock(); + ::ObjectOperation wr; + prepare_assert_ops(&io, &wr); + wr.clone_range(src_oid, src_offset, len, dst_offset); + objecter->mutate(dst_oid, io.oloc, wr, io.snapc, ut, 0, onack, NULL, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::operate(IoCtxImpl& io, const object_t& oid, + ::ObjectOperation *o, time_t *pmtime) +{ + utime_t ut; + if (pmtime) { + ut = utime_t(*pmtime, 0); + } else { + ut = ceph_clock_now(cct); + } + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + if (!o->size()) + return 0; + + Mutex mylock("RadosClient::mutate::mylock"); + Cond cond; + bool done; + int r; + eversion_t ver; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock.Lock(); + objecter->mutate(oid, io.oloc, + *o, io.snapc, ut, 0, + onack, NULL, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::operate_read(IoCtxImpl& io, const object_t& oid, + ::ObjectOperation *o, bufferlist *pbl) +{ + if (!o->size()) + return 0; + + Mutex mylock("RadosClient::mutate::mylock"); + Cond cond; + bool done; + int r; + eversion_t ver; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock.Lock(); + objecter->read(oid, io.oloc, + *o, io.snap_seq, pbl, 0, + onack, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::aio_operate_read(IoCtxImpl& io, const object_t &oid, + ::ObjectOperation *o, + AioCompletionImpl *c, bufferlist *pbl) +{ + Context *onack = new C_aio_Ack(c); + + c->pbl = pbl; + + Mutex::Locker l(lock); + objecter->read(oid, io.oloc, + *o, io.snap_seq, pbl, 0, + onack, 0); + return 0; +} + +int librados::RadosClient::aio_operate(IoCtxImpl& io, const object_t& oid, + ::ObjectOperation *o, AioCompletionImpl *c) +{ + utime_t ut = ceph_clock_now(cct); + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Context *onack = new C_aio_Ack(c); + Context *oncommit = new C_aio_Safe(c); + + io.queue_aio_write(c); + + Mutex::Locker l(lock); + objecter->mutate(oid, io.oloc, *o, io.snapc, ut, 0, onack, oncommit, &c->objver); + + return 0; +} + +int librados::RadosClient::aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c, + bufferlist *pbl, size_t len, uint64_t off) +{ + + Context *onack = new C_aio_Ack(c); + eversion_t ver; + + c->pbl = pbl; + + Mutex::Locker l(lock); + objecter->read(oid, io.oloc, + off, len, io.snap_seq, &c->bl, 0, + onack, &c->objver); + return 0; +} + +int librados::RadosClient::aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c, + char *buf, size_t len, uint64_t off) +{ + Context *onack = new C_aio_Ack(c); + + c->buf = buf; + c->maxlen = len; + + Mutex::Locker l(lock); + objecter->read(oid, io.oloc, + off, len, io.snap_seq, &c->bl, 0, + onack, &c->objver); + + return 0; +} + +int librados::RadosClient::aio_sparse_read(IoCtxImpl& io, const object_t oid, + AioCompletionImpl *c, std::map<uint64_t,uint64_t> *m, + bufferlist *data_bl, size_t len, uint64_t off) +{ + + C_aio_sparse_read_Ack *onack = new C_aio_sparse_read_Ack(c); + onack->m = m; + onack->data_bl = data_bl; + eversion_t ver; + + c->pbl = NULL; + + Mutex::Locker l(lock); + objecter->sparse_read(oid, io.oloc, + off, len, io.snap_seq, &c->bl, 0, + onack); + return 0; +} + +int librados::RadosClient::aio_write(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, + const bufferlist& bl, size_t len, uint64_t off) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + io.queue_aio_write(c); + + Context *onack = new C_aio_Ack(c); + Context *onsafe = new C_aio_Safe(c); + + Mutex::Locker l(lock); + objecter->write(oid, io.oloc, + off, len, io.snapc, bl, ut, 0, + onack, onsafe, &c->objver); + + return 0; +} + +int librados::RadosClient::aio_append(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, + const bufferlist& bl, size_t len) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + io.queue_aio_write(c); + + Context *onack = new C_aio_Ack(c); + Context *onsafe = new C_aio_Safe(c); + + Mutex::Locker l(lock); + objecter->append(oid, io.oloc, + len, io.snapc, bl, ut, 0, + onack, onsafe, &c->objver); + + return 0; +} + +int librados::RadosClient::aio_write_full(IoCtxImpl& io, const object_t &oid, + AioCompletionImpl *c, const bufferlist& bl) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + io.queue_aio_write(c); + + Context *onack = new C_aio_Ack(c); + Context *onsafe = new C_aio_Safe(c); + + Mutex::Locker l(lock); + objecter->write_full(oid, io.oloc, + io.snapc, bl, ut, 0, + onack, onsafe, &c->objver); + + return 0; +} + +int librados::RadosClient::remove(IoCtxImpl& io, const object_t& oid) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::remove::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->remove(oid, io.oloc, + io.snapc, ut, 0, + onack, NULL, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::trunc(IoCtxImpl& io, const object_t& oid, uint64_t size) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::write_full::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->trunc(oid, io.oloc, + io.snapc, ut, 0, + size, 0, + onack, NULL, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::tmap_update(IoCtxImpl& io, const object_t& oid, bufferlist& cmdbl) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::tmap_update::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + bufferlist outbl; + + lock.Lock(); + ::ObjectOperation wr; + prepare_assert_ops(&io, &wr); + wr.tmap_update(cmdbl); + objecter->mutate(oid, io.oloc, wr, io.snapc, ut, 0, onack, NULL, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::tmap_put(IoCtxImpl& io, const object_t& oid, bufferlist& bl) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::tmap_put::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + bufferlist outbl; + + lock.Lock(); + ::ObjectOperation wr; + prepare_assert_ops(&io, &wr); + wr.tmap_put(bl); + objecter->mutate(oid, io.oloc, wr, io.snapc, ut, 0, onack, NULL, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::tmap_get(IoCtxImpl& io, const object_t& oid, bufferlist& bl) +{ + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::tmap_put::mylock"); + Cond cond; + bool done; + int r = 0; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + bufferlist outbl; + + lock.Lock(); + ::ObjectOperation rd; + prepare_assert_ops(&io, &rd); + rd.tmap_get(&bl, NULL); + objecter->read(oid, io.oloc, rd, io.snap_seq, 0, 0, onack, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::exec(IoCtxImpl& io, const object_t& oid, + const char *cls, const char *method, + bufferlist& inbl, bufferlist& outbl) +{ + Mutex mylock("RadosClient::exec::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + + lock.Lock(); + ::ObjectOperation rd; + prepare_assert_ops(&io, &rd); + rd.call(cls, method, inbl); + objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::aio_exec(IoCtxImpl& io, const object_t& oid, AioCompletionImpl *c, + const char *cls, const char *method, + bufferlist& inbl, bufferlist *outbl) +{ + Context *onack = new C_aio_Ack(c); + + Mutex::Locker l(lock); + ::ObjectOperation rd; + prepare_assert_ops(&io, &rd); + rd.call(cls, method, inbl); + objecter->read(oid, io.oloc, rd, io.snap_seq, outbl, 0, onack, &c->objver); + + return 0; +} + +int librados::RadosClient::read(IoCtxImpl& io, const object_t& oid, + bufferlist& bl, size_t len, uint64_t off) +{ + CephContext *cct = io.client->cct; + Mutex mylock("RadosClient::read::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->read(oid, io.oloc, + off, len, io.snap_seq, &bl, 0, + onack, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + ldout(cct, 10) << "Objecter returned from read r=" << r << dendl; + + set_sync_op_version(io, ver); + + if (r < 0) + return r; + + if (bl.length() < len) { + ldout(cct, 10) << "Returned length " << bl.length() + << " less than original length "<< len << dendl; + } + + return bl.length(); +} + +int librados::RadosClient::mapext(IoCtxImpl& io, const object_t& oid, + uint64_t off, size_t len, std::map<uint64_t,uint64_t>& m) +{ + CephContext *cct = io.client->cct; + bufferlist bl; + + Mutex mylock("RadosClient::read::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock.Lock(); + objecter->mapext(oid, io.oloc, + off, len, io.snap_seq, &bl, 0, + onack); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + ldout(cct, 10) << "Objecter returned from read r=" << r << dendl; + + if (r < 0) + return r; + + bufferlist::iterator iter = bl.begin(); + ::decode(m, iter); + + return m.size(); +} + +int librados::RadosClient::sparse_read(IoCtxImpl& io, const object_t& oid, + std::map<uint64_t,uint64_t>& m, + bufferlist& data_bl, size_t len, uint64_t off) +{ + CephContext *cct = io.client->cct; + bufferlist bl; + + Mutex mylock("RadosClient::read::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock.Lock(); + objecter->sparse_read(oid, io.oloc, + off, len, io.snap_seq, &bl, 0, + onack); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + ldout(cct, 10) << "Objecter returned from read r=" << r << dendl; + + if (r < 0) + return r; + + bufferlist::iterator iter = bl.begin(); + ::decode(m, iter); + ::decode(data_bl, iter); + + return m.size(); +} + +int librados::RadosClient::stat(IoCtxImpl& io, const object_t& oid, uint64_t *psize, time_t *pmtime) +{ + CephContext *cct = io.client->cct; + Mutex mylock("RadosClient::stat::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + uint64_t size; + utime_t mtime; + eversion_t ver; + + if (!psize) + psize = &size; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->stat(oid, io.oloc, + io.snap_seq, psize, &mtime, 0, + onack, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + ldout(cct, 10) << "Objecter returned from stat" << dendl; + + if (r >= 0 && pmtime) { + *pmtime = mtime.sec(); + } + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::getxattr(IoCtxImpl& io, const object_t& oid, + const char *name, bufferlist& bl) +{ + CephContext *cct = io.client->cct; + Mutex mylock("RadosClient::getxattr::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->getxattr(oid, io.oloc, + name, io.snap_seq, &bl, 0, + onack, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + ldout(cct, 10) << "Objecter returned from getxattr" << dendl; + + set_sync_op_version(io, ver); + + if (r < 0) + return r; + + return bl.length(); +} + +int librados::RadosClient::rmxattr(IoCtxImpl& io, const object_t& oid, const char *name) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::rmxattr::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->removexattr(oid, io.oloc, name, + io.snapc, ut, 0, + onack, NULL, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + if (r < 0) + return r; + + return 0; +} + +int librados::RadosClient::setxattr(IoCtxImpl& io, const object_t& oid, + const char *name, bufferlist& bl) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::setxattr::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->setxattr(oid, io.oloc, name, + io.snapc, bl, ut, 0, + onack, NULL, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + if (r < 0) + return r; + + return 0; +} + +int librados::RadosClient::getxattrs(IoCtxImpl& io, const object_t& oid, + map<std::string, bufferlist>& attrset) +{ + CephContext *cct = io.client->cct; + Mutex mylock("RadosClient::getexattrs::mylock"); + Cond cond; + bool done; + int r; + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock.Lock(); + map<string, bufferlist> aset; + objecter->getxattrs(oid, io.oloc, io.snap_seq, + aset, + 0, onack, &ver, pop); + lock.Unlock(); + + attrset.clear(); + + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + for (map<string,bufferlist>::iterator p = aset.begin(); p != aset.end(); p++) { + ldout(cct, 10) << "RadosClient::getxattrs: xattr=" << p->first << dendl; + attrset[p->first.c_str()] = p->second; + } + + set_sync_op_version(io, ver); + + return r; +} + +void librados::RadosClient::watch_notify(MWatchNotify *m) +{ + assert(lock.is_locked()); + WatchContext *wc = NULL; + map<uint64_t, WatchContext *>::iterator iter = watchers.find(m->cookie); + if (iter != watchers.end()) + wc = iter->second; + + if (!wc) + return; + + wc->notify(this, m); + + m->put(); +} + +int librados::RadosClient::watch(IoCtxImpl& io, const object_t& oid, uint64_t ver, + uint64_t *cookie, librados::WatchCtx *ctx) +{ + ::ObjectOperation rd; + Mutex mylock("RadosClient::watch::mylock"); + Cond cond; + bool done; + int r; + Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t objver; + + lock.Lock(); + + WatchContext *wc; + register_watcher(io, oid, ctx, cookie, &wc); + prepare_assert_ops(&io, &rd); + rd.watch(*cookie, ver, 1); + bufferlist bl; + wc->linger_id = objecter->linger(oid, io.oloc, rd, io.snap_seq, bl, NULL, 0, NULL, onfinish, &objver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, objver); + + if (r < 0) { + lock.Lock(); + unregister_watcher(*cookie); + lock.Unlock(); + } + + return r; +} + + +/* this is called with RadosClient::lock held */ +int librados::RadosClient::_notify_ack(IoCtxImpl& io, const object_t& oid, + uint64_t notify_id, uint64_t ver) +{ + Mutex mylock("RadosClient::watch::mylock"); + Cond cond; + eversion_t objver; + + ::ObjectOperation rd; + prepare_assert_ops(&io, &rd); + rd.notify_ack(notify_id, ver); + objecter->read(oid, io.oloc, rd, io.snap_seq, (bufferlist*)NULL, 0, 0, 0); + + return 0; +} + +int librados::RadosClient::unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie) +{ + bufferlist inbl, outbl; + + Mutex mylock("RadosClient::watch::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + lock.Lock(); + + unregister_watcher(cookie); + + ::ObjectOperation rd; + prepare_assert_ops(&io, &rd); + rd.watch(cookie, 0, 0); + objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl) +{ + bufferlist inbl, outbl; + + Mutex mylock("RadosClient::notify::mylock"); + Mutex mylock_all("RadosClient::notify::mylock_all"); + Cond cond, cond_all; + bool done, done_all; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t objver; + uint64_t cookie; + C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all); + + ::ObjectOperation rd; + prepare_assert_ops(&io, &rd); + + lock.Lock(); + WatchContext *wc; + register_watcher(io, oid, ctx, &cookie, &wc); + uint32_t prot_ver = 1; + uint32_t timeout = io.notify_timeout; + ::encode(prot_ver, inbl); + ::encode(timeout, inbl); + ::encode(bl, inbl); + rd.notify(cookie, ver, inbl); + wc->linger_id = objecter->linger(oid, io.oloc, rd, io.snap_seq, inbl, NULL, + 0, onack, NULL, &objver); + lock.Unlock(); + + mylock_all.Lock(); + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + if (r == 0) { + while (!done_all) + cond_all.Wait(mylock_all); + } + + mylock_all.Unlock(); + + lock.Lock(); + unregister_watcher(cookie); + lock.Unlock(); + + set_sync_op_version(io, objver); + delete ctx; + + return r; +} + +void librados::RadosClient::set_sync_op_version(IoCtxImpl& io, eversion_t& ver) +{ + io.last_objver = ver; +} + +void librados::RadosClient::register_watcher(IoCtxImpl& io, + const object_t& oid, + librados::WatchCtx *ctx, + uint64_t *cookie, + WatchContext **pwc) +{ + assert(lock.is_locked()); + WatchContext *wc = new WatchContext(&io, oid, ctx); + *cookie = ++max_watch_cookie; + watchers[*cookie] = wc; + if (pwc) + *pwc = wc; +} + +void librados::RadosClient::unregister_watcher(uint64_t cookie) +{ + assert(lock.is_locked()); + map<uint64_t, WatchContext *>::iterator iter = watchers.find(cookie); + if (iter != watchers.end()) { + WatchContext *ctx = iter->second; + if (ctx->linger_id) + objecter->unregister_linger(ctx->linger_id); + delete ctx; + watchers.erase(iter); + } +} + +eversion_t librados::RadosClient::last_version(IoCtxImpl& io) +{ + return io.last_objver; +} + +void librados::RadosClient::set_assert_version(IoCtxImpl& io, uint64_t ver) +{ + io.assert_ver = ver; +} +void librados::RadosClient::set_assert_src_version(IoCtxImpl& io, + const object_t& oid, + uint64_t ver) +{ + io.assert_src_version[oid] = ver; +} + +void librados::RadosClient::set_notify_timeout(IoCtxImpl& io, uint32_t timeout) +{ + io.notify_timeout = timeout; +} + +///////////////////////////// C_aio_Ack //////////////////////////////// + +librados::RadosClient::C_aio_Ack::C_aio_Ack(AioCompletionImpl *_c) : c(_c) +{ + c->get(); +} + +void librados::RadosClient::C_aio_Ack::finish(int r) +{ + c->lock.Lock(); + c->rval = r; + c->ack = true; + c->cond.Signal(); + + if (c->buf && c->bl.length() > 0) { + unsigned l = MIN(c->bl.length(), c->maxlen); + c->bl.copy(0, l, c->buf); + c->rval = c->bl.length(); + } + if (c->pbl) { + *c->pbl = c->bl; + } + + if (c->callback_complete) { + rados_callback_t cb = c->callback_complete; + void *cb_arg = c->callback_arg; + c->lock.Unlock(); + cb(c, cb_arg); + c->lock.Lock(); + } + + c->put_unlock(); +} + +/////////////////////// C_aio_sparse_read_Ack ////////////////////////// + +librados::RadosClient::C_aio_sparse_read_Ack::C_aio_sparse_read_Ack(AioCompletionImpl *_c) + : c(_c) +{ + c->get(); +} + +void librados::RadosClient::C_aio_sparse_read_Ack::finish(int r) +{ + c->lock.Lock(); + c->rval = r; + c->ack = true; + c->cond.Signal(); + + bufferlist::iterator iter = c->bl.begin(); + if (r >= 0) { + ::decode(*m, iter); + ::decode(*data_bl, iter); + } + + if (c->callback_complete) { + rados_callback_t cb = c->callback_complete; + void *cb_arg = c->callback_arg; + c->lock.Unlock(); + cb(c, cb_arg); + c->lock.Lock(); + } + + c->put_unlock(); +} + +//////////////////////////// C_aio_Safe //////////////////////////////// + +librados::RadosClient::C_aio_Safe::C_aio_Safe(AioCompletionImpl *_c) : c(_c) +{ + c->get(); +} + +void librados::RadosClient::C_aio_Safe::finish(int r) +{ + c->lock.Lock(); + if (!c->ack) { + c->rval = r; + c->ack = true; + } + c->safe = true; + c->cond.Signal(); + + if (c->callback_safe) { + rados_callback_t cb = c->callback_safe; + void *cb_arg = c->callback_arg; + c->lock.Unlock(); + cb(c, cb_arg); + c->lock.Lock(); + } + + c->io->complete_aio_write(c); + + c->put_unlock(); +} + +///////////////////////// C_PoolAsync_Safe ///////////////////////////// + +librados::RadosClient::C_PoolAsync_Safe::C_PoolAsync_Safe(PoolAsyncCompletionImpl *_c) + : c(_c) +{ + c->get(); +} + +void librados::RadosClient::C_PoolAsync_Safe::finish(int r) +{ + c->lock.Lock(); + c->rval = r; + c->done = true; + c->cond.Signal(); + + if (c->callback) { + rados_callback_t cb = c->callback; + void *cb_arg = c->callback_arg; + c->lock.Unlock(); + cb(c, cb_arg); + c->lock.Lock(); + } + + c->put_unlock(); +} + +///////////////////////// C_NotifyComplete ///////////////////////////// + +librados::RadosClient::C_NotifyComplete::C_NotifyComplete(Mutex *_l, + Cond *_c, + bool *_d) + : lock(_l), cond(_c), done(_d) +{ + *done = false; +} + +void librados::RadosClient::C_NotifyComplete::notify(uint8_t opcode, + uint64_t ver, + bufferlist& bl) +{ + *done = true; + cond->Signal(); +} + +/////////////////////////// WatchContext /////////////////////////////// + +librados::RadosClient::WatchContext::WatchContext(IoCtxImpl *io_ctx_impl_, + const object_t& _oc, + librados::WatchCtx *_ctx) + : io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0) +{ + io_ctx_impl->get(); +} + +librados::RadosClient::WatchContext::~WatchContext() +{ + io_ctx_impl->put(); +} + +void librados::RadosClient::WatchContext::notify(RadosClient *client, + MWatchNotify *m) +{ + ctx->notify(m->opcode, m->ver, m->bl); + if (m->opcode != WATCH_NOTIFY_COMPLETE) { + client->_notify_ack(*io_ctx_impl, oid, m->notify_id, m->ver); + } +} diff --git a/src/librados/RadosClient.h b/src/librados/RadosClient.h new file mode 100644 index 00000000000..98082df30b7 --- /dev/null +++ b/src/librados/RadosClient.h @@ -0,0 +1,234 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#ifndef CEPH_LIBRADOS_RADOSCLIENT_H +#define CEPH_LIBRADOS_RADOSCLIENT_H + +#include "common/Cond.h" +#include "common/Mutex.h" +#include "common/Timer.h" +#include "include/rados/librados.h" +#include "include/rados/librados.hpp" +#include "mon/MonClient.h" +#include "msg/Dispatcher.h" +#include "osd/OSDMap.h" +#include "osdc/Objecter.h" + +class AuthAuthorizer; +class CephContext; +class Connection; +struct md_config_t; +class Message; +class MWatchNotify; +class SimpleMessenger; + +class librados::RadosClient : public Dispatcher +{ +public: + CephContext *cct; + md_config_t *conf; +private: + enum { + DISCONNECTED, + CONNECTING, + CONNECTED, + } state; + + OSDMap osdmap; + MonClient monclient; + SimpleMessenger *messenger; + + bool _dispatch(Message *m); + bool ms_dispatch(Message *m); + + bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new); + void ms_handle_connect(Connection *con); + bool ms_handle_reset(Connection *con); + void ms_handle_remote_reset(Connection *con); + + Objecter *objecter; + + Mutex lock; + Cond cond; + SafeTimer timer; + +public: + + RadosClient(CephContext *cct_); + ~RadosClient(); + int connect(); + void shutdown(); + + int64_t lookup_pool(const char *name); + const char *get_pool_name(int64_t poolid_); + ::ObjectOperation *prepare_assert_ops(IoCtxImpl *io, ::ObjectOperation *op); + + // snaps + int snap_list(IoCtxImpl *io, vector<uint64_t> *snaps); + int snap_lookup(IoCtxImpl *io, const char *name, uint64_t *snapid); + int snap_get_name(IoCtxImpl *io, uint64_t snapid, std::string *s); + int snap_get_stamp(IoCtxImpl *io, uint64_t snapid, time_t *t); + int snap_create(rados_ioctx_t io, const char* snapname); + int selfmanaged_snap_create(rados_ioctx_t io, uint64_t *snapid); + int snap_remove(rados_ioctx_t io, const char* snapname); + int rollback(rados_ioctx_t io_, const object_t& oid, const char *snapName); + int selfmanaged_snap_remove(rados_ioctx_t io, uint64_t snapid); + int selfmanaged_snap_rollback_object(rados_ioctx_t io, const object_t& oid, + ::SnapContext& snapc, uint64_t snapid); + + // io + int create(IoCtxImpl& io, const object_t& oid, bool exclusive); + int create(IoCtxImpl& io, const object_t& oid, bool exclusive, const std::string& category); + int write(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len, uint64_t off); + int append(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len); + int write_full(IoCtxImpl& io, const object_t& oid, bufferlist& bl); + int clone_range(IoCtxImpl& io, const object_t& dst_oid, uint64_t dst_offset, + const object_t& src_oid, uint64_t src_offset, uint64_t len); + int read(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len, uint64_t off); + int mapext(IoCtxImpl& io, const object_t& oid, uint64_t off, size_t len, + std::map<uint64_t,uint64_t>& m); + int sparse_read(IoCtxImpl& io, const object_t& oid, std::map<uint64_t,uint64_t>& m, + bufferlist& bl, size_t len, uint64_t off); + int remove(IoCtxImpl& io, const object_t& oid); + int stat(IoCtxImpl& io, const object_t& oid, uint64_t *psize, time_t *pmtime); + int trunc(IoCtxImpl& io, const object_t& oid, uint64_t size); + + int tmap_update(IoCtxImpl& io, const object_t& oid, bufferlist& cmdbl); + int tmap_put(IoCtxImpl& io, const object_t& oid, bufferlist& bl); + int tmap_get(IoCtxImpl& io, const object_t& oid, bufferlist& bl); + + int exec(IoCtxImpl& io, const object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl); + + int getxattr(IoCtxImpl& io, const object_t& oid, const char *name, bufferlist& bl); + int setxattr(IoCtxImpl& io, const object_t& oid, const char *name, bufferlist& bl); + int getxattrs(IoCtxImpl& io, const object_t& oid, map<string, bufferlist>& attrset); + int rmxattr(IoCtxImpl& io, const object_t& oid, const char *name); + + int pool_list(std::list<string>& ls); + int get_pool_stats(std::list<string>& ls, map<string,::pool_stat_t>& result); + int get_fs_stats(ceph_statfs& result); + + int pool_create(string& name, unsigned long long auid=0, __u8 crush_rule=0); + int pool_create_async(string& name, PoolAsyncCompletionImpl *c, unsigned long long auid=0, + __u8 crush_rule=0); + int pool_delete(const char *name); + int pool_change_auid(rados_ioctx_t io, unsigned long long auid); + int pool_get_auid(rados_ioctx_t io, unsigned long long *auid); + + int pool_delete_async(const char *name, PoolAsyncCompletionImpl *c); + int pool_change_auid_async(rados_ioctx_t io, unsigned long long auid, PoolAsyncCompletionImpl *c); + + int list(Objecter::ListContext *context, int max_entries); + + int operate(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, time_t *pmtime); + int operate_read(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, bufferlist *pbl); + int aio_operate(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c); + int aio_operate_read(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c, bufferlist *pbl); + + struct C_aio_Ack : public Context { + librados::AioCompletionImpl *c; + C_aio_Ack(AioCompletionImpl *_c); + void finish(int r); + }; + + struct C_aio_sparse_read_Ack : public Context { + AioCompletionImpl *c; + bufferlist *data_bl; + std::map<uint64_t,uint64_t> *m; + C_aio_sparse_read_Ack(AioCompletionImpl *_c); + void finish(int r); + }; + + struct C_aio_Safe : public Context { + AioCompletionImpl *c; + C_aio_Safe(AioCompletionImpl *_c); + void finish(int r); + }; + + int aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c, + bufferlist *pbl, size_t len, uint64_t off); + int aio_read(IoCtxImpl& io, object_t oid, AioCompletionImpl *c, + char *buf, size_t len, uint64_t off); + int aio_sparse_read(IoCtxImpl& io, const object_t oid, + AioCompletionImpl *c, std::map<uint64_t,uint64_t> *m, + bufferlist *data_bl, size_t len, uint64_t off); + int aio_write(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, + const bufferlist& bl, size_t len, uint64_t off); + int aio_append(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, + const bufferlist& bl, size_t len); + int aio_write_full(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, + const bufferlist& bl); + int aio_exec(IoCtxImpl& io, const object_t& oid, AioCompletionImpl *c, + const char *cls, const char *method, bufferlist& inbl, bufferlist *outbl); + + struct C_PoolAsync_Safe : public Context { + PoolAsyncCompletionImpl *c; + C_PoolAsync_Safe(PoolAsyncCompletionImpl *_c); + void finish(int r); + }; + + static PoolAsyncCompletionImpl *pool_async_create_completion(); + static PoolAsyncCompletionImpl *pool_async_create_completion(void *cb_arg, + rados_callback_t cb); + static AioCompletionImpl *aio_create_completion(); + static AioCompletionImpl *aio_create_completion(void *cb_arg, + rados_callback_t cb_complete, + rados_callback_t cb_safe); + + // watch/notify + struct WatchContext { + IoCtxImpl *io_ctx_impl; + const object_t oid; + uint64_t cookie; + uint64_t ver; + librados::WatchCtx *ctx; + uint64_t linger_id; + + WatchContext(IoCtxImpl *io_ctx_impl_, + const object_t& _oc, + librados::WatchCtx *_ctx); + ~WatchContext(); + void notify(RadosClient *client, MWatchNotify *m); + }; + + struct C_NotifyComplete : public librados::WatchCtx { + Mutex *lock; + Cond *cond; + bool *done; + + C_NotifyComplete(Mutex *_l, Cond *_c, bool *_d); + void notify(uint8_t opcode, uint64_t ver, bufferlist& bl); + }; + + uint64_t max_watch_cookie; + map<uint64_t, WatchContext *> watchers; + + void set_sync_op_version(IoCtxImpl& io, eversion_t& ver); + + void register_watcher(IoCtxImpl& io, const object_t& oid, + librados::WatchCtx *ctx, uint64_t *cookie, + WatchContext **pwc = NULL); + void unregister_watcher(uint64_t cookie); + void watch_notify(MWatchNotify *m); + int watch(IoCtxImpl& io, const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx); + int unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie); + int notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl); + int _notify_ack(IoCtxImpl& io, const object_t& oid, uint64_t notify_id, uint64_t ver); + + eversion_t last_version(IoCtxImpl& io); + void set_assert_version(IoCtxImpl& io, uint64_t ver); + void set_assert_src_version(IoCtxImpl& io, const object_t& oid, uint64_t ver); + void set_notify_timeout(IoCtxImpl& io, uint32_t timeout); +}; + +#endif diff --git a/src/librados/librados.cc b/src/librados/librados.cc new file mode 100644 index 00000000000..48c988fccbb --- /dev/null +++ b/src/librados/librados.cc @@ -0,0 +1,2046 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +using namespace std; + +#include "common/config.h" +#include "common/errno.h" +#include "common/ceph_argparse.h" +#include "common/common_init.h" +#include "include/rados/librados.h" +#include "include/rados/librados.hpp" +#include "include/types.h" + +#include "librados/AioCompletionImpl.h" +#include "librados/IoCtxImpl.h" +#include "librados/PoolAsyncCompletionImpl.h" +#include "librados/RadosClient.h" + +#define DOUT_SUBSYS rados +#undef dout_prefix +#define dout_prefix *_dout << "librados: " + +#define RADOS_LIST_MAX_ENTRIES 1024 + +/* + * Structure of this file + * + * RadosClient and the related classes are the internal implementation of librados. + * Above that layer sits the C API, found in include/rados/librados.h, and + * the C++ API, found in include/rados/librados.hpp + * + * The C++ API sometimes implements things in terms of the C API. + * Both the C++ and C API rely on RadosClient. + * + * Visually: + * +--------------------------------------+ + * | C++ API | + * +--------------------+ | + * | C API | | + * +--------------------+-----------------+ + * | RadosClient | + * +--------------------------------------+ + */ + +size_t librados::ObjectOperation::size() +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + return o->size(); +} + +void librados::ObjectOperation::set_op_flags(ObjectOperationFlags flags) +{ + int rados_flags = 0; + if (flags & OP_EXCL) + rados_flags |= CEPH_OSD_OP_FLAG_EXCL; + if (flags & OP_FAILOK) + rados_flags |= CEPH_OSD_OP_FLAG_FAILOK; + + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->set_last_op_flags(rados_flags); +} + +void librados::ObjectOperation::cmpxattr(const char *name, uint8_t op, const bufferlist& v) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->cmpxattr(name, op, CEPH_OSD_CMPXATTR_MODE_STRING, v); +} + +void librados::ObjectOperation::cmpxattr(const char *name, uint8_t op, uint64_t v) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + bufferlist bl; + ::encode(v, bl); + o->cmpxattr(name, op, CEPH_OSD_CMPXATTR_MODE_U64, bl); +} + +void librados::ObjectOperation::src_cmpxattr(const std::string& src_oid, + const char *name, int op, const bufferlist& v) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + object_t oid(src_oid); + o->src_cmpxattr(oid, CEPH_NOSNAP, name, v, op, CEPH_OSD_CMPXATTR_MODE_STRING); +} + +void librados::ObjectOperation::src_cmpxattr(const std::string& src_oid, + const char *name, int op, uint64_t val) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + object_t oid(src_oid); + bufferlist bl; + ::encode(val, bl); + o->src_cmpxattr(oid, CEPH_NOSNAP, name, bl, op, CEPH_OSD_CMPXATTR_MODE_U64); +} + +void librados::ObjectOperation::exec(const char *cls, const char *method, bufferlist& inbl) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->call(cls, method, inbl); +} + +void librados::ObjectReadOperation::stat(uint64_t *psize, time_t *pmtime, int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->stat(psize, pmtime, prval); +} + +void librados::ObjectReadOperation::read(size_t off, uint64_t len, bufferlist *pbl, int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->read(off, len, pbl, prval); +} + +void librados::ObjectReadOperation::tmap_get(bufferlist *pbl, int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->tmap_get(pbl, prval); +} + +void librados::ObjectReadOperation::getxattr(const char *name, bufferlist *pbl, int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->getxattr(name, pbl, prval); +} + +void librados::ObjectReadOperation::omap_get_vals( + const std::string &start_after, + const std::string &filter_prefix, + uint64_t max_return, + std::map<std::string, bufferlist> *out_vals, + int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_get_vals(start_after, filter_prefix, max_return, out_vals, prval); +} + +void librados::ObjectReadOperation::omap_get_vals( + const std::string &start_after, + uint64_t max_return, + std::map<std::string, bufferlist> *out_vals, + int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_get_vals(start_after, "", max_return, out_vals, prval); +} + +void librados::ObjectReadOperation::omap_get_keys( + const std::string &start_after, + uint64_t max_return, + std::set<std::string> *out_keys, + int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_get_keys(start_after, max_return, out_keys, prval); +} + +void librados::ObjectReadOperation::omap_get_header(bufferlist *bl, int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_get_header(bl, prval); +} + +void librados::ObjectReadOperation::omap_get_vals_by_keys( + const std::set<std::string> &keys, + std::map<std::string, bufferlist> *map, + int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_get_vals_by_keys(keys, map, prval); +} + +int librados::IoCtx::omap_get_vals(const std::string& oid, + const std::string& start_after, + const std::string& filter_prefix, + uint64_t max_return, + std::map<std::string, bufferlist> *out_vals) +{ + ObjectReadOperation op; + int r; + op.omap_get_vals(start_after, filter_prefix, max_return, out_vals, &r); + bufferlist bl; + int ret = operate(oid, &op, &bl); + if (ret < 0) + return ret; + + return r; +} + +void librados::ObjectReadOperation::getxattrs(map<string, bufferlist> *pattrs, int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->getxattrs(pattrs, prval); +} + +void librados::ObjectWriteOperation::create(bool exclusive) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->create(exclusive); +} + +void librados::ObjectWriteOperation::create(bool exclusive, const std::string& category) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->create(exclusive, category); +} + +void librados::ObjectWriteOperation::write(uint64_t off, const bufferlist& bl) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + bufferlist c = bl; + o->write(off, c); +} + +void librados::ObjectWriteOperation::write_full(const bufferlist& bl) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + bufferlist c = bl; + o->write_full(c); +} + +void librados::ObjectWriteOperation::append(const bufferlist& bl) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + bufferlist c = bl; + o->append(c); +} + +void librados::ObjectWriteOperation::remove() +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->remove(); +} + +void librados::ObjectWriteOperation::truncate(uint64_t off) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->truncate(off); +} + +void librados::ObjectWriteOperation::zero(uint64_t off, uint64_t len) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->zero(off, len); +} + +void librados::ObjectWriteOperation::rmxattr(const char *name) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->rmxattr(name); +} + +void librados::ObjectWriteOperation::setxattr(const char *name, const bufferlist& v) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->setxattr(name, v); +} + +void librados::ObjectWriteOperation::omap_set( + const map<string, bufferlist> &map) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_set(map); +} + +void librados::ObjectWriteOperation::omap_set_header(const bufferlist &bl) +{ + bufferlist c = bl; + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_set_header(c); +} + +void librados::ObjectWriteOperation::omap_clear() +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_clear(); +} + +void librados::ObjectWriteOperation::omap_rm_keys( + const std::set<std::string> &to_rm) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_rm_keys(to_rm); +} + +void librados::ObjectWriteOperation::tmap_put(const bufferlist &bl) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + bufferlist c = bl; + o->tmap_put(c); +} + +void librados::ObjectWriteOperation::tmap_update(const bufferlist& cmdbl) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + bufferlist c = cmdbl; + o->tmap_update(c); +} + +void librados::ObjectWriteOperation::clone_range(uint64_t dst_off, + const std::string& src_oid, uint64_t src_off, + size_t len) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->clone_range(src_oid, src_off, len, dst_off); +} + +librados::WatchCtx:: +~WatchCtx() +{ +} + + +struct librados::ObjListCtx { + librados::IoCtxImpl *ctx; + Objecter::ListContext *lc; + + ObjListCtx(IoCtxImpl *c, Objecter::ListContext *l) : ctx(c), lc(l) {} + ~ObjListCtx() { + delete lc; + } +}; + +///////////////////////////// ObjectIterator ///////////////////////////// +librados::ObjectIterator::ObjectIterator(ObjListCtx *ctx_) + : ctx(ctx_) +{ +} + +librados::ObjectIterator::~ObjectIterator() +{ + ctx.reset(); +} + +bool librados::ObjectIterator::operator==(const librados::ObjectIterator& rhs) const { + return (ctx.get() == rhs.ctx.get()); +} + +bool librados::ObjectIterator::operator!=(const librados::ObjectIterator& rhs) const { + return (ctx.get() != rhs.ctx.get()); +} + +const pair<std::string, std::string>& librados::ObjectIterator::operator*() const { + return cur_obj; +} + +const pair<std::string, std::string>* librados::ObjectIterator::operator->() const { + return &cur_obj; +} + +librados::ObjectIterator& librados::ObjectIterator::operator++() +{ + get_next(); + return *this; +} + +librados::ObjectIterator librados::ObjectIterator::operator++(int) +{ + librados::ObjectIterator ret(*this); + get_next(); + return ret; +} + +void librados::ObjectIterator::get_next() +{ + const char *entry, *key; + int ret = rados_objects_list_next(ctx.get(), &entry, &key); + if (ret == -ENOENT) { + ctx.reset(); + *this = __EndObjectIterator; + return; + } + else if (ret) { + ostringstream oss; + oss << "rados returned " << cpp_strerror(ret); + throw std::runtime_error(oss.str()); + } + + cur_obj = make_pair(entry, key ? key : string()); +} + +const librados::ObjectIterator librados::ObjectIterator::__EndObjectIterator(NULL); + +///////////////////////////// PoolAsyncCompletion ////////////////////////////// +int librados::PoolAsyncCompletion::PoolAsyncCompletion::set_callback(void *cb_arg, + rados_callback_t cb) +{ + PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; + return c->set_callback(cb_arg, cb); +} + +int librados::PoolAsyncCompletion::PoolAsyncCompletion::wait() +{ + PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; + return c->wait(); +} + +bool librados::PoolAsyncCompletion::PoolAsyncCompletion::is_complete() +{ + PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; + return c->is_complete(); +} + +int librados::PoolAsyncCompletion::PoolAsyncCompletion::get_return_value() +{ + PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; + return c->get_return_value(); +} + +void librados::PoolAsyncCompletion::PoolAsyncCompletion::release() +{ + PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; + c->release(); + delete this; +} + +///////////////////////////// AioCompletion ////////////////////////////// +int librados::AioCompletion::AioCompletion::set_complete_callback(void *cb_arg, rados_callback_t cb) +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + return c->set_complete_callback(cb_arg, cb); +} + +int librados::AioCompletion::AioCompletion::set_safe_callback(void *cb_arg, rados_callback_t cb) +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + return c->set_safe_callback(cb_arg, cb); +} + +int librados::AioCompletion::AioCompletion::wait_for_complete() +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + return c->wait_for_complete(); +} + +int librados::AioCompletion::AioCompletion::wait_for_safe() +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + return c->wait_for_safe(); +} + +bool librados::AioCompletion::AioCompletion::is_complete() +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + return c->is_complete(); +} + +bool librados::AioCompletion::AioCompletion::is_safe() +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + return c->is_safe(); +} + +int librados::AioCompletion::AioCompletion::get_return_value() +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + return c->get_return_value(); +} + +int librados::AioCompletion::AioCompletion::get_version() +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + return c->get_version(); +} + +void librados::AioCompletion::AioCompletion::release() +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + c->release(); + delete this; +} + +///////////////////////////// IoCtx ////////////////////////////// +librados::IoCtx::IoCtx() : io_ctx_impl(NULL) +{ +} + +void librados::IoCtx::from_rados_ioctx_t(rados_ioctx_t p, IoCtx &io) +{ + IoCtxImpl *io_ctx_impl = (IoCtxImpl*)p; + + io.io_ctx_impl = io_ctx_impl; + if (io_ctx_impl) { + io_ctx_impl->get(); + } +} + +librados::IoCtx::IoCtx(const IoCtx& rhs) +{ + io_ctx_impl = rhs.io_ctx_impl; + if (io_ctx_impl) { + io_ctx_impl->get(); + } +} + +librados::IoCtx& librados::IoCtx::operator=(const IoCtx& rhs) +{ + if (io_ctx_impl) + io_ctx_impl->put(); + io_ctx_impl = rhs.io_ctx_impl; + io_ctx_impl->get(); + return *this; +} + +librados::IoCtx::~IoCtx() +{ + close(); +} + +void librados::IoCtx::close() +{ + if (io_ctx_impl) + io_ctx_impl->put(); + io_ctx_impl = 0; +} + +void librados::IoCtx::dup(const IoCtx& rhs) +{ + if (io_ctx_impl) + io_ctx_impl->put(); + io_ctx_impl = new IoCtxImpl(); + io_ctx_impl->get(); + io_ctx_impl->dup(*rhs.io_ctx_impl); +} + +int librados::IoCtx::set_auid(uint64_t auid_) +{ + return io_ctx_impl->client->pool_change_auid(io_ctx_impl, auid_); +} + +int librados::IoCtx::set_auid_async(uint64_t auid_, PoolAsyncCompletion *c) +{ + return io_ctx_impl->client->pool_change_auid_async(io_ctx_impl, auid_, c->pc); +} + +int librados::IoCtx::get_auid(uint64_t *auid_) +{ + return io_ctx_impl->client->pool_get_auid(io_ctx_impl, (unsigned long long *)auid_); +} + +int librados::IoCtx::create(const std::string& oid, bool exclusive) +{ + object_t obj(oid); + return io_ctx_impl->client->create(*io_ctx_impl, obj, exclusive); +} + +int librados::IoCtx::create(const std::string& oid, bool exclusive, const std::string& category) +{ + object_t obj(oid); + return io_ctx_impl->client->create(*io_ctx_impl, obj, exclusive, category); +} + +int librados::IoCtx::write(const std::string& oid, bufferlist& bl, size_t len, uint64_t off) +{ + object_t obj(oid); + return io_ctx_impl->client->write(*io_ctx_impl, obj, bl, len, off); +} + +int librados::IoCtx::append(const std::string& oid, bufferlist& bl, size_t len) +{ + object_t obj(oid); + return io_ctx_impl->client->append(*io_ctx_impl, obj, bl, len); +} + +int librados::IoCtx::write_full(const std::string& oid, bufferlist& bl) +{ + object_t obj(oid); + return io_ctx_impl->client->write_full(*io_ctx_impl, obj, bl); +} + +int librados::IoCtx::clone_range(const std::string& dst_oid, uint64_t dst_off, + const std::string& src_oid, uint64_t src_off, + size_t len) +{ + object_t src(src_oid), dst(dst_oid); + return io_ctx_impl->client->clone_range(*io_ctx_impl, dst, dst_off, src, src_off, len); +} + +int librados::IoCtx::read(const std::string& oid, bufferlist& bl, size_t len, uint64_t off) +{ + object_t obj(oid); + return io_ctx_impl->client->read(*io_ctx_impl, obj, bl, len, off); +} + +int librados::IoCtx::remove(const std::string& oid) +{ + object_t obj(oid); + return io_ctx_impl->client->remove(*io_ctx_impl, obj); +} + +int librados::IoCtx::trunc(const std::string& oid, uint64_t size) +{ + object_t obj(oid); + return io_ctx_impl->client->trunc(*io_ctx_impl, obj, size); +} + +int librados::IoCtx::mapext(const std::string& oid, uint64_t off, size_t len, + std::map<uint64_t,uint64_t>& m) +{ + object_t obj(oid); + return io_ctx_impl->client->mapext(*io_ctx_impl, oid, off, len, m); +} + +int librados::IoCtx::sparse_read(const std::string& oid, std::map<uint64_t,uint64_t>& m, + bufferlist& bl, size_t len, uint64_t off) +{ + object_t obj(oid); + return io_ctx_impl->client->sparse_read(*io_ctx_impl, oid, m, bl, len, off); +} + +int librados::IoCtx::getxattr(const std::string& oid, const char *name, bufferlist& bl) +{ + object_t obj(oid); + return io_ctx_impl->client->getxattr(*io_ctx_impl, obj, name, bl); +} + +int librados::IoCtx::getxattrs(const std::string& oid, map<std::string, bufferlist>& attrset) +{ + object_t obj(oid); + return io_ctx_impl->client->getxattrs(*io_ctx_impl, obj, attrset); +} + +int librados::IoCtx::setxattr(const std::string& oid, const char *name, bufferlist& bl) +{ + object_t obj(oid); + return io_ctx_impl->client->setxattr(*io_ctx_impl, obj, name, bl); +} + +int librados::IoCtx::rmxattr(const std::string& oid, const char *name) +{ + object_t obj(oid); + return io_ctx_impl->client->rmxattr(*io_ctx_impl, obj, name); +} + +int librados::IoCtx::stat(const std::string& oid, uint64_t *psize, time_t *pmtime) +{ + object_t obj(oid); + return io_ctx_impl->client->stat(*io_ctx_impl, oid, psize, pmtime); +} + +int librados::IoCtx::exec(const std::string& oid, const char *cls, const char *method, + bufferlist& inbl, bufferlist& outbl) +{ + object_t obj(oid); + return io_ctx_impl->client->exec(*io_ctx_impl, obj, cls, method, inbl, outbl); +} + +int librados::IoCtx::tmap_update(const std::string& oid, bufferlist& cmdbl) +{ + object_t obj(oid); + return io_ctx_impl->client->tmap_update(*io_ctx_impl, obj, cmdbl); +} + +int librados::IoCtx::tmap_put(const std::string& oid, bufferlist& bl) +{ + object_t obj(oid); + return io_ctx_impl->client->tmap_put(*io_ctx_impl, obj, bl); +} + +int librados::IoCtx::tmap_get(const std::string& oid, bufferlist& bl) +{ + object_t obj(oid); + return io_ctx_impl->client->tmap_get(*io_ctx_impl, obj, bl); +} + +int librados::IoCtx::omap_get_vals(const std::string& oid, + const std::string& start_after, + uint64_t max_return, + std::map<std::string, bufferlist> *out_vals) +{ + ObjectReadOperation op; + int r; + op.omap_get_vals(start_after, max_return, out_vals, &r); + bufferlist bl; + int ret = operate(oid, &op, &bl); + if (ret < 0) + return ret; + + return r; +} + +int librados::IoCtx::omap_get_keys(const std::string& oid, + const std::string& start_after, + uint64_t max_return, + std::set<std::string> *out_keys) +{ + ObjectReadOperation op; + int r; + op.omap_get_keys(start_after, max_return, out_keys, &r); + bufferlist bl; + int ret = operate(oid, &op, &bl); + if (ret < 0) + return ret; + + return r; +} + +int librados::IoCtx::omap_get_header(const std::string& oid, + bufferlist *bl) +{ + ObjectReadOperation op; + int r; + op.omap_get_header(bl, &r); + bufferlist b; + int ret = operate(oid, &op, &b); + if (ret < 0) + return ret; + + return r; +} + +int librados::IoCtx::omap_get_vals_by_keys(const std::string& oid, + const std::set<std::string>& keys, + std::map<std::string, bufferlist> *vals) +{ + ObjectReadOperation op; + int r; + bufferlist bl; + op.omap_get_vals_by_keys(keys, vals, &r); + int ret = operate(oid, &op, &bl); + if (ret < 0) + return ret; + + return r; +} + +int librados::IoCtx::omap_set(const std::string& oid, + const map<string, bufferlist>& m) +{ + ObjectWriteOperation op; + op.omap_set(m); + return operate(oid, &op); +} + +int librados::IoCtx::omap_set_header(const std::string& oid, + const bufferlist& bl) +{ + ObjectWriteOperation op; + op.omap_set_header(bl); + return operate(oid, &op); +} + +int librados::IoCtx::omap_clear(const std::string& oid) +{ + ObjectWriteOperation op; + op.omap_clear(); + return operate(oid, &op); +} + +int librados::IoCtx::omap_rm_keys(const std::string& oid, + const std::set<std::string>& keys) +{ + ObjectWriteOperation op; + op.omap_rm_keys(keys); + return operate(oid, &op); +} + +int librados::IoCtx::operate(const std::string& oid, librados::ObjectWriteOperation *o) +{ + object_t obj(oid); + return io_ctx_impl->client->operate(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, o->pmtime); +} + +int librados::IoCtx::operate(const std::string& oid, librados::ObjectReadOperation *o, bufferlist *pbl) +{ + object_t obj(oid); + return io_ctx_impl->client->operate_read(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, pbl); +} + +int librados::IoCtx::aio_operate(const std::string& oid, AioCompletion *c, librados::ObjectWriteOperation *o) +{ + object_t obj(oid); + return io_ctx_impl->client->aio_operate(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, c->pc); +} + +int librados::IoCtx::aio_operate(const std::string& oid, AioCompletion *c, librados::ObjectReadOperation *o, bufferlist *pbl) +{ + object_t obj(oid); + return io_ctx_impl->client->aio_operate_read(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, c->pc, pbl); +} + +void librados::IoCtx::snap_set_read(snap_t seq) +{ + io_ctx_impl->set_snap_read(seq); +} + +int librados::IoCtx::selfmanaged_snap_set_write_ctx(snap_t seq, vector<snap_t>& snaps) +{ + vector<snapid_t> snv; + snv.resize(snaps.size()); + for (unsigned i=0; i<snaps.size(); i++) + snv[i] = snaps[i]; + return io_ctx_impl->set_snap_write_context(seq, snv); +} + +int librados::IoCtx::snap_create(const char *snapname) +{ + return io_ctx_impl->client->snap_create(io_ctx_impl, snapname); +} + +int librados::IoCtx::snap_lookup(const char *name, snap_t *snapid) +{ + return io_ctx_impl->client->snap_lookup(io_ctx_impl, name, snapid); +} + +int librados::IoCtx::snap_get_stamp(snap_t snapid, time_t *t) +{ + return io_ctx_impl->client->snap_get_stamp(io_ctx_impl, snapid, t); +} + +int librados::IoCtx::snap_get_name(snap_t snapid, std::string *s) +{ + return io_ctx_impl->client->snap_get_name(io_ctx_impl, snapid, s); +} + +int librados::IoCtx::snap_remove(const char *snapname) +{ + return io_ctx_impl->client->snap_remove(io_ctx_impl, snapname); +} + +int librados::IoCtx::snap_list(std::vector<snap_t> *snaps) +{ + return io_ctx_impl->client->snap_list(io_ctx_impl, snaps); +} + +int librados::IoCtx::rollback(const std::string& oid, const char *snapname) +{ + return io_ctx_impl->client->rollback(io_ctx_impl, oid, snapname); +} + +int librados::IoCtx::selfmanaged_snap_create(uint64_t *snapid) +{ + return io_ctx_impl->client->selfmanaged_snap_create(io_ctx_impl, snapid); +} + +int librados::IoCtx::selfmanaged_snap_remove(uint64_t snapid) +{ + return io_ctx_impl->client->selfmanaged_snap_remove(io_ctx_impl, snapid); +} + +int librados::IoCtx::selfmanaged_snap_rollback(const std::string& oid, uint64_t snapid) +{ + return io_ctx_impl->client->selfmanaged_snap_rollback_object(io_ctx_impl, + oid, + io_ctx_impl->snapc, + snapid); +} + +librados::ObjectIterator librados::IoCtx::objects_begin() +{ + rados_list_ctx_t listh; + rados_objects_list_open(io_ctx_impl, &listh); + ObjectIterator iter((ObjListCtx*)listh); + iter.get_next(); + return iter; +} + +const librados::ObjectIterator& librados::IoCtx::objects_end() const +{ + return ObjectIterator::__EndObjectIterator; +} + +uint64_t librados::IoCtx::get_last_version() +{ + eversion_t ver = io_ctx_impl->client->last_version(*io_ctx_impl); + return ver.version; +} + +int librados::IoCtx::aio_read(const std::string& oid, librados::AioCompletion *c, + bufferlist *pbl, size_t len, uint64_t off) +{ + return io_ctx_impl->client->aio_read(*io_ctx_impl, oid, c->pc, pbl, len, off); +} + +int librados::IoCtx::aio_exec(const std::string& oid, librados::AioCompletion *c, const char *cls, const char *method, + bufferlist& inbl, bufferlist *outbl) +{ + object_t obj(oid); + return io_ctx_impl->client->aio_exec(*io_ctx_impl, obj, c->pc, cls, method, inbl, outbl); +} + +int librados::IoCtx::aio_sparse_read(const std::string& oid, librados::AioCompletion *c, + std::map<uint64_t,uint64_t> *m, bufferlist *data_bl, + size_t len, uint64_t off) +{ + return io_ctx_impl->client->aio_sparse_read(*io_ctx_impl, oid, c->pc, + m, data_bl, len, off); +} + +int librados::IoCtx::aio_write(const std::string& oid, librados::AioCompletion *c, + const bufferlist& bl, size_t len, uint64_t off) +{ + return io_ctx_impl->client->aio_write(*io_ctx_impl, oid, c->pc, bl, len, off); +} + +int librados::IoCtx::aio_append(const std::string& oid, librados::AioCompletion *c, + const bufferlist& bl, size_t len) +{ + return io_ctx_impl->client->aio_append(*io_ctx_impl, oid, c->pc, bl, len); +} + +int librados::IoCtx::aio_write_full(const std::string& oid, librados::AioCompletion *c, + const bufferlist& bl) +{ + object_t obj(oid); + return io_ctx_impl->client->aio_write_full(*io_ctx_impl, obj, c->pc, bl); +} + +int librados::IoCtx::aio_flush() +{ + io_ctx_impl->flush_aio_writes(); + return 0; +} + +int librados::IoCtx::watch(const string& oid, uint64_t ver, uint64_t *cookie, + librados::WatchCtx *ctx) +{ + object_t obj(oid); + return io_ctx_impl->client->watch(*io_ctx_impl, obj, ver, cookie, ctx); +} + +int librados::IoCtx::unwatch(const string& oid, uint64_t handle) +{ + uint64_t cookie = handle; + object_t obj(oid); + return io_ctx_impl->client->unwatch(*io_ctx_impl, obj, cookie); +} + +int librados::IoCtx::notify(const string& oid, uint64_t ver, bufferlist& bl) +{ + object_t obj(oid); + return io_ctx_impl->client->notify(*io_ctx_impl, obj, ver, bl); +} + +void librados::IoCtx::set_notify_timeout(uint32_t timeout) +{ + io_ctx_impl->client->set_notify_timeout(*io_ctx_impl, timeout); +} + +void librados::IoCtx::set_assert_version(uint64_t ver) +{ + io_ctx_impl->client->set_assert_version(*io_ctx_impl, ver); +} + +void librados::IoCtx::set_assert_src_version(const std::string& oid, uint64_t ver) +{ + object_t obj(oid); + io_ctx_impl->client->set_assert_src_version(*io_ctx_impl, obj, ver); +} + +const std::string& librados::IoCtx::get_pool_name() const +{ + return io_ctx_impl->pool_name; +} + +void librados::IoCtx::locator_set_key(const string& key) +{ + io_ctx_impl->oloc.key = key; +} + +int64_t librados::IoCtx::get_id() +{ + return io_ctx_impl->get_id(); +} + +librados::config_t librados::IoCtx::cct() +{ + return (config_t)io_ctx_impl->client->cct; +} + +librados::IoCtx::IoCtx(IoCtxImpl *io_ctx_impl_) + : io_ctx_impl(io_ctx_impl_) +{ +} + +///////////////////////////// Rados ////////////////////////////// +void librados::Rados::version(int *major, int *minor, int *extra) +{ + rados_version(major, minor, extra); +} + +librados::Rados::Rados() : client(NULL) +{ +} + +librados::Rados::~Rados() +{ + shutdown(); +} + +int librados::Rados::init(const char * const id) +{ + return rados_create((rados_t *)&client, id); +} + +int librados::Rados::init_with_context(config_t cct_) +{ + return rados_create_with_context((rados_t *)&client, (rados_config_t)cct_); +} + +int librados::Rados::connect() +{ + return client->connect(); +} + +librados::config_t librados::Rados::cct() +{ + return (config_t)client->cct; +} + +void librados::Rados::shutdown() +{ + if (!client) + return; + client->shutdown(); + delete client; + client = NULL; +} + +int librados::Rados::conf_read_file(const char * const path) const +{ + return rados_conf_read_file((rados_t)client, path); +} + +int librados::Rados::conf_parse_argv(int argc, const char ** argv) const +{ + return rados_conf_parse_argv((rados_t)client, argc, argv); +} + +int librados::Rados::conf_parse_env(const char *name) const +{ + return rados_conf_parse_env((rados_t)client, name); +} + +int librados::Rados::conf_set(const char *option, const char *value) +{ + return rados_conf_set((rados_t)client, option, value); +} + +int librados::Rados::conf_get(const char *option, std::string &val) +{ + char *str; + md_config_t *conf = client->cct->_conf; + int ret = conf->get_val(option, &str, -1); + if (ret) + return ret; + val = str; + free(str); + return 0; +} + +int librados::Rados::pool_create(const char *name) +{ + string str(name); + return client->pool_create(str); +} + +int librados::Rados::pool_create(const char *name, uint64_t auid) +{ + string str(name); + return client->pool_create(str, auid); +} + +int librados::Rados::pool_create(const char *name, uint64_t auid, __u8 crush_rule) +{ + string str(name); + return client->pool_create(str, auid, crush_rule); +} + +int librados::Rados::pool_create_async(const char *name, PoolAsyncCompletion *c) +{ + string str(name); + return client->pool_create_async(str, c->pc); +} + +int librados::Rados::pool_create_async(const char *name, uint64_t auid, PoolAsyncCompletion *c) +{ + string str(name); + return client->pool_create_async(str, c->pc, auid); +} + +int librados::Rados::pool_create_async(const char *name, uint64_t auid, __u8 crush_rule, + PoolAsyncCompletion *c) +{ + string str(name); + return client->pool_create_async(str, c->pc, auid, crush_rule); +} + +int librados::Rados::pool_delete(const char *name) +{ + return client->pool_delete(name); +} + +int librados::Rados::pool_delete_async(const char *name, PoolAsyncCompletion *c) +{ + return client->pool_delete_async(name, c->pc); +} + +int librados::Rados::pool_list(std::list<std::string>& v) +{ + return client->pool_list(v); +} + +int64_t librados::Rados::pool_lookup(const char *name) +{ + return client->lookup_pool(name); +} + +int librados::Rados::ioctx_create(const char *name, IoCtx &io) +{ + rados_ioctx_t p; + int ret = rados_ioctx_create((rados_t)client, name, &p); + if (ret) + return ret; + io.io_ctx_impl = (IoCtxImpl*)p; + return 0; +} + +int librados::Rados::get_pool_stats(std::list<string>& v, std::map<string, stats_map>& result) +{ + string category; + return get_pool_stats(v, category, result); +} + +int librados::Rados::get_pool_stats(std::list<string>& v, string& category, + std::map<string, stats_map>& result) +{ + map<string,::pool_stat_t> rawresult; + int r = client->get_pool_stats(v, rawresult); + for (map<string,::pool_stat_t>::iterator p = rawresult.begin(); + p != rawresult.end(); + p++) { + stats_map& c = result[p->first]; + + string cat; + vector<string> cats; + + if (!category.size()) { + cats.push_back(cat); + map<string,object_stat_sum_t>::iterator iter; + for (iter = p->second.stats.cat_sum.begin(); iter != p->second.stats.cat_sum.end(); ++iter) { + cats.push_back(iter->first); + } + } else { + cats.push_back(category); + } + + vector<string>::iterator cat_iter; + for (cat_iter = cats.begin(); cat_iter != cats.end(); ++cat_iter) { + string& cur_category = *cat_iter; + object_stat_sum_t *sum; + + if (!cur_category.size()) { + sum = &p->second.stats.sum; + } else { + map<string,object_stat_sum_t>::iterator iter = p->second.stats.cat_sum.find(cur_category); + if (iter == p->second.stats.cat_sum.end()) + continue; + sum = &iter->second; + } + + pool_stat_t& pv = c[cur_category]; + pv.num_kb = SHIFT_ROUND_UP(sum->num_bytes, 10); + pv.num_bytes = sum->num_bytes; + pv.num_objects = sum->num_objects; + pv.num_object_clones = sum->num_object_clones; + pv.num_object_copies = sum->num_object_copies; + pv.num_objects_missing_on_primary = sum->num_objects_missing_on_primary; + pv.num_objects_unfound = sum->num_objects_unfound; + pv.num_objects_degraded = sum->num_objects_degraded; + pv.num_rd = sum->num_rd; + pv.num_rd_kb = sum->num_rd_kb; + pv.num_wr = sum->num_wr; + pv.num_wr_kb = sum->num_wr_kb; + } + } + return r; +} + +int librados::Rados::cluster_stat(cluster_stat_t& result) +{ + ceph_statfs stats; + int r = client->get_fs_stats(stats); + result.kb = stats.kb; + result.kb_used = stats.kb_used; + result.kb_avail = stats.kb_avail; + result.num_objects = stats.num_objects; + return r; +} + +librados::PoolAsyncCompletion *librados::Rados::pool_async_create_completion() +{ + PoolAsyncCompletionImpl *c = RadosClient::pool_async_create_completion(); + return new PoolAsyncCompletion(c); +} + +librados::AioCompletion *librados::Rados::aio_create_completion() +{ + AioCompletionImpl *c = RadosClient::aio_create_completion(); + return new AioCompletion(c); +} + +librados::AioCompletion *librados::Rados::aio_create_completion(void *cb_arg, + callback_t cb_complete, + callback_t cb_safe) +{ + AioCompletionImpl *c = RadosClient::aio_create_completion(cb_arg, cb_complete, cb_safe); + return new AioCompletion(c); +} + + +librados::ObjectOperation::ObjectOperation() +{ + impl = (ObjectOperationImpl *)new ::ObjectOperation; +} + +librados::ObjectOperation::~ObjectOperation() +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + delete o; +} + +///////////////////////////// C API ////////////////////////////// +extern "C" int rados_create(rados_t *pcluster, const char * const id) +{ + CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT); + if (id) { + iparams.name.set(CEPH_ENTITY_TYPE_CLIENT, id); + } + + CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, 0); + cct->_conf->parse_env(); // environment variables override + cct->_conf->apply_changes(NULL); + + librados::RadosClient *radosp = new librados::RadosClient(cct); + *pcluster = (void *)radosp; + return 0; +} + +/* This function is intended for use by Ceph daemons. These daemons have + * already called global_init and want to use that particular configuration for + * their cluster. + */ +extern "C" int rados_create_with_context(rados_t *pcluster, rados_config_t cct_) +{ + CephContext *cct = (CephContext *)cct_; + librados::RadosClient *radosp = new librados::RadosClient(cct); + *pcluster = (void *)radosp; + return 0; +} + +extern "C" rados_config_t rados_cct(rados_t cluster) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + return (rados_config_t)client->cct; +} + +extern "C" int rados_connect(rados_t cluster) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + return client->connect(); +} + +extern "C" void rados_shutdown(rados_t cluster) +{ + librados::RadosClient *radosp = (librados::RadosClient *)cluster; + radosp->shutdown(); + delete radosp; +} + +extern "C" void rados_version(int *major, int *minor, int *extra) +{ + if (major) + *major = LIBRADOS_VER_MAJOR; + if (minor) + *minor = LIBRADOS_VER_MINOR; + if (extra) + *extra = LIBRADOS_VER_EXTRA; +} + + +// -- config -- +extern "C" int rados_conf_read_file(rados_t cluster, const char *path_list) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + md_config_t *conf = client->cct->_conf; + std::deque<std::string> parse_errors; + int ret = conf->parse_config_files(path_list, &parse_errors, 0); + if (ret) + return ret; + conf->parse_env(); // environment variables override + + conf->apply_changes(NULL); + complain_about_parse_errors(client->cct, &parse_errors); + return 0; +} + +extern "C" int rados_conf_parse_argv(rados_t cluster, int argc, const char **argv) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + md_config_t *conf = client->cct->_conf; + vector<const char*> args; + argv_to_vec(argc, argv, args); + int ret = conf->parse_argv(args); + if (ret) + return ret; + conf->apply_changes(NULL); + return 0; +} + +extern "C" int rados_conf_parse_env(rados_t cluster, const char *env) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + md_config_t *conf = client->cct->_conf; + vector<const char*> args; + env_to_vec(args, env); + int ret = conf->parse_argv(args); + if (ret) + return ret; + conf->apply_changes(NULL); + return 0; +} + +extern "C" int rados_conf_set(rados_t cluster, const char *option, const char *value) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + md_config_t *conf = client->cct->_conf; + int ret = conf->set_val(option, value); + if (ret) + return ret; + conf->apply_changes(NULL); + return 0; +} + +/* cluster info */ +extern "C" int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + + ceph_statfs stats; + int r = client->get_fs_stats(stats); + result->kb = stats.kb; + result->kb_used = stats.kb_used; + result->kb_avail = stats.kb_avail; + result->num_objects = stats.num_objects; + return r; +} + +extern "C" int rados_conf_get(rados_t cluster, const char *option, char *buf, size_t len) +{ + char *tmp = buf; + librados::RadosClient *client = (librados::RadosClient *)cluster; + md_config_t *conf = client->cct->_conf; + return conf->get_val(option, &tmp, len); +} + +extern "C" int64_t rados_pool_lookup(rados_t cluster, const char *name) +{ + librados::RadosClient *radosp = (librados::RadosClient *)cluster; + return radosp->lookup_pool(name); +} + +extern "C" int rados_pool_list(rados_t cluster, char *buf, size_t len) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + std::list<std::string> pools; + client->pool_list(pools); + + char *b = buf; + if (b) + memset(b, 0, len); + int needed = 0; + std::list<std::string>::const_iterator i = pools.begin(); + std::list<std::string>::const_iterator p_end = pools.end(); + for (; i != p_end; ++i) { + if (len <= 0) + break; + int rl = i->length() + 1; + strncat(b, i->c_str(), len - 2); // leave space for two NULLs + needed += rl; + len -= rl; + b += rl; + } + for (; i != p_end; ++i) { + int rl = i->length() + 1; + needed += rl; + } + return needed + 1; +} + +extern "C" int rados_ioctx_create(rados_t cluster, const char *name, rados_ioctx_t *io) +{ + librados::RadosClient *radosp = (librados::RadosClient *)cluster; + int64_t poolid = radosp->lookup_pool(name); + if (poolid < 0) + return (int)poolid; + + librados::IoCtxImpl *ctx = new librados::IoCtxImpl(radosp, poolid, name, CEPH_NOSNAP); + if (!ctx) + return -ENOMEM; + *io = ctx; + ctx->get(); + return 0; +} + +extern "C" void rados_ioctx_destroy(rados_ioctx_t io) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + ctx->put(); +} + +extern "C" int rados_ioctx_pool_stat(rados_ioctx_t io, struct rados_pool_stat_t *stats) +{ + librados::IoCtxImpl *io_ctx_impl = (librados::IoCtxImpl *)io; + list<string> ls; + ls.push_back(io_ctx_impl->pool_name); + map<string, ::pool_stat_t> rawresult; + + int err = io_ctx_impl->client->get_pool_stats(ls, rawresult); + if (err) + return err; + + ::pool_stat_t& r = rawresult[io_ctx_impl->pool_name]; + stats->num_kb = SHIFT_ROUND_UP(r.stats.sum.num_bytes, 10); + stats->num_bytes = r.stats.sum.num_bytes; + stats->num_objects = r.stats.sum.num_objects; + stats->num_object_clones = r.stats.sum.num_object_clones; + stats->num_object_copies = r.stats.sum.num_object_copies; + stats->num_objects_missing_on_primary = r.stats.sum.num_objects_missing_on_primary; + stats->num_objects_unfound = r.stats.sum.num_objects_unfound; + stats->num_objects_degraded = r.stats.sum.num_objects_degraded; + stats->num_rd = r.stats.sum.num_rd; + stats->num_rd_kb = r.stats.sum.num_rd_kb; + stats->num_wr = r.stats.sum.num_wr; + stats->num_wr_kb = r.stats.sum.num_wr_kb; + return 0; +} + +extern "C" rados_config_t rados_ioctx_cct(rados_ioctx_t io) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return (rados_config_t)ctx->client->cct; +} + +extern "C" void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t seq) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + ctx->set_snap_read((snapid_t)seq); +} + +extern "C" int rados_ioctx_selfmanaged_snap_set_write_ctx(rados_ioctx_t io, + rados_snap_t seq, rados_snap_t *snaps, int num_snaps) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + vector<snapid_t> snv; + snv.resize(num_snaps); + for (int i=0; i<num_snaps; i++) + snv[i] = (snapid_t)snaps[i]; + return ctx->set_snap_write_context((snapid_t)seq, snv); +} + +extern "C" int rados_write(rados_ioctx_t io, const char *o, const char *buf, size_t len, uint64_t off) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + bl.append(buf, len); + return ctx->client->write(*ctx, oid, bl, len, off); +} + +extern "C" int rados_append(rados_ioctx_t io, const char *o, const char *buf, size_t len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + bl.append(buf, len); + return ctx->client->append(*ctx, oid, bl, len); +} + +extern "C" int rados_write_full(rados_ioctx_t io, const char *o, const char *buf, size_t len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + bl.append(buf, len); + return ctx->client->write_full(*ctx, oid, bl); +} + +extern "C" int rados_clone_range(rados_ioctx_t io, const char *dst, uint64_t dst_off, + const char *src, uint64_t src_off, size_t len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t dst_oid(dst), src_oid(src); + return ctx->client->clone_range(*ctx, dst_oid, dst_off, src_oid, src_off, len); +} + +extern "C" int rados_trunc(rados_ioctx_t io, const char *o, uint64_t size) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + return ctx->client->trunc(*ctx, oid, size); +} + +extern "C" int rados_remove(rados_ioctx_t io, const char *o) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + return ctx->client->remove(*ctx, oid); +} + +extern "C" int rados_read(rados_ioctx_t io, const char *o, char *buf, size_t len, uint64_t off) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + int ret; + object_t oid(o); + + bufferlist bl; + bufferptr bp = buffer::create_static(len, buf); + bl.push_back(bp); + + ret = ctx->client->read(*ctx, oid, bl, len, off); + if (ret >= 0) { + if (bl.length() > len) + return -ERANGE; + if (bl.c_str() != buf) + bl.copy(0, bl.length(), buf); + ret = bl.length(); // hrm :/ + } + + return ret; +} + +extern "C" uint64_t rados_get_last_version(rados_ioctx_t io) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + eversion_t ver = ctx->client->last_version(*ctx); + return ver.version; +} + +extern "C" int rados_pool_create(rados_t cluster, const char *name) +{ + librados::RadosClient *radosp = (librados::RadosClient *)cluster; + string sname(name); + return radosp->pool_create(sname); +} + +extern "C" int rados_pool_create_with_auid(rados_t cluster, const char *name, + uint64_t auid) +{ + librados::RadosClient *radosp = (librados::RadosClient *)cluster; + string sname(name); + return radosp->pool_create(sname, auid); +} + +extern "C" int rados_pool_create_with_crush_rule(rados_t cluster, const char *name, + __u8 crush_rule_num) +{ + librados::RadosClient *radosp = (librados::RadosClient *)cluster; + string sname(name); + return radosp->pool_create(sname, 0, crush_rule_num); +} + +extern "C" int rados_pool_create_with_all(rados_t cluster, const char *name, + uint64_t auid, __u8 crush_rule_num) +{ + librados::RadosClient *radosp = (librados::RadosClient *)cluster; + string sname(name); + return radosp->pool_create(sname, auid, crush_rule_num); +} + +extern "C" int rados_pool_delete(rados_t cluster, const char *pool_name) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + return client->pool_delete(pool_name); +} + +extern "C" int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->pool_change_auid(ctx, auid); +} + +extern "C" int rados_ioctx_pool_get_auid(rados_ioctx_t io, uint64_t *auid) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->pool_get_auid(ctx, (unsigned long long *)auid); +} + +extern "C" void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + if (key) + ctx->oloc.key = key; + else + ctx->oloc.key = ""; +} + +extern "C" int64_t rados_ioctx_get_id(rados_ioctx_t io) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->get_id(); +} +// snaps + +extern "C" int rados_ioctx_snap_create(rados_ioctx_t io, const char *snapname) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->snap_create(ctx, snapname); +} + +extern "C" int rados_ioctx_snap_remove(rados_ioctx_t io, const char *snapname) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->snap_remove(ctx, snapname); +} + +extern "C" int rados_rollback(rados_ioctx_t io, const char *oid, + const char *snapname) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->rollback(ctx, oid, snapname); +} + +extern "C" int rados_ioctx_selfmanaged_snap_create(rados_ioctx_t io, + uint64_t *snapid) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->selfmanaged_snap_create(ctx, snapid); +} + +extern "C" int rados_ioctx_selfmanaged_snap_remove(rados_ioctx_t io, + uint64_t snapid) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->selfmanaged_snap_remove(ctx, snapid); +} + +extern "C" int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io, + const char *oid, + uint64_t snapid) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->selfmanaged_snap_rollback_object(ctx, oid, ctx->snapc, snapid); +} + +extern "C" int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t *snaps, + int maxlen) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + vector<uint64_t> snapvec; + int r = ctx->client->snap_list(ctx, &snapvec); + if (r < 0) + return r; + if ((int)snapvec.size() <= maxlen) { + for (unsigned i=0; i<snapvec.size(); i++) + snaps[i] = snapvec[i]; + return snapvec.size(); + } + return -ERANGE; +} + +extern "C" int rados_ioctx_snap_lookup(rados_ioctx_t io, const char *name, + rados_snap_t *id) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->snap_lookup(ctx, name, (uint64_t *)id); +} + +extern "C" int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, + char *name, int maxlen) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + std::string sname; + int r = ctx->client->snap_get_name(ctx, id, &sname); + if (r < 0) + return r; + if ((int)sname.length() >= maxlen) + return -ERANGE; + strncpy(name, sname.c_str(), maxlen); + return 0; +} + +extern "C" int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t *t) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->snap_get_stamp(ctx, id, t); +} + +extern "C" int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, + char *buf, size_t len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + int ret; + object_t oid(o); + bufferlist bl; + ret = ctx->client->getxattr(*ctx, oid, name, bl); + if (ret >= 0) { + if (bl.length() > len) + return -ERANGE; + bl.copy(0, bl.length(), buf); + ret = bl.length(); + } + + return ret; +} + +class RadosXattrsIter { +public: + RadosXattrsIter() + : val(NULL) + { + i = attrset.end(); + } + ~RadosXattrsIter() + { + free(val); + val = NULL; + } + std::map<std::string, bufferlist> attrset; + std::map<std::string, bufferlist>::iterator i; + char *val; +}; + +extern "C" int rados_getxattrs(rados_ioctx_t io, const char *oid, + rados_xattrs_iter_t *iter) +{ + RadosXattrsIter *it = new RadosXattrsIter(); + if (!it) + return -ENOMEM; + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t obj(oid); + int ret = ctx->client->getxattrs(*ctx, obj, it->attrset); + if (ret) { + delete it; + return ret; + } + it->i = it->attrset.begin(); + + RadosXattrsIter **iret = (RadosXattrsIter**)iter; + *iret = it; + *iter = it; + return 0; +} + +extern "C" int rados_getxattrs_next(rados_xattrs_iter_t iter, + const char **name, const char **val, size_t *len) +{ + RadosXattrsIter *it = (RadosXattrsIter*)iter; + if (it->i == it->attrset.end()) { + *name = NULL; + *val = NULL; + *len = 0; + return 0; + } + free(it->val); + const std::string &s(it->i->first); + *name = s.c_str(); + bufferlist &bl(it->i->second); + size_t bl_len = bl.length(); + it->val = (char*)malloc(bl_len); + if (!it->val) + return -ENOMEM; + memcpy(it->val, bl.c_str(), bl_len); + *val = it->val; + *len = bl_len; + ++it->i; + return 0; +} + +extern "C" void rados_getxattrs_end(rados_xattrs_iter_t iter) +{ + RadosXattrsIter *it = (RadosXattrsIter*)iter; + delete it; +} + +extern "C" int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + bl.append(buf, len); + return ctx->client->setxattr(*ctx, oid, name, bl); +} + +extern "C" int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + return ctx->client->rmxattr(*ctx, oid, name); +} + +extern "C" int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + return ctx->client->stat(*ctx, oid, psize, pmtime); +} + +extern "C" int rados_tmap_update(rados_ioctx_t io, const char *o, const char *cmdbuf, size_t cmdbuflen) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist cmdbl; + cmdbl.append(cmdbuf, cmdbuflen); + return ctx->client->tmap_update(*ctx, oid, cmdbl); +} + +extern "C" int rados_tmap_put(rados_ioctx_t io, const char *o, const char *buf, size_t buflen) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + bl.append(buf, buflen); + return ctx->client->tmap_put(*ctx, oid, bl); +} + +extern "C" int rados_tmap_get(rados_ioctx_t io, const char *o, char *buf, size_t buflen) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + int r = ctx->client->tmap_get(*ctx, oid, bl); + if (r < 0) + return r; + if (bl.length() > buflen) + return -ERANGE; + bl.copy(0, bl.length(), buf); + return bl.length(); +} + +extern "C" int rados_exec(rados_ioctx_t io, const char *o, const char *cls, const char *method, + const char *inbuf, size_t in_len, char *buf, size_t out_len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist inbl, outbl; + int ret; + inbl.append(inbuf, in_len); + ret = ctx->client->exec(*ctx, oid, cls, method, inbl, outbl); + if (ret >= 0) { + if (outbl.length()) { + if (outbl.length() > out_len) + return -ERANGE; + outbl.copy(0, outbl.length(), buf); + ret = outbl.length(); // hrm :/ + } + } + return ret; +} + +/* list objects */ + +extern "C" int rados_objects_list_open(rados_ioctx_t io, rados_list_ctx_t *listh) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + Objecter::ListContext *h = new Objecter::ListContext; + h->pool_id = ctx->poolid; + h->pool_snap_seq = ctx->snap_seq; + *listh = (void *)new librados::ObjListCtx(ctx, h); + return 0; +} + +extern "C" void rados_objects_list_close(rados_list_ctx_t h) +{ + librados::ObjListCtx *lh = (librados::ObjListCtx *)h; + delete lh; +} + +extern "C" int rados_objects_list_next(rados_list_ctx_t listctx, const char **entry, const char **key) +{ + librados::ObjListCtx *lh = (librados::ObjListCtx *)listctx; + Objecter::ListContext *h = lh->lc; + int ret; + + // if the list is non-empty, this method has been called before + if (!h->list.empty()) + // so let's kill the previously-returned object + h->list.pop_front(); + + if (h->list.empty()) { + ret = lh->ctx->client->list(lh->lc, RADOS_LIST_MAX_ENTRIES); + if (ret < 0) + return ret; + if (h->list.empty()) + return -ENOENT; + } + + *entry = h->list.front().first.name.c_str(); + + if (key) { + if (h->list.front().second.size()) + *key = h->list.front().second.c_str(); + else + *key = NULL; + } + return 0; +} + + + +// ------------------------- +// aio + +extern "C" int rados_aio_create_completion(void *cb_arg, rados_callback_t cb_complete, + rados_callback_t cb_safe, rados_completion_t *pc) +{ + *pc = librados::RadosClient::aio_create_completion(cb_arg, cb_complete, cb_safe); + return 0; +} + +extern "C" int rados_aio_wait_for_complete(rados_completion_t c) +{ + return ((librados::AioCompletionImpl*)c)->wait_for_complete(); +} + +extern "C" int rados_aio_wait_for_safe(rados_completion_t c) +{ + return ((librados::AioCompletionImpl*)c)->wait_for_safe(); +} + +extern "C" int rados_aio_is_complete(rados_completion_t c) +{ + return ((librados::AioCompletionImpl*)c)->is_complete(); +} + +extern "C" int rados_aio_is_safe(rados_completion_t c) +{ + return ((librados::AioCompletionImpl*)c)->is_safe(); +} + +extern "C" int rados_aio_get_return_value(rados_completion_t c) +{ + return ((librados::AioCompletionImpl*)c)->get_return_value(); +} + +extern "C" uint64_t rados_aio_get_version(rados_completion_t c) +{ + return ((librados::AioCompletionImpl*)c)->get_version(); +} + +extern "C" void rados_aio_release(rados_completion_t c) +{ + ((librados::AioCompletionImpl*)c)->put(); +} + +extern "C" int rados_aio_read(rados_ioctx_t io, const char *o, + rados_completion_t completion, + char *buf, size_t len, uint64_t off) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + return ctx->client->aio_read(*ctx, oid, + (librados::AioCompletionImpl*)completion, buf, len, off); +} + +extern "C" int rados_aio_write(rados_ioctx_t io, const char *o, + rados_completion_t completion, + const char *buf, size_t len, uint64_t off) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + bl.append(buf, len); + return ctx->client->aio_write(*ctx, oid, + (librados::AioCompletionImpl*)completion, bl, len, off); +} + +extern "C" int rados_aio_append(rados_ioctx_t io, const char *o, + rados_completion_t completion, + const char *buf, size_t len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + bl.append(buf, len); + return ctx->client->aio_append(*ctx, oid, + (librados::AioCompletionImpl*)completion, bl, len); +} + +extern "C" int rados_aio_write_full(rados_ioctx_t io, const char *o, + rados_completion_t completion, + const char *buf, size_t len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + bl.append(buf, len); + return ctx->client->aio_write_full(*ctx, oid, + (librados::AioCompletionImpl*)completion, bl); +} + +extern "C" int rados_aio_flush(rados_ioctx_t io) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + ctx->flush_aio_writes(); + return 0; +} + +struct C_WatchCB : public librados::WatchCtx { + rados_watchcb_t wcb; + void *arg; + C_WatchCB(rados_watchcb_t _wcb, void *_arg) : wcb(_wcb), arg(_arg) {} + void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) { + wcb(opcode, ver, arg); + } +}; + +int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, uint64_t *handle, + rados_watchcb_t watchcb, void *arg) +{ + uint64_t *cookie = handle; + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + C_WatchCB *wc = new C_WatchCB(watchcb, arg); + return ctx->client->watch(*ctx, oid, ver, cookie, wc); +} + +int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle) +{ + uint64_t cookie = handle; + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + return ctx->client->unwatch(*ctx, oid, cookie); +} + +int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + if (buf) { + bufferptr p = buffer::create(buf_len); + memcpy(p.c_str(), buf, buf_len); + bl.push_back(p); + } + return ctx->client->notify(*ctx, oid, ver, bl); +} |