/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #ifndef _FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_ #define _FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_ 1 // for inet_ntop -- #include #include #include #include namespace apache { namespace thrift { namespace server { using namespace apache::thrift; using namespace apache::thrift::transport; using namespace apache::thrift::concurrency; using boost::shared_ptr; using std::string; using std::vector; /** * StableVector -- a minimal vector class where growth is automatic and * vector elements never move as the vector grows. Allocates new space * as needed, but does not copy old values. * * A level vector stores a list of storage vectors containing the actual * elements. Levels are added as needed, doubling in size each time. * Locking is only done when a level is added. Access is amortized * constant time. */ template class StableVector { /// The initial allocation as an exponent of 2 static const uint32_t kInitialSizePowOf2 = 10; /// The initial allocation size static const uint32_t kInitialVectorSize = 1 << kInitialSizePowOf2; /// This bound is guaranteed not to be exceeded on 64-bit archs static const int kMaxLevels = 64; /// Values are kept in one or more of these typedef vector Vect; /// One or more value vectors are kept in one of these typedef vector LevelVector; Mutex mutex_; /// current size size_t size_; _Atomic_word vectLvl_; LevelVector vects_; public: /** * Constructor -- initialize the level vector and allocate the * initial storage vector */ StableVector() : size_(0) , vectLvl_(0) { vects_.reserve(kMaxLevels); Vect* storageVector(new Vect(1 << kInitialSizePowOf2)); vects_.push_back(storageVector); } private: /** * make sure the requested number of storage levels have been allocated. */ void expand(uint32_t level) { // we need the guard to insure that we only allocate once. Guard g(mutex_); while (level > vectLvl_) { Vect* levelVect(new Vect(1 << (vectLvl_ + kInitialSizePowOf2))); vects_.push_back(levelVect); // we need to make sure this is done after levelVect is inserted // (what we want is effectively a memory barrier here). __gnu_cxx::__atomic_add(&vectLvl_, 1); } } /** * Given an index, determine which level and element of that level is * required. Grows if needed. */ void which(uint32_t n, uint32_t* vno, uint32_t* idx) { if (n >= size_) { size_ = n + 1; } if (n < kInitialVectorSize) { *idx = n; *vno = 0; } else { uint32_t upper = n >> kInitialSizePowOf2; *vno = CHAR_BIT*sizeof(upper) - __builtin_clz(upper); *idx = n - (1 << (*vno + kInitialSizePowOf2 - 1)); if (*vno > vectLvl_) { expand(*vno); } } } public: /** * Given an index, return a reference to that element, perhaps after * allocating additional space. * * @param n a positive integer */ T& operator[](uint32_t n) { uint32_t vno; uint32_t idx; which(n, &vno, &idx); return (*vects_[vno])[idx]; } /** * Return the present size of the vector. */ size_t size() const { return size_; } }; /** * This class embodies the representation of a single connection during * processing. We'll keep one of these per file descriptor in TClientInfo. */ class TClientInfoConnection { public: const static int kNameLen = 32; private: typedef union IPAddrUnion { sockaddr_in ipv4; sockaddr_in6 ipv6; }; char call_[kNameLen]; ///< The name of the thrift call IPAddrUnion addr_; ///< The client's IP address timespec time_; ///< Time processing started uint64_t ncalls_; ///< # of calls processed public: /** * Constructor; insure that no client address or thrift call name is * represented. */ TClientInfoConnection(); /** * A connection has been made; record its address. Since this is the * first we'll know of a connection we start the timer here as well. */ void recordAddr(const sockaddr* addr); /** * Mark the address as empty/unknown. */ void eraseAddr(); /** * Return a string representing the present address, or nullptr if none. * Copies the string into the buffer provided. */ const char* getAddr(char* buf, int len) const; /** * A call has been made on this connection; record its name. Since this is * called for every thrift call processed, we also do our call count here. */ void recordCall(const char* name); /** * Invoked when processing has ended to clear the call name. */ void eraseCall(); /** * Return as string the thrift call either currently being processed or * most recently processed if the connection is still open for additional * calls. Returns nullptr if a call hasn't been made yet or processing * has ended. */ const char* getCall() const; /** * Get the timespec for the start of this connection (specifically, when * recordAddr() was first called). */ void getTime(timespec* time) const; /** * Return the number of calls made on this connection. */ uint64_t getNCalls() const; private: void initTime(); }; /** * Store for info about a server's clients -- specifically, the client's IP * address and the call it is executing. This information is indexed by * socket file descriptor and in the present implementation is updated * asynchronously, so it may only approximate reality. */ class TClientInfo { private: StableVector info_; public: /** * Return the info object for a given file descriptor. If "grow" is true * extend the info vector if required (such as for a file descriptor not seen * before). If "grow" is false and the info vector isn't large enough, * or if "fd" is negative, return nullptr. */ TClientInfoConnection* getConnection(int fd, bool grow); size_t size() const; }; /** * This derivation of TServerEventHandler encapsulates the main status vector * and provides context to the server's processing loop via overrides. * Together with TClientInfoCallHandler (derived from TProcessorEventHandler) * it integrates client info collection into the server. */ class TClientInfoServerHandler : public TServerEventHandler { private: TClientInfo clientInfo_; public: /** * One of these is constructed for each open connection/descriptor and links * to both the status vector (clientInfo_) and that descriptor's entry * within it. */ struct Connect { TClientInfo* clientInfo_; TClientInfoConnection* callInfo_; explicit Connect(TClientInfo* clientInfo) : clientInfo_(clientInfo) , callInfo_(nullptr) { } }; /** * Generate processor context; we don't know what descriptor we belong to * yet -- we'll get hooked up in contextProcess(). */ void* createContext(boost::shared_ptr input, boost::shared_ptr output); /** * Mark our slot as unused and delete the context created in createContext(). */ void deleteContext(void* processorContext, boost::shared_ptr input, boost::shared_ptr output); /** * Called in the processing loop just before the server invokes the * processor itself, on the first call we establish which descriptor * we correspond to and set it to that socket's peer IP address. This * also has the side effect of initializing call counting and connection * timing. We won't know which call we're handling until the handler * first gets called in TClientInfoCallHandler::getContext(). */ void processContext(void* processorContext, shared_ptr transport); /** * Get status report for server in the form of a vector of strings. * Each active client appears as one string in the format: * * FD IPADDR CALLNAME DURATION NCALLS * * where "FD" is the file descriptor for the client's socket, "IPADDR" * is the IP address (as reported by accept()), "CALLNAME" is the * current or most recent Thrift function name, "DURATION" is the * duration of the connection, while NCALLS is the number of Thrift * calls made since the connection was made. A single space separates * fields. */ void getStatsStrings(vector& result); }; /** * This class derives from TProcessorEventHandler to gain access to the * function name for the current Thrift call. We need two versions of * this -- TClientInfoCallStatsHandler is the other -- since in the latter * case we pass through to TFunctionStatHandler to perform Thrift call * stats. */ class TClientInfoCallHandler : public TProcessorEventHandler { public: virtual void* getContext(const char* fn_name, void* serverContext); }; } } } // namespace apache::thrift::server #endif // !_FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_