summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/cpp/src/qpid/client/ConnectionImpl.cpp')
-rw-r--r--M4-RCs/qpid/cpp/src/qpid/client/ConnectionImpl.cpp201
1 files changed, 0 insertions, 201 deletions
diff --git a/M4-RCs/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/M4-RCs/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
deleted file mode 100644
index 0d7ffa0288..0000000000
--- a/M4-RCs/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "ConnectionImpl.h"
-#include "Connector.h"
-#include "ConnectionSettings.h"
-#include "SessionImpl.h"
-#include "FailoverListener.h"
-
-#include "qpid/log/Statement.h"
-#include "qpid/Url.h"
-#include "qpid/framing/enum.h"
-#include "qpid/framing/reply_exceptions.h"
-
-#include <boost/bind.hpp>
-#include <boost/format.hpp>
-
-
-namespace qpid {
-namespace client {
-
-using namespace qpid::framing;
-using namespace qpid::framing::connection;
-using namespace qpid::sys;
-using namespace qpid::framing::connection;//for connection error codes
-
-ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
- : Bounds(settings.maxFrameSize * settings.bounds),
- handler(settings, v),
- version(v),
- nextChannel(1)
-{
- QPID_LOG(debug, "ConnectionImpl created for " << version.toString());
- handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
- handler.out = boost::bind(&Connector::send, boost::ref(connector), _1);
- handler.onClose = boost::bind(&ConnectionImpl::closed, this,
- CLOSE_CODE_NORMAL, std::string());
- //only set error handler once open
- handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
-}
-
-ConnectionImpl::~ConnectionImpl() {
- // Important to close the connector first, to ensure the
- // connector thread does not call on us while the destructor
- // is running.
- failover.reset();
- if (connector) connector->close();
-}
-
-void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel)
-{
- Mutex::ScopedLock l(lock);
- session->setChannel(channel ? channel : nextChannel++);
- boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()];
- boost::shared_ptr<SessionImpl> ss = s.lock();
- if (ss) throw SessionBusyException(QPID_MSG("Channel " << ss->getChannel() << " attachd to " << ss->getId()));
- s = session;
-}
-
-void ConnectionImpl::handle(framing::AMQFrame& frame)
-{
- handler.outgoing(frame);
-}
-
-void ConnectionImpl::incoming(framing::AMQFrame& frame)
-{
- boost::shared_ptr<SessionImpl> s;
- {
- Mutex::ScopedLock l(lock);
- s = sessions[frame.getChannel()].lock();
- }
- if (!s)
- throw NotAttachedException(QPID_MSG("Invalid channel: " << frame.getChannel()));
- s->in(frame);
-}
-
-bool ConnectionImpl::isOpen() const
-{
- return handler.isOpen();
-}
-
-
-void ConnectionImpl::open()
-{
- const std::string& protocol = handler.protocol;
- const std::string& host = handler.host;
- int port = handler.port;
- QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port);
-
- connector.reset(Connector::create(protocol, version, handler, this));
- connector->setInputHandler(&handler);
- connector->setShutdownHandler(this);
- connector->connect(host, port);
- connector->init();
- handler.waitForOpen();
-
- failover.reset(new FailoverListener(shared_from_this(), handler.knownBrokersUrls));
-}
-
-void ConnectionImpl::idleIn()
-{
- close();
-}
-
-void ConnectionImpl::idleOut()
-{
- AMQFrame frame(in_place<AMQHeartbeatBody>());
- connector->send(frame);
-}
-
-void ConnectionImpl::close()
-{
- if (!handler.isOpen()) return;
- handler.close();
- closed(CLOSE_CODE_NORMAL, "Closed by client");
-}
-
-
-template <class F> void ConnectionImpl::closeInternal(const F& f) {
- {
- Mutex::ScopedUnlock u(lock);
- connector->close();
- }
- //notifying sessions of failure can result in those session being
- //deleted which in turn results in a call to erase(); this can
- //even happen on this thread, when 's' goes out of scope
- //below. Using a copy prevents the map being modified as we
- //iterate through.
- SessionMap copy;
- sessions.swap(copy);
- for (SessionMap::iterator i = copy.begin(); i != copy.end(); ++i) {
- boost::shared_ptr<SessionImpl> s = i->second.lock();
- if (s) f(s);
- }
-}
-
-void ConnectionImpl::closed(uint16_t code, const std::string& text) {
- Mutex::ScopedLock l(lock);
- setException(new ConnectionException(ConnectionHandler::convert(code), text));
- closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text));
-}
-
-static const std::string CONN_CLOSED("Connection closed");
-
-void ConnectionImpl::shutdown() {
- if ( failureCallback )
- failureCallback();
-
- if (handler.isClosed()) return;
-
- // FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have
- // an appropriate close-code. connection-forced is not right.
- bool isClosing = handler.isClosing();
- handler.fail(CONN_CLOSED);//ensure connection is marked as failed before notifying sessions
- Mutex::ScopedLock l(lock);
- if (!isClosing)
- closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CONN_CLOSED));
- setException(new TransportFailure(CONN_CLOSED));
-}
-
-void ConnectionImpl::erase(uint16_t ch) {
- Mutex::ScopedLock l(lock);
- sessions.erase(ch);
-}
-
-const ConnectionSettings& ConnectionImpl::getNegotiatedSettings()
-{
- return handler;
-}
-
-std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() {
- return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls;
-}
-
-boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& name, uint32_t timeout, uint16_t channel) {
- boost::shared_ptr<SessionImpl> simpl(new SessionImpl(name, shared_from_this()));
- addSession(simpl, channel);
- simpl->open(timeout);
- return simpl;
-}
-
-void ConnectionImpl::stopFailoverListener() { failover->stop(); }
-
-}} // namespace qpid::client