diff options
author | Alan Conway <aconway@apache.org> | 2011-09-30 20:56:15 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-09-30 20:56:15 +0000 |
commit | de0cd1fa6022a5143435010cc76a10dfbe2706ad (patch) | |
tree | eddc56154ba93d2f7cfbeeef5403cc260e18b756 | |
parent | 603735047e6fb4ab6cc49cc384b4c09109f2f165 (diff) | |
download | qpid-python-de0cd1fa6022a5143435010cc76a10dfbe2706ad.tar.gz |
QPID-2920: Send messages frame by frame.
The sender picks a channel number unique within that sender. Messages
are sent over CPG frame-by-frame and assembled based on the sender and
channel number. Channel numbers can be re-used once the send is complete.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1177833 13f79535-47bb-0310-9956-ffa450edef68
19 files changed, 294 insertions, 40 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index 29b609bacd..70bcc8e95a 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -126,6 +126,8 @@ cluster2_la_SOURCES = \ qpid/cluster/exp/hash.h \ qpid/cluster/exp/HandlerBase.cpp \ qpid/cluster/exp/HandlerBase.h \ + qpid/cluster/exp/MessageBuilders.cpp \ + qpid/cluster/exp/MessageBuilders.h \ qpid/cluster/exp/MessageHandler.cpp \ qpid/cluster/exp/MessageHandler.h \ qpid/cluster/exp/Multicaster.cpp \ @@ -138,6 +140,7 @@ cluster2_la_SOURCES = \ qpid/cluster/exp/QueueReplica.h \ qpid/cluster/exp/Settings.cpp \ qpid/cluster/exp/Settings.h \ + qpid/cluster/exp/UniqueIds.h \ qpid/cluster/exp/WiringHandler.cpp \ qpid/cluster/exp/WiringHandler.h diff --git a/qpid/cpp/src/qpid/cluster/Cpg.h b/qpid/cpp/src/qpid/cluster/Cpg.h index 6b81c602bd..1b2b7e1587 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.h +++ b/qpid/cpp/src/qpid/cluster/Cpg.h @@ -113,6 +113,8 @@ class Cpg : public sys::IOHandle { MemberId self() const; int getFd(); + + std::string getName() const { return str(group); } private: diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp index 484b64f25d..4a33d737d7 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp @@ -41,6 +41,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/Buffer.h" #include "qpid/log/Statement.h" +#include <boost/bind.hpp> namespace qpid { namespace cluster { @@ -84,15 +85,26 @@ BrokerContext::BrokerContext(Core& c) : core(c) {} BrokerContext::~BrokerContext() {} + +namespace { +void sendFrame(Multicaster& mcaster, const AMQFrame& frame, uint16_t channel) { + AMQFrame copy(frame); + copy.setChannel(channel); + mcaster.mcast(copy); +} +} + bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg) { if (!tssReplicate) return true; - // FIXME aconway 2010-10-20: replicate message in fragments - // (frames), using fixed size bufffers. - std::string data(msg->encodedSize(),char()); - framing::Buffer buf(&data[0], data.size()); - msg->encode(buf); - mcaster(queue).mcast(ClusterMessageEnqueueBody(pv, queue.getName(), data)); + // FIXME aconway 2011-09-29: for async completion the + // UniqueIds::release must move to self-delivery so we can + // identify the same message. + UniqueIds<uint16_t>::Scope s(channels); + uint16_t channel = s.id; + mcaster(queue).mcast(ClusterMessageEnqueueBody(pv, queue.getName(), channel)); + std::for_each(msg->getFrames().begin(), msg->getFrames().end(), + boost::bind(&sendFrame, boost::ref(mcaster(queue)), _1, channel)); return false; // Strict order, wait for CPG self-delivery to enqueue. } diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h index 825a975292..01909b136d 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h @@ -22,6 +22,7 @@ * */ +#include "UniqueIds.h" #include "qpid/broker/Cluster.h" #include "qpid/sys/AtomicValue.h" @@ -85,6 +86,7 @@ class BrokerContext : public broker::Cluster Multicaster& mcaster(const std::string&); Core& core; + UniqueIds<uint16_t> channels; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp index 6685cb6390..a7e514e675 100644 --- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp @@ -62,26 +62,26 @@ void EventHandler::deliver( sender = MemberId(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); framing::AMQFrame frame; + // FIXME aconway 2011-09-29: don't decode own frame bodies. Ignore based on channel. while (buf.available()) { frame.decode(buf); - assert(frame.getBody()); - QPID_LOG(trace, "cluster deliver: " << PrettyId(sender, self) << " " - << *frame.getBody()); + QPID_LOG(trace, "cluster deliver on " << cpg.getName() << " from "<< PrettyId(sender, self) << ": " << frame); try { - invoke(*frame.getBody()); + handle(frame); } catch (const std::exception& e) { // Note: exceptions are assumed to be survivable, // fatal errors should log a message and call Core::fatal. QPID_LOG(error, e.what()); + // FIXME aconway 2011-09-29: error handling } } } -void EventHandler::invoke(const framing::AMQBody& body) { +void EventHandler::handle(const framing::AMQFrame& frame) { for (Handlers::iterator i = handlers.begin(); i != handlers.end(); ++i) - if ((*i)->invoke(body)) return; - QPID_LOG(error, "Cluster received unknown control: " << body ); - assert(0); // Error handling + if ((*i)->handle(frame)) return; + QPID_LOG(error, "Cluster received unknown frame: " << frame ); + assert(0); // FIXME aconway 2011-09-29: Error handling } struct PrintAddrs { diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h index 4af43fb76e..cc7caaac89 100644 --- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h @@ -49,7 +49,6 @@ class EventHandler : public Cpg::Handler public: EventHandler(boost::shared_ptr<sys::Poller> poller, boost::function<void()> onError); - ~EventHandler(); /** Add a handler */ @@ -79,7 +78,7 @@ class EventHandler : public Cpg::Handler Cpg& getCpg() { return cpg; } private: - void invoke(const framing::AMQBody& body); + void handle(const framing::AMQFrame&); Cpg cpg; PollerDispatch dispatcher; diff --git a/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h index f0c6650994..aae1538e6a 100644 --- a/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h +++ b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h @@ -28,6 +28,7 @@ namespace qpid { namespace framing { class AMQBody; +class AMQFrame; } namespace cluster { @@ -42,7 +43,7 @@ class HandlerBase : public RefCounted HandlerBase(EventHandler&); virtual ~HandlerBase(); - virtual bool invoke(const framing::AMQBody& body) = 0; + virtual bool handle(const framing::AMQFrame&) = 0; virtual void left(const MemberId&) {} virtual void joined(const MemberId&) {} diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.cpp new file mode 100644 index 0000000000..5d6e165d7a --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.cpp @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MessageBuilders.h" +#include "qpid/log/Statement.h" +#include "qpid/Exception.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/broker/Message.h" + +namespace qpid { +namespace cluster { + +MessageBuilders::MessageBuilders(broker::MessageStore* s) : store(s) {} + +void MessageBuilders::announce(MemberId sender, uint16_t channel, + const boost::shared_ptr<broker::Queue>& queue) +{ + ChannelId key(sender, channel); + if (map.find(key) != map.end()) + throw Exception( + QPID_MSG("MessageBuilder channel " << channel << " on " << sender + << " is already assigned.")); + map[key] = std::make_pair(queue, new broker::MessageBuilder(store)); +} + +bool MessageBuilders::handle( + MemberId sender, + const framing::AMQFrame& frame, + boost::shared_ptr<broker::Queue>& queueOut, + boost::intrusive_ptr<broker::Message>& messageOut) +{ + ChannelId key(sender, frame.getChannel()); + Map::iterator i = map.find(key); + if (i == map.end()) + throw Exception(QPID_MSG("MessageBuilder channel " << frame.getChannel() + << " on " << sender << " is not assigned.")); + boost::shared_ptr<broker::MessageBuilder> msgBuilder = i->second.second; + // Nasty bit of code pasted from broker::SessionState::handleContent. + // Should really be part of broker::MessageBuilder + if (frame.getBof() && frame.getBos()) //start of frameset + msgBuilder->start(0); + msgBuilder->handle(const_cast<framing::AMQFrame&>(frame)); + if (frame.getEof() && frame.getEos()) { //end of frameset + if (frame.getBof()) { + //i.e this is a just a command frame, add a dummy header + framing::AMQFrame header((framing::AMQHeaderBody())); + header.setBof(false); + header.setEof(false); + msgBuilder->getMessage()->getFrames().append(header); + } + queueOut = i->second.first; + messageOut = msgBuilder->getMessage(); + map.erase(key); + return true; + } + return false; +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.h b/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.h new file mode 100644 index 0000000000..d9c7339749 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.h @@ -0,0 +1,73 @@ +#ifndef QPID_CLUSTER_EXP_MESSAGEBUILDERS_H +#define QPID_CLUSTER_EXP_MESSAGEBUILDERS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/cluster/types.h" +#include "qpid/broker/MessageBuilder.h" +#include <map> + +namespace qpid { +namespace broker { +class Queue; +} +namespace framing { +class AMQFrame; +} + +namespace cluster { + +/** + * Build messages received by CPG delivery. + */ +class MessageBuilders +{ + public: + MessageBuilders(broker::MessageStore* store); + + /** Announce a message for queue arriving on channel from sender. */ + void announce(MemberId sender, uint16_t channel, + const boost::shared_ptr<broker::Queue>&); + + /** Add a frame to the message in progress. + *@param sender member that sent the frame. + *@param frame is the frame to add. + *@param queueOut set to the queue if message complete. + *@param messageOut set to message if message complete. + *@return True if the frame completes a message + */ + bool handle(MemberId sender, const framing::AMQFrame& frame, + boost::shared_ptr<broker::Queue>& queueOut, + boost::intrusive_ptr<broker::Message>& messageOut); + + private: + typedef std::pair<MemberId, uint16_t> ChannelId; + typedef std::pair<boost::shared_ptr<broker::Queue>, + boost::shared_ptr<broker::MessageBuilder> > QueueBuilder; + typedef std::map<ChannelId, QueueBuilder> Map; + Map map; + broker::MessageStore* store; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EXP_MESSAGEBUILDERS_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp index b73a3747c0..1608d0e6ec 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp @@ -31,22 +31,43 @@ #include "qpid/broker/Queue.h" #include "qpid/framing/AllInvoker.h" #include "qpid/framing/Buffer.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/sys/Thread.h" #include "qpid/log/Statement.h" #include <boost/shared_ptr.hpp> namespace qpid { namespace cluster { + using namespace broker; +using namespace framing; MessageHandler::MessageHandler(EventHandler& e, Core& c) : HandlerBase(e), broker(c.getBroker()), - core(c) + core(c), + messageBuilders(&c.getBroker().getStore()) {} -bool MessageHandler::invoke(const framing::AMQBody& body) { - return framing::invoke(*this, body).wasHandled(); +bool MessageHandler::handle(const framing::AMQFrame& frame) { + assert(frame.getBody()); + const AMQBody& body = *frame.getBody(); + if (framing::invoke(*this, body).wasHandled()) return true; + // Test for message frame + if (body.type() == HEADER_BODY || body.type() == CONTENT_BODY || + (body.getMethod() && body.getMethod()->isA<MessageTransferBody>())) + { + boost::shared_ptr<broker::Queue> queue; + boost::intrusive_ptr<broker::Message> message; + if (messageBuilders.handle(sender(), frame, queue, message)) { + BrokerContext::ScopedSuppressReplication ssr; + queue->deliver(message); + } + // FIXME aconway 2011-09-29: async completion goes here. + // For own messages need to release the channel assigned by BrokerContext. + return true; + } + return false; } boost::shared_ptr<broker::Queue> MessageHandler::findQueue( @@ -57,17 +78,10 @@ boost::shared_ptr<broker::Queue> MessageHandler::findQueue( return queue; } -void MessageHandler::enqueue(const std::string& q, const std::string& message) { - +void MessageHandler::enqueue(const std::string& q, uint16_t channel) { boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed"); - // FIXME aconway 2010-10-28: decode message by frame in bounded-size buffers. // FIXME aconway 2011-09-28: don't re-decode my own messages - boost::intrusive_ptr<broker::Message> msg = new broker::Message(); - framing::Buffer buf(const_cast<char*>(&message[0]), message.size()); - msg->decodeHeader(buf); - msg->decodeContent(buf); - BrokerContext::ScopedSuppressReplication ssr; - queue->deliver(msg); + messageBuilders.announce(sender(), channel, queue); } // FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h index d97999b738..f1d3dc2726 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h @@ -25,6 +25,7 @@ // TODO aconway 2010-10-19: experimental cluster code. #include "HandlerBase.h" +#include "MessageBuilders.h" #include "qpid/framing/AMQP_AllOperations.h" #include <boost/intrusive_ptr.hpp> #include <map> @@ -54,9 +55,9 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler public: MessageHandler(EventHandler&, Core&); - bool invoke(const framing::AMQBody& body); + bool handle(const framing::AMQFrame&); - void enqueue(const std::string& queue, const std::string& message); + void enqueue(const std::string& queue, uint16_t channel); void acquire(const std::string& queue, uint32_t position); void dequeue(const std::string& queue, uint32_t position); void requeue(const std::string& queue, uint32_t position, bool redelivered); @@ -66,6 +67,7 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler broker::Broker& broker; Core& core; + MessageBuilders messageBuilders; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp index 9d8a00e217..84b2145691 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp @@ -52,7 +52,7 @@ Multicaster::Multicaster(Cpg& cpg_, } void Multicaster::mcast(const framing::AMQFrame& data) { - QPID_LOG(trace, "cluster multicast: " << data); + QPID_LOG(trace, "cluster multicast on " << cpg.getName() << ": " << data); BufferRef bufRef = buffers.get(data.encodedSize()); framing::Buffer buf(bufRef.begin(), bufRef.size()); data.encode(buf); diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index c3ff94b897..1de1c1850c 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -108,7 +108,7 @@ void QueueContext::timeout() { queue.stopConsumers(); } -// Callback set up by queue.stopConsumers() called in connection thread. +// Callback set up by queue.stopConsumers() called in connection or timer thread. // Called when no threads are dispatching from the queue. void QueueContext::stopped() { sys::Mutex::ScopedLock l(lock); diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp index 33f805ab82..50638b9cb3 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp @@ -36,8 +36,8 @@ namespace cluster { QueueHandler::QueueHandler(EventHandler& eh, Multicaster& m, const Settings& s) : HandlerBase(eh), multicaster(m), consumeLock(s.getConsumeLock()) {} -bool QueueHandler::invoke(const framing::AMQBody& body) { - return framing::invoke(*this, body).wasHandled(); +bool QueueHandler::handle(const framing::AMQFrame& frame) { + return framing::invoke(*this, *frame.getBody()).wasHandled(); } void QueueHandler::subscribe(const std::string& queue) { diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h index 920c5de0b2..6bf6e66eec 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h @@ -56,7 +56,7 @@ class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler, public: QueueHandler(EventHandler&, Multicaster&, const Settings&); - bool invoke(const framing::AMQBody& body); + bool handle(const framing::AMQFrame& body); // Events void subscribe(const std::string& queue); diff --git a/qpid/cpp/src/qpid/cluster/exp/UniqueIds.h b/qpid/cpp/src/qpid/cluster/exp/UniqueIds.h new file mode 100644 index 0000000000..624fde7214 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/UniqueIds.h @@ -0,0 +1,70 @@ +#ifndef QPID_CLUSTER_EXP_UNIQUEIDS_H +#define QPID_CLUSTER_EXP_UNIQUEIDS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Mutex.h" +#include <set> + +namespace qpid { +namespace cluster { + +/** + * Assign ID numbers, ensuring that all the assigned IDs are unique + * T is the numeric type - actually any type with >, == and ++ will do. + */ +template <class T> class UniqueIds +{ + public: + /** Get an ID that is different from all other active IDs. + *@return the ID, which is now considered active. + */ + T get() { + sys::Mutex::ScopedLock l(lock); + T old = mark; + while (active.find(++mark) != active.end() && mark != old) + ; + assert(mark != old); // check wrap-around + active.insert(mark); + return mark; + } + /** Release an ID, so it is inactive and available for re-use */ + void release(T id) { + sys::Mutex::ScopedLock l(lock); + active.erase(id); + } + /** Allocate an ID, release automatically at end of scope */ + struct Scope { + UniqueIds& ids; + T id; + Scope(UniqueIds& ids_) : ids(ids_), id(ids.get()) {} + ~Scope() { ids.release(id); } + }; + + private: + sys::Mutex lock; + std::set<T> active; + T mark; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EXP_UNIQUEIDS_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp index 79e9d657bc..b96ae0b30f 100644 --- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp @@ -50,8 +50,8 @@ WiringHandler::WiringHandler(EventHandler& e, queueHandler(qh) {} -bool WiringHandler::invoke(const framing::AMQBody& body) { - return framing::invoke(*this, body).wasHandled(); +bool WiringHandler::handle(const framing::AMQFrame& frame) { + return framing::invoke(*this, *frame.getBody()).wasHandled(); } void WiringHandler::createQueue(const std::string& data) { diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h index cd964de997..7a07c7098e 100644 --- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h @@ -53,7 +53,7 @@ class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler, public: WiringHandler(EventHandler&, const boost::intrusive_ptr<QueueHandler>& qh, broker::Broker&); - bool invoke(const framing::AMQBody& body); + bool handle(const framing::AMQFrame&); void createQueue(const std::string& data); void destroyQueue(const std::string& name); diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 5fba6b4036..0d325c4d12 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -333,7 +333,7 @@ <class name="cluster-message" code="0x82"> <control name="enqueue" code="0x2"> <field name="queue" type="queue.name"/> - <field name="message" type="str32"/> + <field name="channel" type="uint16"/> </control> <control name="acquire" code="0x4"> |