summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-01-29 21:47:21 +0000
committerAlan Conway <aconway@apache.org>2009-01-29 21:47:21 +0000
commit9aa6074f06c87be8834d2562d91faac30a5ee5f5 (patch)
treea3dd9bffa2f087be9ea1af60a16644e3f2bb3608
parentcd751dab9b74348b7dc6cc3a1816ce1ddb42af86 (diff)
downloadqpid-python-9aa6074f06c87be8834d2562d91faac30a5ee5f5.tar.gz
Better error messages for not-attached exceptions.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@739031 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp61
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h3
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp6
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp18
6 files changed, 38 insertions, 55 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index 2f6b59e901..f58d59cbd7 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -34,6 +34,8 @@ namespace amqp_0_10 {
using namespace framing;
using namespace std;
+#define CHECK_ATTACHED(MSG) if (!getState()) throw NotAttachedException(QPID_MSG(MSG << ": channel " << channel.get() << " is not attached"))
+
SessionHandler::SessionHandler(FrameHandler* out, ChannelId ch)
: channel(ch, out), peer(channel), ignoring(false), sendReady(), receiveReady() {}
@@ -61,14 +63,6 @@ session::DetachCode convert(uint8_t code) {
} // namespace
-void SessionHandler::checkAttached() {
- if (!getState())
- throw NotAttachedException(
- QPID_MSG("Channel " << channel.get() << " is not attached"));
- assert(getInHandler());
- assert(channel.next);
-}
-
void SessionHandler::invoke(const AMQMethodBody& m) {
framing::invoke(*this, m);
}
@@ -82,7 +76,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
else if (isSessionControl(m))
invoke(*m);
else {
- checkAttached();
+ CHECK_ATTACHED("receiving " << f);
if (!receiveReady)
throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data"));
if (!getState()->receiverRecord(f))
@@ -126,7 +120,7 @@ bool isCommand(const AMQFrame& f) {
} // namespace
void SessionHandler::handleOut(AMQFrame& f) {
- checkAttached();
+ CHECK_ATTACHED("sending " << f);
if (!sendReady)
throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to send data"));
getState()->senderRecord(f);
@@ -137,14 +131,6 @@ void SessionHandler::handleOut(AMQFrame& f) {
channel.handle(f);
}
-void SessionHandler::checkName(const std::string& name) {
- checkAttached();
- if (name != getState()->getId().getName())
- throw InvalidArgumentException(
- QPID_MSG("Incorrect session name: " << name
- << ", expecting: " << getState()->getId().getName()));
-}
-
void SessionHandler::attach(const std::string& name_, bool force) {
// Save the name for possible session-busy exception. Session-busy
// can be thrown before we have attached the handler to a valid
@@ -164,18 +150,27 @@ void SessionHandler::attach(const std::string& name_, bool force) {
sendCommandPoint(getState()->senderGetCommandPoint());
}
+#define CHECK_NAME(NAME, MSG) do { \
+ CHECK_ATTACHED(MSG); \
+ if (NAME != getState()->getId().getName()) \
+ throw InvalidArgumentException( \
+ QPID_MSG(MSG << ": incorrect session name: " << NAME \
+ << ", expecting: " << getState()->getId().getName())); \
+ } while(0)
+
+
void SessionHandler::attached(const std::string& name) {
- checkName(name);
+ CHECK_NAME(name, "session.attached");
}
void SessionHandler::detach(const std::string& name) {
- checkName(name);
+ CHECK_NAME(name, "session.detach");
peer.detached(name, session::DETACH_CODE_NORMAL);
handleDetach();
}
void SessionHandler::detached(const std::string& name, uint8_t code) {
- checkName(name);
+ CHECK_NAME(name, "session.detached");
ignoring = false;
if (code != session::DETACH_CODE_NORMAL)
channelException(convert(code), "session.detached from peer.");
@@ -189,18 +184,18 @@ void SessionHandler::handleDetach() {
}
void SessionHandler::requestTimeout(uint32_t t) {
- checkAttached();
+ CHECK_ATTACHED("session.request-timeout");
getState()->setTimeout(t);
peer.timeout(t);
}
void SessionHandler::timeout(uint32_t t) {
- checkAttached();
+ CHECK_ATTACHED("session.request-timeout");
getState()->setTimeout(t);
}
void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) {
- checkAttached();
+ CHECK_ATTACHED("session.command-point");
getState()->receiverSetCommandPoint(SessionPoint(id, offset));
if (!receiveReady) {
receiveReady = true;
@@ -209,7 +204,7 @@ void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) {
}
void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragments*/) {
- checkAttached();
+ CHECK_ATTACHED("session.expected");
if (getState()->hasState()) { // Replay
if (commands.empty()) throw IllegalStateException(
QPID_MSG(getState()->getId() << ": has state but client is attaching as new session."));
@@ -225,14 +220,14 @@ void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragme
}
void SessionHandler::confirmed(const SequenceSet& commands, const Array& /*fragments*/) {
- checkAttached();
+ CHECK_ATTACHED("session.confirmed");
// Ignore non-contiguous confirmations.
if (!commands.empty() && commands.front() >= getState()->senderGetReplayPoint())
getState()->senderConfirmed(commands.rangesBegin()->last());
}
void SessionHandler::completed(const SequenceSet& commands, bool timelyReply) {
- checkAttached();
+ CHECK_ATTACHED("session.completed");
getState()->senderCompleted(commands);
if (getState()->senderNeedKnownCompleted() || timelyReply) {
peer.knownCompleted(commands);
@@ -241,12 +236,12 @@ void SessionHandler::completed(const SequenceSet& commands, bool timelyReply) {
}
void SessionHandler::knownCompleted(const SequenceSet& commands) {
- checkAttached();
+ CHECK_ATTACHED("session.known-completed");
getState()->receiverKnownCompleted(commands);
}
void SessionHandler::flush(bool expected, bool confirmed, bool completed) {
- checkAttached();
+ CHECK_ATTACHED("session.flush");
if (expected) {
SequenceSet expectSet;
if (getState()->hasState())
@@ -270,19 +265,19 @@ void SessionHandler::gap(const SequenceSet& /*commands*/) {
void SessionHandler::sendDetach()
{
- checkAttached();
+ CHECK_ATTACHED("session.sendDetach");
ignoring = true;
peer.detach(getState()->getId().getName());
}
void SessionHandler::sendCompletion() {
- checkAttached();
+ CHECK_ATTACHED("session.send-completion");
const SequenceSet& c = getState()->receiverGetUnknownComplete();
peer.completed(c, getState()->receiverNeedKnownCompleted());
}
void SessionHandler::sendAttach(bool force) {
- checkAttached();
+ CHECK_ATTACHED("session.send-attach");
QPID_LOG(debug, "SessionHandler::sendAttach attach id=" << getState()->getId());
peer.attach(getState()->getId().getName(), force);
if (getState()->hasState())
@@ -306,7 +301,7 @@ void SessionHandler::markReadyToSend() {
}
void SessionHandler::sendTimeout(uint32_t t) {
- checkAttached();
+ CHECK_ATTACHED("session.send-timeout");
peer.requestTimeout(t);
}
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
index 967e89c984..d7af7dd6c7 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
@@ -98,9 +98,6 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
virtual void handleIn(framing::AMQFrame&);
virtual void handleOut(framing::AMQFrame&);
- void checkAttached();
- void checkName(const std::string& name);
-
framing::ChannelHandler channel;
framing::AMQP_AllProxy::Session peer;
bool ignoring;
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 84102fb015..2c4de478f6 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -90,7 +90,7 @@ void SessionHandler::readyToSend() {
//
void SessionHandler::attached(const std::string& name) {
if (session.get()) {
- checkName(name);
+ amqp_0_10::SessionHandler::attached(name);
} else {
SessionId id(connection.getUserId(), name);
SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index d970523534..cccb1fa098 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -65,12 +65,6 @@ using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = ::qmf::org::apache::qpid::cluster;
-/**@file
- Threading notes:
- - Public functions may be called in local connection IO threads.
- see .h.
-*/
-
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
MemberId member;
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 0d99bebdd4..6e91ca8f64 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -58,9 +58,6 @@ class Connection;
/**
* Connection to the cluster
- *
- * Threading notes: 3 thread categories: connection, deliver, update.
- *
*/
class Cluster : private Cpg::Handler, public management::Manageable {
public:
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index c58133f453..f2580cb777 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -104,7 +104,7 @@ static const char UPDATE_CHARS[] = "\000qpid-update";
const std::string UpdateClient::UPDATE(UPDATE_CHARS, sizeof(UPDATE_CHARS));
void UpdateClient::update() {
- QPID_LOG(debug, updaterId << " updateing state to " << updateeId << " at " << updateeUrl);
+ QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl);
Broker& b = updaterBroker;
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
@@ -144,7 +144,7 @@ template <class T> std::string encode(const T& t) {
} // namespace
void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
- QPID_LOG(debug, updaterId << " updateing exchange " << ex->getName());
+ QPID_LOG(debug, updaterId << " updating exchange " << ex->getName());
ClusterConnectionProxy proxy(session);
proxy.exchange(encode(*ex));
}
@@ -187,7 +187,7 @@ class MessageUpdater {
void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) {
- QPID_LOG(debug, updaterId << " updateing queue " << q->getName());
+ QPID_LOG(debug, updaterId << " updating queue " << q->getName());
ClusterConnectionProxy proxy(session);
proxy.queue(encode(*q));
MessageUpdater updater(q->getName(), session);
@@ -201,7 +201,7 @@ void UpdateClient::updateBinding(const std::string& queue, const QueueBinding& b
}
void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
- QPID_LOG(debug, updaterId << " updateing connection " << *updateConnection);
+ QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
shadowConnection = catchUpConnection();
broker::Connection& bc = updateConnection->getBrokerConnection();
@@ -216,7 +216,7 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
}
void UpdateClient::updateSession(broker::SessionHandler& sh) {
- QPID_LOG(debug, updaterId << " updateing session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
+ QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
<< sh.getSession()->getId());
broker::SessionState* ss = sh.getSession();
if (!ss) return; // no session.
@@ -230,10 +230,10 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
// Re-create session state on remote connection.
// Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
- QPID_LOG(debug, updaterId << " updateing consumers.");
+ QPID_LOG(debug, updaterId << " updating consumers.");
ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this));
- QPID_LOG(debug, updaterId << " updateing unacknowledged messages.");
+ QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
std::for_each(drs.begin(), drs.end(), boost::bind(&UpdateClient::updateUnacked, this, _1));
@@ -267,7 +267,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
}
void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) {
- QPID_LOG(debug, updaterId << " updateing consumer " << ci->getName() << " on " << shadowSession.getId());
+ QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " << shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
@@ -354,7 +354,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
};
void UpdateClient::updateTxState(broker::SemanticState& s) {
- QPID_LOG(debug, updaterId << " updateing TX transaction state.");
+ QPID_LOG(debug, updaterId << " updating TX transaction state.");
ClusterConnectionProxy proxy(shadowSession);
proxy.accumulatedAck(s.getAccumulatedAck());
broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();