summaryrefslogtreecommitdiff
path: root/contrib/fb303/TClientInfo.h
blob: e3859a71e5055db35e11a3bb10537288de7e65ec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#ifndef _FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_
#define _FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_ 1

// for inet_ntop --
#include <arpa/inet.h>
#include <thrift/server/TServer.h>
#include <thrift/transport/TSocket.h>
#include <thrift/concurrency/Mutex.h>

namespace apache { namespace thrift { namespace server {

using namespace apache::thrift;
using namespace apache::thrift::transport;
using namespace apache::thrift::concurrency;
using boost::shared_ptr;
using std::string;
using std::vector;

/**
 * StableVector -- a minimal vector class where growth is automatic and
 * vector elements never move as the vector grows.  Allocates new space
 * as needed, but does not copy old values.
 *
 * A level vector stores a list of storage vectors containing the actual
 * elements.  Levels are added as needed, doubling in size each time.
 * Locking is only done when a level is added.  Access is amortized
 * constant time.
 */
template <typename T>
class StableVector {
  /// The initial allocation as an exponent of 2
  static const uint32_t kInitialSizePowOf2 = 10;
  /// The initial allocation size
  static const uint32_t kInitialVectorSize = 1 << kInitialSizePowOf2;
  /// This bound is guaranteed not to be exceeded on 64-bit archs
  static const int kMaxLevels = 64;

  /// Values are kept in one or more of these
  typedef vector<T> Vect;
  /// One or more value vectors are kept in one of these
  typedef vector<Vect*> LevelVector;

  Mutex mutex_;
  /// current size
  size_t size_;
  _Atomic_word vectLvl_;
  LevelVector vects_;

 public:
  /**
   * Constructor -- initialize the level vector and allocate the
   * initial storage vector
   */
  StableVector()
    : size_(0) 
    , vectLvl_(0) {
    vects_.reserve(kMaxLevels);
    Vect* storageVector(new Vect(1 << kInitialSizePowOf2));
    vects_.push_back(storageVector);
  }

 private:
  /**
   * make sure the requested number of storage levels have been allocated.
   */
  void expand(uint32_t level) {
    // we need the guard to insure that we only allocate once.
    Guard g(mutex_);
    while (level > vectLvl_) {
      Vect* levelVect(new Vect(1 << (vectLvl_ + kInitialSizePowOf2)));
      vects_.push_back(levelVect);
      // we need to make sure this is done after levelVect is inserted
      // (what we want is effectively a memory barrier here).
      __gnu_cxx::__atomic_add(&vectLvl_, 1);
    }
  }

  /**
   * Given an index, determine which level and element of that level is
   * required.  Grows if needed.
   */
  void which(uint32_t n, uint32_t* vno, uint32_t* idx) {
    if (n >= size_) {
      size_ = n + 1;
    }
    if (n < kInitialVectorSize) {
      *idx = n;
      *vno = 0;
    } else {
      uint32_t upper = n >> kInitialSizePowOf2;
      *vno = CHAR_BIT*sizeof(upper) - __builtin_clz(upper);
      *idx = n - (1 << (*vno + kInitialSizePowOf2 - 1));
      if (*vno > vectLvl_) {
        expand(*vno);
      }
    }
  }

 public:
  /**
   * Given an index, return a reference to that element, perhaps after
   * allocating additional space.
   *
   * @param n a positive integer
   */
  T& operator[](uint32_t n) {
    uint32_t vno;
    uint32_t idx;
    which(n, &vno, &idx);
    return (*vects_[vno])[idx];
  }

  /**
   * Return the present size of the vector.
   */
  size_t size() const { return size_; }
};


/**
 * This class embodies the representation of a single connection during
 * processing.  We'll keep one of these per file descriptor in TClientInfo.
 */
class TClientInfoConnection {
 public:
  const static int kNameLen = 32;

 private:
  typedef union IPAddrUnion {
    sockaddr_in ipv4;
    sockaddr_in6 ipv6;
  };

  char call_[kNameLen];            ///< The name of the thrift call
  IPAddrUnion addr_;               ///< The client's IP address
  timespec time_;                  ///< Time processing started
  uint64_t ncalls_;                ///< # of calls processed

 public:
  /**
   * Constructor; insure that no client address or thrift call name is
   * represented.
   */
  TClientInfoConnection();

  /**
   * A connection has been made; record its address.  Since this is the
   * first we'll know of a connection we start the timer here as well.
   */
  void recordAddr(const sockaddr* addr);

  /**
   * Mark the address as empty/unknown.
   */
  void eraseAddr();

  /**
   * Return a string representing the present address, or nullptr if none.
   * Copies the string into the buffer provided.
   */
  const char* getAddr(char* buf, int len) const;

  /**
   * A call has been made on this connection; record its name.  Since this is
   * called for every thrift call processed, we also do our call count here.
   */ 
  void recordCall(const char* name);

  /**
   * Invoked when processing has ended to clear the call name.
   */
  void eraseCall();

  /**
   * Return as string the thrift call either currently being processed or
   * most recently processed if the connection is still open for additional
   * calls.  Returns nullptr if a call hasn't been made yet or processing
   * has ended.
   */
  const char* getCall() const;

  /**
   * Get the timespec for the start of this connection (specifically, when
   * recordAddr() was first called).
   */
  void getTime(timespec* time) const;

  /**
   * Return the number of calls made on this connection.
   */
  uint64_t getNCalls() const;

 private:
  void initTime();
};


/**
 * Store for info about a server's clients -- specifically, the client's IP
 * address and the call it is executing.  This information is indexed by
 * socket file descriptor and in the present implementation is updated
 * asynchronously, so it may only approximate reality.
 */
class TClientInfo {
 private:
  StableVector<TClientInfoConnection> info_;

 public:
  /**
   * Return the info object for a given file descriptor.  If "grow" is true
   * extend the info vector if required (such as for a file descriptor not seen
   * before).  If "grow" is false and the info vector isn't large enough,
   * or if "fd" is negative, return nullptr.
   */
  TClientInfoConnection* getConnection(int fd, bool grow);

  size_t size() const;
};

/**
 * This derivation of TServerEventHandler encapsulates the main status vector
 * and provides context to the server's processing loop via overrides.
 * Together with TClientInfoCallHandler (derived from TProcessorEventHandler) 
 * it integrates client info collection into the server.
 */
class TClientInfoServerHandler : public TServerEventHandler {
 private:
  TClientInfo clientInfo_;

 public:
  /**
   * One of these is constructed for each open connection/descriptor and links
   * to both the status vector (clientInfo_) and that descriptor's entry
   * within it.
   */
  struct Connect {
    TClientInfo* clientInfo_;
    TClientInfoConnection* callInfo_;

    explicit Connect(TClientInfo* clientInfo)
      : clientInfo_(clientInfo)
      , callInfo_(nullptr) {
    }
  };

  /**
   * Generate processor context; we don't know what descriptor we belong to
   * yet -- we'll get hooked up in contextProcess(). 
   */
  void* createContext(boost::shared_ptr<TProtocol> input,
                      boost::shared_ptr<TProtocol> output);

  /**
   * Mark our slot as unused and delete the context created in createContext().
   */
  void deleteContext(void* processorContext,
                     boost::shared_ptr<TProtocol> input,
                     boost::shared_ptr<TProtocol> output);
  
  /**
   * Called in the processing loop just before the server invokes the
   * processor itself, on the first call we establish which descriptor
   * we correspond to and set it to that socket's peer IP address.  This
   * also has the side effect of initializing call counting and connection
   * timing.  We won't know which call we're handling until the handler
   * first gets called in TClientInfoCallHandler::getContext().
   */
  void processContext(void* processorContext,
                      shared_ptr<TTransport> transport);

  /**
   * Get status report for server in the form of a vector of strings.
   * Each active client appears as one string in the format:
   *
   *     FD IPADDR CALLNAME DURATION NCALLS
   *
   * where "FD" is the file descriptor for the client's socket, "IPADDR"
   * is the IP address (as reported by accept()), "CALLNAME" is the
   * current or most recent Thrift function name, "DURATION" is the
   * duration of the connection, while NCALLS is the number of Thrift
   * calls made since the connection was made.  A single space separates
   * fields.
   */
  void getStatsStrings(vector<string>& result);
};

/**
 * This class derives from TProcessorEventHandler to gain access to the
 * function name for the current Thrift call.  We need two versions of
 * this -- TClientInfoCallStatsHandler is the other -- since in the latter
 * case we pass through to TFunctionStatHandler to perform Thrift call
 * stats.
 */
class TClientInfoCallHandler : public TProcessorEventHandler {
 public:
  virtual void* getContext(const char* fn_name, void* serverContext);
};

} } } // namespace apache::thrift::server

#endif // !_FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_