diff options
author | Alan Conway <aconway@apache.org> | 2008-08-05 19:29:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-08-05 19:29:09 +0000 |
commit | b5f8cf1bd9b5652e2691d6bc5b9b1c3228f53d68 (patch) | |
tree | 3c74eaf22c916844bea3b130a8ee6bd2635261d7 /cpp | |
parent | bd47fd629bb2356df93af70b174a6a070f3a58cc (diff) | |
download | qpid-python-b5f8cf1bd9b5652e2691d6bc5b9b1c3228f53d68.tar.gz |
Fix Cluster::send encode race.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@682885 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/Daemon.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 1 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 27 |
5 files changed, 30 insertions, 11 deletions
diff --git a/cpp/src/qpid/broker/Daemon.cpp b/cpp/src/qpid/broker/Daemon.cpp index 89d0e20a2b..c311730f76 100644 --- a/cpp/src/qpid/broker/Daemon.cpp +++ b/cpp/src/qpid/broker/Daemon.cpp @@ -121,7 +121,7 @@ uint16_t Daemon::wait(int timeout) { // parent waits for child. FD_ZERO(&fds); FD_SET(pipeFds[0], &fds); int n=select(FD_SETSIZE, &fds, 0, 0, &tv); - if(n==0) throw ErrnoException("Timed out waiting for daemon"); + if(n==0) throw Exception("Timed out waiting for daemon"); if(n<0) throw ErrnoException("Error waiting for daemon"); uint16_t port = 0; /* diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 524155b929..f4f414bc63 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -155,7 +155,6 @@ void Connector::Writer::handle(framing::AMQFrame& frame) { frames.push_back(frame); if (frame.getEof()) {//or if we already have a buffers worth lastEof = frames.size(); - QPID_LOG(debug, "Requesting write: lastEof=" << lastEof); aio->notifyPendingWrite(); } QPID_LOG(trace, "SENT " << identifier << ": " << frame); @@ -163,8 +162,6 @@ void Connector::Writer::handle(framing::AMQFrame& frame) { void Connector::Writer::writeOne(const Mutex::ScopedLock& l) { assert(buffer); - QPID_LOG(trace, "Write buffer " << encode.getPosition() - << " bytes " << framesEncoded << " frames "); framesEncoded = 0; buffer->dataStart = 0; @@ -193,7 +190,6 @@ void Connector::Writer::write(sys::AsynchIO&) { frame.encode(encode); ++framesEncoded; bytesWritten += size; - QPID_LOG(debug, "Wrote frame: lastEof=" << lastEof << ", i=" << i); } frames.erase(frames.begin(), frames.begin()+lastEof); lastEof = 0; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index d18fd452e4..1d81a50e1c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -109,8 +109,7 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) { void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) { QPID_LOG(trace, "MCAST [" << connection << "] " << frame); - // FIXME aconway 2008-07-03: More efficient buffer management. - // Cache coded form of decoded frames for re-encoding? + char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management. Buffer buf(buffer); frame.encode(buf); encodePtr(buf, connection); @@ -161,7 +160,8 @@ void Cluster::deliver( try { Buffer buf(static_cast<char*>(msg), msg_len); AMQFrame frame; - frame.decode(buf); + if (!frame.decode(buf)) // Not enough data. + throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: cluster error handling. ConnectionInterceptor* connection; decodePtr(buf, connection); QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame); @@ -170,7 +170,6 @@ void Cluster::deliver( QPID_LOG(warning, "Unexpected DLVR, already left the cluster."); return; } - if (connection && from != self) // Look up shadow for remote connections connection = getShadowConnection(from, connection); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 7ce0354a80..f5a695de24 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -131,7 +131,6 @@ class Cluster : private sys::Runnable, private Cpg::Handler, public RefCounted Id self; ShadowConnectionMap shadowConnectionMap; ShadowConnectionOutputHandler shadowOut; - char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management. friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 567896d44d..9a907fc476 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -244,7 +244,32 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) { BOOST_CHECK(c1.subs.get(msg, "q")); BOOST_CHECK_EQUAL("bar", msg.getData()); - // Queue should be empty on all queues. + // Queue should be empty on all cluster members. + BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount()); + BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount()); + BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); +} + +QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { + ClusterFixture cluster(3); + // First start a subscription. + Client c0(cluster[0]); + c0.session.queueDeclare("q"); + c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2)); + // Now send messages + Client c1(cluster[1]); + c1.session.messageTransfer(arg::content=TransferContent("foo", "q")); + c1.session.messageTransfer(arg::content=TransferContent("bar", "q")); + + // Check they arrived + Message m; + BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC)); + BOOST_CHECK_EQUAL("foo", m.getData()); + BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC)); + BOOST_CHECK_EQUAL("bar", m.getData()); + + // Queue should be empty on all cluster members. + Client c2(cluster[2]); BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount()); BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount()); BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); |