summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h232
1 files changed, 232 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
new file mode 100644
index 0000000000..1bf6b2cee3
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
@@ -0,0 +1,232 @@
+#ifndef QPID_BROKER_AMQP_0_10_CONNECTION_H
+#define QPID_BROKER_AMQP_0_10_CONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <sstream>
+#include <vector>
+#include <queue>
+
+#include "qpid/broker/BrokerImportExport.h"
+
+#include "qpid/broker/ConnectionHandler.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/broker/OwnershipToken.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/sys/AggregateOutput.h"
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/SecuritySettings.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/types/Variant.h"
+#include "qpid/RefCounted.h"
+#include "qpid/Url.h"
+#include "qpid/ptr_map.h"
+
+#include "qmf/org/apache/qpid/broker/Connection.h"
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/bind.hpp>
+
+#include <algorithm>
+
+namespace qpid {
+namespace sys {
+class ConnectionOutputHandler;
+class Timer;
+class TimerTask;
+}
+namespace broker {
+
+class Broker;
+class LinkRegistry;
+class Queue;
+class SecureConnection;
+class SessionHandler;
+
+namespace amqp_0_10 {
+struct ConnectionTimeoutTask;
+
+class Connection : public sys::ConnectionInputHandler, public qpid::broker::Connection,
+ public management::Manageable,
+ public RefCounted
+{
+ public:
+ uint32_t getFrameMax() const { return framemax; }
+ uint16_t getHeartbeat() const { return heartbeat; }
+ uint16_t getHeartbeatMax() const { return heartbeatmax; }
+
+ void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); }
+ void setHeartbeat(uint16_t hb) { heartbeat = hb; }
+ void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
+
+
+ const management::ObjectId getObjectId() const { return GetManagementObject()->getObjectId(); };
+ const std::string& getUserId() const { return userId; }
+
+ bool isFederationLink() const { return federationPeerTag.size() > 0; }
+ void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); }
+ const std::string& getFederationPeerTag() const { return federationPeerTag; }
+ std::vector<Url>& getKnownHosts() { return knownHosts; }
+
+ /**@return true if user is the authenticated user on this connection.
+ * If id has the default realm will also compare plain username.
+ */
+ bool isAuthenticatedUser(const std::string& id) const {
+ return (id == userId || (isDefaultRealm && id == userName));
+ }
+
+ Broker& getBroker() { return broker; }
+
+ sys::ConnectionOutputHandler& getOutput() { return *out; }
+ void activateOutput();
+ void addOutputTask(OutputTask*);
+ void removeOutputTask(OutputTask*);
+ framing::ProtocolVersion getVersion() const { return version; }
+
+ Connection(sys::ConnectionOutputHandler* out,
+ Broker& broker,
+ const std::string& mgmtId,
+ const qpid::sys::SecuritySettings&,
+ bool isLink = false,
+ uint64_t objectId = 0);
+
+ ~Connection ();
+
+ /** Get the SessionHandler for channel. Create if it does not already exist */
+ SessionHandler& getChannel(framing::ChannelId channel);
+
+ /** Close the connection. Waits for the client to respond with close-ok
+ * before actually destroying the connection.
+ */
+ QPID_BROKER_EXTERN void close(
+ framing::connection::CloseCode code, const std::string& text);
+
+ /** Abort the connection. Close abruptly and immediately. */
+ QPID_BROKER_EXTERN void abort();
+
+ // ConnectionInputHandler methods
+ void received(framing::AMQFrame& frame);
+ bool doOutput();
+ void closed();
+
+ void closeChannel(framing::ChannelId channel);
+
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject(void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
+
+ void requestIOProcessing (boost::function0<void>);
+ void recordFromServer (const framing::AMQFrame& frame);
+ void recordFromClient (const framing::AMQFrame& frame);
+
+ // gets for configured federation links
+ std::string getAuthMechanism();
+ std::string getAuthCredentials();
+ std::string getUsername();
+ std::string getPassword();
+ std::string getHost();
+ uint16_t getPort();
+
+ void notifyConnectionForced(const std::string& text);
+ void setUserId(const std::string& uid);
+
+ // credentials for connected client
+ const std::string& getMgmtId() const { return mgmtId; }
+ management::ManagementAgent* getAgent() const { return agent; }
+
+ void setHeartbeatInterval(uint16_t heartbeat);
+ void sendHeartbeat();
+ void restartTimeout();
+
+ void setSecureConnection(SecureConnection* secured);
+
+ const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
+ {
+ return securitySettings;
+ }
+
+ /** @return true if the initial connection negotiation is complete. */
+ bool isOpen();
+
+ bool isLink() const { return link; }
+ void startLinkHeartbeatTimeoutTask();
+
+ void setClientProperties(const types::Variant::Map& cp) { clientProperties = cp; }
+ const types::Variant::Map& getClientProperties() const { return clientProperties; }
+
+ private:
+ // Management object is used in the constructor so must be early
+ qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject;
+
+ //contained output tasks
+ sys::AggregateOutput outputTasks;
+
+ boost::scoped_ptr<framing::FrameHandler> outboundTracker;
+ boost::scoped_ptr<sys::ConnectionOutputHandler> out;
+
+ Broker& broker;
+
+ framing::ProtocolVersion version;
+ uint32_t framemax;
+ uint16_t heartbeat;
+ uint16_t heartbeatmax;
+ std::string userId;
+ std::string federationPeerTag;
+ std::vector<Url> knownHosts;
+ std::string userName;
+ bool isDefaultRealm;
+
+ typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
+
+ ChannelMap channels;
+ qpid::sys::SecuritySettings securitySettings;
+ const bool link;
+ ConnectionHandler adapter;
+ bool mgmtClosing;
+ const std::string mgmtId;
+ sys::Mutex ioCallbackLock;
+ std::queue<boost::function0<void> > ioCallbacks;
+ LinkRegistry& links;
+ management::ManagementAgent* agent;
+ sys::Timer& timer;
+ boost::intrusive_ptr<sys::TimerTask> heartbeatTimer, linkHeartbeatTimer;
+ boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
+ uint64_t objectId;
+ types::Variant::Map clientProperties;
+
+ void raiseConnectEvent();
+
+friend class OutboundFrameTracker;
+
+ void sent(const framing::AMQFrame& f);
+ void doIoCallbacks();
+
+ public:
+
+ qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; }
+};
+
+}}}
+
+#endif /*!QPID_BROKER_AMQP_0_10_CONNECTION_H*/