summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ConnectionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ConnectionHandler.cpp')
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp196
1 files changed, 196 insertions, 0 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
new file mode 100644
index 0000000000..ada3fa4fb0
--- /dev/null
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -0,0 +1,196 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionHandler.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/amqp_framing.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace boost;
+
+namespace {
+const std::string OK("OK");
+}
+
+ConnectionHandler::ConnectionHandler()
+ : StateManager(NOT_STARTED)
+{
+
+ mechanism = "PLAIN";
+ locale = "en_US";
+ heartbeat = 0;
+ maxChannels = 32767;
+ maxFrameSize = 65536;
+ insist = true;
+ version = framing::highestProtocolVersion;
+
+ ESTABLISHED.insert(FAILED);
+ ESTABLISHED.insert(OPEN);
+}
+
+void ConnectionHandler::incoming(AMQFrame& frame)
+{
+ if (getState() == CLOSED) {
+ throw Exception("Connection is closed.");
+ }
+
+ AMQBody::shared_ptr body = frame.getBody();
+ if (frame.getChannel() == 0) {
+ if (body->type() == METHOD_BODY) {
+ handle(shared_polymorphic_cast<AMQMethodBody>(body));
+ } else {
+ error(503, "Cannot send content on channel zero.");
+ }
+ } else {
+ switch(getState()) {
+ case OPEN:
+ try {
+ in(frame);
+ }catch(ConnectionException& e){
+ error(e.code, e.toString(), body);
+ }catch(std::exception& e){
+ error(541/*internal error*/, e.what(), body);
+ }
+ break;
+ case CLOSING:
+ QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored.");
+ break;
+ default:
+ //must be in connection initialisation:
+ fail("Cannot receive frames on non-zero channel until connection is established.");
+ }
+ }
+}
+
+void ConnectionHandler::outgoing(AMQFrame& frame)
+{
+ if (getState() == OPEN) {
+ out(frame);
+ } else {
+ throw Exception("Connection is not open.");
+ }
+}
+
+void ConnectionHandler::waitForOpen()
+{
+ waitFor(ESTABLISHED);
+ if (getState() == FAILED) {
+ throw Exception("Failed to establish connection.");
+ }
+}
+
+void ConnectionHandler::close()
+{
+ setState(CLOSING);
+ send(make_shared_ptr(new ConnectionCloseBody(version, 200, OK, 0, 0)));
+
+ waitFor(CLOSED);
+}
+
+void ConnectionHandler::send(framing::AMQBody::shared_ptr body)
+{
+ AMQFrame f;
+ f.setBody(body);
+ out(f);
+}
+
+void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
+{
+ setState(CLOSING);
+ send(make_shared_ptr(new ConnectionCloseBody(version, code, message, classId, methodId)));
+}
+
+void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody::shared_ptr body)
+{
+ if (body->type() == METHOD_BODY) {
+ AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body));
+ error(code, message, method->amqpClassId(), method->amqpMethodId());
+ } else {
+ error(code, message);
+ }
+}
+
+
+void ConnectionHandler::fail(const std::string& message)
+{
+ QPID_LOG(error, message);
+ setState(FAILED);
+}
+
+void ConnectionHandler::handle(AMQMethodBody::shared_ptr method)
+{
+ switch (getState()) {
+ case NOT_STARTED:
+ if (method->isA<ConnectionStartBody>()) {
+ setState(NEGOTIATING);
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ send(make_shared_ptr(new ConnectionStartOkBody(version, properties, mechanism, response, locale)));
+ } else {
+ fail("Bad method sequence, expected connection-start.");
+ }
+ break;
+ case NEGOTIATING:
+ if (method->isA<ConnectionTuneBody>()) {
+ ConnectionTuneBody::shared_ptr proposal(shared_polymorphic_cast<ConnectionTuneBody>(method));
+ heartbeat = proposal->getHeartbeat();
+ maxChannels = proposal->getChannelMax();
+ send(make_shared_ptr(new ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat)));
+ setState(OPENING);
+ send(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, insist)));
+ //TODO: support for further security challenges
+ //} else if (method->isA<ConnectionSecureBody>()) {
+ } else {
+ fail("Unexpected method sequence, expected connection-tune.");
+ }
+ break;
+ case OPENING:
+ if (method->isA<ConnectionOpenOkBody>()) {
+ setState(OPEN);
+ //TODO: support for redirection
+ //} else if (method->isA<ConnectionRedirectBody>()) {
+ } else {
+ fail("Unexpected method sequence, expected connection-open-ok.");
+ }
+ break;
+ case OPEN:
+ if (method->isA<ConnectionCloseBody>()) {
+ send(make_shared_ptr(new ConnectionCloseOkBody(version)));
+ setState(CLOSED);
+ if (onClose) {
+ onClose();
+ }
+ } else {
+ error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId());
+ }
+ break;
+ case CLOSING:
+ if (method->isA<ConnectionCloseOkBody>()) {
+ setState(CLOSED);
+ if (onClose) {
+ onClose();
+ }
+ } else {
+ QPID_LOG(warning, "Received frame on channel zero while closing connection; frame ignored.");
+ }
+ break;
+ }
+}