summaryrefslogtreecommitdiff
path: root/src/common/TrackedOp.h
blob: 9007a4d5bd2dab35481d1c51b60ddcbe203fc136 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2012 New Dream Network/Sage Weil <sage@newdream.net>
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation.  See file COPYING.
 */

#ifndef TRACKEDREQUEST_H_
#define TRACKEDREQUEST_H_
#include <sstream>
#include <stdint.h>
#include <include/utime.h>
#include "common/Mutex.h"
#include "include/histogram.h"
#include "include/xlist.h"
#include "msg/Message.h"
#include <tr1/memory>

class TrackedOp;
typedef std::tr1::shared_ptr<TrackedOp> TrackedOpRef;

class OpTracker;
class OpHistory {
  set<pair<utime_t, TrackedOpRef> > arrived;
  set<pair<double, TrackedOpRef> > duration;
  void cleanup(utime_t now);
  bool shutdown;
  OpTracker *tracker;

public:
  OpHistory(OpTracker *tracker_) : shutdown(false), tracker(tracker_) {}
  ~OpHistory() {
    assert(arrived.empty());
    assert(duration.empty());
  }
  void insert(utime_t now, TrackedOpRef op);
  void dump_ops(utime_t now, Formatter *f);
  void on_shutdown();
};

class OpTracker {
  class RemoveOnDelete {
    OpTracker *tracker;
  public:
    RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {}
    void operator()(TrackedOp *op);
  };
  friend class RemoveOnDelete;
  friend class OpRequest;
  friend class OpHistory;
  uint64_t seq;
  Mutex ops_in_flight_lock;
  xlist<TrackedOp *> ops_in_flight;
  OpHistory history;

protected:
  CephContext *cct;

public:
  OpTracker(CephContext *cct_) : seq(0), ops_in_flight_lock("OpTracker mutex"), history(this), cct(cct_) {}
  void dump_ops_in_flight(Formatter *f);
  void dump_historic_ops(Formatter *f);
  void register_inflight_op(xlist<TrackedOp*>::item *i);
  void unregister_inflight_op(TrackedOp *i);

  void get_age_ms_histogram(pow2_hist_t *h);

  /**
   * Look for Ops which are too old, and insert warning
   * strings for each Op that is too old.
   *
   * @param warning_strings A vector<string> reference which is filled
   * with a warning string for each old Op.
   * @return True if there are any Ops to warn on, false otherwise.
   */
  bool check_ops_in_flight(std::vector<string> &warning_strings);
  void mark_event(TrackedOp *op, const string &evt);
  void _mark_event(TrackedOp *op, const string &evt, utime_t now);

  void on_shutdown() {
    Mutex::Locker l(ops_in_flight_lock);
    history.on_shutdown();
  }
  ~OpTracker() {
    assert(ops_in_flight.empty());
  }

  template <typename T, typename TRef>
  TRef create_request(Message *ref)
  {
    TRef retval(new T(ref, this),
                RemoveOnDelete(this));

    _mark_event(retval.get(), "header_read", ref->get_recv_stamp());
    _mark_event(retval.get(), "throttled", ref->get_throttle_stamp());
    _mark_event(retval.get(), "all_read", ref->get_recv_complete_stamp());
    _mark_event(retval.get(), "dispatched", ref->get_dispatch_stamp());

    retval->init_from_message();

    return retval;
  }
};

class TrackedOp {
private:
  friend class OpHistory;
  friend class OpTracker;
  xlist<TrackedOp*>::item xitem;
protected:
  Message *request; /// the logical request we are tracking
  OpTracker *tracker; /// the tracker we are associated with

  list<pair<utime_t, string> > events; /// list of events and their times
  Mutex lock; /// to protect the events list
  string current; /// the current state the event is in
  uint64_t seq; /// a unique value set by the OpTracker

  uint8_t warn_interval_multiplier; // limits output of a given op warning

  TrackedOp(Message *req, OpTracker *_tracker) :
    xitem(this),
    request(req),
    tracker(_tracker),
    lock("TrackedOp::lock"),
    seq(0),
    warn_interval_multiplier(1)
  {
    tracker->register_inflight_op(&xitem);
  }

  virtual void init_from_message() {};

public:
  virtual ~TrackedOp() { assert(request); request->put(); }

  utime_t get_arrived() const {
    return request->get_recv_stamp();
  }
  // This function maybe needs some work; assumes last event is completion time
  double get_duration() const {
    return events.size() ?
      (events.rbegin()->first - get_arrived()) :
      0.0;
  }
  Message *get_req() const { return request; }

  virtual void mark_event(const string &event);
  virtual const char *state_string() const {
    return events.rbegin()->second.c_str();
  }
  virtual void dump(utime_t now, Formatter *f) const = 0;
};

#endif