summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-08-05 19:29:09 +0000
committerAlan Conway <aconway@apache.org>2008-08-05 19:29:09 +0000
commitb5f8cf1bd9b5652e2691d6bc5b9b1c3228f53d68 (patch)
tree3c74eaf22c916844bea3b130a8ee6bd2635261d7 /cpp
parentbd47fd629bb2356df93af70b174a6a070f3a58cc (diff)
downloadqpid-python-b5f8cf1bd9b5652e2691d6bc5b9b1c3228f53d68.tar.gz
Fix Cluster::send encode race.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@682885 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Daemon.cpp2
-rw-r--r--cpp/src/qpid/client/Connector.cpp4
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp7
-rw-r--r--cpp/src/qpid/cluster/Cluster.h1
-rw-r--r--cpp/src/tests/cluster_test.cpp27
5 files changed, 30 insertions, 11 deletions
diff --git a/cpp/src/qpid/broker/Daemon.cpp b/cpp/src/qpid/broker/Daemon.cpp
index 89d0e20a2b..c311730f76 100644
--- a/cpp/src/qpid/broker/Daemon.cpp
+++ b/cpp/src/qpid/broker/Daemon.cpp
@@ -121,7 +121,7 @@ uint16_t Daemon::wait(int timeout) { // parent waits for child.
FD_ZERO(&fds);
FD_SET(pipeFds[0], &fds);
int n=select(FD_SETSIZE, &fds, 0, 0, &tv);
- if(n==0) throw ErrnoException("Timed out waiting for daemon");
+ if(n==0) throw Exception("Timed out waiting for daemon");
if(n<0) throw ErrnoException("Error waiting for daemon");
uint16_t port = 0;
/*
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 524155b929..f4f414bc63 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -155,7 +155,6 @@ void Connector::Writer::handle(framing::AMQFrame& frame) {
frames.push_back(frame);
if (frame.getEof()) {//or if we already have a buffers worth
lastEof = frames.size();
- QPID_LOG(debug, "Requesting write: lastEof=" << lastEof);
aio->notifyPendingWrite();
}
QPID_LOG(trace, "SENT " << identifier << ": " << frame);
@@ -163,8 +162,6 @@ void Connector::Writer::handle(framing::AMQFrame& frame) {
void Connector::Writer::writeOne(const Mutex::ScopedLock& l) {
assert(buffer);
- QPID_LOG(trace, "Write buffer " << encode.getPosition()
- << " bytes " << framesEncoded << " frames ");
framesEncoded = 0;
buffer->dataStart = 0;
@@ -193,7 +190,6 @@ void Connector::Writer::write(sys::AsynchIO&) {
frame.encode(encode);
++framesEncoded;
bytesWritten += size;
- QPID_LOG(debug, "Wrote frame: lastEof=" << lastEof << ", i=" << i);
}
frames.erase(frames.begin(), frames.begin()+lastEof);
lastEof = 0;
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index d18fd452e4..1d81a50e1c 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -109,8 +109,7 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) {
void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) {
QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
- // FIXME aconway 2008-07-03: More efficient buffer management.
- // Cache coded form of decoded frames for re-encoding?
+ char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
Buffer buf(buffer);
frame.encode(buf);
encodePtr(buf, connection);
@@ -161,7 +160,8 @@ void Cluster::deliver(
try {
Buffer buf(static_cast<char*>(msg), msg_len);
AMQFrame frame;
- frame.decode(buf);
+ if (!frame.decode(buf)) // Not enough data.
+ throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: cluster error handling.
ConnectionInterceptor* connection;
decodePtr(buf, connection);
QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
@@ -170,7 +170,6 @@ void Cluster::deliver(
QPID_LOG(warning, "Unexpected DLVR, already left the cluster.");
return;
}
-
if (connection && from != self) // Look up shadow for remote connections
connection = getShadowConnection(from, connection);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 7ce0354a80..f5a695de24 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -131,7 +131,6 @@ class Cluster : private sys::Runnable, private Cpg::Handler, public RefCounted
Id self;
ShadowConnectionMap shadowConnectionMap;
ShadowConnectionOutputHandler shadowOut;
- char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 567896d44d..9a907fc476 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -244,7 +244,32 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) {
BOOST_CHECK(c1.subs.get(msg, "q"));
BOOST_CHECK_EQUAL("bar", msg.getData());
- // Queue should be empty on all queues.
+ // Queue should be empty on all cluster members.
+ BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount());
+ BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
+ BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
+}
+
+QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
+ ClusterFixture cluster(3);
+ // First start a subscription.
+ Client c0(cluster[0]);
+ c0.session.queueDeclare("q");
+ c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
+ // Now send messages
+ Client c1(cluster[1]);
+ c1.session.messageTransfer(arg::content=TransferContent("foo", "q"));
+ c1.session.messageTransfer(arg::content=TransferContent("bar", "q"));
+
+ // Check they arrived
+ Message m;
+ BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC));
+ BOOST_CHECK_EQUAL("foo", m.getData());
+ BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC));
+ BOOST_CHECK_EQUAL("bar", m.getData());
+
+ // Queue should be empty on all cluster members.
+ Client c2(cluster[2]);
BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount());
BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());