/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/client/amqp0_10/SessionImpl.h" #include "qpid/client/amqp0_10/ConnectionImpl.h" #include "qpid/client/amqp0_10/ReceiverImpl.h" #include "qpid/client/amqp0_10/SenderImpl.h" #include "qpid/client/amqp0_10/MessageSource.h" #include "qpid/client/amqp0_10/MessageSink.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/client/SessionImpl.h" #include "qpid/messaging/PrivateImplRef.h" #include "qpid/Exception.h" #include "qpid/log/Statement.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/Connection.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/MessageImpl.h" #include "qpid/messaging/Sender.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Session.h" #include #include #include using qpid::messaging::KeyError; using qpid::messaging::NoMessageAvailable; using qpid::messaging::MessagingException; using qpid::messaging::TransactionAborted; using qpid::messaging::SessionError; using qpid::messaging::MessageImplAccess; using qpid::messaging::Sender; using qpid::messaging::Receiver; namespace qpid { namespace client { namespace amqp0_10 { typedef qpid::sys::Mutex::ScopedLock ScopedLock; typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock; SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {} void SessionImpl::checkError() { qpid::client::SessionBase_0_10Access s(session); s.get()->assertOpen(); } bool SessionImpl::hasError() { qpid::client::SessionBase_0_10Access s(session); return s.get()->hasError(); } void SessionImpl::sync(bool block) { if (block) retry(); else execute(); } void SessionImpl::commit() { if (!execute()) { throw TransactionAborted("Transaction aborted due to transport failure"); } } void SessionImpl::rollback() { //If the session fails during this operation, the transaction will //be rolled back anyway. execute(); } void SessionImpl::acknowledge(bool sync_) { //Should probably throw an exception on failure here, or indicate //it through a return type at least. Failure means that the //message may be redelivered; i.e. the application cannot delete //any state necessary for preventing reprocessing of the message execute(); sync(sync_); } void SessionImpl::reject(qpid::messaging::Message& m) { //Possibly want to somehow indicate failure here as well. Less //clear need as compared to acknowledge however. execute1(m); } void SessionImpl::release(qpid::messaging::Message& m) { execute1(m); } void SessionImpl::acknowledge(qpid::messaging::Message& m) { //Should probably throw an exception on failure here, or indicate //it through a return type at least. Failure means that the //message may be redelivered; i.e. the application cannot delete //any state necessary for preventing reprocessing of the message execute1(m); } void SessionImpl::close() { if (hasError()) { ScopedLock l(lock); senders.clear(); receivers.clear(); } else { while (true) { Sender s; { ScopedLock l(lock); if (senders.empty()) break; s = senders.begin()->second; } s.close(); // outside the lock, will call senderCancelled } while (true) { Receiver r; { ScopedLock l(lock); if (receivers.empty()) break; r = receivers.begin()->second; } r.close(); // outside the lock, will call receiverCancelled } } connection->closed(*this); if (!hasError()) session.close(); } template boost::intrusive_ptr getImplPtr(T& t) { return boost::dynamic_pointer_cast(qpid::messaging::PrivateImplRef::get(t)); } template void getFreeKey(std::string& key, T& map) { std::string name = key; int count = 1; for (typename T::const_iterator i = map.find(name); i != map.end(); i = map.find(name)) { name = (boost::format("%1%_%2%") % key % ++count).str(); } key = name; } void SessionImpl::setSession(qpid::client::Session s) { ScopedLock l(lock); session = s; incoming.setSession(session); if (transactional) session.txSelect(); for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { getImplPtr(i->second)->init(session, resolver); } for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { getImplPtr(i->second)->init(session, resolver); } session.sync(); } struct SessionImpl::CreateReceiver : Command { qpid::messaging::Receiver result; const qpid::messaging::Address& address; CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) : Command(i), address(a) {} void operator()() { result = impl.createReceiverImpl(address); } }; Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address) { return get1(address); } Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address) { ScopedLock l(lock); std::string name = address.getName(); getFreeKey(name, receivers); Receiver receiver(new ReceiverImpl(*this, name, address)); getImplPtr(receiver)->init(session, resolver); receivers[name] = receiver; return receiver; } struct SessionImpl::CreateSender : Command { qpid::messaging::Sender result; const qpid::messaging::Address& address; CreateSender(SessionImpl& i, const qpid::messaging::Address& a) : Command(i), address(a) {} void operator()() { result = impl.createSenderImpl(address); } }; Sender SessionImpl::createSender(const qpid::messaging::Address& address) { return get1(address); } Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address) { ScopedLock l(lock); std::string name = address.getName(); getFreeKey(name, senders); Sender sender(new SenderImpl(*this, name, address)); getImplPtr(sender)->init(session, resolver); senders[name] = sender; return sender; } Sender SessionImpl::getSender(const std::string& name) const { qpid::sys::Mutex::ScopedLock l(lock); Senders::const_iterator i = senders.find(name); if (i == senders.end()) { throw KeyError(name); } else { return i->second; } } Receiver SessionImpl::getReceiver(const std::string& name) const { qpid::sys::Mutex::ScopedLock l(lock); Receivers::const_iterator i = receivers.find(name); if (i == receivers.end()) { throw KeyError(name); } else { return i->second; } } SessionImpl& SessionImpl::convert(qpid::messaging::Session& s) { boost::intrusive_ptr impl = getImplPtr(s); if (!impl) { throw SessionError(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl")); } return *impl; } namespace { struct IncomingMessageHandler : IncomingMessages::Handler { typedef boost::function1 Callback; Callback callback; IncomingMessageHandler(Callback c) : callback(c) {} bool accept(IncomingMessages::MessageTransfer& transfer) { return callback(transfer); } }; } bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageTransfer& transfer) { ScopedLock l(lock); Receivers::const_iterator i = receivers.find(transfer.getDestination()); if (i == receivers.end()) { QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination()); return false; } else { *receiver = i->second; return true; } } bool SessionImpl::accept(ReceiverImpl* receiver, qpid::messaging::Message* message, IncomingMessages::MessageTransfer& transfer) { if (receiver->getName() == transfer.getDestination()) { transfer.retrieve(message); receiver->received(*message); return true; } else { return false; } } qpid::sys::Duration adjust(qpid::messaging::Duration timeout) { uint64_t ms = timeout.getMilliseconds(); if (ms < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) { return ms * qpid::sys::TIME_MSEC; } else { return qpid::sys::TIME_INFINITE; } } bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout) { return incoming.get(handler, adjust(timeout)); } bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout) { IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, _1)); return getIncoming(handler, timeout); } bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout) { while (true) { try { std::string destination; if (incoming.getNextDestination(destination, adjust(timeout))) { qpid::sys::Mutex::ScopedLock l(lock); Receivers::const_iterator i = receivers.find(destination); if (i == receivers.end()) { throw qpid::messaging::ReceiverError(QPID_MSG("Received message for unknown destination " << destination)); } else { receiver = i->second; } return true; } else { return false; } } catch (TransportFailure&) { reconnect(); } catch (const qpid::framing::ResourceLimitExceededException& e) { if (backoff()) return false; else throw qpid::messaging::TargetCapacityExceeded(e.what()); } catch (const qpid::framing::UnauthorizedAccessException& e) { throw qpid::messaging::UnauthorizedAccess(e.what()); } catch (const qpid::SessionException& e) { throw qpid::messaging::SessionError(e.what()); } catch (const qpid::ConnectionException& e) { throw qpid::messaging::ConnectionError(e.what()); } catch (const qpid::ChannelException& e) { throw qpid::messaging::MessagingException(e.what()); } } } qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::messaging::Duration timeout) { qpid::messaging::Receiver receiver; if (!nextReceiver(receiver, timeout)) throw NoMessageAvailable(); if (!receiver) throw SessionError("Bad receiver returned!"); return receiver; } uint32_t SessionImpl::getReceivable() { return get1((const std::string*) 0); } uint32_t SessionImpl::getReceivable(const std::string& destination) { return get1(&destination); } struct SessionImpl::Receivable : Command { const std::string* destination; uint32_t result; Receivable(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} void operator()() { result = impl.getReceivableImpl(destination); } }; uint32_t SessionImpl::getReceivableImpl(const std::string* destination) { ScopedLock l(lock); if (destination) { return incoming.available(*destination); } else { return incoming.available(); } } uint32_t SessionImpl::getUnsettledAcks() { return get1((const std::string*) 0); } uint32_t SessionImpl::getUnsettledAcks(const std::string& destination) { return get1(&destination); } struct SessionImpl::UnsettledAcks : Command { const std::string* destination; uint32_t result; UnsettledAcks(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} void operator()() { result = impl.getUnsettledAcksImpl(destination); } }; uint32_t SessionImpl::getUnsettledAcksImpl(const std::string* destination) { ScopedLock l(lock); if (destination) { return incoming.pendingAccept(*destination); } else { return incoming.pendingAccept(); } } void SessionImpl::syncImpl(bool block) { if (block) session.sync(); else session.flush(); //cleanup unconfirmed accept records: incoming.pendingAccept(); } void SessionImpl::commitImpl() { ScopedLock l(lock); incoming.accept(); session.txCommit(); } void SessionImpl::rollbackImpl() { ScopedLock l(lock); for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { getImplPtr(i->second)->stop(); } //ensure that stop has been processed and all previously sent //messages are available for release: session.sync(); incoming.releaseAll(); session.txRollback(); for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { getImplPtr(i->second)->start(); } } void SessionImpl::acknowledgeImpl() { ScopedLock l(lock); if (!transactional) incoming.accept(); } void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m) { ScopedLock l(lock); if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId()); } void SessionImpl::rejectImpl(qpid::messaging::Message& m) { SequenceSet set; set.add(MessageImplAccess::get(m).getInternalId()); session.messageReject(set); } void SessionImpl::releaseImpl(qpid::messaging::Message& m) { SequenceSet set; set.add(MessageImplAccess::get(m).getInternalId()); session.messageRelease(set); } void SessionImpl::receiverCancelled(const std::string& name) { ScopedLock l(lock); receivers.erase(name); session.sync(); incoming.releasePending(name); } void SessionImpl::releasePending(const std::string& name) { ScopedLock l(lock); incoming.releasePending(name); } void SessionImpl::senderCancelled(const std::string& name) { ScopedLock l(lock); senders.erase(name); } void SessionImpl::reconnect() { connection->open(); } bool SessionImpl::backoff() { return connection->backoff(); } qpid::messaging::Connection SessionImpl::getConnection() const { return qpid::messaging::Connection(connection.get()); } }}} // namespace qpid::client::amqp0_10