summaryrefslogtreecommitdiff
path: root/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp')
-rw-r--r--trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp317
1 files changed, 317 insertions, 0 deletions
diff --git a/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
new file mode 100644
index 0000000000..8f1cc7b03f
--- /dev/null
+++ b/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -0,0 +1,317 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/ConnectionHandler.h"
+
+#include "qpid/client/SaslFactory.h"
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/all_method_bodies.h"
+#include "qpid/framing/ClientInvoker.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Helpers.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/SystemInfo.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::framing::connection;
+using qpid::sys::SecurityLayer;
+using qpid::sys::Duration;
+using qpid::sys::TimerTask;
+using qpid::sys::Timer;
+using qpid::sys::AbsTime;
+using qpid::sys::TIME_SEC;
+using qpid::sys::ScopedLock;
+using qpid::sys::Mutex;
+
+namespace {
+const std::string OK("OK");
+const std::string PLAIN("PLAIN");
+const std::string en_US("en_US");
+
+const std::string INVALID_STATE_START("start received in invalid state");
+const std::string INVALID_STATE_TUNE("tune received in invalid state");
+const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state");
+const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state");
+
+const std::string SESSION_FLOW_CONTROL("qpid.session_flow");
+const std::string CLIENT_PROCESS_NAME("qpid.client_process");
+const std::string CLIENT_PID("qpid.client_pid");
+const std::string CLIENT_PPID("qpid.client_ppid");
+const int SESSION_FLOW_CONTROL_VER = 1;
+}
+
+CloseCode ConnectionHandler::convert(uint16_t replyCode)
+{
+ switch (replyCode) {
+ case 200: return CLOSE_CODE_NORMAL;
+ case 320: return CLOSE_CODE_CONNECTION_FORCED;
+ case 402: return CLOSE_CODE_INVALID_PATH;
+ case 501: default:
+ return CLOSE_CODE_FRAMING_ERROR;
+ }
+}
+
+ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v)
+ : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler),
+ errorCode(CLOSE_CODE_NORMAL), version(v)
+{
+ insist = true;
+
+ ESTABLISHED.insert(FAILED);
+ ESTABLISHED.insert(CLOSED);
+ ESTABLISHED.insert(OPEN);
+
+ FINISHED.insert(FAILED);
+ FINISHED.insert(CLOSED);
+
+ properties.setInt(SESSION_FLOW_CONTROL, SESSION_FLOW_CONTROL_VER);
+ properties.setString(CLIENT_PROCESS_NAME, sys::SystemInfo::getProcessName());
+ properties.setInt(CLIENT_PID, sys::SystemInfo::getProcessId());
+ properties.setInt(CLIENT_PPID, sys::SystemInfo::getParentProcessId());
+}
+
+void ConnectionHandler::incoming(AMQFrame& frame)
+{
+ if (getState() == CLOSED) {
+ throw Exception("Received frame on closed connection");
+ }
+
+ if (rcvTimeoutTask) {
+ // Received frame on connection so delay timeout
+ rcvTimeoutTask->restart();
+ }
+
+ AMQBody* body = frame.getBody();
+ try {
+ if (frame.getChannel() != 0 || !invoke(static_cast<ConnectionOperations&>(*this), *body)) {
+ switch(getState()) {
+ case OPEN:
+ in(frame);
+ break;
+ case CLOSING:
+ QPID_LOG(warning, "Ignoring frame while closing connection: " << frame);
+ break;
+ default:
+ throw Exception("Cannot receive frames on non-zero channel until connection is established.");
+ }
+ }
+ }catch(std::exception& e){
+ QPID_LOG(warning, "Closing connection due to " << e.what());
+ setState(CLOSING);
+ errorCode = CLOSE_CODE_FRAMING_ERROR;
+ errorText = e.what();
+ proxy.close(501, e.what());
+ }
+}
+
+void ConnectionHandler::outgoing(AMQFrame& frame)
+{
+ if (getState() == OPEN)
+ out(frame);
+ else
+ throw TransportFailure(errorText.empty() ? "Connection is not open." : errorText);
+}
+
+void ConnectionHandler::waitForOpen()
+{
+ waitFor(ESTABLISHED);
+ if (getState() == FAILED || getState() == CLOSED) {
+ throw ConnectionException(errorCode, errorText);
+ }
+}
+
+void ConnectionHandler::close()
+{
+ switch (getState()) {
+ case NEGOTIATING:
+ case OPENING:
+ fail("Connection closed before it was established");
+ break;
+ case OPEN:
+ if (setState(CLOSING, OPEN)) {
+ proxy.close(200, OK);
+ waitFor(FINISHED);//FINISHED = CLOSED or FAILED
+ }
+ //else, state was changed from open after we checked, can only
+ //change to failed or closed, so nothing to do
+ break;
+
+ // Nothing to do if already CLOSING, CLOSED, FAILED or if NOT_STARTED
+ }
+}
+
+void ConnectionHandler::heartbeat()
+{
+ // Do nothing - the purpose of heartbeats is just to make sure that there is some
+ // traffic on the connection within the heart beat interval, we check for the
+ // traffic and don't need to do anything in response to heartbeats
+
+ // Although the above is still true we're now using a received heartbeat as a trigger
+ // to send out our own heartbeat
+ proxy.heartbeat();
+}
+
+void ConnectionHandler::checkState(STATES s, const std::string& msg)
+{
+ if (getState() != s) {
+ throw CommandInvalidException(msg);
+ }
+}
+
+void ConnectionHandler::fail(const std::string& message)
+{
+ errorCode = CLOSE_CODE_FRAMING_ERROR;
+ errorText = message;
+ QPID_LOG(warning, message);
+ setState(FAILED);
+}
+
+namespace {
+std::string SPACE(" ");
+}
+
+void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& mechanisms, const Array& /*locales*/)
+{
+ checkState(NOT_STARTED, INVALID_STATE_START);
+ setState(NEGOTIATING);
+ sasl = SaslFactory::getInstance().create(*this);
+
+ std::string mechlist;
+ bool chosenMechanismSupported = mechanism.empty();
+ for (Array::const_iterator i = mechanisms.begin(); i != mechanisms.end(); ++i) {
+ if (!mechanism.empty() && mechanism == (*i)->get<std::string>()) {
+ chosenMechanismSupported = true;
+ mechlist = (*i)->get<std::string>() + SPACE + mechlist;
+ } else {
+ if (i != mechanisms.begin()) mechlist += SPACE;
+ mechlist += (*i)->get<std::string>();
+ }
+ }
+
+ if (!chosenMechanismSupported) {
+ fail("Selected mechanism not supported: " + mechanism);
+ }
+
+ if (sasl.get()) {
+ string response = sasl->start(mechanism.empty() ? mechlist : mechanism,
+ getSSF ? getSSF() : 0);
+ proxy.startOk(properties, sasl->getMechanism(), response, locale);
+ } else {
+ //TODO: verify that desired mechanism and locale are supported
+ string response = ((char)0) + username + ((char)0) + password;
+ proxy.startOk(properties, mechanism, response, locale);
+ }
+}
+
+void ConnectionHandler::secure(const std::string& challenge)
+{
+ if (sasl.get()) {
+ string response = sasl->step(challenge);
+ proxy.secureOk(response);
+ } else {
+ throw NotImplementedException("Challenge-response cycle not yet implemented in client");
+ }
+}
+
+void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed,
+ uint16_t heartbeatMin, uint16_t heartbeatMax)
+{
+ checkState(NEGOTIATING, INVALID_STATE_TUNE);
+ maxChannels = std::min(maxChannels, maxChannelsProposed);
+ maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed);
+ // Clip the requested heartbeat to the maximum/minimum offered
+ uint16_t heartbeat = ConnectionSettings::heartbeat;
+ heartbeat = heartbeat < heartbeatMin ? heartbeatMin :
+ heartbeat > heartbeatMax ? heartbeatMax :
+ heartbeat;
+ ConnectionSettings::heartbeat = heartbeat;
+ proxy.tuneOk(maxChannels, maxFrameSize, heartbeat);
+ setState(OPENING);
+ proxy.open(virtualhost, capabilities, insist);
+}
+
+void ConnectionHandler::openOk ( const Array& knownBrokers )
+{
+ checkState(OPENING, INVALID_STATE_OPEN_OK);
+ knownBrokersUrls.clear();
+ framing::Array::ValueVector::const_iterator i;
+ for ( i = knownBrokers.begin(); i != knownBrokers.end(); ++i )
+ knownBrokersUrls.push_back(Url((*i)->get<std::string>()));
+ if (sasl.get()) {
+ securityLayer = sasl->getSecurityLayer(maxFrameSize);
+ operUserId = sasl->getUserId();
+ }
+ setState(OPEN);
+ QPID_LOG(debug, "Known-brokers for connection: " << log::formatList(knownBrokersUrls));
+}
+
+
+void ConnectionHandler::redirect(const std::string& /*host*/, const Array& /*knownHosts*/)
+{
+ throw NotImplementedException("Redirection received from broker; not yet implemented in client");
+}
+
+void ConnectionHandler::close(uint16_t replyCode, const std::string& replyText)
+{
+ proxy.closeOk();
+ errorCode = convert(replyCode);
+ errorText = replyText;
+ setState(CLOSED);
+ QPID_LOG(warning, "Broker closed connection: " << replyCode << ", " << replyText);
+ if (onError) {
+ onError(replyCode, replyText);
+ }
+}
+
+void ConnectionHandler::closeOk()
+{
+ checkState(CLOSING, INVALID_STATE_CLOSE_OK);
+ if (onError && errorCode != CLOSE_CODE_NORMAL) {
+ onError(errorCode, errorText);
+ } else if (onClose) {
+ onClose();
+ }
+ setState(CLOSED);
+}
+
+bool ConnectionHandler::isOpen() const
+{
+ return getState() == OPEN;
+}
+
+bool ConnectionHandler::isClosed() const
+{
+ int s = getState();
+ return s == CLOSED || s == FAILED;
+}
+
+bool ConnectionHandler::isClosing() const { return getState() == CLOSING; }
+
+std::auto_ptr<qpid::sys::SecurityLayer> ConnectionHandler::getSecurityLayer()
+{
+ return securityLayer;
+}
+
+void ConnectionHandler::setRcvTimeoutTask(boost::intrusive_ptr<qpid::sys::TimerTask> t)
+{
+ rcvTimeoutTask = t;
+}