diff options
Diffstat (limited to 'qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 1317 |
1 files changed, 1317 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp new file mode 100644 index 0000000000..1b8c848941 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -0,0 +1,1317 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionContext.h" +#include "DriverImpl.h" +#include "PnData.h" +#include "ReceiverContext.h" +#include "Sasl.h" +#include "SenderContext.h" +#include "SessionContext.h" +#include "Transaction.h" +#include "Transport.h" +#include "util.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/amqp/Encoder.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/AddressImpl.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/framing/Uuid.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/SecurityLayer.h" +#include "qpid/sys/SystemInfo.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/Timer.h" +#include "qpid/sys/urlAdd.h" +#include "config.h" +#include <boost/lexical_cast.hpp> +#include <boost/bind.hpp> +#include <vector> +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace messaging { +namespace amqp { +using types::Variant; + +namespace { + +void do_trace(pn_transport_t* transport, const char* message) +{ + ConnectionContext* c = reinterpret_cast<ConnectionContext*>(pn_transport_get_context(transport)); + if (c) c->trace(message); +} + +void set_tracer(pn_transport_t* transport, void* context) +{ + pn_transport_set_context(transport, context); + pn_transport_set_tracer(transport, &do_trace); +} + +#ifdef USE_PROTON_TRANSPORT_CONDITION +std::string get_error(pn_connection_t* connection, pn_transport_t* transport) +{ + std::stringstream text; + pn_error_t* cerror = pn_connection_error(connection); + if (cerror) text << "connection error " << pn_error_text(cerror) << " [" << cerror << "]"; + pn_condition_t* tcondition = pn_transport_condition(transport); + if (pn_condition_is_set(tcondition)) text << get_error_string(tcondition, "transport error", ": "); + return text.str(); +} +#else +std::string get_error(pn_connection_t* connection, pn_transport_t* transport) +{ + std::stringstream text; + pn_error_t* cerror = pn_connection_error(connection); + if (cerror) text << "connection error " << pn_error_text(cerror) << " [" << cerror << "]"; + pn_error_t* terror = pn_transport_error(transport); + if (terror) text << "transport error " << pn_error_text(terror) << " [" << terror << "]"; + return text.str(); +} +#endif + +class ConnectionTickerTask : public qpid::sys::TimerTask +{ + qpid::sys::Timer& timer; + ConnectionContext& connection; + public: + ConnectionTickerTask(const qpid::sys::Duration& interval, qpid::sys::Timer& t, ConnectionContext& c) : + TimerTask(interval, "ConnectionTicker"), + timer(t), + connection(c) + {} + + void fire() { + QPID_LOG(debug, "ConnectionTickerTask fired"); + // Setup next firing + setupNextFire(); + timer.add(this); + + // Send Ticker + connection.activateOutput(); + } +}; +} + +void ConnectionContext::trace(const char* message) const +{ + QPID_LOG_CAT(trace, protocol, "[" << identifier << "]: " << message); +} + +ConnectionContext::ConnectionContext(const std::string& url, const qpid::types::Variant::Map& o) + : qpid::messaging::ConnectionOptions(o), + fullUrl(url, protocol.empty() ? qpid::Address::TCP : protocol), + engine(pn_transport()), + connection(pn_connection()), + //note: disabled read/write of header as now handled by engine + writeHeader(false), + readHeader(false), + haveOutput(false), + state(DISCONNECTED), + codecAdapter(*this), + notifyOnWrite(false) +{ + // Concatenate all known URLs into a single URL, get rid of duplicate addresses. + sys::urlAddStrings(fullUrl, urls.begin(), urls.end(), protocol.empty() ? + qpid::Address::TCP : protocol); + if (identifier.empty()) { + identifier = qpid::types::Uuid(true).str(); + } + configureConnection(); +} + +ConnectionContext::~ConnectionContext() +{ + if (ticker) ticker->cancel(); + close(); + sessions.clear(); + pn_connection_free(connection); + pn_transport_free(engine); +} + +bool ConnectionContext::isOpen() const +{ + sys::Monitor::ScopedLock l(lock); + return state == CONNECTED && pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); +} + +void ConnectionContext::sync(boost::shared_ptr<SessionContext> ssn) +{ + sys::Monitor::ScopedLock l(lock); + syncLH(ssn, l); +} + +void ConnectionContext::syncLH(boost::shared_ptr<SessionContext> ssn, sys::Monitor::ScopedLock&) { + while (!ssn->settled()) { + QPID_LOG(debug, "Waiting for sends to settle on sync()"); + wait(ssn);//wait until message has been confirmed + wakeupDriver(); + } + checkClosed(ssn); +} + +void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) +{ + sys::Monitor::ScopedLock l(lock); + if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) { + //explicitly release messages that have yet to be fetched + for (SessionContext::ReceiverMap::iterator i = ssn->receivers.begin(); i != ssn->receivers.end(); ++i) { + drain_and_release_messages(ssn, i->second); + } + syncLH(ssn, l); + } + + if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) { + pn_session_close(ssn->session); + } + sessions.erase(ssn->getName()); + + wakeupDriver(); +} + +void ConnectionContext::close() +{ + sys::Monitor::ScopedLock l(lock); + if (state != CONNECTED) return; + if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) { + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + syncLH(i->second, l); + if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) { + pn_session_close(i->second->session); + } + } + pn_connection_close(connection); + wakeupDriver(); + //wait for close to be confirmed by peer? + while (!(pn_connection_state(connection) & PN_REMOTE_CLOSED)) { + if (state == DISCONNECTED) { + QPID_LOG(warning, "Disconnected before close received from peer."); + break; + } + lock.wait(); + } + sessions.clear(); + } + if (state != DISCONNECTED) { + transport->close(); + while (state != DISCONNECTED) { + lock.wait(); + } + } + if (ticker) { + ticker->cancel(); + ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(); + } +} + +bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + /** + * For fetch() on a receiver with zero capacity, need to reissue the + * credit on reconnect, so track the fetches in progress. + */ + qpid::sys::AtomicCount::ScopedIncrement track(lnk->fetching); + { + sys::Monitor::ScopedLock l(lock); + checkClosed(ssn, lnk); + if (!lnk->capacity) { + pn_link_flow(lnk->receiver, 1); + wakeupDriver(); + } + } + if (get(ssn, lnk, message, timeout)) { + return true; + } else { + { + sys::Monitor::ScopedLock l(lock); + pn_link_drain(lnk->receiver, 0); + wakeupDriver(); + while (pn_link_draining(lnk->receiver) && !pn_link_queued(lnk->receiver)) { + QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver)); + wait(ssn, lnk); + } + if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) { + pn_link_flow(lnk->receiver, lnk->capacity); + } + } + if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) { + return true; + } else { + return false; + } + } +} + +qpid::sys::AbsTime convert(qpid::messaging::Duration timeout) +{ + qpid::sys::AbsTime until; + uint64_t ms = timeout.getMilliseconds(); + if (ms < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) { + return qpid::sys::AbsTime(qpid::sys::now(), ms * qpid::sys::TIME_MSEC); + } else { + return qpid::sys::FAR_FUTURE; + } +} + +bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + qpid::sys::AbsTime until(convert(timeout)); + while (true) { + sys::Monitor::ScopedLock l(lock); + checkClosed(ssn, lnk); + pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver); + QPID_LOG(debug, "In ConnectionContext::get(), current=" << current); + if (current) { + qpid::messaging::MessageImpl& impl = MessageImplAccess::get(message); + boost::shared_ptr<EncodedMessage> encoded(new EncodedMessage(pn_delivery_pending(current))); + encoded->setNestAnnotationsOption(nestAnnotations); + ssize_t read = pn_link_recv(lnk->receiver, encoded->getData(), encoded->getSize()); + if (read < 0) throw qpid::messaging::MessagingException("Failed to read message"); + encoded->trim((size_t) read); + QPID_LOG(debug, "Received message of " << encoded->getSize() << " bytes: "); + encoded->init(impl); + impl.setEncoded(encoded); + impl.setInternalId(ssn->record(current)); + if (lnk->capacity) { + pn_link_flow(lnk->receiver, 1); + if (lnk->wakeupToIssueCredit()) { + wakeupDriver(); + } else { + haveOutput = true; + } + } + // Automatically ack messages if we are in a transaction. + if (ssn->transaction) + acknowledgeLH(ssn, &message, false, l); + return true; + } else if (until > qpid::sys::now()) { + waitUntil(ssn, lnk, until); + } else { + return false; + } + } + return false; +} + +boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout) +{ + qpid::sys::AbsTime until(convert(timeout)); + while (true) { + sys::Monitor::ScopedLock l(lock); + checkClosed(ssn); + boost::shared_ptr<ReceiverContext> r = ssn->nextReceiver(); + if (r) { + return r; + } else if (until > qpid::sys::now()) { + waitUntil(ssn, until); + } else { + return boost::shared_ptr<ReceiverContext>(); + } + } +} + +void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) { + sys::Monitor::ScopedLock l(lock); + acknowledgeLH(ssn, message, cumulative, l); +} + +void ConnectionContext::acknowledgeLH(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&) +{ + checkClosed(ssn); + if (message) { + ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative); + } else { + ssn->acknowledge(); + } + wakeupDriver(); +} + +void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject) +{ + sys::Monitor::ScopedLock l(lock); + checkClosed(ssn); + ssn->nack(MessageImplAccess::get(message).getInternalId(), reject); + wakeupDriver(); +} + +void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) +{ + sys::Monitor::ScopedLock l(lock); + if (pn_link_state(lnk->sender) & PN_LOCAL_ACTIVE) { + lnk->close(); + } + wakeupDriver(); + while (pn_link_state(lnk->sender) & PN_REMOTE_ACTIVE) { + wait(ssn); + } + ssn->removeSender(lnk->getName()); +} + +void ConnectionContext::drain_and_release_messages(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + pn_link_drain(lnk->receiver, 0); + wakeupDriver(); + //Not all implementations handle drain correctly, so limit the + //time spent waiting for it + qpid::sys::AbsTime until(qpid::sys::now(), qpid::sys::TIME_SEC*2); + while (pn_link_credit(lnk->receiver) > pn_link_queued(lnk->receiver) && until > qpid::sys::now()) { + QPID_LOG(debug, "Waiting for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver)); + waitUntil(ssn, lnk, until); + } + //release as yet unfetched messages: + for (pn_delivery_t* d = pn_link_current(lnk->receiver); d; d = pn_link_current(lnk->receiver)) { + pn_link_advance(lnk->receiver); + pn_delivery_update(d, PN_RELEASED); + pn_delivery_settle(d); + } +} + +void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + sys::Monitor::ScopedLock l(lock); + drain_and_release_messages(ssn, lnk); + if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) { + lnk->close(); + } + wakeupDriver(); + while (pn_link_state(lnk->receiver) & PN_REMOTE_ACTIVE) { + wait(ssn); + } + ssn->removeReceiver(lnk->getName()); +} + +void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) +{ + lnk->configure(); + attach(ssn, lnk->sender); + checkClosed(ssn, lnk); + lnk->verify(); + QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget()); +} + +void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + lnk->configure(); + attach(ssn, lnk->receiver, lnk->capacity); + checkClosed(ssn, lnk); + lnk->verify(); + QPID_LOG(debug, "Attach succeeded from " << lnk->getSource()); +} + +void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, pn_link_t* link, int credit) +{ + pn_link_open(link); + QPID_LOG(debug, "Link attach sent for " << link << ", state=" << pn_link_state(link)); + if (credit) pn_link_flow(link, credit); + wakeupDriver(); + while (pn_link_state(link) & PN_REMOTE_UNINIT) { + QPID_LOG(debug, "Waiting for confirmation of link attach for " << link << ", state=" << pn_link_state(link) << "..."); + wait(ssn); + } +} + +boost::shared_ptr<SenderContext> ConnectionContext::createSender(boost::shared_ptr<SessionContext> session, const qpid::messaging::Address& address) +{ + sys::Monitor::ScopedLock l(lock); + boost::shared_ptr<SenderContext> sender = session->createSender(address, setToOnSend); + try { + attach(session, sender); + return sender; + } catch (...) { + session->removeSender(sender->getName()); + throw; + } + +} +boost::shared_ptr<ReceiverContext> ConnectionContext::createReceiver(boost::shared_ptr<SessionContext> session, const qpid::messaging::Address& address) +{ + sys::Monitor::ScopedLock l(lock); + boost::shared_ptr<ReceiverContext> receiver = session->createReceiver(address); + try { + attach(session, receiver); + return receiver; + } catch (...) { + session->removeReceiver(receiver->getName()); + throw; + } +} +boost::shared_ptr<SenderContext> ConnectionContext::getSender(boost::shared_ptr<SessionContext> session, const std::string& name) const +{ + sys::Monitor::ScopedLock l(lock); + return session->getSender(name); +} + +boost::shared_ptr<ReceiverContext> ConnectionContext::getReceiver(boost::shared_ptr<SessionContext> session, const std::string& name) const +{ + sys::Monitor::ScopedLock l(lock); + return session->getReceiver(name); +} + +void ConnectionContext::send( + boost::shared_ptr<SessionContext> ssn, + boost::shared_ptr<SenderContext> snd, + const qpid::messaging::Message& message, + bool sync, + SenderContext::Delivery** delivery) +{ + sys::Monitor::ScopedLock l(lock); + sendLH(ssn, snd, message, sync, delivery, l); +} + +void ConnectionContext::sendLH( + boost::shared_ptr<SessionContext> ssn, + boost::shared_ptr<SenderContext> snd, + const qpid::messaging::Message& message, + bool sync, + SenderContext::Delivery** delivery, + sys::Monitor::ScopedLock&) +{ + checkClosed(ssn); + while (pn_transport_pending(engine) > 65536) { + QPID_LOG(debug, "Have " << pn_transport_pending(engine) << " bytes of output pending; waiting for this to be written..."); + notifyOnWrite = true; + wakeupDriver(); + wait(ssn, snd); + notifyOnWrite = false; + } + while (!snd->send(message, delivery)) { + QPID_LOG(debug, "Waiting for capacity..."); + wait(ssn, snd);//wait for capacity + } + wakeupDriver(); + if (sync && *delivery) { + while (!(*delivery)->delivered()) { + QPID_LOG(debug, "Waiting for confirmation..."); + wait(ssn, snd);//wait until message has been confirmed + } + if ((*delivery)->rejected()) { + throw MessageRejected("Message was rejected by peer"); + } + + } +} + +void ConnectionContext::setCapacity(boost::shared_ptr<SenderContext> sender, uint32_t capacity) +{ + sys::Monitor::ScopedLock l(lock); + sender->setCapacity(capacity); +} +uint32_t ConnectionContext::getCapacity(boost::shared_ptr<SenderContext> sender) +{ + sys::Monitor::ScopedLock l(lock); + return sender->getCapacity(); +} +uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<SenderContext> sender) +{ + sys::Monitor::ScopedLock l(lock); + return sender->getUnsettled(); +} + +void ConnectionContext::setCapacity(boost::shared_ptr<ReceiverContext> receiver, uint32_t capacity) +{ + sys::Monitor::ScopedLock l(lock); + receiver->setCapacity(capacity); + pn_link_flow((pn_link_t*) receiver->receiver, receiver->getCapacity()); + wakeupDriver(); +} +uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver) +{ + sys::Monitor::ScopedLock l(lock); + return receiver->getCapacity(); +} +uint32_t ConnectionContext::getAvailable(boost::shared_ptr<ReceiverContext> receiver) +{ + sys::Monitor::ScopedLock l(lock); + return receiver->getAvailable(); +} +uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> receiver) +{ + sys::Monitor::ScopedLock l(lock); + return receiver->getUnsettled(); +} + +void ConnectionContext::activateOutput() +{ + sys::Monitor::ScopedLock l(lock); + if (state == CONNECTED) wakeupDriver(); +} +/** + * Expects lock to be held by caller + */ +void ConnectionContext::wakeupDriver() +{ + switch (state) { + case CONNECTED: + haveOutput = true; + transport->activateOutput(); + QPID_LOG(debug, "wakeupDriver()"); + break; + case DISCONNECTED: + case CONNECTING: + QPID_LOG(error, "wakeupDriver() called while not connected"); + break; + } +} + +namespace { +pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED; +pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED; +} + +void ConnectionContext::reset() +{ + pn_connection_free(connection); + pn_transport_free(engine); + + engine = pn_transport(); + connection = pn_connection(); + configureConnection(); + + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + i->second->reset(connection); + } +} + +bool ConnectionContext::check() { + if (checkDisconnected()) { + if (ConnectionOptions::reconnect) { + QPID_LOG(notice, "Auto-reconnecting to " << fullUrl); + autoconnect(); + QPID_LOG(notice, "Auto-reconnected to " << currentUrl); + } else { + throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)"); + } + return true; + } + return false; +} + +bool ConnectionContext::checkDisconnected() { + if (state == DISCONNECTED) { + reset(); + } else { + if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { + std::string text = get_error_string(pn_connection_remote_condition(connection), "Connection closed by peer"); + pn_connection_close(connection); + throw qpid::messaging::ConnectionError(text); + } + } + return state == DISCONNECTED; +} + +void ConnectionContext::wait() +{ + if (check()) return; // Reconnected, may need to re-test condition. + lock.wait(); + check(); +} +void ConnectionContext::waitUntil(qpid::sys::AbsTime until) +{ + lock.wait(until); + check(); +} +void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn) +{ + wait(); + checkClosed(ssn); +} +void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + wait(); + checkClosed(ssn, lnk); +} +void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) +{ + wait(); + checkClosed(ssn, lnk); +} +void ConnectionContext::waitUntil(boost::shared_ptr<SessionContext> ssn, qpid::sys::AbsTime until) +{ + waitUntil(until); + checkClosed(ssn); +} +void ConnectionContext::waitUntil(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::sys::AbsTime until) +{ + waitUntil(until); + checkClosed(ssn, lnk); +} +void ConnectionContext::waitUntil(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk, qpid::sys::AbsTime until) +{ + waitUntil(until); + checkClosed(ssn, lnk); +} +void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn) +{ + check(); + ssn->error.raise(); + if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { + std::string text = get_error_string(pn_session_remote_condition(ssn->session), "Session ended by peer"); + pn_session_close(ssn->session); + throw qpid::messaging::SessionError(text); + } else if ((pn_session_state(ssn->session) & IS_CLOSED) == IS_CLOSED) { + throw qpid::messaging::SessionClosed(); + } +} + +bool ConnectionContext::isClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + try { + checkClosed(ssn, lnk->receiver); + return false; + } catch (const LinkError&) { + return true; + } +} +void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + checkClosed(ssn, lnk->receiver); +} +void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) +{ + checkClosed(ssn, lnk->sender); +} +void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_link_t* lnk) +{ + checkClosed(ssn); + if ((pn_link_state(lnk) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { + pn_condition_t* error = pn_link_remote_condition(lnk); + std::string text = get_error_string(error, "Link detached by peer"); + pn_link_close(lnk); + std::string name = pn_condition_get_name(error); + if (name == qpid::amqp::error_conditions::NOT_FOUND) { + throw qpid::messaging::NotFound(text); + } else if (name == qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS) { + throw qpid::messaging::UnauthorizedAccess(text); + } else { + throw qpid::messaging::LinkError(text); + } + } else if ((pn_link_state(lnk) & IS_CLOSED) == IS_CLOSED) { + throw qpid::messaging::LinkError("Link is not attached"); + } +} + +void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s) +{ + if (s->error) return; + pn_session_open(s->session); + wakeupDriver(); + while (pn_session_state(s->session) & PN_REMOTE_UNINIT) { + wait(); + } + + for (SessionContext::SenderMap::iterator i = s->senders.begin(); i != s->senders.end(); ++i) { + QPID_LOG(debug, id << " reattaching sender " << i->first); + attach(s, i->second->sender); + i->second->verify(); + QPID_LOG(debug, id << " sender " << i->first << " reattached"); + i->second->resend(); + } + for (SessionContext::ReceiverMap::iterator i = s->receivers.begin(); i != s->receivers.end(); ++i) { + QPID_LOG(debug, id << " reattaching receiver " << i->first); + if (i->second->capacity) { + attach(s, i->second->receiver, i->second->capacity); + } else { + attach(s, i->second->receiver, (uint32_t) i->second->fetching); + } + i->second->verify(); + QPID_LOG(debug, id << " receiver " << i->first << " reattached"); + } + wakeupDriver(); +} + +boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n) +{ + boost::shared_ptr<SessionContext> session; + std::string name = n.empty() ? qpid::framing::Uuid(true).str() : n; + { + sys::Monitor::ScopedLock l(lock); + SessionMap::const_iterator i = sessions.find(name); + if (i == sessions.end()) { + session = boost::shared_ptr<SessionContext>(new SessionContext(connection)); + session->setName(name); + pn_session_open(session->session); + wakeupDriver(); + sessions[name] = session; // Add it now so it will be restarted if we reconnect in wait() + while (pn_session_state(session->session) & PN_REMOTE_UNINIT) { + wait(); + } + } else { + throw qpid::messaging::KeyError(std::string("Session already exists: ") + name); + } + + } + if (transactional) { // Outside of lock + startTxSession(session); + } + return session; +} + +boost::shared_ptr<SessionContext> ConnectionContext::getSession(const std::string& name) const +{ + SessionMap::const_iterator i = sessions.find(name); + if (i == sessions.end()) { + throw qpid::messaging::KeyError(std::string("No such session") + name); + } else { + return i->second; + } +} + +void ConnectionContext::setOption(const std::string& name, const qpid::types::Variant& value) +{ + set(name, value); +} + +std::string ConnectionContext::getAuthenticatedUsername() +{ + return sasl.get() ? sasl->getAuthenticatedUsername() : std::string(); +} + +std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size) +{ + sys::Monitor::ScopedLock l(lock); + QPID_LOG(trace, id << " decode(" << size << ")"); + if (readHeader) { + size_t decoded = readProtocolHeader(buffer, size); + if (decoded < size) { + decoded += decode(buffer + decoded, size - decoded); + } + return decoded; + } + + //TODO: Fix pn_engine_input() to take const buffer + ssize_t n = pn_transport_input(engine, const_cast<char*>(buffer), size); + if (n > 0 || n == PN_EOS) { + // PN_EOS either means we received a Close (which also means we've + // consumed all the input), OR some Very Bad Thing happened and this + // connection is toast. + if (n == PN_EOS) + { + std::string error; + if (checkTransportError(error)) { + // "He's dead, Jim." + QPID_LOG_CAT(error, network, id << " connection failed: " << error); + transport->abort(); + return 0; + } else { + n = size; // assume all consumed + } + } + QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size) + pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC); + lock.notifyAll(); + return n; + } else if (n == PN_ERR) { + std::string error; + checkTransportError(error); + QPID_LOG_CAT(error, network, id << " connection error: " << error); + transport->abort(); + return 0; + } else { + return 0; + } + +} +std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size) +{ + sys::Monitor::ScopedLock l(lock); + QPID_LOG(trace, id << " encode(" << size << ")"); + if (writeHeader) { + size_t encoded = writeProtocolHeader(buffer, size); + if (encoded < size) { + encoded += encode(buffer + encoded, size - encoded); + } + return encoded; + } + + ssize_t n = pn_transport_output(engine, buffer, size); + if (n > 0) { + QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size) + haveOutput = true; + if (notifyOnWrite) lock.notifyAll(); + return n; + } else if (n == PN_ERR) { + std::string error; + checkTransportError(error); + QPID_LOG_CAT(error, network, id << " connection error: " << error); + transport->abort(); + return 0; + } else if (n == PN_EOS) { + haveOutput = false; + // Normal close, or error? + std::string error; + if (checkTransportError(error)) { + QPID_LOG_CAT(error, network, id << " connection failed: " << error); + transport->abort(); + } + return 0; + } else { + haveOutput = false; + return 0; + } +} +bool ConnectionContext::canEncodePlain() +{ + sys::Monitor::ScopedLock l(lock); + pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC); + return haveOutput && state == CONNECTED; +} +void ConnectionContext::closed() +{ + sys::Monitor::ScopedLock l(lock); + state = DISCONNECTED; + lock.notifyAll(); +} +void ConnectionContext::opened() +{ + sys::Monitor::ScopedLock l(lock); + state = CONNECTED; + lock.notifyAll(); +} +bool ConnectionContext::isClosed() const +{ + return !isOpen(); +} +namespace { +qpid::framing::ProtocolVersion AMQP_1_0_PLAIN(1,0,qpid::framing::ProtocolVersion::AMQP); +} + +std::string ConnectionContext::getError() +{ + return get_error(connection, engine); +} + +framing::ProtocolVersion ConnectionContext::getVersion() const +{ + return AMQP_1_0_PLAIN; +} + +std::size_t ConnectionContext::readProtocolHeader(const char* buffer, std::size_t size) +{ + framing::ProtocolInitiation pi(getVersion()); + if (size >= pi.encodedSize()) { + readHeader = false; + qpid::framing::Buffer out(const_cast<char*>(buffer), size); + pi.decode(out); + QPID_LOG_CAT(debug, protocol, id << " read protocol header: " << pi); + return pi.encodedSize(); + } else { + return 0; + } +} +std::size_t ConnectionContext::writeProtocolHeader(char* buffer, std::size_t size) +{ + framing::ProtocolInitiation pi(getVersion()); + if (size >= pi.encodedSize()) { + QPID_LOG_CAT(debug, protocol, id << " writing protocol header: " << pi); + writeHeader = false; + qpid::framing::Buffer out(buffer, size); + pi.encode(out); + return pi.encodedSize(); + } else { + QPID_LOG_CAT(debug, protocol, id << " insufficient buffer for protocol header: " << size) + return 0; + } +} +bool ConnectionContext::useSasl() +{ + return !(mechanism == "none" || mechanism == "NONE" || mechanism == "None"); +} + +qpid::sys::Codec& ConnectionContext::getCodec() +{ + return *this; +} + +const qpid::messaging::ConnectionOptions* ConnectionContext::getOptions() +{ + return this; +} + +std::size_t ConnectionContext::decode(const char* buffer, std::size_t size) +{ + sys::Monitor::ScopedLock l(lock); + size_t decoded = 0; + try { + if (sasl.get() && !sasl->authenticated()) { + decoded = sasl->decode(buffer, size); + if (!sasl->authenticated()) return decoded; + } + if (decoded < size) { + if (sasl.get() && sasl->getSecurityLayer()) decoded += sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded); + else decoded += decodePlain(buffer+decoded, size-decoded); + } + } catch (const AuthenticationFailure&) { + transport->close(); + } + return decoded; +} +std::size_t ConnectionContext::encode(char* buffer, std::size_t size) +{ + sys::Monitor::ScopedLock l(lock); + size_t encoded = 0; + try { + if (sasl.get() && sasl->canEncode()) { + encoded += sasl->encode(buffer, size); + if (!sasl->authenticated()) return encoded; + } + if (encoded < size) { + if (sasl.get() && sasl->getSecurityLayer()) encoded += sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded); + else encoded += encodePlain(buffer+encoded, size-encoded); + } + } catch (const AuthenticationFailure&) { + transport->close(); + } + return encoded; +} +bool ConnectionContext::canEncode() +{ + sys::Monitor::ScopedLock l(lock); + if (sasl.get()) { + try { + if (sasl->canEncode()) return true; + else if (!sasl->authenticated()) return false; + else if (sasl->getSecurityLayer()) return sasl->getSecurityLayer()->canEncode(); + } catch (const AuthenticationFailure&) { + transport->close(); + return false; + } + } + return canEncodePlain(); +} + +namespace { +const std::string CLIENT_PROCESS_NAME("qpid.client_process"); +const std::string CLIENT_PID("qpid.client_pid"); +const std::string CLIENT_PPID("qpid.client_ppid"); +} +void ConnectionContext::setProperties() +{ + PnData data(pn_connection_properties(connection)); + pn_data_put_map(data.data); + pn_data_enter(data.data); + data.putSymbol(CLIENT_PROCESS_NAME); + data.putSymbol(sys::SystemInfo::getProcessName()); + data.putSymbol(CLIENT_PID); + data.put(int32_t(sys::SystemInfo::getProcessId())); + data.putSymbol(CLIENT_PPID); + data.put(int32_t(sys::SystemInfo::getParentProcessId())); + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) + { + data.putSymbol(i->first); + data.put(i->second); + } + pn_data_exit(data.data); +} + +const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettings() +{ + return transport ? transport->getSecuritySettings() : 0; +} + +void ConnectionContext::open() +{ + sys::Monitor::ScopedLock l(lock); + if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); + if (!driver) driver = DriverImpl::getDefault(); + QPID_LOG(info, "Starting connection to " << fullUrl); + autoconnect(); +} + + +namespace { +double FOREVER(std::numeric_limits<double>::max()); +bool expired(const sys::AbsTime& start, double timeout) +{ + if (timeout == 0) return true; + if (timeout == FOREVER) return false; + qpid::sys::Duration used(start, qpid::sys::now()); + qpid::sys::Duration allowed((int64_t)(timeout*qpid::sys::TIME_SEC)); + return allowed < used; +} +const std::string COLON(":"); +} + +void throwConnectFail(const Url& url, const std::string& msg) { + throw qpid::messaging::TransportFailure( + Msg() << "Connect failed to " << url << ": " << msg); +} + +void ConnectionContext::autoconnect() +{ + qpid::sys::AbsTime started(qpid::sys::now()); + for (double i = minReconnectInterval; !tryConnectUrl(fullUrl); i = std::min(i*2, maxReconnectInterval)) { + if (!ConnectionOptions::reconnect) throwConnectFail(fullUrl, "Reconnect disabled"); + if (limit >= 0 && retries++ >= limit) throwConnectFail(fullUrl, "Exceeded retries"); + if (expired(started, timeout)) throwConnectFail(fullUrl, "Exceeded timeout"); + QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds to" + << fullUrl); + qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds. + } + retries = 0; +} + +void ConnectionContext::reconnect(const Url& url) { + QPID_LOG(notice, "Reconnecting to " << url); + sys::Monitor::ScopedLock l(lock); + if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); + if (!driver) driver = DriverImpl::getDefault(); + reset(); + if (!tryConnectUrl(url)) throwConnectFail(url, "Failed to reconnect"); + QPID_LOG(notice, "Reconnected to " << currentUrl); +} + +void ConnectionContext::reconnect(const std::string& url) { reconnect(Url(url)); } + +void ConnectionContext::reconnect() { reconnect(fullUrl); } + +void ConnectionContext::waitNoReconnect() { + if (!checkDisconnected()) { + lock.wait(); + checkDisconnected(); + } +} + +// Try to connect to a URL, i.e. try to connect to each of its addresses in turn +// till one succeeds or they all fail. +// @return true if we connect successfully +bool ConnectionContext::tryConnectUrl(const Url& url) +{ + if (url.getUser().size()) username = url.getUser(); + if (url.getPass().size()) password = url.getPass(); + + for (Url::const_iterator i = url.begin(); i != url.end(); ++i) { + QPID_LOG(info, "Connecting to " << *i); + if (tryConnectAddr(*i) && tryOpenAddr(*i)) { + QPID_LOG(info, "Connected to " << *i); + return true; + } + } + return false; +} + +// Try to open an AMQP protocol connection on an address, after we have already +// established a transport connect (see tryConnectAddr below) +// @return true if the AMQP connection is succesfully opened. +bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) { + currentUrl = Url(addr); + if (sasl.get()) { + wakeupDriver(); + while (!sasl->authenticated() && state != DISCONNECTED) { + QPID_LOG(debug, id << " Waiting to be authenticated..."); + waitNoReconnect(); + } + if (state == DISCONNECTED) return false; + QPID_LOG(debug, id << " Authenticated"); + } + + QPID_LOG(debug, id << " Opening..."); + pn_connection_open(connection); + wakeupDriver(); //want to write + while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) && + state != DISCONNECTED) + waitNoReconnect(); + if (state == DISCONNECTED) return false; + if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { + throw qpid::messaging::ConnectionError("Failed to open connection"); + } + + // Connection open - check for idle timeout from the remote and start a + // periodic tick to monitor for idle connections + pn_timestamp_t remote = pn_transport_get_remote_idle_timeout(engine); + pn_timestamp_t local = pn_transport_get_idle_timeout(engine); + uint64_t shortest = ((remote && local) + ? std::min(remote, local) + : (remote) ? remote : local); + if (shortest) { + // send an idle frame at least twice before timeout + shortest = (shortest + 1)/2; + qpid::sys::Duration d(shortest * qpid::sys::TIME_MSEC); + ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask(d, driver->getTimer(), *this)); + driver->getTimer().add(ticker); + QPID_LOG(debug, id << " AMQP 1.0 idle-timeout set:" + << " local=" << pn_transport_get_idle_timeout(engine) + << " remote=" << pn_transport_get_remote_idle_timeout(engine)); + } + + QPID_LOG(debug, id << " Opened"); + + return restartSessions(); +} + +std::string ConnectionContext::getUrl() const +{ + sys::Monitor::ScopedLock l(lock); + return (state == CONNECTED) ? currentUrl.str() : std::string(); +} + +// Try to establish a transport connect to an individual address (typically a +// TCP host:port) +// @return true if we succeed in connecting. +bool ConnectionContext::tryConnectAddr(const qpid::Address& address) +{ + transport = driver->getTransport(address.protocol, *this); + id = boost::lexical_cast<std::string>(address); + if (useSasl()) { + sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, address.host)); + } + state = CONNECTING; + try { + QPID_LOG(debug, id << " Connecting ..."); + transport->connect(address.host, boost::lexical_cast<std::string>(address.port)); + bool waiting(true); + while (waiting) { + switch (state) { + case CONNECTED: + QPID_LOG(debug, id << " Connected"); + return true; + case CONNECTING: + lock.wait(); + break; + case DISCONNECTED: + waiting = false; + break; + } + } + } catch (const std::exception& e) { + QPID_LOG(info, id << " Error while connecting: " << e.what()); + state = DISCONNECTED; + } + transport = boost::shared_ptr<Transport>(); + return false; +} + +bool ConnectionContext::restartSessions() +{ + try { + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + restartSession(i->second); + } + return true; + } catch (const qpid::TransportFailure& e) { + QPID_LOG(debug, "Connection Failed to re-initialize sessions: " << e.what()); + return false; + } +} + +void ConnectionContext::initSecurityLayer(qpid::sys::SecurityLayer& s) +{ + s.init(&codecAdapter); +} + +ConnectionContext::CodecAdapter::CodecAdapter(ConnectionContext& c) : context(c) {} +std::size_t ConnectionContext::CodecAdapter::decode(const char* buffer, std::size_t size) +{ + return context.decodePlain(buffer, size); +} +std::size_t ConnectionContext::CodecAdapter::encode(char* buffer, std::size_t size) +{ + return context.encodePlain(buffer, size); +} +bool ConnectionContext::CodecAdapter::canEncode() +{ + return context.canEncodePlain(); +} + +void ConnectionContext::startTxSession(boost::shared_ptr<SessionContext> session) { + try { + QPID_LOG(debug, id << " attaching transaction for " << session->getName()); + boost::shared_ptr<Transaction> tx(new Transaction(session->session)); + session->transaction = tx; + { + sys::Monitor::ScopedLock l(lock); + attach(session, boost::shared_ptr<SenderContext>(tx)); + } + tx->declare(boost::bind(&ConnectionContext::send, this, _1, _2, _3, _4, _5), session); + } catch (const Exception& e) { + throw TransactionError(Msg() << "Cannot start transaction: " << e.what()); + } +} + +void ConnectionContext::discharge(boost::shared_ptr<SessionContext> session, bool fail) { + { + sys::Monitor::ScopedLock l(lock); + checkClosed(session); + if (!session->transaction) + throw TransactionError("No Transaction"); + Transaction::SendFunction sendFn = boost::bind( + &ConnectionContext::sendLH, this, _1, _2, _3, _4, _5, boost::ref(l)); + syncLH(session, boost::ref(l)); // Sync to make sure all tx transfers have been received. + session->transaction->discharge(sendFn, session, fail); + session->transaction->declare(sendFn, session); + } +} + +void ConnectionContext::commit(boost::shared_ptr<SessionContext> session) { + discharge(session, false); +} + +void ConnectionContext::rollback(boost::shared_ptr<SessionContext> session) { + discharge(session, true); +} + + +// setup the transport and connection objects: +void ConnectionContext::configureConnection() +{ + pn_connection_set_container(connection, identifier.c_str()); + setProperties(); + if (heartbeat) { + // fail an idle connection at 2 x heartbeat (in msecs) + pn_transport_set_idle_timeout(engine, heartbeat*2*1000); + } + + bool enableTrace(false); + QPID_LOG_TEST_CAT(trace, protocol, enableTrace); + if (enableTrace) { + pn_transport_trace(engine, PN_TRACE_FRM); + set_tracer(engine, this); + } + + int err = pn_transport_bind(engine, connection); + if (err) + QPID_LOG(error, id << " Error binding connection and transport: " << err); +} + + +// check for failures of the transport: +bool ConnectionContext::checkTransportError(std::string& text) +{ + std::stringstream info; + +#ifdef USE_PROTON_TRANSPORT_CONDITION + pn_condition_t* tcondition = pn_transport_condition(engine); + if (pn_condition_is_set(tcondition)) + info << get_error_string(tcondition, "transport error", ": "); +#else + pn_error_t* terror = pn_transport_error(engine); + if (terror) info << "transport error " << pn_error_text(terror) << " [" << terror << "]"; +#endif + + text = info.str(); + return !text.empty(); +} +}}} // namespace qpid::messaging::amqp |