diff options
Diffstat (limited to 'trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp')
-rw-r--r-- | trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp | 317 |
1 files changed, 317 insertions, 0 deletions
diff --git a/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp new file mode 100644 index 0000000000..8f1cc7b03f --- /dev/null +++ b/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp @@ -0,0 +1,317 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/ConnectionHandler.h" + +#include "qpid/client/SaslFactory.h" +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/all_method_bodies.h" +#include "qpid/framing/ClientInvoker.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Helpers.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/SystemInfo.h" + +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::framing::connection; +using qpid::sys::SecurityLayer; +using qpid::sys::Duration; +using qpid::sys::TimerTask; +using qpid::sys::Timer; +using qpid::sys::AbsTime; +using qpid::sys::TIME_SEC; +using qpid::sys::ScopedLock; +using qpid::sys::Mutex; + +namespace { +const std::string OK("OK"); +const std::string PLAIN("PLAIN"); +const std::string en_US("en_US"); + +const std::string INVALID_STATE_START("start received in invalid state"); +const std::string INVALID_STATE_TUNE("tune received in invalid state"); +const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state"); +const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state"); + +const std::string SESSION_FLOW_CONTROL("qpid.session_flow"); +const std::string CLIENT_PROCESS_NAME("qpid.client_process"); +const std::string CLIENT_PID("qpid.client_pid"); +const std::string CLIENT_PPID("qpid.client_ppid"); +const int SESSION_FLOW_CONTROL_VER = 1; +} + +CloseCode ConnectionHandler::convert(uint16_t replyCode) +{ + switch (replyCode) { + case 200: return CLOSE_CODE_NORMAL; + case 320: return CLOSE_CODE_CONNECTION_FORCED; + case 402: return CLOSE_CODE_INVALID_PATH; + case 501: default: + return CLOSE_CODE_FRAMING_ERROR; + } +} + +ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v) + : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), + errorCode(CLOSE_CODE_NORMAL), version(v) +{ + insist = true; + + ESTABLISHED.insert(FAILED); + ESTABLISHED.insert(CLOSED); + ESTABLISHED.insert(OPEN); + + FINISHED.insert(FAILED); + FINISHED.insert(CLOSED); + + properties.setInt(SESSION_FLOW_CONTROL, SESSION_FLOW_CONTROL_VER); + properties.setString(CLIENT_PROCESS_NAME, sys::SystemInfo::getProcessName()); + properties.setInt(CLIENT_PID, sys::SystemInfo::getProcessId()); + properties.setInt(CLIENT_PPID, sys::SystemInfo::getParentProcessId()); +} + +void ConnectionHandler::incoming(AMQFrame& frame) +{ + if (getState() == CLOSED) { + throw Exception("Received frame on closed connection"); + } + + if (rcvTimeoutTask) { + // Received frame on connection so delay timeout + rcvTimeoutTask->restart(); + } + + AMQBody* body = frame.getBody(); + try { + if (frame.getChannel() != 0 || !invoke(static_cast<ConnectionOperations&>(*this), *body)) { + switch(getState()) { + case OPEN: + in(frame); + break; + case CLOSING: + QPID_LOG(warning, "Ignoring frame while closing connection: " << frame); + break; + default: + throw Exception("Cannot receive frames on non-zero channel until connection is established."); + } + } + }catch(std::exception& e){ + QPID_LOG(warning, "Closing connection due to " << e.what()); + setState(CLOSING); + errorCode = CLOSE_CODE_FRAMING_ERROR; + errorText = e.what(); + proxy.close(501, e.what()); + } +} + +void ConnectionHandler::outgoing(AMQFrame& frame) +{ + if (getState() == OPEN) + out(frame); + else + throw TransportFailure(errorText.empty() ? "Connection is not open." : errorText); +} + +void ConnectionHandler::waitForOpen() +{ + waitFor(ESTABLISHED); + if (getState() == FAILED || getState() == CLOSED) { + throw ConnectionException(errorCode, errorText); + } +} + +void ConnectionHandler::close() +{ + switch (getState()) { + case NEGOTIATING: + case OPENING: + fail("Connection closed before it was established"); + break; + case OPEN: + if (setState(CLOSING, OPEN)) { + proxy.close(200, OK); + waitFor(FINISHED);//FINISHED = CLOSED or FAILED + } + //else, state was changed from open after we checked, can only + //change to failed or closed, so nothing to do + break; + + // Nothing to do if already CLOSING, CLOSED, FAILED or if NOT_STARTED + } +} + +void ConnectionHandler::heartbeat() +{ + // Do nothing - the purpose of heartbeats is just to make sure that there is some + // traffic on the connection within the heart beat interval, we check for the + // traffic and don't need to do anything in response to heartbeats + + // Although the above is still true we're now using a received heartbeat as a trigger + // to send out our own heartbeat + proxy.heartbeat(); +} + +void ConnectionHandler::checkState(STATES s, const std::string& msg) +{ + if (getState() != s) { + throw CommandInvalidException(msg); + } +} + +void ConnectionHandler::fail(const std::string& message) +{ + errorCode = CLOSE_CODE_FRAMING_ERROR; + errorText = message; + QPID_LOG(warning, message); + setState(FAILED); +} + +namespace { +std::string SPACE(" "); +} + +void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& mechanisms, const Array& /*locales*/) +{ + checkState(NOT_STARTED, INVALID_STATE_START); + setState(NEGOTIATING); + sasl = SaslFactory::getInstance().create(*this); + + std::string mechlist; + bool chosenMechanismSupported = mechanism.empty(); + for (Array::const_iterator i = mechanisms.begin(); i != mechanisms.end(); ++i) { + if (!mechanism.empty() && mechanism == (*i)->get<std::string>()) { + chosenMechanismSupported = true; + mechlist = (*i)->get<std::string>() + SPACE + mechlist; + } else { + if (i != mechanisms.begin()) mechlist += SPACE; + mechlist += (*i)->get<std::string>(); + } + } + + if (!chosenMechanismSupported) { + fail("Selected mechanism not supported: " + mechanism); + } + + if (sasl.get()) { + string response = sasl->start(mechanism.empty() ? mechlist : mechanism, + getSSF ? getSSF() : 0); + proxy.startOk(properties, sasl->getMechanism(), response, locale); + } else { + //TODO: verify that desired mechanism and locale are supported + string response = ((char)0) + username + ((char)0) + password; + proxy.startOk(properties, mechanism, response, locale); + } +} + +void ConnectionHandler::secure(const std::string& challenge) +{ + if (sasl.get()) { + string response = sasl->step(challenge); + proxy.secureOk(response); + } else { + throw NotImplementedException("Challenge-response cycle not yet implemented in client"); + } +} + +void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed, + uint16_t heartbeatMin, uint16_t heartbeatMax) +{ + checkState(NEGOTIATING, INVALID_STATE_TUNE); + maxChannels = std::min(maxChannels, maxChannelsProposed); + maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed); + // Clip the requested heartbeat to the maximum/minimum offered + uint16_t heartbeat = ConnectionSettings::heartbeat; + heartbeat = heartbeat < heartbeatMin ? heartbeatMin : + heartbeat > heartbeatMax ? heartbeatMax : + heartbeat; + ConnectionSettings::heartbeat = heartbeat; + proxy.tuneOk(maxChannels, maxFrameSize, heartbeat); + setState(OPENING); + proxy.open(virtualhost, capabilities, insist); +} + +void ConnectionHandler::openOk ( const Array& knownBrokers ) +{ + checkState(OPENING, INVALID_STATE_OPEN_OK); + knownBrokersUrls.clear(); + framing::Array::ValueVector::const_iterator i; + for ( i = knownBrokers.begin(); i != knownBrokers.end(); ++i ) + knownBrokersUrls.push_back(Url((*i)->get<std::string>())); + if (sasl.get()) { + securityLayer = sasl->getSecurityLayer(maxFrameSize); + operUserId = sasl->getUserId(); + } + setState(OPEN); + QPID_LOG(debug, "Known-brokers for connection: " << log::formatList(knownBrokersUrls)); +} + + +void ConnectionHandler::redirect(const std::string& /*host*/, const Array& /*knownHosts*/) +{ + throw NotImplementedException("Redirection received from broker; not yet implemented in client"); +} + +void ConnectionHandler::close(uint16_t replyCode, const std::string& replyText) +{ + proxy.closeOk(); + errorCode = convert(replyCode); + errorText = replyText; + setState(CLOSED); + QPID_LOG(warning, "Broker closed connection: " << replyCode << ", " << replyText); + if (onError) { + onError(replyCode, replyText); + } +} + +void ConnectionHandler::closeOk() +{ + checkState(CLOSING, INVALID_STATE_CLOSE_OK); + if (onError && errorCode != CLOSE_CODE_NORMAL) { + onError(errorCode, errorText); + } else if (onClose) { + onClose(); + } + setState(CLOSED); +} + +bool ConnectionHandler::isOpen() const +{ + return getState() == OPEN; +} + +bool ConnectionHandler::isClosed() const +{ + int s = getState(); + return s == CLOSED || s == FAILED; +} + +bool ConnectionHandler::isClosing() const { return getState() == CLOSING; } + +std::auto_ptr<qpid::sys::SecurityLayer> ConnectionHandler::getSecurityLayer() +{ + return securityLayer; +} + +void ConnectionHandler::setRcvTimeoutTask(boost::intrusive_ptr<qpid::sys::TimerTask> t) +{ + rcvTimeoutTask = t; +} |