// listen.h
/* Copyright 2009 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#pragma once
#include
#include
#include
#include "mongo/config.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/net/abstract_message_port.h"
#include "mongo/util/net/sock.h"
namespace mongo {
const int DEFAULT_MAX_CONN = 1000000;
class Listener {
MONGO_DISALLOW_COPYING(Listener);
public:
Listener(const std::string& name, const std::string& ip, int port, bool logConnect = true);
virtual ~Listener();
void initAndListen(); // never returns unless error (start a thread)
/* spawn a thread, etc., then return */
virtual void accepted(AbstractMessagingPort* mp) = 0;
const int _port;
/**
* @return a rough estimate of elapsed time since the server started
todo:
1) consider adding some sort of relaxedLoad semantic to the reading here of
_elapsedTime
2) curTimeMillis() implementations have gotten faster. consider eliminating
this code? would have to measure it first. if eliminated be careful if
syscall used isn't skewable. Note also if #2 is done, listen() doesn't
then have to keep waking up and maybe that helps on a developer's laptop
battery usage...
*/
long long getMyElapsedTimeMillis() const {
return _elapsedTime;
}
/**
* Allocate sockets for the listener and set _setupSocketsSuccessful to true
* iff the process was successful.
* Returns _setupSocketsSuccessful.
*/
bool setupSockets();
void setAsTimeTracker() {
_timeTracker = this;
}
// TODO(spencer): Remove this and get the global Listener via the
// globalEnvironmentExperiment
static const Listener* getTimeTracker() {
return _timeTracker;
}
static long long getElapsedTimeMillis() {
if (_timeTracker)
return _timeTracker->getMyElapsedTimeMillis();
// should this assert or throw? seems like callers may not expect to get zero back,
// certainly not forever.
return 0;
}
/**
* Blocks until initAndListen has been called on this instance and gotten far enough that
* it is ready to receive incoming network requests.
*/
void waitUntilListening() const;
private:
std::vector _mine;
std::vector _socks;
std::string _name;
std::string _ip;
bool _setupSocketsSuccessful;
bool _logConnect;
long long _elapsedTime;
mutable stdx::mutex _readyMutex; // Protects _ready
mutable stdx::condition_variable _readyCondition; // Used to wait for changes to _ready
// Boolean that indicates whether this Listener is ready to accept incoming network requests
bool _ready;
virtual void _accepted(const std::shared_ptr& psocket, long long connectionId);
#ifdef MONGO_CONFIG_SSL
SSLManagerInterface* _ssl;
#endif
void _logListen(int port, bool ssl);
static const Listener* _timeTracker;
virtual bool useUnixSockets() const {
return false;
}
public:
/** the "next" connection number. every connection to this process has a unique number */
static AtomicInt64 globalConnectionNumber;
/** keeps track of how many allowed connections there are and how many are being used*/
static TicketHolder globalTicketHolder;
/** makes sure user input is sane */
static void checkTicketNumbers();
/**
* This will close implementations of AbstractMessagingPort, skipping any that have tags
* matching `skipMask`.
*/
static void closeMessagingPorts(
AbstractMessagingPort::Tag skipMask = AbstractMessagingPort::kSkipAllMask);
};
class ListeningSockets {
public:
ListeningSockets() : _sockets(new std::set()), _socketPaths(new std::set()) {}
void add(int sock) {
stdx::lock_guard lk(_mutex);
_sockets->insert(sock);
}
void addPath(const std::string& path) {
stdx::lock_guard lk(_mutex);
_socketPaths->insert(path);
}
void remove(int sock) {
stdx::lock_guard lk(_mutex);
_sockets->erase(sock);
}
void closeAll();
static ListeningSockets* get();
private:
stdx::mutex _mutex;
std::set* _sockets;
std::set* _socketPaths; // for unix domain sockets
static ListeningSockets* _instance;
};
}