summaryrefslogtreecommitdiff
path: root/src/mongo/util/net/message_port.h
blob: 9988f15a58b046ea44fcb15bbf655fabe0456dff (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// message_port.h

/*    Copyright 2009 10gen Inc.
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */

#pragma once

#include "mongo/util/net/message.h"
#include "mongo/util/net/sock.h"

namespace mongo {

    class MessagingPort;
    class PiggyBackData;

    typedef AtomicUInt MSGID;

    class AbstractMessagingPort : boost::noncopyable {
    public:
        AbstractMessagingPort() : tag(0), _connectionId(0) {}
        virtual ~AbstractMessagingPort() { }
        virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available
        virtual void reply(Message& received, Message& response) = 0;

        virtual HostAndPort remote() const = 0;
        virtual unsigned remotePort() const = 0;

        long long connectionId() const { return _connectionId; }
        void setConnectionId( long long connectionId );

    public:
        // TODO make this private with some helpers

        /* ports can be tagged with various classes.  see closeAllSockets(tag). defaults to 0. */
        unsigned tag;

    private:
        long long _connectionId;
    };

    class MessagingPort : public AbstractMessagingPort {
    public:
        MessagingPort(int fd, const SockAddr& remote);

        // in some cases the timeout will actually be 2x this value - eg we do a partial send,
        // then the timeout fires, then we try to send again, then the timeout fires again with
        // no data sent, then we detect that the other side is down
        MessagingPort(double so_timeout = 0, int logLevel = 0 );

        MessagingPort(boost::shared_ptr<Socket> socket);

        virtual ~MessagingPort();

        void setSocketTimeout(double timeout);

        void shutdown();

        /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's.
           also, the Message data will go out of scope on the subsequent recv call.
        */
        bool recv(Message& m);
        void reply(Message& received, Message& response, MSGID responseTo);
        void reply(Message& received, Message& response);
        bool call(Message& toSend, Message& response);

        void say(Message& toSend, int responseTo = -1);

        /**
         * this is used for doing 'async' queries
         * instead of doing call( to , from )
         * you would do
         * say( to )
         * recv( from )
         * Note: if you fail to call recv and someone else uses this port,
         *       horrible things will happen
         */
        bool recv( const Message& sent , Message& response );

        void piggyBack( Message& toSend , int responseTo = -1 );

        unsigned remotePort() const { return psock->remotePort(); }
        virtual HostAndPort remote() const;

        boost::shared_ptr<Socket> psock;
                
        void send( const char * data , int len, const char *context ) {
            psock->send( data, len, context );
        }
        void send( const vector< pair< char *, int > > &data, const char *context ) {
            psock->send( data, context );
        }
        bool connect(SockAddr& farEnd) {
            return psock->connect( farEnd );
        }
#ifdef MONGO_SSL
        /**
         * Initiates the TLS/SSL handshake on this MessagingPort.
         * When this function returns, further communication on this
         * MessagingPort will be encrypted.
         */
        void secure( SSLManagerInterface* ssl ) {
            psock->secure( ssl );
        }
#endif

        uint64_t getSockCreationMicroSec() const {
            return psock->getSockCreationMicroSec();
        }

    private:
        
        PiggyBackData * piggyBackData;
        
        // this is the parsed version of remote
        // mutable because its initialized only on call to remote()
        mutable HostAndPort _remoteParsed; 

    public:
        static void closeAllSockets(unsigned tagMask = 0xffffffff);

        friend class PiggyBackData;
    };


} // namespace mongo