summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-12-11 22:50:02 +0000
committerAlan Conway <aconway@apache.org>2008-12-11 22:50:02 +0000
commite1d0293e7944c73995b52e2352d60ae8ba9ebc3d (patch)
tree2b14f861d8b6c46776cb00e7d1925343b740c88b /cpp/src
parentcc781622299a4de5af2fdde6bfc1e2eb42e1623a (diff)
downloadqpid-python-e1d0293e7944c73995b52e2352d60ae8ba9ebc3d.tar.gz
cluster: refactor multicast concerns into separate Multicaster class with separate locking.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@725853 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/cluster.mk3
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp72
-rw-r--r--cpp/src/qpid/cluster/Cluster.h62
-rw-r--r--cpp/src/qpid/cluster/ClusterLeaveException.h35
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp4
-rw-r--r--cpp/src/qpid/cluster/Connection.h1
-rw-r--r--cpp/src/qpid/cluster/Multicaster.cpp81
-rw-r--r--cpp/src/qpid/cluster/Multicaster.h69
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp4
9 files changed, 237 insertions, 94 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index fe2342a416..5c44fa7c76 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -46,6 +46,9 @@ cluster_la_SOURCES = \
qpid/cluster/ClusterMap.cpp \
qpid/cluster/FailoverExchange.h \
qpid/cluster/FailoverExchange.cpp \
+ qpid/cluster/Multicaster.h \
+ qpid/cluster/Multicaster.cpp \
+ qpid/cluster/ClusterLeaveException.h \
qpid/cluster/Quorum.h
cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 222aa07548..aac5bc1dd8 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -90,20 +90,19 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
name(name_),
myUrl(url_),
myId(cpg.self()),
+ readMax(readMax_),
cpgDispatchHandle(
cpg,
boost::bind(&Cluster::dispatch, this, _1), // read
0, // write
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
- deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
- mcastQueue(boost::bind(&Cluster::sendMcast, this, _1), poller),
- mcastId(0),
+ mcast(cpg, poller),
mgmtObject(0),
+ deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
state(INIT),
lastSize(0),
- lastBroker(false),
- readMax(readMax_)
+ lastBroker(false)
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
@@ -116,7 +115,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
failoverExchange.reset(new FailoverExchange(this));
cpgDispatchHandle.startWatch(poller);
deliverQueue.start();
- mcastQueue.start();
QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
if (quorum_) quorum.init();
cpg.join(name);
@@ -135,49 +133,6 @@ void Cluster::erase(ConnectionId id) {
connections.erase(id);
}
-void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id, uint32_t seq) {
- Event e(Event::control(body, id, seq));
- QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
- mcast(e);
-}
-
-void Cluster::mcastControl(const framing::AMQBody& body) {
- Event e(Event::control(body, ConnectionId(myId,0), ++mcastId));
- QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
- mcast(e);
-}
-
-void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id) {
- Event e(DATA, connection, size, id);
- memcpy(e.getData(), data, size);
- {
- Lock l(lock);
- if (state <= CATCHUP && e.isConnection()) {
- // Stall outgoing connection events untill we are fully READY
- QPID_LOG(trace, *this << " MCAST deferred: " << e );
- mcastStallQueue.push_back(e);
- return;
- }
- }
- QPID_LOG(trace, *this << " MCAST " << e);
- mcast(e);
-}
-
-void Cluster::mcast(const Event& e) { mcastQueue.push(e); }
-
-void Cluster::sendMcast(PollableEventQueue::Queue& values) {
- try {
- PollableEventQueue::Queue::iterator i = values.begin();
- while (i != values.end() && i->mcast(cpg))
- ++i;
- values.erase(values.begin(), i);
- }
- catch (const std::exception& e) {
- QPID_LOG(critical, "Multicast failure: " << e.what());
- leave();
- }
-}
-
std::vector<Url> Cluster::getUrls() const {
Lock l(lock);
return getUrls(l);
@@ -315,7 +270,6 @@ ostream& operator<<(ostream& o, const AddrList& a) {
void Cluster::dispatch(sys::DispatchHandle& h) {
try {
cpg.dispatchAll();
- mcastQueue.start(); // In case it was stopped by flow control.
h.rewatch();
} catch (const std::exception& e) {
QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what());
@@ -361,7 +315,9 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
if (state == INIT) { // First configChange
if (map.aliveCount() == 1) {
setClusterId(true);
+ // FIXME aconway 2008-12-11: Centralize transition to READY and associated actions eg mcast.release()
state = READY;
+ mcast.release();
QPID_LOG(notice, *this << " first in cluster");
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
map = ClusterMap(myId, myUrl, true);
@@ -370,7 +326,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
else { // Joining established group.
state = NEWBIE;
QPID_LOG(info, *this << " joining cluster: " << map);
- mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()));
+ mcast.mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), myId);
}
}
else if (state >= READY && memberChange)
@@ -384,7 +340,7 @@ void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isNewbie(id)) {
state = OFFER;
QPID_LOG(info, *this << " send dump-offer to " << id);
- mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId));
+ mcast.mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), myId);
}
}
@@ -414,10 +370,10 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
memberUpdate(l);
if (state == CATCHUP && id == myId) {
state = READY;
+ mcast.release();
QPID_LOG(notice, *this << " caught up, active cluster member");
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
- for_each(mcastStallQueue.begin(), mcastStallQueue.end(), boost::bind(&Cluster::mcast, this, _1));
- mcastStallQueue.clear();
+ mcast.release();
}
}
@@ -432,6 +388,7 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid&
}
else { // Another offer was first.
state = READY;
+ mcast.release();
QPID_LOG(info, *this << " cancelled dump offer to " << dumpee);
tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer.
}
@@ -461,6 +418,7 @@ void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) {
boost::bind(&Cluster::dumpOutError, this, _1)));
}
+// Called in dump thread.
void Cluster::dumpInDone(const ClusterMap& m) {
Lock l(lock);
dumpedMap = m;
@@ -471,8 +429,7 @@ void Cluster::checkDumpIn(Lock& ) {
if (state == LEFT) return;
if (state == DUMPEE && dumpedMap) {
map = *dumpedMap;
- mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()));
- // Don't flush the mcast queue till we are READY, on self-deliver.
+ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
state = CATCHUP;
QPID_LOG(info, *this << " received dump, starting catch-up");
deliverQueue.start();
@@ -487,6 +444,7 @@ void Cluster::dumpOutDone() {
void Cluster::dumpOutDone(Lock& l) {
assert(state == DUMPER);
state = READY;
+ mcast.release();
QPID_LOG(info, *this << " sent dump");
deliverQueue.start();
tryMakeOffer(map.firstNewbie(), l); // Try another offer
@@ -523,7 +481,7 @@ void Cluster::stopClusterNode(Lock& l) {
void Cluster::stopFullCluster(Lock& ) {
QPID_LOG(notice, *this << " shutting down cluster " << name);
- mcastControl(ClusterShutdownBody());
+ mcast.mcastControl(ClusterShutdownBody(), myId);
}
void Cluster::memberUpdate(Lock& l) {
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index feeb68fd4b..f962f4c72f 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -26,6 +26,7 @@
#include "ConnectionMap.h"
#include "FailoverExchange.h"
#include "Quorum.h"
+#include "Multicaster.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/PollableQueue.h"
@@ -55,8 +56,10 @@ namespace cluster {
class Connection;
/**
- * Connection to the cluster.
+ * Connection to the cluster
*
+ * Threading notes: 3 thread categories: connection, deliver, dump.
+ *
*/
class Cluster : private Cpg::Handler, public management::Manageable {
public:
@@ -70,29 +73,26 @@ class Cluster : private Cpg::Handler, public management::Manageable {
virtual ~Cluster();
- // Connection map
+ // Connection map - called in connection threads.
void insert(const ConnectionPtr&);
void erase(ConnectionId);
- // Send to the cluster
- void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&, uint32_t id);
- void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id);
-
- // URLs of current cluster members.
+ // URLs of current cluster members - called in connection threads.
std::vector<Url> getUrls() const;
boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; }
- // Leave the cluster
+ // Leave the cluster - called in any thread.
void leave();
- // Dump completedJo
+ // Dump completed - called in dump thread
void dumpInDone(const ClusterMap&);
MemberId getId() const;
broker::Broker& getBroker() const;
+ Multicaster& getMulticast() { return mcast; }
boost::function<bool ()> isQuorate;
- void checkQuorum();
+ void checkQuorum(); // called in connection threads.
size_t getReadMax() { return readMax; }
@@ -109,22 +109,17 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// The parameter makes it hard to forget since you have to have an instance of
// a Lock to call the unlocked functions.
- void mcastControl(const framing::AMQBody& controlBody);
- void mcast(const Event& e);
-
void leave(Lock&);
std::vector<Url> getUrls(Lock&) const;
- void sendMcast(PollableEventQueue::Queue& );
-
- // Called via CPG, deliverQueue or DumpClient threads.
+ // Make an offer if we can - called in deliver thread.
void tryMakeOffer(const MemberId&, Lock&);
// Called in main thread in ~Broker.
void brokerShutdown();
// Cluster controls implement XML methods from cluster.xml.
- // May be called in CPG thread via deliver() OR in deliverQueue thread.
+ // Called in deliver thread.
//
void dumpRequest(const MemberId&, const std::string&, Lock&);
void dumpOffer(const MemberId& dumper, uint64_t dumpee, const framing::Uuid&, Lock&);
@@ -134,6 +129,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void delivered(PollableEventQueue::Queue&); // deliverQueue callback
void deliveredEvent(const Event&);
+ // Helper, called in deliver thread.
void dumpStart(const MemberId& dumpee, const Url& url, Lock&);
// CPG callbacks, called in CPG IO thread.
@@ -177,25 +173,31 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void setClusterId(const framing::Uuid&);
- mutable sys::Monitor lock;
-
+ // Immutable members set on construction, never changed.
broker::Broker& broker;
boost::shared_ptr<sys::Poller> poller;
Cpg cpg;
const std::string name;
const Url myUrl;
const MemberId myId;
-
- ConnectionMap connections;
+ const size_t readMax;
+ framing::Uuid clusterId;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
- PollableEventQueue deliverQueue, mcastQueue;
- PlainEventQueue mcastStallQueue;
- uint32_t mcastId;
- framing::Uuid clusterId;
+
+ // Thread safe members
+ Multicaster mcast;
qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
+ PollableEventQueue deliverQueue;
+ ConnectionMap connections;
+ boost::shared_ptr<FailoverExchange> failoverExchange;
+ Quorum quorum;
+
+ // Remaining members are protected by lock.
+ mutable sys::Monitor lock;
+ // Local cluster state, cluster map
enum {
INIT, ///< Initial state, no CPG messages received.
NEWBIE, ///< Sent dump request, waiting for dump offer.
@@ -206,17 +208,13 @@ class Cluster : private Cpg::Handler, public management::Manageable {
DUMPER, ///< Offer accepted, sending a state dump.
LEFT ///< Final state, left the cluster.
} state;
-
ClusterMap map;
- sys::Thread dumpThread;
- boost::optional<ClusterMap> dumpedMap;
-
size_t lastSize;
bool lastBroker;
- boost::shared_ptr<FailoverExchange> failoverExchange;
- Quorum quorum;
- size_t readMax;
+ // Dump related
+ sys::Thread dumpThread;
+ boost::optional<ClusterMap> dumpedMap;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
diff --git a/cpp/src/qpid/cluster/ClusterLeaveException.h b/cpp/src/qpid/cluster/ClusterLeaveException.h
new file mode 100644
index 0000000000..e5bdbc560a
--- /dev/null
+++ b/cpp/src/qpid/cluster/ClusterLeaveException.h
@@ -0,0 +1,35 @@
+#ifndef QPID_CLUSTER_CLUSTERLEAVEEXCEPTION_H
+#define QPID_CLUSTER_CLUSTERLEAVEEXCEPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/Exception.h"
+
+namespace qpid {
+namespace cluster {
+
+struct ClusterLeaveException : public Exception
+{
+ ClusterLeaveException(const std::string& message=std::string()) : Exception(message) {}
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CLUSTERLEAVEEXCEPTION_H*/
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index a422164c81..f0d38bf299 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -163,7 +163,7 @@ void Connection::closed() {
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
output.setOutputHandler(discardHandler);
- cluster.mcastControl(ClusterConnectionDeliverCloseBody(), self, ++mcastSeq);
+ cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
}
}
catch (const std::exception& e) {
@@ -193,7 +193,7 @@ size_t Connection::decode(const char* buffer, size_t size) {
}
else { // Multicast local connections.
assert(isLocal());
- cluster.mcastBuffer(buffer, size, self, ++mcastSeq);
+ cluster.getMulticast().mcastBuffer(buffer, size, self);
}
return size;
}
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index ce80f42414..29e42ce534 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -163,7 +163,6 @@ class Connection :
framing::FrameDecoder localDecoder;
framing::FrameDecoder mcastDecoder;
broker::Connection connection;
- framing::SequenceNumber mcastSeq;
framing::SequenceNumber deliverSeq;
framing::ChannelId currentChannel;
boost::shared_ptr<broker::TxBuffer> txBuffer;
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp
new file mode 100644
index 0000000000..896f7c6a6e
--- /dev/null
+++ b/cpp/src/qpid/cluster/Multicaster.cpp
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Multicaster.h"
+#include "Cpg.h"
+#include "ClusterLeaveException.h"
+#include "qpid/log/Statement.h"
+
+
+namespace qpid {
+namespace cluster {
+
+Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller) :
+ cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
+ holding(true)
+{
+ queue.start();
+}
+
+void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) {
+ mcast(Event::control(body, id));
+}
+
+void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) {
+ Event e(DATA, id, size);
+ memcpy(e.getData(), data, size);
+ mcast(e);
+}
+
+void Multicaster::mcast(const Event& e) {
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (e.getType() == DATA && e.isConnection() && holding) {
+ holdingQueue.push_back(e);
+ QPID_LOG(trace, " MCAST held: " << e );
+ return;
+ }
+ }
+ queue.push(e);
+}
+
+void Multicaster::sendMcast(PollableEventQueue::Queue& values) {
+ try {
+ PollableEventQueue::Queue::iterator i = values.begin();
+ while (i != values.end() && i->mcast(cpg)) {
+ QPID_LOG(trace, " MCAST " << *i);
+ ++i;
+ }
+ values.erase(values.begin(), i);
+ }
+ catch (const std::exception& e) {
+ throw ClusterLeaveException(e.what());
+ }
+}
+
+void Multicaster::release() {
+ sys::Mutex::ScopedLock l(lock);
+ holding = false;
+ std::for_each(holdingQueue.begin(), holdingQueue.end(), boost::bind(&Multicaster::mcast, this, _1));
+ holdingQueue.clear();
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h
new file mode 100644
index 0000000000..e7aff7fe7c
--- /dev/null
+++ b/cpp/src/qpid/cluster/Multicaster.h
@@ -0,0 +1,69 @@
+#ifndef QPID_CLUSTER_MULTICASTER_H
+#define QPID_CLUSTER_MULTICASTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "Event.h"
+#include "qpid/sys/PollableQueue.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+namespace sys {
+class Poller;
+}
+
+namespace cluster {
+
+class Cpg;
+
+/**
+ * Multicast to the cluster. Shared, thread safe object.
+ */
+class Multicaster
+{
+ public:
+ /** Starts in holding mode: connection data events are held, other events are mcast */
+ Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& );
+ void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&);
+ void mcastBuffer(const char*, size_t, const ConnectionId&);
+ void mcast(const Event& e);
+ /** End holding mode, held events are mcast */
+ void release();
+
+ private:
+ typedef sys::PollableQueue<Event> PollableEventQueue;
+ typedef std::deque<Event> PlainEventQueue;
+
+ void sendMcast(PollableEventQueue::Queue& );
+
+ sys::Mutex lock;
+ Cpg& cpg;
+ PollableEventQueue queue;
+ bool holding;
+ PlainEventQueue holdingQueue;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_MULTICASTER_H*/
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 0385ffc04e..ae2a040ef3 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -99,8 +99,8 @@ void OutputInterceptor::sendDoOutput() {
// Send it anyway to keep the doOutput chain going until we are sure there's no more output
// (in deliverDoOutput)
//
- // FIXME aconway 2008-10-16: use ++parent.mcastSeq as sequence no,not 0
- parent.getCluster().mcastControl(ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), request), parent.getId(), 0);
+ parent.getCluster().getMulticast().mcastControl(
+ ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), request), parent.getId());
QPID_LOG(trace, parent << "Send doOutput request for " << request);
}