summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-01-22 16:13:14 -0800
committerSage Weil <sage@inktank.com>2013-01-22 16:13:14 -0800
commit8eee815fb2686d505ec0ad4e3d3239e323990610 (patch)
treedd679e33ff3ecb7d86a42bde6ee65cb38688fc9c
parenteaf20fa94bf23f268a6d84fa0e9845fc1adf4c79 (diff)
parent73a969366c8bbd105579611320c43e2334907fef (diff)
downloadceph-8eee815fb2686d505ec0ad4e3d3239e323990610.tar.gz
Merge remote-tracking branch 'gh/wip-3833-b'
Conflicts: src/osd/OSD.cc src/osd/OSD.h Reviewed-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/common/AsyncReserver.h9
-rw-r--r--src/common/PrioritizedQueue.h89
-rw-r--r--src/common/config_opts.h12
-rw-r--r--src/messages/MOSDSubOp.h6
-rw-r--r--src/msg/DispatchQueue.cc4
-rw-r--r--src/msg/DispatchQueue.h2
-rw-r--r--src/msg/Message.h4
-rw-r--r--src/os/FileStore.cc12
-rw-r--r--src/osd/OSD.cc111
-rw-r--r--src/osd/OSD.h34
-rw-r--r--src/osd/OpRequest.cc3
-rw-r--r--src/osd/OpRequest.h23
-rw-r--r--src/osd/PG.cc2
-rw-r--r--src/osd/ReplicatedPG.cc78
14 files changed, 279 insertions, 110 deletions
diff --git a/src/common/AsyncReserver.h b/src/common/AsyncReserver.h
index 755d11a6753..8cc2258d7b4 100644
--- a/src/common/AsyncReserver.h
+++ b/src/common/AsyncReserver.h
@@ -28,7 +28,7 @@
template <typename T>
class AsyncReserver {
Finisher *f;
- const unsigned max_allowed;
+ unsigned max_allowed;
Mutex lock;
list<pair<T, Context*> > queue;
@@ -51,6 +51,13 @@ public:
unsigned max_allowed)
: f(f), max_allowed(max_allowed), lock("AsyncReserver::lock") {}
+ void set_max(unsigned max) {
+ Mutex::Locker l(lock);
+ assert(max > 0);
+ max_allowed = max;
+ do_queues();
+ }
+
/**
* Requests a reservation
*
diff --git a/src/common/PrioritizedQueue.h b/src/common/PrioritizedQueue.h
index 9c27ddc3860..6dcb519da40 100644
--- a/src/common/PrioritizedQueue.h
+++ b/src/common/PrioritizedQueue.h
@@ -16,6 +16,7 @@
#define PRIORITY_QUEUE_H
#include "common/Mutex.h"
+#include "common/Formatter.h"
#include <map>
#include <utility>
@@ -45,6 +46,8 @@
template <typename T, typename K>
class PrioritizedQueue {
int64_t total_priority;
+ int64_t max_tokens_per_subqueue;
+ int64_t min_cost;
template <class F>
static unsigned filter_list_pairs(
@@ -76,22 +79,39 @@ class PrioritizedQueue {
struct SubQueue {
private:
map<K, list<pair<unsigned, T> > > q;
- unsigned bucket;
+ unsigned tokens, max_tokens;
int64_t size;
typename map<K, list<pair<unsigned, T> > >::iterator cur;
public:
SubQueue(const SubQueue &other)
- : q(other.q), bucket(other.bucket), size(other.size),
+ : q(other.q),
+ tokens(other.tokens),
+ max_tokens(other.max_tokens),
+ size(other.size),
cur(q.begin()) {}
- SubQueue() : bucket(0), size(0), cur(q.begin()) {}
+ SubQueue()
+ : tokens(0),
+ max_tokens(0),
+ size(0) {}
+ void set_max_tokens(unsigned mt) {
+ max_tokens = mt;
+ }
+ unsigned get_max_tokens() const {
+ return max_tokens;
+ }
unsigned num_tokens() const {
- return bucket;
+ return tokens;
}
- void put_tokens(unsigned tokens) {
- bucket += tokens;
+ void put_tokens(unsigned t) {
+ tokens += t;
+ if (tokens > max_tokens)
+ tokens = max_tokens;
}
- void take_tokens(unsigned tokens) {
- bucket -= tokens;
+ void take_tokens(unsigned t) {
+ if (tokens > t)
+ tokens -= t;
+ else
+ tokens = 0;
}
void enqueue(K cl, unsigned cost, T item) {
q[cl].push_back(make_pair(cost, item));
@@ -166,6 +186,13 @@ class PrioritizedQueue {
}
q.erase(i);
}
+
+ void dump(Formatter *f) const {
+ f->dump_int("tokens", tokens);
+ f->dump_int("max_tokens", max_tokens);
+ f->dump_int("size", size);
+ f->dump_int("num_keys", q.size());
+ }
};
map<unsigned, SubQueue> high_queue;
map<unsigned, SubQueue> queue;
@@ -175,7 +202,9 @@ class PrioritizedQueue {
if (p != queue.end())
return &p->second;
total_priority += priority;
- return &queue[priority];
+ SubQueue *sq = &queue[priority];
+ sq->set_max_tokens(max_tokens_per_subqueue);
+ return sq;
}
void remove_queue(unsigned priority) {
@@ -196,7 +225,11 @@ class PrioritizedQueue {
}
public:
- PrioritizedQueue() : total_priority(0) {}
+ PrioritizedQueue(unsigned max_per, unsigned min_c)
+ : total_priority(0),
+ max_tokens_per_subqueue(max_per),
+ min_cost(min_c)
+ {}
unsigned length() {
unsigned total = 0;
@@ -276,10 +309,14 @@ public:
}
void enqueue(K cl, unsigned priority, unsigned cost, T item) {
+ if (cost < min_cost)
+ cost = min_cost;
create_queue(priority)->enqueue(cl, cost, item);
}
void enqueue_front(K cl, unsigned priority, unsigned cost, T item) {
+ if (cost < min_cost)
+ cost = min_cost;
create_queue(priority)->enqueue_front(cl, cost, item);
}
@@ -300,6 +337,9 @@ public:
return ret;
}
+ // if there are multiple buckets/subqueues with sufficient tokens,
+ // we behave like a strict priority queue among all subqueues that
+ // are eligible to run.
for (typename map<unsigned, SubQueue>::iterator i = queue.begin();
i != queue.end();
++i) {
@@ -315,6 +355,9 @@ public:
return ret;
}
}
+
+ // if no subqueues have sufficient tokens, we behave like a strict
+ // priority queue.
T ret = queue.rbegin()->second.front().second;
unsigned cost = queue.rbegin()->second.front().first;
queue.rbegin()->second.pop_front();
@@ -323,6 +366,32 @@ public:
distribute_tokens(cost);
return ret;
}
+
+ void dump(Formatter *f) const {
+ f->dump_int("total_priority", total_priority);
+ f->dump_int("max_tokens_per_subqueue", max_tokens_per_subqueue);
+ f->dump_int("min_cost", min_cost);
+ f->open_array_section("high_queues");
+ for (typename map<unsigned, SubQueue>::const_iterator p = high_queue.begin();
+ p != high_queue.end();
+ ++p) {
+ f->open_object_section("subqueue");
+ f->dump_int("priority", p->first);
+ p->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+ f->open_array_section("queues");
+ for (typename map<unsigned, SubQueue>::const_iterator p = queue.begin();
+ p != queue.end();
+ ++p) {
+ f->open_object_section("subqueue");
+ f->dump_int("priority", p->first);
+ p->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+ }
};
#endif
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index a5df112576f..7eb3e94fadc 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -105,6 +105,8 @@ OPTION(ms_bind_port_min, OPT_INT, 6800)
OPTION(ms_bind_port_max, OPT_INT, 7100)
OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10)
OPTION(ms_tcp_read_timeout, OPT_U64, 900)
+OPTION(ms_pq_max_tokens_per_priority, OPT_U64, 4194304)
+OPTION(ms_pq_min_cost, OPT_U64, 65536)
OPTION(ms_inject_socket_failures, OPT_U64, 0)
OPTION(ms_inject_delay_type, OPT_STR, "") // "osd mds mon client" allowed
OPTION(ms_inject_delay_max, OPT_DOUBLE, 1) // seconds
@@ -315,6 +317,8 @@ OPTION(osd_map_dedup, OPT_BOOL, true)
OPTION(osd_map_cache_size, OPT_INT, 500)
OPTION(osd_map_message_max, OPT_INT, 100) // max maps per MOSDMap message
OPTION(osd_op_threads, OPT_INT, 2) // 0 == no threading
+OPTION(osd_op_pq_max_tokens_per_priority, OPT_U64, 4194304)
+OPTION(osd_op_pq_min_cost, OPT_U64, 65536)
OPTION(osd_disk_threads, OPT_INT, 1)
OPTION(osd_recovery_threads, OPT_INT, 1)
OPTION(osd_recover_clone_overlap, OPT_BOOL, true) // preserve clone_overlap during recovery/migration
@@ -370,7 +374,7 @@ OPTION(osd_debug_drop_pg_create_duration, OPT_INT, 1)
OPTION(osd_debug_drop_op_probability, OPT_DOUBLE, 0) // probability of stalling/dropping a client op
OPTION(osd_op_history_size, OPT_U32, 20) // Max number of completed ops to track
OPTION(osd_op_history_duration, OPT_U32, 600) // Oldest completed op to track
-OPTION(osd_target_transaction_size, OPT_INT, 300) // to adjust various transactions that batch smaller items
+OPTION(osd_target_transaction_size, OPT_INT, 30) // to adjust various transactions that batch smaller items
/**
* osd_client_op_priority and osd_recovery_op_priority adjust the relative
@@ -409,10 +413,10 @@ OPTION(filestore_sync_flush, OPT_BOOL, false)
OPTION(filestore_journal_parallel, OPT_BOOL, false)
OPTION(filestore_journal_writeahead, OPT_BOOL, false)
OPTION(filestore_journal_trailing, OPT_BOOL, false)
-OPTION(filestore_queue_max_ops, OPT_INT, 500)
+OPTION(filestore_queue_max_ops, OPT_INT, 50)
OPTION(filestore_queue_max_bytes, OPT_INT, 100 << 20)
-OPTION(filestore_queue_committing_max_ops, OPT_INT, 500) // this is ON TOP of filestore_queue_max_*
-OPTION(filestore_queue_committing_max_bytes, OPT_INT, 100 << 20) // "
+OPTION(filestore_queue_committing_max_ops, OPT_INT, 0) // this is ON TOP of filestore_queue_max_*
+OPTION(filestore_queue_committing_max_bytes, OPT_INT, 0) // "
OPTION(filestore_op_threads, OPT_INT, 2)
OPTION(filestore_op_thread_timeout, OPT_INT, 60)
OPTION(filestore_op_thread_suicide_timeout, OPT_INT, 180)
diff --git a/src/messages/MOSDSubOp.h b/src/messages/MOSDSubOp.h
index e69042b121a..50b1a926957 100644
--- a/src/messages/MOSDSubOp.h
+++ b/src/messages/MOSDSubOp.h
@@ -86,6 +86,12 @@ public:
// indicates that we must fix hobject_t encoding
bool hobject_incorrect_pool;
+ int get_cost() const {
+ if (ops.size() == 1 && ops[0].op.op == CEPH_OSD_OP_PULL)
+ return ops[0].op.extent.length;
+ return data.length();
+ }
+
virtual void decode_payload() {
hobject_incorrect_pool = false;
bufferlist::iterator p = payload.begin();
diff --git a/src/msg/DispatchQueue.cc b/src/msg/DispatchQueue.cc
index 04e405581a7..31f37cfd5f6 100644
--- a/src/msg/DispatchQueue.cc
+++ b/src/msg/DispatchQueue.cc
@@ -37,7 +37,7 @@ void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
id, priority, QueueItem(m));
} else {
mqueue.enqueue(
- id, priority, m->get_data().length(), QueueItem(m));
+ id, priority, m->get_cost(), QueueItem(m));
}
cond.Signal();
}
@@ -51,7 +51,7 @@ void DispatchQueue::local_delivery(Message *m, int priority)
0, priority, QueueItem(m));
} else {
mqueue.enqueue(
- 0, priority, m->get_data().length(), QueueItem(m));
+ 0, priority, m->get_cost(), QueueItem(m));
}
cond.Signal();
}
diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h
index ea44c165d56..884e0269342 100644
--- a/src/msg/DispatchQueue.h
+++ b/src/msg/DispatchQueue.h
@@ -151,6 +151,8 @@ class DispatchQueue {
DispatchQueue(CephContext *cct, SimpleMessenger *msgr)
: cct(cct), msgr(msgr),
lock("SimpleMessenger::DispatchQeueu::lock"),
+ mqueue(cct->_conf->ms_pq_max_tokens_per_priority,
+ cct->_conf->ms_pq_min_cost),
next_pipe_id(1),
dispatch_thread(this),
stop(false)
diff --git a/src/msg/Message.h b/src/msg/Message.h
index 95f165a43dd..5bdd4d463b6 100644
--- a/src/msg/Message.h
+++ b/src/msg/Message.h
@@ -424,6 +424,10 @@ public:
footer.data_crc = data.crc32c(0);
}
+ virtual int get_cost() const {
+ return data.length();
+ }
+
// type
int get_type() const { return header.type; }
void set_type(int t) { header.type = t; }
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc
index 4e208350902..5976e68b6d2 100644
--- a/src/os/FileStore.cc
+++ b/src/os/FileStore.cc
@@ -4629,6 +4629,10 @@ const char** FileStore::get_tracked_conf_keys() const
static const char* KEYS[] = {
"filestore_min_sync_interval",
"filestore_max_sync_interval",
+ "filestore_queue_max_ops",
+ "filestore_queue_max_bytes",
+ "filestore_queue_committing_max_ops",
+ "filestore_queue_committing_max_bytes",
"filestore_flusher",
"filestore_flusher_max_fds",
"filestore_sync_flush",
@@ -4646,6 +4650,10 @@ void FileStore::handle_conf_change(const struct md_config_t *conf,
{
if (changed.count("filestore_min_sync_interval") ||
changed.count("filestore_max_sync_interval") ||
+ changed.count("filestore_queue_max_ops") ||
+ changed.count("filestore_queue_max_bytes") ||
+ changed.count("filestore_queue_committing_max_ops") ||
+ changed.count("filestore_queue_committing_max_bytes") ||
changed.count("filestore_flusher_max_fds") ||
changed.count("filestore_flush_min") ||
changed.count("filestore_kill_at") ||
@@ -4653,6 +4661,10 @@ void FileStore::handle_conf_change(const struct md_config_t *conf,
Mutex::Locker l(lock);
m_filestore_min_sync_interval = conf->filestore_min_sync_interval;
m_filestore_max_sync_interval = conf->filestore_max_sync_interval;
+ m_filestore_queue_max_ops = conf->filestore_queue_max_ops;
+ m_filestore_queue_max_bytes = conf->filestore_queue_max_bytes;
+ m_filestore_queue_committing_max_ops = conf->filestore_queue_committing_max_ops;
+ m_filestore_queue_committing_max_bytes = conf->filestore_queue_committing_max_bytes;
m_filestore_flusher = conf->filestore_flusher;
m_filestore_flusher_max_fds = conf->filestore_flusher_max_fds;
m_filestore_flush_min = conf->filestore_flush_min;
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 34ec3678f08..a17fec0fea8 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -762,6 +762,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
whoami(id),
dev_path(dev), journal_path(jdev),
dispatch_running(false),
+ asok_hook(NULL),
osd_compat(get_osd_compat_set()),
state(STATE_INITIALIZING), boot_epoch(0), up_epoch(0), bind_epoch(0),
op_tp(external_messenger->cct, "OSD::op_tp", g_conf->osd_op_threads, "osd_op_threads"),
@@ -777,8 +778,6 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
heartbeat_dispatcher(this),
stat_lock("OSD::stat_lock"),
finished_lock("OSD::finished_lock"),
- admin_ops_hook(NULL),
- historic_ops_hook(NULL),
op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp, 200),
map_lock("OSD::map_lock"),
@@ -842,33 +841,42 @@ int OSD::pre_init()
<< "currently in use. (Is ceph-osd already running?)" << dendl;
return -EBUSY;
}
+
+ g_conf->add_observer(this);
return 0;
}
-class HistoricOpsSocketHook : public AdminSocketHook {
+// asok
+
+class OSDSocketHook : public AdminSocketHook {
OSD *osd;
public:
- HistoricOpsSocketHook(OSD *o) : osd(o) {}
+ OSDSocketHook(OSD *o) : osd(o) {}
bool call(std::string command, std::string args, bufferlist& out) {
stringstream ss;
- osd->dump_historic_ops(ss);
+ bool r = osd->asok_command(command, args, ss);
out.append(ss);
- return true;
+ return r;
}
};
-
-class OpsFlightSocketHook : public AdminSocketHook {
- OSD *osd;
-public:
- OpsFlightSocketHook(OSD *o) : osd(o) {}
- bool call(std::string command, std::string args, bufferlist& out) {
- stringstream ss;
- osd->dump_ops_in_flight(ss);
- out.append(ss);
- return true;
+bool OSD::asok_command(string command, string args, ostream& ss)
+{
+ if (command == "dump_ops_in_flight") {
+ op_tracker.dump_ops_in_flight(ss);
+ } else if (command == "dump_historic_ops") {
+ op_tracker.dump_historic_ops(ss);
+ } else if (command == "dump_op_pq_state") {
+ JSONFormatter f(true);
+ f.open_object_section("pq");
+ op_wq.dump(&f);
+ f.close_section();
+ f.flush(ss);
+ } else {
+ assert(0 == "broken asok registration");
}
-};
+ return true;
+}
class TestOpsSocketHook : public AdminSocketHook {
OSDService *service;
@@ -979,14 +987,16 @@ int OSD::init()
// tick
timer.add_event_after(g_conf->osd_heartbeat_interval, new C_Tick(this));
- admin_ops_hook = new OpsFlightSocketHook(this);
AdminSocket *admin_socket = cct->get_admin_socket();
- r = admin_socket->register_command("dump_ops_in_flight", admin_ops_hook,
- "show the ops currently in flight");
+ asok_hook = new OSDSocketHook(this);
+ r = admin_socket->register_command("dump_ops_in_flight", asok_hook,
+ "show the ops currently in flight");
assert(r == 0);
- historic_ops_hook = new HistoricOpsSocketHook(this);
- r = admin_socket->register_command("dump_historic_ops", historic_ops_hook,
- "show slowest recent ops");
+ r = admin_socket->register_command("dump_historic_ops", asok_hook,
+ "show slowest recent ops");
+ assert(r == 0);
+ r = admin_socket->register_command("dump_op_pq_state", asok_hook,
+ "dump op priority queue state");
assert(r == 0);
test_ops_hook = new TestOpsSocketHook(&(this->service), this->store);
r = admin_socket->register_command("setomapval", test_ops_hook,
@@ -1121,6 +1131,8 @@ void OSD::suicide(int exitcode)
int OSD::shutdown()
{
+ g_conf->remove_observer(this);
+
service.shutdown();
g_ceph_context->_conf->set_val("debug_osd", "100");
g_ceph_context->_conf->set_val("debug_journal", "100");
@@ -1152,10 +1164,10 @@ int OSD::shutdown()
cct->get_admin_socket()->unregister_command("dump_ops_in_flight");
cct->get_admin_socket()->unregister_command("dump_historic_ops");
- delete admin_ops_hook;
- delete historic_ops_hook;
- admin_ops_hook = NULL;
- historic_ops_hook = NULL;
+ cct->get_admin_socket()->unregister_command("dump_op_pq_state");
+ delete asok_hook;
+ asok_hook = NULL;
+
cct->get_admin_socket()->unregister_command("setomapval");
cct->get_admin_socket()->unregister_command("rmomapkey");
cct->get_admin_socket()->unregister_command("setomapheader");
@@ -2324,11 +2336,6 @@ void OSD::check_ops_in_flight()
return;
}
-void OSD::dump_ops_in_flight(ostream& ss)
-{
- op_tracker.dump_ops_in_flight(ss);
-}
-
// Usage:
// setomapval <pool-id> <obj-name> <key> <val>
// rmomapkey <pool-id> <obj-name> <key>
@@ -3828,7 +3835,7 @@ void OSD::wait_for_new_map(OpRequestRef op)
}
waiting_for_osdmap.push_back(op);
- op->mark_delayed();
+ op->mark_delayed("wait for new map");
}
@@ -6056,7 +6063,7 @@ void OSD::handle_op(OpRequestRef op)
if (osdmap->get_pg_acting_role(pgid, whoami) >= 0) {
dout(7) << "we are valid target for op, waiting" << dendl;
waiting_for_pg[pgid].push_back(op);
- op->mark_delayed();
+ op->mark_delayed("waiting for pg to exist locally");
return;
}
@@ -6188,14 +6195,18 @@ bool OSD::op_is_discardable(MOSDOp *op)
*/
void OSD::enqueue_op(PG *pg, OpRequestRef op)
{
- dout(15) << "enqueue_op " << op << " " << *(op->request) << dendl;
+ utime_t latency = ceph_clock_now(g_ceph_context) - op->request->get_recv_stamp();
+ dout(15) << "enqueue_op " << op << " prio " << op->request->get_priority()
+ << " cost " << op->request->get_cost()
+ << " latency " << latency
+ << " " << *(op->request) << dendl;
op_wq.queue(make_pair(PGRef(pg), op));
}
void OSD::OpWQ::_enqueue(pair<PGRef, OpRequestRef> item)
{
unsigned priority = item.second->request->get_priority();
- unsigned cost = item.second->request->get_data().length();
+ unsigned cost = item.second->request->get_cost();
if (priority >= CEPH_MSG_PRIO_LOW)
pqueue.enqueue_strict(
item.second->request->get_source_inst(),
@@ -6217,7 +6228,7 @@ void OSD::OpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item)
}
}
unsigned priority = item.second->request->get_priority();
- unsigned cost = item.second->request->get_data().length();
+ unsigned cost = item.second->request->get_cost();
if (priority >= CEPH_MSG_PRIO_LOW)
pqueue.enqueue_strict_front(
item.second->request->get_source_inst(),
@@ -6273,7 +6284,11 @@ void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
*/
void OSD::dequeue_op(PGRef pg, OpRequestRef op)
{
- dout(10) << "dequeue_op " << op << " " << *(op->request)
+ utime_t latency = ceph_clock_now(g_ceph_context) - op->request->get_recv_stamp();
+ dout(10) << "dequeue_op " << op << " prio " << op->request->get_priority()
+ << " cost " << op->request->get_cost()
+ << " latency " << latency
+ << " " << *(op->request)
<< " pg " << *pg << dendl;
if (pg->deleting)
return;
@@ -6364,6 +6379,26 @@ void OSD::process_peering_events(const list<PG*> &pgs)
// --------------------------------
+const char** OSD::get_tracked_conf_keys() const
+{
+ static const char* KEYS[] = {
+ "osd_max_backfills",
+ NULL
+ };
+ return KEYS;
+}
+
+void OSD::handle_conf_change(const struct md_config_t *conf,
+ const std::set <std::string> &changed)
+{
+ if (changed.count("osd_max_backfills")) {
+ service.local_reserver.set_max(g_conf->osd_max_backfills);
+ service.remote_reserver.set_max(g_conf->osd_max_backfills);
+ }
+}
+
+// --------------------------------
+
int OSD::init_op_flags(OpRequestRef op)
{
MOSDOp *m = (MOSDOp*)op->request;
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index be4ed11791e..359d75396fe 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -395,8 +395,15 @@ public:
OSDService(OSD *osd);
};
-class OSD : public Dispatcher {
+class OSD : public Dispatcher,
+ public md_config_obs_t {
/** OSD **/
+public:
+ // config observer bits
+ virtual const char** get_tracked_conf_keys() const;
+ virtual void handle_conf_change(const struct md_config_t *conf,
+ const std::set <std::string> &changed);
+
protected:
Mutex osd_lock; // global lock
SafeTimer timer; // safe timer (osd_lock)
@@ -434,6 +441,11 @@ protected:
void check_osdmap_features();
+ // asok
+ friend class OSDSocketHook;
+ class OSDSocketHook *asok_hook;
+ bool asok_command(string command, string args, ostream& ss);
+
public:
ClassHandler *class_handler;
int get_nodeid() { return whoami; }
@@ -614,18 +626,10 @@ private:
// -- op tracking --
OpTracker op_tracker;
void check_ops_in_flight();
- void dump_ops_in_flight(ostream& ss);
- void dump_historic_ops(ostream& ss) {
- return op_tracker.dump_historic_ops(ss);
- }
void test_ops(std::string command, std::string args, ostream& ss);
- friend class OpsFlightSocketHook;
- friend class HistoricOpsSocketHook;
friend class TestOpsSocketHook;
- friend class C_CompleteSplits;
- OpsFlightSocketHook *admin_ops_hook;
- HistoricOpsSocketHook *historic_ops_hook;
TestOpsSocketHook *test_ops_hook;
+ friend class C_CompleteSplits;
// -- op queue --
@@ -639,7 +643,15 @@ private:
: ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef >(
"OSD::OpWQ", ti, ti*10, tp),
qlock("OpWQ::qlock"),
- osd(o) {}
+ osd(o),
+ pqueue(o->cct->_conf->osd_op_pq_max_tokens_per_priority,
+ o->cct->_conf->osd_op_pq_min_cost)
+ {}
+
+ void dump(Formatter *f) {
+ Mutex::Locker l(qlock);
+ pqueue.dump(f);
+ }
void _enqueue_front(pair<PGRef, OpRequestRef> item);
void _enqueue(pair<PGRef, OpRequestRef> item);
diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc
index 436e2de4176..b3d95367ec5 100644
--- a/src/osd/OpRequest.cc
+++ b/src/osd/OpRequest.cc
@@ -153,7 +153,8 @@ bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector)
utime_t age = now - (*i)->received_time;
stringstream ss;
ss << "slow request " << age << " seconds old, received at " << (*i)->received_time
- << ": " << *((*i)->request) << " currently " << (*i)->state_string();
+ << ": " << *((*i)->request) << " currently "
+ << ((*i)->current.size() ? (*i)->current : (*i)->state_string());
warning_vector.push_back(ss.str());
// only those that have been shown will backoff
diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h
index 0b35fd89f70..432bddb1672 100644
--- a/src/osd/OpRequest.h
+++ b/src/osd/OpRequest.h
@@ -128,6 +128,7 @@ struct OpRequest : public TrackedOp {
private:
list<pair<utime_t, string> > events;
+ string current;
Mutex lock;
OpTracker *tracker;
osd_reqid_t reqid;
@@ -139,6 +140,7 @@ private:
static const uint8_t flag_delayed = 1 << 2;
static const uint8_t flag_started = 1 << 3;
static const uint8_t flag_sub_op_sent = 1 << 4;
+ static const uint8_t flag_commit_sent = 1 << 5;
OpRequest(Message *req, OpTracker *tracker) :
request(req), xitem(this),
@@ -162,11 +164,13 @@ public:
bool been_delayed() { return hit_flag_points & flag_delayed; }
bool been_started() { return hit_flag_points & flag_started; }
bool been_sub_op_sent() { return hit_flag_points & flag_sub_op_sent; }
+ bool been_commit_sent() { return hit_flag_points & flag_commit_sent; }
bool currently_queued_for_pg() { return latest_flag_point & flag_queued_for_pg; }
bool currently_reached_pg() { return latest_flag_point & flag_reached_pg; }
bool currently_delayed() { return latest_flag_point & flag_delayed; }
bool currently_started() { return latest_flag_point & flag_started; }
bool currently_sub_op_sent() { return latest_flag_point & flag_sub_op_sent; }
+ bool currently_commit_sent() { return latest_flag_point & flag_commit_sent; }
const char *state_string() const {
switch(latest_flag_point) {
@@ -175,6 +179,7 @@ public:
case flag_delayed: return "delayed";
case flag_started: return "started";
case flag_sub_op_sent: return "waiting for sub ops";
+ case flag_commit_sent: return "commit sent; apply or cleanup";
default: break;
}
return "no flag points reached";
@@ -182,28 +187,40 @@ public:
void mark_queued_for_pg() {
mark_event("queued_for_pg");
+ current = "queued for pg";
hit_flag_points |= flag_queued_for_pg;
latest_flag_point = flag_queued_for_pg;
}
void mark_reached_pg() {
mark_event("reached_pg");
+ current = "reached pg";
hit_flag_points |= flag_reached_pg;
latest_flag_point = flag_reached_pg;
}
- void mark_delayed() {
+ void mark_delayed(string s) {
+ mark_event(s);
+ current = s;
hit_flag_points |= flag_delayed;
latest_flag_point = flag_delayed;
}
void mark_started() {
mark_event("started");
+ current = "started";
hit_flag_points |= flag_started;
latest_flag_point = flag_started;
}
- void mark_sub_op_sent() {
- mark_event("sub_op_sent");
+ void mark_sub_op_sent(string s) {
+ mark_event(s);
+ current = s;
hit_flag_points |= flag_sub_op_sent;
latest_flag_point = flag_sub_op_sent;
}
+ void mark_commit_sent() {
+ mark_event("commit_sent");
+ current = "commit sent";
+ hit_flag_points |= flag_commit_sent;
+ latest_flag_point = flag_commit_sent;
+ }
void mark_event(const string &event);
osd_reqid_t get_reqid() const {
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 2899738c346..51dbee46f61 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -4964,7 +4964,7 @@ bool PG::can_discard_op(OpRequestRef op)
dout(7) << " queueing replay at " << m->get_version()
<< " for " << *m << dendl;
replay_queue[m->get_version()] = op;
- op->mark_delayed();
+ op->mark_delayed("waiting for replay");
return true;
}
}
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 2e15e945b10..552fb89bed2 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -122,7 +122,7 @@ void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequestRef o
pull(soid, v, g_conf->osd_client_op_priority);
}
waiting_for_missing_object[soid].push_back(op);
- op->mark_delayed();
+ op->mark_delayed("waiting for missing object");
}
void ReplicatedPG::wait_for_all_missing(OpRequestRef op)
@@ -178,7 +178,7 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequestRef
recover_object_replicas(soid, v, g_conf->osd_client_op_priority);
}
waiting_for_degraded_object[soid].push_back(op);
- op->mark_delayed();
+ op->mark_delayed("waiting for degraded object");
}
void ReplicatedPG::wait_for_backfill_pos(OpRequestRef op)
@@ -631,7 +631,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
if (op->may_write() && scrubber.write_blocked_by_scrub(head)) {
dout(20) << __func__ << ": waiting for scrub" << dendl;
waiting_for_active.push_back(op);
- op->mark_delayed();
+ op->mark_delayed("waiting for scrub");
return;
}
@@ -734,7 +734,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
if (!ok) {
dout(10) << "do_op waiting on mode " << mode << dendl;
mode.waiting.push_back(op);
- op->mark_delayed();
+ op->mark_delayed("waiting on pg mode");
return;
}
@@ -876,7 +876,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
}
dout(10) << " waiting for " << oldv << " to commit" << dendl;
waiting_for_ondisk[oldv].push_back(op); // always queue ondisk waiters, so that we can requeue if needed
- op->mark_delayed();
+ op->mark_delayed("waiting for ondisk");
}
return;
}
@@ -1086,55 +1086,46 @@ void ReplicatedPG::do_sub_op(OpRequestRef op)
assert(m->get_header().type == MSG_OSD_SUBOP);
dout(15) << "do_sub_op " << *op->request << dendl;
+ OSDOp *first = NULL;
if (m->ops.size() >= 1) {
- OSDOp& first = m->ops[0];
- switch (first.op.op) {
+ first = &m->ops[0];
+ switch (first->op.op) {
case CEPH_OSD_OP_PULL:
sub_op_pull(op);
return;
+ }
+ }
+
+ if (!is_active()) {
+ waiting_for_active.push_back(op);
+ op->mark_delayed("waiting for active");
+ return;
+ }
+
+ if (first) {
+ switch (first->op.op) {
case CEPH_OSD_OP_PUSH:
- if (!is_active())
- waiting_for_active.push_back(op);
- else
- sub_op_push(op);
+ sub_op_push(op);
return;
case CEPH_OSD_OP_DELETE:
- if (!is_active())
- waiting_for_active.push_back(op);
- else
- sub_op_remove(op);
+ sub_op_remove(op);
return;
case CEPH_OSD_OP_SCRUB_RESERVE:
- if (!is_active())
- waiting_for_active.push_back(op);
- else
- sub_op_scrub_reserve(op);
+ sub_op_scrub_reserve(op);
return;
case CEPH_OSD_OP_SCRUB_UNRESERVE:
- if (!is_active())
- waiting_for_active.push_back(op);
- else
- sub_op_scrub_unreserve(op);
+ sub_op_scrub_unreserve(op);
return;
case CEPH_OSD_OP_SCRUB_STOP:
- if (!is_active())
- waiting_for_active.push_back(op);
- else
- sub_op_scrub_stop(op);
+ sub_op_scrub_stop(op);
return;
case CEPH_OSD_OP_SCRUB_MAP:
- if (!is_active())
- waiting_for_active.push_back(op);
- else
- sub_op_scrub_map(op);
+ sub_op_scrub_map(op);
return;
}
}
- if (!is_active())
- waiting_for_active.push_back(op);
- else
- sub_op_modify(op);
+ sub_op_modify(op);
}
void ReplicatedPG::do_sub_op_reply(OpRequestRef op)
@@ -3876,6 +3867,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
osd->send_message_osd_client(reply, m->get_connection());
repop->sent_disk = true;
+ repop->ctx->op->mark_commit_sent();
}
}
@@ -3962,9 +3954,12 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now,
int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+ if (ctx->op && acting.size() > 1) {
+ ostringstream ss;
+ ss << "waiting for subops from " << vector<int>(acting.begin() + 1, acting.end());
+ ctx->op->mark_sub_op_sent(ss.str());
+ }
for (unsigned i=1; i<acting.size(); i++) {
- if (ctx->op)
- ctx->op->mark_sub_op_sent();
int peer = acting[i];
pg_info_t &pinfo = peer_info[peer];
@@ -4698,6 +4693,8 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op)
}
}
+ op->mark_started();
+
Context *oncommit = new C_OSD_RepModifyCommit(rm);
Context *onapply = new C_OSD_RepModifyApply(rm);
int r = osd->store->queue_transactions(osr.get(), rm->tls, onapply, oncommit, 0, op);
@@ -4753,7 +4750,7 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
{
lock();
- rm->op->mark_event("sub_op_commit");
+ rm->op->mark_commit_sent();
rm->committed = true;
if (rm->epoch_started >= last_peering_reset) {
@@ -5192,6 +5189,7 @@ int ReplicatedPG::send_pull(int prio, int peer,
subop->set_priority(prio);
subop->ops = vector<OSDOp>(1);
subop->ops[0].op.op = CEPH_OSD_OP_PULL;
+ subop->ops[0].op.extent.length = g_conf->osd_recovery_max_chunk;
subop->recovery_info = recovery_info;
subop->recovery_progress = progress;
@@ -5780,8 +5778,10 @@ void ReplicatedPG::_committed_pushed_object(OpRequestRef op, epoch_t same_since,
dout(10) << "_committed_pushed_object pg has changed, not touching last_complete_ondisk" << dendl;
}
- if (op)
+ if (op) {
log_subop_stats(op, l_osd_sop_push_inb, l_osd_sop_push_lat);
+ op->mark_event("committed");
+ }
unlock();
}