summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Link.h
blob: 01ddc68d9747c0c5e6e25e3ea97dd90dc120a1a2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#ifndef _broker_Link_h
#define _broker_Link_h

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include <boost/shared_ptr.hpp>
#include "qpid/Url.h"
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/PersistableConfig.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/sys/Mutex.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/Link.h"
#include <boost/ptr_container/ptr_vector.hpp>

namespace qpid {

namespace sys {
class TimerTask;
}

namespace broker {

class LinkRegistry;
class Broker;
class Connection;
class LinkExchange;

class Link : public PersistableConfig, public management::Manageable {
  private:
    mutable sys::Mutex  lock;
    const std::string   name;
    LinkRegistry*       links;

    // these remain constant across failover - used to identify this link
    const std::string   configuredTransport;
    const std::string   configuredHost;
    const uint16_t      configuredPort;
    // these reflect the current address of remote - will change during failover
    std::string         host;
    uint16_t            port;
    std::string         transport;

    bool durable;

    std::string        authMechanism;
    std::string        username;
    std::string        password;
    mutable uint64_t    persistenceId;
    qmf::org::apache::qpid::broker::Link::shared_ptr mgmtObject;
    Broker* broker;
    int     state;
    uint32_t visitCount;
    uint32_t currentInterval;
    Url      url;       // URL can contain many addresses.
    size_t   reconnectNext; // Index for next re-connect attempt

    typedef std::vector<Bridge::shared_ptr> Bridges;
    Bridges created;   // Bridges pending creation
    Bridges active;    // Bridges active
    Bridges cancellations;    // Bridges pending cancellation
    framing::ChannelId nextFreeChannel;
    RangeSet<framing::ChannelId> freeChannels;
    Connection* connection;
    management::ManagementAgent* agent;
    boost::function<void(Link*)> listener;
    boost::intrusive_ptr<sys::TimerTask> timerTask;
    boost::shared_ptr<broker::LinkExchange> failoverExchange;  // subscribed to remote's amq.failover exchange
    bool failover; // Do we subscribe to a failover exchange?
    uint failoverChannel;
    std::string failoverSession;

    static const int STATE_WAITING     = 1;
    static const int STATE_CONNECTING  = 2;
    static const int STATE_OPERATIONAL = 3;
    static const int STATE_FAILED      = 4;
    static const int STATE_CLOSED      = 5;
    static const int STATE_CLOSING     = 6;  // Waiting for outstanding connect to complete first

    static const uint32_t MAX_INTERVAL = 32;

    void setStateLH (int newState);
    void startConnectionLH();        // Start the IO Connection
    void destroy();                  // Cleanup connection before link goes away
    void ioThreadProcessing();       // Called on connection's IO thread by request
    bool tryFailoverLH();            // Called during maintenance visit
    void reconnectLH(const Address&); //called by LinkRegistry

    // connection management (called by LinkRegistry)
    void established(Connection*); // Called when connection is created
    void opened();      // Called when connection is open (after create)
    void closed(int, std::string);   // Called when connection goes away
    void notifyConnectionForced(const std::string text);
    void closeConnection(const std::string& reason);

    friend class LinkRegistry; // to call established, opened, closed

  public:
    typedef boost::shared_ptr<Link> shared_ptr;
    typedef boost::function<void(Link*)> DestroyedListener;

    Link(const std::string&       name,
         LinkRegistry* links,
         const std::string&       host,
         uint16_t      port,
         const std::string&       transport,
         DestroyedListener        l,
         bool          durable,
         const std::string&       authMechanism,
         const std::string&       username,
         const std::string&       password,
         Broker*       broker,
         management::Manageable* parent = 0,
         bool failover=true);
    virtual ~Link();

    /** these return the *configured* transport/host/port, which does not change over the
        lifetime of the Link */
    std::string getHost() const { return configuredHost; }
    uint16_t    getPort() const { return configuredPort; }
    std::string getTransport() const { return configuredTransport; }

    /** returns the current address of the remote, which may be different from the
        configured transport/host/port due to failover. Returns true if connection is
        active */
    QPID_BROKER_EXTERN bool getRemoteAddress(qpid::Address& addr) const;

    bool isDurable() { return durable; }
    void maintenanceVisit ();
    framing::ChannelId nextChannel();        // allocate channel from link free pool
    void returnChannel(framing::ChannelId);  // return channel to link free pool
    void add(Bridge::shared_ptr);
    void cancel(Bridge::shared_ptr);

    QPID_BROKER_EXTERN void setUrl(const Url&); // Set URL for reconnection.

    // Close the link.
    QPID_BROKER_EXTERN void close();

    std::string getAuthMechanism() { return authMechanism; }
    std::string getUsername()      { return username; }
    std::string getPassword()      { return password; }
    Broker* getBroker()       { return broker; }

    bool isConnecting() const { return state == STATE_CONNECTING; }

    // PersistableConfig:
    void     setPersistenceId(uint64_t id) const;
    uint64_t getPersistenceId() const { return persistenceId; }
    uint32_t encodedSize() const;
    void     encode(framing::Buffer& buffer) const;
    const std::string& getName() const;

    static const std::string ENCODED_IDENTIFIER;
    static const std::string ENCODED_IDENTIFIER_V1;
    static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
    static bool isEncodedLink(const std::string& key);

    // Manageable entry points
    management::ManagementObject::shared_ptr GetManagementObject(void) const;
    management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);

    // manage the exchange owned by this link
    static const std::string exchangeTypeName;
    static boost::shared_ptr<Exchange> linkExchangeFactory(const std::string& name);

    /** create a name for a link (if none supplied by user config) */
    static std::string createName(const std::string& transport,
                                  const std::string& host,
                                  uint16_t  port);

    /** The current connction for this link. Note returns 0 if the link is not
     * presently connected.
     */
    Connection* getConnection() { return connection; }
};
}
}


#endif  /*!_broker_Link.cpp_h*/