summaryrefslogtreecommitdiff
path: root/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp')
-rw-r--r--trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp433
1 files changed, 0 insertions, 433 deletions
diff --git a/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
deleted file mode 100644
index c7dff05dd7..0000000000
--- a/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ /dev/null
@@ -1,433 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/client/amqp0_10/SessionImpl.h"
-#include "qpid/client/amqp0_10/ConnectionImpl.h"
-#include "qpid/client/amqp0_10/ReceiverImpl.h"
-#include "qpid/client/amqp0_10/SenderImpl.h"
-#include "qpid/client/amqp0_10/MessageSource.h"
-#include "qpid/client/amqp0_10/MessageSink.h"
-#include "qpid/client/PrivateImplRef.h"
-#include "qpid/Exception.h"
-#include "qpid/log/Statement.h"
-#include "qpid/messaging/Address.h"
-#include "qpid/messaging/Connection.h"
-#include "qpid/messaging/Message.h"
-#include "qpid/messaging/MessageImpl.h"
-#include "qpid/messaging/Sender.h"
-#include "qpid/messaging/Receiver.h"
-#include "qpid/messaging/Session.h"
-#include "qpid/framing/reply_exceptions.h"
-#include <boost/format.hpp>
-#include <boost/function.hpp>
-#include <boost/intrusive_ptr.hpp>
-
-using qpid::messaging::KeyError;
-using qpid::messaging::MessageImplAccess;
-using qpid::messaging::Sender;
-using qpid::messaging::Receiver;
-using qpid::messaging::VariantMap;
-
-namespace qpid {
-namespace client {
-namespace amqp0_10 {
-
-SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(c), transactional(t) {}
-
-
-void SessionImpl::sync()
-{
- retry<Sync>();
-}
-
-void SessionImpl::flush()
-{
- retry<Flush>();
-}
-
-void SessionImpl::commit()
-{
- if (!execute<Commit>()) {
- throw Exception();//TODO: what type?
- }
-}
-
-void SessionImpl::rollback()
-{
- //If the session fails during this operation, the transaction will
- //be rolled back anyway.
- execute<Rollback>();
-}
-
-void SessionImpl::acknowledge()
-{
- //Should probably throw an exception on failure here, or indicate
- //it through a return type at least. Failure means that the
- //message may be redelivered; i.e. the application cannot delete
- //any state necessary for preventing reprocessing of the message
- execute<Acknowledge>();
-}
-
-void SessionImpl::reject(qpid::messaging::Message& m)
-{
- //Possibly want to somehow indicate failure here as well. Less
- //clear need as compared to acknowledge however.
- execute1<Reject>(m);
-}
-
-void SessionImpl::close()
-{
- //cancel all the senders and receivers (get copy of names and then
- //make the calls to avoid modifying maps while iterating over
- //them):
- std::vector<std::string> s;
- std::vector<std::string> r;
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- for (Senders::const_iterator i = senders.begin(); i != senders.end(); ++i) s.push_back(i->first);
- for (Receivers::const_iterator i = receivers.begin(); i != receivers.end(); ++i) r.push_back(i->first);
- }
- for (std::vector<std::string>::const_iterator i = s.begin(); i != s.end(); ++i) getSender(*i).cancel();
- for (std::vector<std::string>::const_iterator i = r.begin(); i != r.end(); ++i) getReceiver(*i).cancel();
-
-
- connection.closed(*this);
- session.close();
-}
-
-template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t)
-{
- return boost::dynamic_pointer_cast<S>(qpid::client::PrivateImplRef<T>::get(t));
-}
-
-template <class T> void getFreeKey(std::string& key, T& map)
-{
- std::string name = key;
- int count = 1;
- for (typename T::const_iterator i = map.find(name); i != map.end(); i = map.find(name)) {
- name = (boost::format("%1%_%2%") % key % ++count).str();
- }
- key = name;
-}
-
-
-void SessionImpl::setSession(qpid::client::Session s)
-{
- qpid::sys::Mutex::ScopedLock l(lock);
- session = s;
- incoming.setSession(session);
- if (transactional) session.txSelect();
- for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
- getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver);
- }
- for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
- getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver);
- }
-}
-
-struct SessionImpl::CreateReceiver : Command
-{
- qpid::messaging::Receiver result;
- const qpid::messaging::Address& address;
-
- CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) :
- Command(i), address(a) {}
- void operator()() { result = impl.createReceiverImpl(address); }
-};
-
-Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address)
-{
- return get1<CreateReceiver, Receiver>(address);
-}
-
-Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address)
-{
- std::string name = address.getName();
- getFreeKey(name, receivers);
- Receiver receiver(new ReceiverImpl(*this, name, address));
- getImplPtr<Receiver, ReceiverImpl>(receiver)->init(session, resolver);
- receivers[name] = receiver;
- return receiver;
-}
-
-struct SessionImpl::CreateSender : Command
-{
- qpid::messaging::Sender result;
- const qpid::messaging::Address& address;
-
- CreateSender(SessionImpl& i, const qpid::messaging::Address& a) :
- Command(i), address(a) {}
- void operator()() { result = impl.createSenderImpl(address); }
-};
-
-Sender SessionImpl::createSender(const qpid::messaging::Address& address)
-{
- return get1<CreateSender, Sender>(address);
-}
-
-Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address)
-{
- std::string name = address.getName();
- getFreeKey(name, senders);
- Sender sender(new SenderImpl(*this, name, address));
- getImplPtr<Sender, SenderImpl>(sender)->init(session, resolver);
- senders[name] = sender;
- return sender;
-}
-
-Sender SessionImpl::getSender(const std::string& name) const
-{
- qpid::sys::Mutex::ScopedLock l(lock);
- Senders::const_iterator i = senders.find(name);
- if (i == senders.end()) {
- throw KeyError(name);
- } else {
- return i->second;
- }
-}
-
-Receiver SessionImpl::getReceiver(const std::string& name) const
-{
- qpid::sys::Mutex::ScopedLock l(lock);
- Receivers::const_iterator i = receivers.find(name);
- if (i == receivers.end()) {
- throw KeyError(name);
- } else {
- return i->second;
- }
-}
-
-SessionImpl& SessionImpl::convert(qpid::messaging::Session& s)
-{
- boost::intrusive_ptr<SessionImpl> impl = getImplPtr<qpid::messaging::Session, SessionImpl>(s);
- if (!impl) {
- throw qpid::Exception(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl"));
- }
- return *impl;
-}
-
-namespace {
-
-struct IncomingMessageHandler : IncomingMessages::Handler
-{
- typedef boost::function1<bool, IncomingMessages::MessageTransfer&> Callback;
- Callback callback;
-
- IncomingMessageHandler(Callback c) : callback(c) {}
-
- bool accept(IncomingMessages::MessageTransfer& transfer)
- {
- return callback(transfer);
- }
-};
-
-}
-
-
-bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageTransfer& transfer)
-{
- Receivers::const_iterator i = receivers.find(transfer.getDestination());
- if (i == receivers.end()) {
- QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination());
- return false;
- } else {
- *receiver = i->second;
- return true;
- }
-}
-
-bool SessionImpl::accept(ReceiverImpl* receiver,
- qpid::messaging::Message* message,
- IncomingMessages::MessageTransfer& transfer)
-{
- if (receiver->getName() == transfer.getDestination()) {
- transfer.retrieve(message);
- receiver->received(*message);
- return true;
- } else {
- return false;
- }
-}
-
-bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout)
-{
- return incoming.get(handler, timeout);
-}
-
-bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout)
-{
- IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, _1));
- return getIncoming(handler, timeout);
-}
-
-bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout)
-{
- qpid::sys::Mutex::ScopedLock l(lock);
- while (true) {
- try {
- std::string destination;
- if (incoming.getNextDestination(destination, timeout)) {
- Receivers::const_iterator i = receivers.find(destination);
- if (i == receivers.end()) {
- throw qpid::Exception(QPID_MSG("Received message for unknown destination " << destination));
- } else {
- receiver = i->second;
- }
- return true;
- } else {
- return false;
- }
- } catch (TransportFailure&) {
- reconnect();
- }
- }
-}
-
-qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::sys::Duration timeout)
-{
- qpid::messaging::Receiver receiver;
- if (!nextReceiver(receiver, timeout)) throw Receiver::NoMessageAvailable();
- if (!receiver) throw qpid::Exception("Bad receiver returned!");
- return receiver;
-}
-
-uint32_t SessionImpl::available()
-{
- return get1<Available, uint32_t>((const std::string*) 0);
-}
-uint32_t SessionImpl::available(const std::string& destination)
-{
- return get1<Available, uint32_t>(&destination);
-}
-
-struct SessionImpl::Available : Command
-{
- const std::string* destination;
- uint32_t result;
-
- Available(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
- void operator()() { result = impl.availableImpl(destination); }
-};
-
-uint32_t SessionImpl::availableImpl(const std::string* destination)
-{
- if (destination) {
- return incoming.available(*destination);
- } else {
- return incoming.available();
- }
-}
-
-uint32_t SessionImpl::pendingAck()
-{
- return get1<PendingAck, uint32_t>((const std::string*) 0);
-}
-
-uint32_t SessionImpl::pendingAck(const std::string& destination)
-{
- return get1<PendingAck, uint32_t>(&destination);
-}
-
-struct SessionImpl::PendingAck : Command
-{
- const std::string* destination;
- uint32_t result;
-
- PendingAck(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
- void operator()() { result = impl.pendingAckImpl(destination); }
-};
-
-uint32_t SessionImpl::pendingAckImpl(const std::string* destination)
-{
- if (destination) {
- return incoming.pendingAccept(*destination);
- } else {
- return incoming.pendingAccept();
- }
-}
-
-void SessionImpl::syncImpl()
-{
- session.sync();
-}
-
-void SessionImpl::flushImpl()
-{
- session.flush();
-}
-
-
-void SessionImpl::commitImpl()
-{
- incoming.accept();
- session.txCommit();
-}
-
-void SessionImpl::rollbackImpl()
-{
- for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
- getImplPtr<Receiver, ReceiverImpl>(i->second)->stop();
- }
- //ensure that stop has been processed and all previously sent
- //messages are available for release:
- session.sync();
- incoming.releaseAll();
- session.txRollback();
-
- for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
- getImplPtr<Receiver, ReceiverImpl>(i->second)->start();
- }
-}
-
-void SessionImpl::acknowledgeImpl()
-{
- incoming.accept();
-}
-
-void SessionImpl::rejectImpl(qpid::messaging::Message& m)
-{
- SequenceSet set;
- set.add(MessageImplAccess::get(m).getInternalId());
- session.messageReject(set);
-}
-
-void SessionImpl::receiverCancelled(const std::string& name)
-{
- receivers.erase(name);
- session.sync();
- incoming.releasePending(name);
-}
-
-void SessionImpl::senderCancelled(const std::string& name)
-{
- senders.erase(name);
-}
-
-void SessionImpl::reconnect()
-{
- connection.reconnect();
-}
-
-qpid::messaging::Connection SessionImpl::getConnection() const
-{
- return qpid::messaging::Connection(&connection);
-}
-
-}}} // namespace qpid::client::amqp0_10