summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-09-30 20:56:15 +0000
committerAlan Conway <aconway@apache.org>2011-09-30 20:56:15 +0000
commitde0cd1fa6022a5143435010cc76a10dfbe2706ad (patch)
treeeddc56154ba93d2f7cfbeeef5403cc260e18b756
parent603735047e6fb4ab6cc49cc384b4c09109f2f165 (diff)
downloadqpid-python-de0cd1fa6022a5143435010cc76a10dfbe2706ad.tar.gz
QPID-2920: Send messages frame by frame.
The sender picks a channel number unique within that sender. Messages are sent over CPG frame-by-frame and assembled based on the sender and channel number. Channel numbers can be re-used once the send is complete. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1177833 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/cluster.mk3
-rw-r--r--qpid/cpp/src/qpid/cluster/Cpg.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp24
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp16
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/EventHandler.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/HandlerBase.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageBuilders.cpp76
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageBuilders.h73
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp38
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.h6
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/UniqueIds.h70
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/WiringHandler.h2
-rw-r--r--qpid/cpp/xml/cluster.xml2
19 files changed, 294 insertions, 40 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index 29b609bacd..70bcc8e95a 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -126,6 +126,8 @@ cluster2_la_SOURCES = \
qpid/cluster/exp/hash.h \
qpid/cluster/exp/HandlerBase.cpp \
qpid/cluster/exp/HandlerBase.h \
+ qpid/cluster/exp/MessageBuilders.cpp \
+ qpid/cluster/exp/MessageBuilders.h \
qpid/cluster/exp/MessageHandler.cpp \
qpid/cluster/exp/MessageHandler.h \
qpid/cluster/exp/Multicaster.cpp \
@@ -138,6 +140,7 @@ cluster2_la_SOURCES = \
qpid/cluster/exp/QueueReplica.h \
qpid/cluster/exp/Settings.cpp \
qpid/cluster/exp/Settings.h \
+ qpid/cluster/exp/UniqueIds.h \
qpid/cluster/exp/WiringHandler.cpp \
qpid/cluster/exp/WiringHandler.h
diff --git a/qpid/cpp/src/qpid/cluster/Cpg.h b/qpid/cpp/src/qpid/cluster/Cpg.h
index 6b81c602bd..1b2b7e1587 100644
--- a/qpid/cpp/src/qpid/cluster/Cpg.h
+++ b/qpid/cpp/src/qpid/cluster/Cpg.h
@@ -113,6 +113,8 @@ class Cpg : public sys::IOHandle {
MemberId self() const;
int getFd();
+
+ std::string getName() const { return str(group); }
private:
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index 484b64f25d..4a33d737d7 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -41,6 +41,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/Buffer.h"
#include "qpid/log/Statement.h"
+#include <boost/bind.hpp>
namespace qpid {
namespace cluster {
@@ -84,15 +85,26 @@ BrokerContext::BrokerContext(Core& c) : core(c) {}
BrokerContext::~BrokerContext() {}
+
+namespace {
+void sendFrame(Multicaster& mcaster, const AMQFrame& frame, uint16_t channel) {
+ AMQFrame copy(frame);
+ copy.setChannel(channel);
+ mcaster.mcast(copy);
+}
+}
+
bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg)
{
if (!tssReplicate) return true;
- // FIXME aconway 2010-10-20: replicate message in fragments
- // (frames), using fixed size bufffers.
- std::string data(msg->encodedSize(),char());
- framing::Buffer buf(&data[0], data.size());
- msg->encode(buf);
- mcaster(queue).mcast(ClusterMessageEnqueueBody(pv, queue.getName(), data));
+ // FIXME aconway 2011-09-29: for async completion the
+ // UniqueIds::release must move to self-delivery so we can
+ // identify the same message.
+ UniqueIds<uint16_t>::Scope s(channels);
+ uint16_t channel = s.id;
+ mcaster(queue).mcast(ClusterMessageEnqueueBody(pv, queue.getName(), channel));
+ std::for_each(msg->getFrames().begin(), msg->getFrames().end(),
+ boost::bind(&sendFrame, boost::ref(mcaster(queue)), _1, channel));
return false; // Strict order, wait for CPG self-delivery to enqueue.
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
index 825a975292..01909b136d 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
@@ -22,6 +22,7 @@
*
*/
+#include "UniqueIds.h"
#include "qpid/broker/Cluster.h"
#include "qpid/sys/AtomicValue.h"
@@ -85,6 +86,7 @@ class BrokerContext : public broker::Cluster
Multicaster& mcaster(const std::string&);
Core& core;
+ UniqueIds<uint16_t> channels;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
index 6685cb6390..a7e514e675 100644
--- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
@@ -62,26 +62,26 @@ void EventHandler::deliver(
sender = MemberId(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
framing::AMQFrame frame;
+ // FIXME aconway 2011-09-29: don't decode own frame bodies. Ignore based on channel.
while (buf.available()) {
frame.decode(buf);
- assert(frame.getBody());
- QPID_LOG(trace, "cluster deliver: " << PrettyId(sender, self) << " "
- << *frame.getBody());
+ QPID_LOG(trace, "cluster deliver on " << cpg.getName() << " from "<< PrettyId(sender, self) << ": " << frame);
try {
- invoke(*frame.getBody());
+ handle(frame);
} catch (const std::exception& e) {
// Note: exceptions are assumed to be survivable,
// fatal errors should log a message and call Core::fatal.
QPID_LOG(error, e.what());
+ // FIXME aconway 2011-09-29: error handling
}
}
}
-void EventHandler::invoke(const framing::AMQBody& body) {
+void EventHandler::handle(const framing::AMQFrame& frame) {
for (Handlers::iterator i = handlers.begin(); i != handlers.end(); ++i)
- if ((*i)->invoke(body)) return;
- QPID_LOG(error, "Cluster received unknown control: " << body );
- assert(0); // Error handling
+ if ((*i)->handle(frame)) return;
+ QPID_LOG(error, "Cluster received unknown frame: " << frame );
+ assert(0); // FIXME aconway 2011-09-29: Error handling
}
struct PrintAddrs {
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
index 4af43fb76e..cc7caaac89 100644
--- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
@@ -49,7 +49,6 @@ class EventHandler : public Cpg::Handler
public:
EventHandler(boost::shared_ptr<sys::Poller> poller,
boost::function<void()> onError);
-
~EventHandler();
/** Add a handler */
@@ -79,7 +78,7 @@ class EventHandler : public Cpg::Handler
Cpg& getCpg() { return cpg; }
private:
- void invoke(const framing::AMQBody& body);
+ void handle(const framing::AMQFrame&);
Cpg cpg;
PollerDispatch dispatcher;
diff --git a/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h
index f0c6650994..aae1538e6a 100644
--- a/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h
+++ b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h
@@ -28,6 +28,7 @@ namespace qpid {
namespace framing {
class AMQBody;
+class AMQFrame;
}
namespace cluster {
@@ -42,7 +43,7 @@ class HandlerBase : public RefCounted
HandlerBase(EventHandler&);
virtual ~HandlerBase();
- virtual bool invoke(const framing::AMQBody& body) = 0;
+ virtual bool handle(const framing::AMQFrame&) = 0;
virtual void left(const MemberId&) {}
virtual void joined(const MemberId&) {}
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.cpp
new file mode 100644
index 0000000000..5d6e165d7a
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.cpp
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MessageBuilders.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/broker/Message.h"
+
+namespace qpid {
+namespace cluster {
+
+MessageBuilders::MessageBuilders(broker::MessageStore* s) : store(s) {}
+
+void MessageBuilders::announce(MemberId sender, uint16_t channel,
+ const boost::shared_ptr<broker::Queue>& queue)
+{
+ ChannelId key(sender, channel);
+ if (map.find(key) != map.end())
+ throw Exception(
+ QPID_MSG("MessageBuilder channel " << channel << " on " << sender
+ << " is already assigned."));
+ map[key] = std::make_pair(queue, new broker::MessageBuilder(store));
+}
+
+bool MessageBuilders::handle(
+ MemberId sender,
+ const framing::AMQFrame& frame,
+ boost::shared_ptr<broker::Queue>& queueOut,
+ boost::intrusive_ptr<broker::Message>& messageOut)
+{
+ ChannelId key(sender, frame.getChannel());
+ Map::iterator i = map.find(key);
+ if (i == map.end())
+ throw Exception(QPID_MSG("MessageBuilder channel " << frame.getChannel()
+ << " on " << sender << " is not assigned."));
+ boost::shared_ptr<broker::MessageBuilder> msgBuilder = i->second.second;
+ // Nasty bit of code pasted from broker::SessionState::handleContent.
+ // Should really be part of broker::MessageBuilder
+ if (frame.getBof() && frame.getBos()) //start of frameset
+ msgBuilder->start(0);
+ msgBuilder->handle(const_cast<framing::AMQFrame&>(frame));
+ if (frame.getEof() && frame.getEos()) { //end of frameset
+ if (frame.getBof()) {
+ //i.e this is a just a command frame, add a dummy header
+ framing::AMQFrame header((framing::AMQHeaderBody()));
+ header.setBof(false);
+ header.setEof(false);
+ msgBuilder->getMessage()->getFrames().append(header);
+ }
+ queueOut = i->second.first;
+ messageOut = msgBuilder->getMessage();
+ map.erase(key);
+ return true;
+ }
+ return false;
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.h b/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.h
new file mode 100644
index 0000000000..d9c7339749
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.h
@@ -0,0 +1,73 @@
+#ifndef QPID_CLUSTER_EXP_MESSAGEBUILDERS_H
+#define QPID_CLUSTER_EXP_MESSAGEBUILDERS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/cluster/types.h"
+#include "qpid/broker/MessageBuilder.h"
+#include <map>
+
+namespace qpid {
+namespace broker {
+class Queue;
+}
+namespace framing {
+class AMQFrame;
+}
+
+namespace cluster {
+
+/**
+ * Build messages received by CPG delivery.
+ */
+class MessageBuilders
+{
+ public:
+ MessageBuilders(broker::MessageStore* store);
+
+ /** Announce a message for queue arriving on channel from sender. */
+ void announce(MemberId sender, uint16_t channel,
+ const boost::shared_ptr<broker::Queue>&);
+
+ /** Add a frame to the message in progress.
+ *@param sender member that sent the frame.
+ *@param frame is the frame to add.
+ *@param queueOut set to the queue if message complete.
+ *@param messageOut set to message if message complete.
+ *@return True if the frame completes a message
+ */
+ bool handle(MemberId sender, const framing::AMQFrame& frame,
+ boost::shared_ptr<broker::Queue>& queueOut,
+ boost::intrusive_ptr<broker::Message>& messageOut);
+
+ private:
+ typedef std::pair<MemberId, uint16_t> ChannelId;
+ typedef std::pair<boost::shared_ptr<broker::Queue>,
+ boost::shared_ptr<broker::MessageBuilder> > QueueBuilder;
+ typedef std::map<ChannelId, QueueBuilder> Map;
+ Map map;
+ broker::MessageStore* store;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EXP_MESSAGEBUILDERS_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
index b73a3747c0..1608d0e6ec 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -31,22 +31,43 @@
#include "qpid/broker/Queue.h"
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/Buffer.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/sys/Thread.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
namespace qpid {
namespace cluster {
+
using namespace broker;
+using namespace framing;
MessageHandler::MessageHandler(EventHandler& e, Core& c) :
HandlerBase(e),
broker(c.getBroker()),
- core(c)
+ core(c),
+ messageBuilders(&c.getBroker().getStore())
{}
-bool MessageHandler::invoke(const framing::AMQBody& body) {
- return framing::invoke(*this, body).wasHandled();
+bool MessageHandler::handle(const framing::AMQFrame& frame) {
+ assert(frame.getBody());
+ const AMQBody& body = *frame.getBody();
+ if (framing::invoke(*this, body).wasHandled()) return true;
+ // Test for message frame
+ if (body.type() == HEADER_BODY || body.type() == CONTENT_BODY ||
+ (body.getMethod() && body.getMethod()->isA<MessageTransferBody>()))
+ {
+ boost::shared_ptr<broker::Queue> queue;
+ boost::intrusive_ptr<broker::Message> message;
+ if (messageBuilders.handle(sender(), frame, queue, message)) {
+ BrokerContext::ScopedSuppressReplication ssr;
+ queue->deliver(message);
+ }
+ // FIXME aconway 2011-09-29: async completion goes here.
+ // For own messages need to release the channel assigned by BrokerContext.
+ return true;
+ }
+ return false;
}
boost::shared_ptr<broker::Queue> MessageHandler::findQueue(
@@ -57,17 +78,10 @@ boost::shared_ptr<broker::Queue> MessageHandler::findQueue(
return queue;
}
-void MessageHandler::enqueue(const std::string& q, const std::string& message) {
-
+void MessageHandler::enqueue(const std::string& q, uint16_t channel) {
boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed");
- // FIXME aconway 2010-10-28: decode message by frame in bounded-size buffers.
// FIXME aconway 2011-09-28: don't re-decode my own messages
- boost::intrusive_ptr<broker::Message> msg = new broker::Message();
- framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
- msg->decodeHeader(buf);
- msg->decodeContent(buf);
- BrokerContext::ScopedSuppressReplication ssr;
- queue->deliver(msg);
+ messageBuilders.announce(sender(), channel, queue);
}
// FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
index d97999b738..f1d3dc2726 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
@@ -25,6 +25,7 @@
// TODO aconway 2010-10-19: experimental cluster code.
#include "HandlerBase.h"
+#include "MessageBuilders.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include <boost/intrusive_ptr.hpp>
#include <map>
@@ -54,9 +55,9 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler
public:
MessageHandler(EventHandler&, Core&);
- bool invoke(const framing::AMQBody& body);
+ bool handle(const framing::AMQFrame&);
- void enqueue(const std::string& queue, const std::string& message);
+ void enqueue(const std::string& queue, uint16_t channel);
void acquire(const std::string& queue, uint32_t position);
void dequeue(const std::string& queue, uint32_t position);
void requeue(const std::string& queue, uint32_t position, bool redelivered);
@@ -66,6 +67,7 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler
broker::Broker& broker;
Core& core;
+ MessageBuilders messageBuilders;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
index 9d8a00e217..84b2145691 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
@@ -52,7 +52,7 @@ Multicaster::Multicaster(Cpg& cpg_,
}
void Multicaster::mcast(const framing::AMQFrame& data) {
- QPID_LOG(trace, "cluster multicast: " << data);
+ QPID_LOG(trace, "cluster multicast on " << cpg.getName() << ": " << data);
BufferRef bufRef = buffers.get(data.encodedSize());
framing::Buffer buf(bufRef.begin(), bufRef.size());
data.encode(buf);
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index c3ff94b897..1de1c1850c 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -108,7 +108,7 @@ void QueueContext::timeout() {
queue.stopConsumers();
}
-// Callback set up by queue.stopConsumers() called in connection thread.
+// Callback set up by queue.stopConsumers() called in connection or timer thread.
// Called when no threads are dispatching from the queue.
void QueueContext::stopped() {
sys::Mutex::ScopedLock l(lock);
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
index 33f805ab82..50638b9cb3 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
@@ -36,8 +36,8 @@ namespace cluster {
QueueHandler::QueueHandler(EventHandler& eh, Multicaster& m, const Settings& s)
: HandlerBase(eh), multicaster(m), consumeLock(s.getConsumeLock()) {}
-bool QueueHandler::invoke(const framing::AMQBody& body) {
- return framing::invoke(*this, body).wasHandled();
+bool QueueHandler::handle(const framing::AMQFrame& frame) {
+ return framing::invoke(*this, *frame.getBody()).wasHandled();
}
void QueueHandler::subscribe(const std::string& queue) {
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
index 920c5de0b2..6bf6e66eec 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
@@ -56,7 +56,7 @@ class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler,
public:
QueueHandler(EventHandler&, Multicaster&, const Settings&);
- bool invoke(const framing::AMQBody& body);
+ bool handle(const framing::AMQFrame& body);
// Events
void subscribe(const std::string& queue);
diff --git a/qpid/cpp/src/qpid/cluster/exp/UniqueIds.h b/qpid/cpp/src/qpid/cluster/exp/UniqueIds.h
new file mode 100644
index 0000000000..624fde7214
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/UniqueIds.h
@@ -0,0 +1,70 @@
+#ifndef QPID_CLUSTER_EXP_UNIQUEIDS_H
+#define QPID_CLUSTER_EXP_UNIQUEIDS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+#include <set>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Assign ID numbers, ensuring that all the assigned IDs are unique
+ * T is the numeric type - actually any type with >, == and ++ will do.
+ */
+template <class T> class UniqueIds
+{
+ public:
+ /** Get an ID that is different from all other active IDs.
+ *@return the ID, which is now considered active.
+ */
+ T get() {
+ sys::Mutex::ScopedLock l(lock);
+ T old = mark;
+ while (active.find(++mark) != active.end() && mark != old)
+ ;
+ assert(mark != old); // check wrap-around
+ active.insert(mark);
+ return mark;
+ }
+ /** Release an ID, so it is inactive and available for re-use */
+ void release(T id) {
+ sys::Mutex::ScopedLock l(lock);
+ active.erase(id);
+ }
+ /** Allocate an ID, release automatically at end of scope */
+ struct Scope {
+ UniqueIds& ids;
+ T id;
+ Scope(UniqueIds& ids_) : ids(ids_), id(ids.get()) {}
+ ~Scope() { ids.release(id); }
+ };
+
+ private:
+ sys::Mutex lock;
+ std::set<T> active;
+ T mark;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EXP_UNIQUEIDS_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
index 79e9d657bc..b96ae0b30f 100644
--- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
@@ -50,8 +50,8 @@ WiringHandler::WiringHandler(EventHandler& e,
queueHandler(qh)
{}
-bool WiringHandler::invoke(const framing::AMQBody& body) {
- return framing::invoke(*this, body).wasHandled();
+bool WiringHandler::handle(const framing::AMQFrame& frame) {
+ return framing::invoke(*this, *frame.getBody()).wasHandled();
}
void WiringHandler::createQueue(const std::string& data) {
diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
index cd964de997..7a07c7098e 100644
--- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
@@ -53,7 +53,7 @@ class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler,
public:
WiringHandler(EventHandler&, const boost::intrusive_ptr<QueueHandler>& qh, broker::Broker&);
- bool invoke(const framing::AMQBody& body);
+ bool handle(const framing::AMQFrame&);
void createQueue(const std::string& data);
void destroyQueue(const std::string& name);
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 5fba6b4036..0d325c4d12 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -333,7 +333,7 @@
<class name="cluster-message" code="0x82">
<control name="enqueue" code="0x2">
<field name="queue" type="queue.name"/>
- <field name="message" type="str32"/>
+ <field name="channel" type="uint16"/>
</control>
<control name="acquire" code="0x4">