diff options
Diffstat (limited to 'cpp/src/client/ClientConnection.cpp')
-rw-r--r-- | cpp/src/client/ClientConnection.cpp | 156 |
1 files changed, 156 insertions, 0 deletions
diff --git a/cpp/src/client/ClientConnection.cpp b/cpp/src/client/ClientConnection.cpp new file mode 100644 index 0000000000..365311ab37 --- /dev/null +++ b/cpp/src/client/ClientConnection.cpp @@ -0,0 +1,156 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <algorithm> +#include <boost/format.hpp> +#include <boost/bind.hpp> + +#include "Connection.h" +#include "ClientChannel.h" +#include "ClientMessage.h" +#include "../QpidError.h" +#include <iostream> +#include <sstream> +#include "MethodBodyInstances.h" +#include <functional> + +using namespace qpid::framing; +using namespace qpid::sys; + + +namespace qpid { +namespace client { + +const std::string Connection::OK("OK"); + +Connection::Connection( + bool _debug, uint32_t _max_frame_size, + framing::ProtocolVersion _version +) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), + defaultConnector(version, _debug, _max_frame_size), + isOpen(false), debug(_debug) +{ + setConnector(defaultConnector); +} + +Connection::~Connection(){} + +void Connection::setConnector(Connector& con) +{ + connector = &con; + connector->setInputHandler(this); + connector->setTimeoutHandler(this); + connector->setShutdownHandler(this); + out = connector->getOutputHandler(); +} + +void Connection::open( + const std::string& host, int port, + const std::string& uid, const std::string& pwd, const std::string& vhost) +{ + if (isOpen) + THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); + connector->connect(host, port); + channels[0] = &channel0; + channel0.open(0, *this); + channel0.protocolInit(uid, pwd, vhost); + isOpen = true; +} + +void Connection::shutdown() { + close(); +} + +void Connection::close( + ReplyCode code, const string& msg, ClassId classId, MethodId methodId +) +{ + if(isOpen) { + // TODO aconway 2007-01-29: Exception handling - could end up + // partly closed with threads left unjoined. + isOpen = false; + channel0.sendAndReceive<ConnectionCloseOkBody>( + new ConnectionCloseBody( + getVersion(), code, msg, classId, methodId)); + + using boost::bind; + for_each(channels.begin(), channels.end(), + bind(&Channel::closeInternal, + bind(&ChannelMap::value_type::second, _1))); + channels.clear(); + connector->close(); + } +} + +void Connection::openChannel(Channel& channel) { + ChannelId id = ++channelIdCounter; + assert (channels.find(id) == channels.end()); + assert(out); + channels[id] = &channel; + channel.open(id, *this); +} + +void Connection::erase(ChannelId id) { + channels.erase(id); +} + +void Connection::received(AMQFrame* frame){ + // FIXME aconway 2007-01-25: Mutex + ChannelId id = frame->getChannel(); + Channel* channel = channels[id]; + // FIXME aconway 2007-01-26: Exception thrown here is hanging the + // client. Need to review use of exceptions. + if (channel == 0) + THROW_QPID_ERROR( + PROTOCOL_ERROR+504, + (boost::format("Invalid channel number %g") % id).str()); + try{ + channel->handleBody(frame->getBody()); + }catch(const qpid::QpidError& e){ + channelException( + *channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); + } +} + +void Connection::send(AMQFrame* frame) { + out->send(frame); +} + +void Connection::channelException( + Channel& channel, AMQMethodBody* method, const QpidError& e) +{ + int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500; + string msg = e.msg; + if(method == 0) + channel.close(code, msg); + else + channel.close( + code, msg, method->amqpClassId(), method->amqpMethodId()); +} + +void Connection::idleIn(){ + connector->close(); +} + +void Connection::idleOut(){ + out->send(new AMQFrame(version, 0, new AMQHeartbeatBody())); +} + +}} // namespace qpid::client |