summaryrefslogtreecommitdiff
path: root/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp')
-rw-r--r--trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp317
1 files changed, 0 insertions, 317 deletions
diff --git a/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
deleted file mode 100644
index 8f1cc7b03f..0000000000
--- a/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/client/ConnectionHandler.h"
-
-#include "qpid/client/SaslFactory.h"
-#include "qpid/framing/amqp_framing.h"
-#include "qpid/framing/all_method_bodies.h"
-#include "qpid/framing/ClientInvoker.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/log/Helpers.h"
-#include "qpid/log/Statement.h"
-#include "qpid/sys/SystemInfo.h"
-
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace qpid::framing::connection;
-using qpid::sys::SecurityLayer;
-using qpid::sys::Duration;
-using qpid::sys::TimerTask;
-using qpid::sys::Timer;
-using qpid::sys::AbsTime;
-using qpid::sys::TIME_SEC;
-using qpid::sys::ScopedLock;
-using qpid::sys::Mutex;
-
-namespace {
-const std::string OK("OK");
-const std::string PLAIN("PLAIN");
-const std::string en_US("en_US");
-
-const std::string INVALID_STATE_START("start received in invalid state");
-const std::string INVALID_STATE_TUNE("tune received in invalid state");
-const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state");
-const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state");
-
-const std::string SESSION_FLOW_CONTROL("qpid.session_flow");
-const std::string CLIENT_PROCESS_NAME("qpid.client_process");
-const std::string CLIENT_PID("qpid.client_pid");
-const std::string CLIENT_PPID("qpid.client_ppid");
-const int SESSION_FLOW_CONTROL_VER = 1;
-}
-
-CloseCode ConnectionHandler::convert(uint16_t replyCode)
-{
- switch (replyCode) {
- case 200: return CLOSE_CODE_NORMAL;
- case 320: return CLOSE_CODE_CONNECTION_FORCED;
- case 402: return CLOSE_CODE_INVALID_PATH;
- case 501: default:
- return CLOSE_CODE_FRAMING_ERROR;
- }
-}
-
-ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v)
- : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler),
- errorCode(CLOSE_CODE_NORMAL), version(v)
-{
- insist = true;
-
- ESTABLISHED.insert(FAILED);
- ESTABLISHED.insert(CLOSED);
- ESTABLISHED.insert(OPEN);
-
- FINISHED.insert(FAILED);
- FINISHED.insert(CLOSED);
-
- properties.setInt(SESSION_FLOW_CONTROL, SESSION_FLOW_CONTROL_VER);
- properties.setString(CLIENT_PROCESS_NAME, sys::SystemInfo::getProcessName());
- properties.setInt(CLIENT_PID, sys::SystemInfo::getProcessId());
- properties.setInt(CLIENT_PPID, sys::SystemInfo::getParentProcessId());
-}
-
-void ConnectionHandler::incoming(AMQFrame& frame)
-{
- if (getState() == CLOSED) {
- throw Exception("Received frame on closed connection");
- }
-
- if (rcvTimeoutTask) {
- // Received frame on connection so delay timeout
- rcvTimeoutTask->restart();
- }
-
- AMQBody* body = frame.getBody();
- try {
- if (frame.getChannel() != 0 || !invoke(static_cast<ConnectionOperations&>(*this), *body)) {
- switch(getState()) {
- case OPEN:
- in(frame);
- break;
- case CLOSING:
- QPID_LOG(warning, "Ignoring frame while closing connection: " << frame);
- break;
- default:
- throw Exception("Cannot receive frames on non-zero channel until connection is established.");
- }
- }
- }catch(std::exception& e){
- QPID_LOG(warning, "Closing connection due to " << e.what());
- setState(CLOSING);
- errorCode = CLOSE_CODE_FRAMING_ERROR;
- errorText = e.what();
- proxy.close(501, e.what());
- }
-}
-
-void ConnectionHandler::outgoing(AMQFrame& frame)
-{
- if (getState() == OPEN)
- out(frame);
- else
- throw TransportFailure(errorText.empty() ? "Connection is not open." : errorText);
-}
-
-void ConnectionHandler::waitForOpen()
-{
- waitFor(ESTABLISHED);
- if (getState() == FAILED || getState() == CLOSED) {
- throw ConnectionException(errorCode, errorText);
- }
-}
-
-void ConnectionHandler::close()
-{
- switch (getState()) {
- case NEGOTIATING:
- case OPENING:
- fail("Connection closed before it was established");
- break;
- case OPEN:
- if (setState(CLOSING, OPEN)) {
- proxy.close(200, OK);
- waitFor(FINISHED);//FINISHED = CLOSED or FAILED
- }
- //else, state was changed from open after we checked, can only
- //change to failed or closed, so nothing to do
- break;
-
- // Nothing to do if already CLOSING, CLOSED, FAILED or if NOT_STARTED
- }
-}
-
-void ConnectionHandler::heartbeat()
-{
- // Do nothing - the purpose of heartbeats is just to make sure that there is some
- // traffic on the connection within the heart beat interval, we check for the
- // traffic and don't need to do anything in response to heartbeats
-
- // Although the above is still true we're now using a received heartbeat as a trigger
- // to send out our own heartbeat
- proxy.heartbeat();
-}
-
-void ConnectionHandler::checkState(STATES s, const std::string& msg)
-{
- if (getState() != s) {
- throw CommandInvalidException(msg);
- }
-}
-
-void ConnectionHandler::fail(const std::string& message)
-{
- errorCode = CLOSE_CODE_FRAMING_ERROR;
- errorText = message;
- QPID_LOG(warning, message);
- setState(FAILED);
-}
-
-namespace {
-std::string SPACE(" ");
-}
-
-void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& mechanisms, const Array& /*locales*/)
-{
- checkState(NOT_STARTED, INVALID_STATE_START);
- setState(NEGOTIATING);
- sasl = SaslFactory::getInstance().create(*this);
-
- std::string mechlist;
- bool chosenMechanismSupported = mechanism.empty();
- for (Array::const_iterator i = mechanisms.begin(); i != mechanisms.end(); ++i) {
- if (!mechanism.empty() && mechanism == (*i)->get<std::string>()) {
- chosenMechanismSupported = true;
- mechlist = (*i)->get<std::string>() + SPACE + mechlist;
- } else {
- if (i != mechanisms.begin()) mechlist += SPACE;
- mechlist += (*i)->get<std::string>();
- }
- }
-
- if (!chosenMechanismSupported) {
- fail("Selected mechanism not supported: " + mechanism);
- }
-
- if (sasl.get()) {
- string response = sasl->start(mechanism.empty() ? mechlist : mechanism,
- getSSF ? getSSF() : 0);
- proxy.startOk(properties, sasl->getMechanism(), response, locale);
- } else {
- //TODO: verify that desired mechanism and locale are supported
- string response = ((char)0) + username + ((char)0) + password;
- proxy.startOk(properties, mechanism, response, locale);
- }
-}
-
-void ConnectionHandler::secure(const std::string& challenge)
-{
- if (sasl.get()) {
- string response = sasl->step(challenge);
- proxy.secureOk(response);
- } else {
- throw NotImplementedException("Challenge-response cycle not yet implemented in client");
- }
-}
-
-void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed,
- uint16_t heartbeatMin, uint16_t heartbeatMax)
-{
- checkState(NEGOTIATING, INVALID_STATE_TUNE);
- maxChannels = std::min(maxChannels, maxChannelsProposed);
- maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed);
- // Clip the requested heartbeat to the maximum/minimum offered
- uint16_t heartbeat = ConnectionSettings::heartbeat;
- heartbeat = heartbeat < heartbeatMin ? heartbeatMin :
- heartbeat > heartbeatMax ? heartbeatMax :
- heartbeat;
- ConnectionSettings::heartbeat = heartbeat;
- proxy.tuneOk(maxChannels, maxFrameSize, heartbeat);
- setState(OPENING);
- proxy.open(virtualhost, capabilities, insist);
-}
-
-void ConnectionHandler::openOk ( const Array& knownBrokers )
-{
- checkState(OPENING, INVALID_STATE_OPEN_OK);
- knownBrokersUrls.clear();
- framing::Array::ValueVector::const_iterator i;
- for ( i = knownBrokers.begin(); i != knownBrokers.end(); ++i )
- knownBrokersUrls.push_back(Url((*i)->get<std::string>()));
- if (sasl.get()) {
- securityLayer = sasl->getSecurityLayer(maxFrameSize);
- operUserId = sasl->getUserId();
- }
- setState(OPEN);
- QPID_LOG(debug, "Known-brokers for connection: " << log::formatList(knownBrokersUrls));
-}
-
-
-void ConnectionHandler::redirect(const std::string& /*host*/, const Array& /*knownHosts*/)
-{
- throw NotImplementedException("Redirection received from broker; not yet implemented in client");
-}
-
-void ConnectionHandler::close(uint16_t replyCode, const std::string& replyText)
-{
- proxy.closeOk();
- errorCode = convert(replyCode);
- errorText = replyText;
- setState(CLOSED);
- QPID_LOG(warning, "Broker closed connection: " << replyCode << ", " << replyText);
- if (onError) {
- onError(replyCode, replyText);
- }
-}
-
-void ConnectionHandler::closeOk()
-{
- checkState(CLOSING, INVALID_STATE_CLOSE_OK);
- if (onError && errorCode != CLOSE_CODE_NORMAL) {
- onError(errorCode, errorText);
- } else if (onClose) {
- onClose();
- }
- setState(CLOSED);
-}
-
-bool ConnectionHandler::isOpen() const
-{
- return getState() == OPEN;
-}
-
-bool ConnectionHandler::isClosed() const
-{
- int s = getState();
- return s == CLOSED || s == FAILED;
-}
-
-bool ConnectionHandler::isClosing() const { return getState() == CLOSING; }
-
-std::auto_ptr<qpid::sys::SecurityLayer> ConnectionHandler::getSecurityLayer()
-{
- return securityLayer;
-}
-
-void ConnectionHandler::setRcvTimeoutTask(boost::intrusive_ptr<qpid::sys::TimerTask> t)
-{
- rcvTimeoutTask = t;
-}