diff options
author | Alan Conway <aconway@apache.org> | 2009-01-29 21:47:21 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-01-29 21:47:21 +0000 |
commit | 9aa6074f06c87be8834d2562d91faac30a5ee5f5 (patch) | |
tree | a3dd9bffa2f087be9ea1af60a16644e3f2bb3608 | |
parent | cd751dab9b74348b7dc6cc3a1816ce1ddb42af86 (diff) | |
download | qpid-python-9aa6074f06c87be8834d2562d91faac30a5ee5f5.tar.gz |
Better error messages for not-attached exceptions.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@739031 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp | 61 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandler.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 18 |
6 files changed, 38 insertions, 55 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index 2f6b59e901..f58d59cbd7 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -34,6 +34,8 @@ namespace amqp_0_10 { using namespace framing; using namespace std; +#define CHECK_ATTACHED(MSG) if (!getState()) throw NotAttachedException(QPID_MSG(MSG << ": channel " << channel.get() << " is not attached")) + SessionHandler::SessionHandler(FrameHandler* out, ChannelId ch) : channel(ch, out), peer(channel), ignoring(false), sendReady(), receiveReady() {} @@ -61,14 +63,6 @@ session::DetachCode convert(uint8_t code) { } // namespace -void SessionHandler::checkAttached() { - if (!getState()) - throw NotAttachedException( - QPID_MSG("Channel " << channel.get() << " is not attached")); - assert(getInHandler()); - assert(channel.next); -} - void SessionHandler::invoke(const AMQMethodBody& m) { framing::invoke(*this, m); } @@ -82,7 +76,7 @@ void SessionHandler::handleIn(AMQFrame& f) { else if (isSessionControl(m)) invoke(*m); else { - checkAttached(); + CHECK_ATTACHED("receiving " << f); if (!receiveReady) throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data")); if (!getState()->receiverRecord(f)) @@ -126,7 +120,7 @@ bool isCommand(const AMQFrame& f) { } // namespace void SessionHandler::handleOut(AMQFrame& f) { - checkAttached(); + CHECK_ATTACHED("sending " << f); if (!sendReady) throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to send data")); getState()->senderRecord(f); @@ -137,14 +131,6 @@ void SessionHandler::handleOut(AMQFrame& f) { channel.handle(f); } -void SessionHandler::checkName(const std::string& name) { - checkAttached(); - if (name != getState()->getId().getName()) - throw InvalidArgumentException( - QPID_MSG("Incorrect session name: " << name - << ", expecting: " << getState()->getId().getName())); -} - void SessionHandler::attach(const std::string& name_, bool force) { // Save the name for possible session-busy exception. Session-busy // can be thrown before we have attached the handler to a valid @@ -164,18 +150,27 @@ void SessionHandler::attach(const std::string& name_, bool force) { sendCommandPoint(getState()->senderGetCommandPoint()); } +#define CHECK_NAME(NAME, MSG) do { \ + CHECK_ATTACHED(MSG); \ + if (NAME != getState()->getId().getName()) \ + throw InvalidArgumentException( \ + QPID_MSG(MSG << ": incorrect session name: " << NAME \ + << ", expecting: " << getState()->getId().getName())); \ + } while(0) + + void SessionHandler::attached(const std::string& name) { - checkName(name); + CHECK_NAME(name, "session.attached"); } void SessionHandler::detach(const std::string& name) { - checkName(name); + CHECK_NAME(name, "session.detach"); peer.detached(name, session::DETACH_CODE_NORMAL); handleDetach(); } void SessionHandler::detached(const std::string& name, uint8_t code) { - checkName(name); + CHECK_NAME(name, "session.detached"); ignoring = false; if (code != session::DETACH_CODE_NORMAL) channelException(convert(code), "session.detached from peer."); @@ -189,18 +184,18 @@ void SessionHandler::handleDetach() { } void SessionHandler::requestTimeout(uint32_t t) { - checkAttached(); + CHECK_ATTACHED("session.request-timeout"); getState()->setTimeout(t); peer.timeout(t); } void SessionHandler::timeout(uint32_t t) { - checkAttached(); + CHECK_ATTACHED("session.request-timeout"); getState()->setTimeout(t); } void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) { - checkAttached(); + CHECK_ATTACHED("session.command-point"); getState()->receiverSetCommandPoint(SessionPoint(id, offset)); if (!receiveReady) { receiveReady = true; @@ -209,7 +204,7 @@ void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) { } void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragments*/) { - checkAttached(); + CHECK_ATTACHED("session.expected"); if (getState()->hasState()) { // Replay if (commands.empty()) throw IllegalStateException( QPID_MSG(getState()->getId() << ": has state but client is attaching as new session.")); @@ -225,14 +220,14 @@ void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragme } void SessionHandler::confirmed(const SequenceSet& commands, const Array& /*fragments*/) { - checkAttached(); + CHECK_ATTACHED("session.confirmed"); // Ignore non-contiguous confirmations. if (!commands.empty() && commands.front() >= getState()->senderGetReplayPoint()) getState()->senderConfirmed(commands.rangesBegin()->last()); } void SessionHandler::completed(const SequenceSet& commands, bool timelyReply) { - checkAttached(); + CHECK_ATTACHED("session.completed"); getState()->senderCompleted(commands); if (getState()->senderNeedKnownCompleted() || timelyReply) { peer.knownCompleted(commands); @@ -241,12 +236,12 @@ void SessionHandler::completed(const SequenceSet& commands, bool timelyReply) { } void SessionHandler::knownCompleted(const SequenceSet& commands) { - checkAttached(); + CHECK_ATTACHED("session.known-completed"); getState()->receiverKnownCompleted(commands); } void SessionHandler::flush(bool expected, bool confirmed, bool completed) { - checkAttached(); + CHECK_ATTACHED("session.flush"); if (expected) { SequenceSet expectSet; if (getState()->hasState()) @@ -270,19 +265,19 @@ void SessionHandler::gap(const SequenceSet& /*commands*/) { void SessionHandler::sendDetach() { - checkAttached(); + CHECK_ATTACHED("session.sendDetach"); ignoring = true; peer.detach(getState()->getId().getName()); } void SessionHandler::sendCompletion() { - checkAttached(); + CHECK_ATTACHED("session.send-completion"); const SequenceSet& c = getState()->receiverGetUnknownComplete(); peer.completed(c, getState()->receiverNeedKnownCompleted()); } void SessionHandler::sendAttach(bool force) { - checkAttached(); + CHECK_ATTACHED("session.send-attach"); QPID_LOG(debug, "SessionHandler::sendAttach attach id=" << getState()->getId()); peer.attach(getState()->getId().getName(), force); if (getState()->hasState()) @@ -306,7 +301,7 @@ void SessionHandler::markReadyToSend() { } void SessionHandler::sendTimeout(uint32_t t) { - checkAttached(); + CHECK_ATTACHED("session.send-timeout"); peer.requestTimeout(t); } diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h index 967e89c984..d7af7dd6c7 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h @@ -98,9 +98,6 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler, virtual void handleIn(framing::AMQFrame&); virtual void handleOut(framing::AMQFrame&); - void checkAttached(); - void checkName(const std::string& name); - framing::ChannelHandler channel; framing::AMQP_AllProxy::Session peer; bool ignoring; diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 84102fb015..2c4de478f6 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -90,7 +90,7 @@ void SessionHandler::readyToSend() { // void SessionHandler::attached(const std::string& name) { if (session.get()) { - checkName(name); + amqp_0_10::SessionHandler::attached(name); } else { SessionId id(connection.getUserId(), name); SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig(); diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index d970523534..cccb1fa098 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -65,12 +65,6 @@ using qpid::management::Manageable; using qpid::management::Args; namespace _qmf = ::qmf::org::apache::qpid::cluster; -/**@file - Threading notes: - - Public functions may be called in local connection IO threads. - see .h. -*/ - struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; MemberId member; diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 0d99bebdd4..6e91ca8f64 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -58,9 +58,6 @@ class Connection; /** * Connection to the cluster - * - * Threading notes: 3 thread categories: connection, deliver, update. - * */ class Cluster : private Cpg::Handler, public management::Manageable { public: diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index c58133f453..f2580cb777 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -104,7 +104,7 @@ static const char UPDATE_CHARS[] = "\000qpid-update"; const std::string UpdateClient::UPDATE(UPDATE_CHARS, sizeof(UPDATE_CHARS)); void UpdateClient::update() { - QPID_LOG(debug, updaterId << " updateing state to " << updateeId << " at " << updateeUrl); + QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl); Broker& b = updaterBroker; b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); @@ -144,7 +144,7 @@ template <class T> std::string encode(const T& t) { } // namespace void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) { - QPID_LOG(debug, updaterId << " updateing exchange " << ex->getName()); + QPID_LOG(debug, updaterId << " updating exchange " << ex->getName()); ClusterConnectionProxy proxy(session); proxy.exchange(encode(*ex)); } @@ -187,7 +187,7 @@ class MessageUpdater { void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) { - QPID_LOG(debug, updaterId << " updateing queue " << q->getName()); + QPID_LOG(debug, updaterId << " updating queue " << q->getName()); ClusterConnectionProxy proxy(session); proxy.queue(encode(*q)); MessageUpdater updater(q->getName(), session); @@ -201,7 +201,7 @@ void UpdateClient::updateBinding(const std::string& queue, const QueueBinding& b } void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) { - QPID_LOG(debug, updaterId << " updateing connection " << *updateConnection); + QPID_LOG(debug, updaterId << " updating connection " << *updateConnection); shadowConnection = catchUpConnection(); broker::Connection& bc = updateConnection->getBrokerConnection(); @@ -216,7 +216,7 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda } void UpdateClient::updateSession(broker::SessionHandler& sh) { - QPID_LOG(debug, updaterId << " updateing session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " + QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " << sh.getSession()->getId()); broker::SessionState* ss = sh.getSession(); if (!ss) return; // no session. @@ -230,10 +230,10 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { // Re-create session state on remote connection. // Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33. - QPID_LOG(debug, updaterId << " updateing consumers."); + QPID_LOG(debug, updaterId << " updating consumers."); ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this)); - QPID_LOG(debug, updaterId << " updateing unacknowledged messages."); + QPID_LOG(debug, updaterId << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); std::for_each(drs.begin(), drs.end(), boost::bind(&UpdateClient::updateUnacked, this, _1)); @@ -267,7 +267,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { } void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) { - QPID_LOG(debug, updaterId << " updateing consumer " << ci->getName() << " on " << shadowSession.getId()); + QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " << shadowSession.getId()); using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), @@ -354,7 +354,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { }; void UpdateClient::updateTxState(broker::SemanticState& s) { - QPID_LOG(debug, updaterId << " updateing TX transaction state."); + QPID_LOG(debug, updaterId << " updating TX transaction state."); ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); |