summaryrefslogtreecommitdiff
path: root/cpp/src/client/ClientConnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/client/ClientConnection.cpp')
-rw-r--r--cpp/src/client/ClientConnection.cpp156
1 files changed, 156 insertions, 0 deletions
diff --git a/cpp/src/client/ClientConnection.cpp b/cpp/src/client/ClientConnection.cpp
new file mode 100644
index 0000000000..365311ab37
--- /dev/null
+++ b/cpp/src/client/ClientConnection.cpp
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <algorithm>
+#include <boost/format.hpp>
+#include <boost/bind.hpp>
+
+#include "Connection.h"
+#include "ClientChannel.h"
+#include "ClientMessage.h"
+#include "../QpidError.h"
+#include <iostream>
+#include <sstream>
+#include "MethodBodyInstances.h"
+#include <functional>
+
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+
+namespace qpid {
+namespace client {
+
+const std::string Connection::OK("OK");
+
+Connection::Connection(
+ bool _debug, uint32_t _max_frame_size,
+ framing::ProtocolVersion _version
+) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size),
+ defaultConnector(version, _debug, _max_frame_size),
+ isOpen(false), debug(_debug)
+{
+ setConnector(defaultConnector);
+}
+
+Connection::~Connection(){}
+
+void Connection::setConnector(Connector& con)
+{
+ connector = &con;
+ connector->setInputHandler(this);
+ connector->setTimeoutHandler(this);
+ connector->setShutdownHandler(this);
+ out = connector->getOutputHandler();
+}
+
+void Connection::open(
+ const std::string& host, int port,
+ const std::string& uid, const std::string& pwd, const std::string& vhost)
+{
+ if (isOpen)
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
+ connector->connect(host, port);
+ channels[0] = &channel0;
+ channel0.open(0, *this);
+ channel0.protocolInit(uid, pwd, vhost);
+ isOpen = true;
+}
+
+void Connection::shutdown() {
+ close();
+}
+
+void Connection::close(
+ ReplyCode code, const string& msg, ClassId classId, MethodId methodId
+)
+{
+ if(isOpen) {
+ // TODO aconway 2007-01-29: Exception handling - could end up
+ // partly closed with threads left unjoined.
+ isOpen = false;
+ channel0.sendAndReceive<ConnectionCloseOkBody>(
+ new ConnectionCloseBody(
+ getVersion(), code, msg, classId, methodId));
+
+ using boost::bind;
+ for_each(channels.begin(), channels.end(),
+ bind(&Channel::closeInternal,
+ bind(&ChannelMap::value_type::second, _1)));
+ channels.clear();
+ connector->close();
+ }
+}
+
+void Connection::openChannel(Channel& channel) {
+ ChannelId id = ++channelIdCounter;
+ assert (channels.find(id) == channels.end());
+ assert(out);
+ channels[id] = &channel;
+ channel.open(id, *this);
+}
+
+void Connection::erase(ChannelId id) {
+ channels.erase(id);
+}
+
+void Connection::received(AMQFrame* frame){
+ // FIXME aconway 2007-01-25: Mutex
+ ChannelId id = frame->getChannel();
+ Channel* channel = channels[id];
+ // FIXME aconway 2007-01-26: Exception thrown here is hanging the
+ // client. Need to review use of exceptions.
+ if (channel == 0)
+ THROW_QPID_ERROR(
+ PROTOCOL_ERROR+504,
+ (boost::format("Invalid channel number %g") % id).str());
+ try{
+ channel->handleBody(frame->getBody());
+ }catch(const qpid::QpidError& e){
+ channelException(
+ *channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
+ }
+}
+
+void Connection::send(AMQFrame* frame) {
+ out->send(frame);
+}
+
+void Connection::channelException(
+ Channel& channel, AMQMethodBody* method, const QpidError& e)
+{
+ int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500;
+ string msg = e.msg;
+ if(method == 0)
+ channel.close(code, msg);
+ else
+ channel.close(
+ code, msg, method->amqpClassId(), method->amqpMethodId());
+}
+
+void Connection::idleIn(){
+ connector->close();
+}
+
+void Connection::idleOut(){
+ out->send(new AMQFrame(version, 0, new AMQHeartbeatBody()));
+}
+
+}} // namespace qpid::client