diff options
Diffstat (limited to 'cpp')
25 files changed, 1080 insertions, 66 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 2a648e968c..7cd4a18c9e 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -35,7 +35,6 @@ endif if HAVE_LIBCPG dmodule_LTLIBRARIES += cluster.la - cluster_la_SOURCES = \ $(CMAN_SOURCES) \ qpid/cluster/Cluster.cpp \ @@ -99,6 +98,27 @@ cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing cluster_la_LDFLAGS = $(PLUGINLDFLAGS) +# Experimental new cluster plugin +dmodule_LTLIBRARIES += cluster2.la +cluster2_la_LIBADD = -lcpg libqpidbroker.la +cluster2_la_LDFLAGS = $(PLUGINLDFLAGS) +cluster2_la_SOURCES = \ + qpid/cluster/BrokerHandler.cpp \ + qpid/cluster/BrokerHandler.h \ + qpid/cluster/Cluster2Plugin.cpp \ + qpid/cluster/Core.cpp \ + qpid/cluster/Core.h \ + qpid/cluster/Cpg.cpp \ + qpid/cluster/Cpg.h \ + qpid/cluster/EventHandler.cpp \ + qpid/cluster/EventHandler.h \ + qpid/cluster/MessageHandler.cpp \ + qpid/cluster/MessageHandler.h \ + qpid/cluster/MessageId.cpp \ + qpid/cluster/MessageId.h \ + qpid/cluster/PollerDispatch.cpp \ + qpid/cluster/PollerDispatch.h + # The watchdog plugin and helper executable dmodule_LTLIBRARIES += watchdog.la watchdog_la_SOURCES = qpid/cluster/WatchDogPlugin.cpp diff --git a/cpp/src/qpid/broker/Cluster.h b/cpp/src/qpid/broker/Cluster.h index 91b52e8af1..4dabd98eab 100644 --- a/cpp/src/qpid/broker/Cluster.h +++ b/cpp/src/qpid/broker/Cluster.h @@ -54,8 +54,14 @@ class Cluster /** In Exchange::route, before the message is enqueued. */ virtual void routing(const boost::intrusive_ptr<Message>&) = 0; - /** A message is delivered to a queue. */ - virtual void enqueue(QueuedMessage&) = 0; + + /** A message is delivered to a queue. + * Called before actually pushing the message to the queue. + *@return If true the message should be pushed to the queue now. + * otherwise the cluster code will push the message when it is replicated. + */ + virtual bool enqueue(Queue& queue, const boost::intrusive_ptr<Message>&) = 0; + /** In Exchange::route, after all enqueues for the message. */ virtual void routed(const boost::intrusive_ptr<Message>&) = 0; @@ -71,11 +77,12 @@ class Cluster /** A locally-acquired message is released by the consumer and re-queued. */ virtual void release(const QueuedMessage&) = 0; - /** A message is dropped from the queue, e.g. expired or replaced on an LVQ. - * This function does only local book-keeping, it does not multicast. - * It is reasonable to call with a queue lock held. + + /** A message is removed from the queue. It could have been + * accepted, rejected or dropped for other reasons e.g. expired or + * replaced on an LVQ. */ - virtual void dequeue(const QueuedMessage&) = 0; + virtual void drop(const QueuedMessage&) = 0; // Consumers diff --git a/cpp/src/qpid/broker/NullCluster.h b/cpp/src/qpid/broker/NullCluster.h index 4f3485eb40..0e11ceef27 100644 --- a/cpp/src/qpid/broker/NullCluster.h +++ b/cpp/src/qpid/broker/NullCluster.h @@ -38,14 +38,14 @@ class NullCluster : public Cluster // Messages virtual void routing(const boost::intrusive_ptr<Message>&) {} - virtual void enqueue(QueuedMessage&) {} + virtual bool enqueue(Queue&, const boost::intrusive_ptr<Message>&) { return true; } virtual void routed(const boost::intrusive_ptr<Message>&) {} virtual void acquire(const QueuedMessage&) {} virtual void accept(const QueuedMessage&) {} virtual void reject(const QueuedMessage&) {} virtual void rejected(const QueuedMessage&) {} virtual void release(const QueuedMessage&) {} - virtual void dequeue(const QueuedMessage&) {} + virtual void drop(const QueuedMessage&) {} // Consumers diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index b05172f984..c530e9cd51 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -146,6 +146,10 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ // Check for deferred delivery in a cluster. if (broker && broker->deferDelivery(name, msg)) return; + // Same thing but for the new cluster interface. + if (broker && !broker->getCluster().enqueue(*this, msg)) + return; + if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); @@ -165,7 +169,6 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ }else { push(msg); } - mgntEnqStats(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name); } } @@ -199,7 +202,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); - mgntEnqStats(msg); if (mgmtObject != 0){ mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); @@ -642,6 +644,7 @@ void Queue::popMsg(QueuedMessage& qmsg) void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ assertClusterSafe(); + if (!isRecovery) mgntEnqStats(msg); QueuedMessage qm; QueueListeners::NotificationSet copy; { @@ -687,7 +690,6 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ } } copy.notify(); - if (broker) broker->getCluster().enqueue(qm); } QueuedMessage Queue::getFront() @@ -868,10 +870,9 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; - if (!ctxt) { - dequeued(msg); - } + if (!ctxt) dequeued(msg); } + if (!ctxt && broker) broker->getCluster().drop(msg); // Outside lock // This check prevents messages which have been forced persistent on one queue from dequeuing // from another on which no forcing has taken place and thus causing a store error. bool fp = msg.payload->isForcedPersistent(); @@ -888,6 +889,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { + if (broker) broker->getCluster().drop(msg); // Outside lock Mutex::ScopedLock locker(messageLock); dequeued(msg); if (mgmtObject != 0) { @@ -913,9 +915,8 @@ void Queue::popAndDequeue() */ void Queue::dequeued(const QueuedMessage& msg) { - // Note: Cluster::dequeued does only local book-keeping, no multicast + // Note: Cluster::drop does only local book-keeping, no multicast // So OK to call here with lock held. - if (broker) broker->getCluster().dequeue(msg); if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { diff --git a/cpp/src/qpid/cluster/BrokerHandler.cpp b/cpp/src/qpid/cluster/BrokerHandler.cpp new file mode 100644 index 0000000000..f0b930a221 --- /dev/null +++ b/cpp/src/qpid/cluster/BrokerHandler.cpp @@ -0,0 +1,96 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Core.h" +#include "BrokerHandler.h" +#include "qpid/framing/ClusterMessageRoutingBody.h" +#include "qpid/framing/ClusterMessageRoutedBody.h" +#include "qpid/framing/ClusterMessageEnqueueBody.h" +#include "qpid/sys/Thread.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/broker/Queue.h" +#include "qpid/framing/Buffer.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace cluster { + +using namespace framing; +using namespace broker; + +namespace { +// noReplicate means the current thread is handling a message +// received from the cluster so it should not be replciated. +QPID_TSS bool noReplicate = false; + +// Sequence number of the message currently being routed. +// 0 if we are not currently routing a message. +QPID_TSS SequenceNumber routeSeq = 0; +} + +BrokerHandler::ScopedSuppressReplication::ScopedSuppressReplication() { + assert(!noReplicate); + noReplicate = true; +} + +BrokerHandler::ScopedSuppressReplication::~ScopedSuppressReplication() { + assert(noReplicate); + noReplicate = false; +} + +BrokerHandler::BrokerHandler(Core& c) : core(c) {} + +SequenceNumber BrokerHandler::nextSequenceNumber() { + SequenceNumber s = ++sequence; + if (!s) s = ++sequence; // Avoid 0 on wrap-around. + return s; +} + +void BrokerHandler::routing(const boost::intrusive_ptr<Message>&) { } + +bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg) +{ + if (noReplicate) return true; + if (!routeSeq) { // This is the first enqueue, so send the message + routeSeq = nextSequenceNumber(); + // FIXME aconway 2010-10-20: replicate message in fixed size buffers. + std::string data(msg->encodedSize(),char()); + framing::Buffer buf(&data[0], data.size()); + msg->encode(buf); + core.mcast(ClusterMessageRoutingBody(ProtocolVersion(), routeSeq, data)); + core.getRoutingMap().put(routeSeq, msg); + } + core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), routeSeq, queue.getName())); + // TODO aconway 2010-10-21: configable option for strict (wait + // for CPG deliver to do local deliver) vs. loose (local deliver + // immediately). + return false; +} + +void BrokerHandler::routed(const boost::intrusive_ptr<Message>&) { + if (routeSeq) { // we enqueued at least one message. + core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), routeSeq)); + // Note: routingMap is cleaned up on CPG delivery in MessageHandler. + routeSeq = 0; + } +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/BrokerHandler.h b/cpp/src/qpid/cluster/BrokerHandler.h new file mode 100644 index 0000000000..1a61d1fc11 --- /dev/null +++ b/cpp/src/qpid/cluster/BrokerHandler.h @@ -0,0 +1,86 @@ +#ifndef QPID_CLUSTER_BROKERHANDLER_H +#define QPID_CLUSTER_BROKERHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Cluster.h" +#include "qpid/sys/AtomicValue.h" + +namespace qpid { +namespace cluster { +class Core; + +// TODO aconway 2010-10-19: experimental cluster code. + +/** + * Implements broker::Cluster interface, handles events in broker code. + */ +class BrokerHandler : public broker::Cluster +{ + public: + /** Suppress replication while in scope. + * Used to prevent re-replication of messages received from the cluster. + */ + struct ScopedSuppressReplication { + ScopedSuppressReplication(); + ~ScopedSuppressReplication(); + }; + + BrokerHandler(Core&); + + // FIXME aconway 2010-10-20: implement all points. + + // Messages + + void routing(const boost::intrusive_ptr<broker::Message>&); + bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&); + void routed(const boost::intrusive_ptr<broker::Message>&); + void acquire(const broker::QueuedMessage&) {} + void accept(const broker::QueuedMessage&) {} + void reject(const broker::QueuedMessage&) {} + void rejected(const broker::QueuedMessage&) {} + void release(const broker::QueuedMessage&) {} + void drop(const broker::QueuedMessage&) {} + + // Consumers + + void consume(const broker::Queue&, size_t) {} + void cancel(const broker::Queue&, size_t) {} + + // Wiring + + void create(const broker::Queue&) {} + void destroy(const broker::Queue&) {} + void create(const broker::Exchange&) {} + void destroy(const broker::Exchange&) {} + void bind(const broker::Queue&, const broker::Exchange&, + const std::string&, const framing::FieldTable&) {} + + private: + SequenceNumber nextSequenceNumber(); + + Core& core; + sys::AtomicValue<SequenceNumber> sequence; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_BROKERHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/Cluster2Plugin.cpp b/cpp/src/qpid/cluster/Cluster2Plugin.cpp new file mode 100644 index 0000000000..28b7dcec2e --- /dev/null +++ b/cpp/src/qpid/cluster/Cluster2Plugin.cpp @@ -0,0 +1,65 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <qpid/Options.h> +#include <qpid/broker/Broker.h> +#include "Core.h" + +namespace qpid { +namespace cluster { +using broker::Broker; + +// TODO aconway 2010-10-19: experimental new cluster code. + +/** + * Plugin for the cluster. + */ +struct Cluster2Plugin : public Plugin { + struct Opts : public Options { + Core::Settings& settings; + Opts(Core::Settings& s) : Options("Cluster Options"), settings(s) { + addOptions() + ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join"); + // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h + } + }; + + Core::Settings settings; + Opts options; + Core* core; // Core deletes itself on shutdown. + + Cluster2Plugin() : options(settings), core(0) {} + + Options* getOptions() { return &options; } + + void earlyInitialize(Plugin::Target& target) { + if (settings.name.empty()) return; + Broker* broker = dynamic_cast<Broker*>(&target); + if (!broker) return; + core = new Core(settings, *broker); + } + + void initialize(Plugin::Target& target) { + Broker* broker = dynamic_cast<Broker*>(&target); + if (broker && core) core->initialize(); + } +}; + +static Cluster2Plugin instance; // Static initialization. + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Core.cpp b/cpp/src/qpid/cluster/Core.cpp new file mode 100644 index 0000000000..e4127fa443 --- /dev/null +++ b/cpp/src/qpid/cluster/Core.cpp @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Core.h" +#include "EventHandler.h" +#include "BrokerHandler.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/SignalHandler.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/Buffer.h" +#include "qpid/log/Statement.h" +#include <sys/uio.h> // For iovec + +namespace qpid { +namespace cluster { + +Core::Core(const Settings& s, broker::Broker& b) : + broker(b), + eventHandler(new EventHandler(*this)) +{ + std::auto_ptr<BrokerHandler> bh(new BrokerHandler(*this)); + brokerHandler = bh.get(); + // BrokerHandler belongs to Broker + broker.setCluster(std::auto_ptr<broker::Cluster>(bh)); + // FIXME aconway 2010-10-20: ownership of BrokerHandler, shutdown issues. + eventHandler->getCpg().join(s.name); +} + +void Core::initialize() {} + +void Core::fatal() { + // FIXME aconway 2010-10-20: error handling + assert(0); + broker::SignalHandler::shutdown(); +} + +void Core::mcast(const framing::AMQBody& body) { + QPID_LOG(trace, "multicast: " << body); + // FIXME aconway 2010-10-20: use Multicaster, or bring in its features. + // here we multicast Frames rather than Events. + framing::AMQFrame f(body); + std::string data(f.encodedSize(), char()); + framing::Buffer buf(&data[0], data.size()); + f.encode(buf); + iovec iov = { buf.getPointer(), buf.getSize() }; + while (!eventHandler->getCpg().mcast(&iov, 1)) + ::usleep(1000); // FIXME aconway 2010-10-20: flow control +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Core.h b/cpp/src/qpid/cluster/Core.h new file mode 100644 index 0000000000..9976c1c906 --- /dev/null +++ b/cpp/src/qpid/cluster/Core.h @@ -0,0 +1,95 @@ +#ifndef QPID_CLUSTER_CORE_H +#define QPID_CLUSTER_CORE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include <memory> + +#include "Cpg.h" +#include "MessageId.h" +#include "LockedMap.h" +#include <qpid/broker/QueuedMessage.h> + +// TODO aconway 2010-10-19: experimental cluster code. + +namespace qpid { + +namespace framing{ +class AMQBody; +} + +namespace broker { +class Broker; +} + +namespace cluster { +class EventHandler; +class BrokerHandler; + +/** + * Cluster core state machine. + * Holds together the various objects that implement cluster behavior, + * and holds state that is shared by multiple components. + * + * Thread safe: called from broker connection threads and CPG dispatch threads. + */ +class Core +{ + public: + /** Configuration settings */ + struct Settings { + std::string name; + }; + + typedef LockedMap<SequenceNumber, boost::intrusive_ptr<broker::Message> > + SequenceMessageMap; + + /** Constructed during Plugin::earlyInitialize() */ + Core(const Settings&, broker::Broker&); + + /** Called during Plugin::initialize() */ + void initialize(); + + /** Shut down broker due to fatal error. Caller should log a critical message */ + void fatal(); + + /** Multicast an event */ + void mcast(const framing::AMQBody&); + + broker::Broker& getBroker() { return broker; } + EventHandler& getEventHandler() { return *eventHandler; } + BrokerHandler& getBrokerHandler() { return *brokerHandler; } + + /** Map of messages that are currently being routed. + * Used to pass messages being routed from BrokerHandler to MessageHandler + */ + SequenceMessageMap& getRoutingMap() { return routingMap; } + private: + broker::Broker& broker; + std::auto_ptr<EventHandler> eventHandler; // Handles CPG events. + BrokerHandler* brokerHandler; // Handles broker events. + SequenceMessageMap routingMap; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CORE_H*/ diff --git a/cpp/src/qpid/cluster/EventHandler.cpp b/cpp/src/qpid/cluster/EventHandler.cpp new file mode 100644 index 0000000000..95ae285b06 --- /dev/null +++ b/cpp/src/qpid/cluster/EventHandler.cpp @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "MessageHandler.h" +#include "EventHandler.h" +#include "Core.h" +#include "types.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AllInvoker.h" +#include "qpid/broker/Broker.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace cluster { + +EventHandler::EventHandler(Core& c) : + core(c), + cpg(*this), // FIXME aconway 2010-10-20: belongs on Core. + dispatcher(cpg, core.getBroker().getPoller(), boost::bind(&Core::fatal, &core)), + self(cpg.self()), + messageHandler(new MessageHandler(*this)) +{ + dispatcher.start(); // FIXME aconway 2010-10-20: later in initialization? +} + +EventHandler::~EventHandler() {} + +// Deliver CPG message. +void EventHandler::deliver( + cpg_handle_t /*handle*/, + const cpg_name* /*group*/, + uint32_t nodeid, + uint32_t pid, + void* msg, + int msg_len) +{ + sender = MemberId(nodeid, pid); + framing::Buffer buf(static_cast<char*>(msg), msg_len); + framing::AMQFrame frame; + while (buf.available()) { + frame.decode(buf); + assert(frame.getBody()); + QPID_LOG(trace, "cluster deliver: " << *frame.getBody()); + try { + invoke(*frame.getBody()); + } + catch (const std::exception& e) { + // Note: exceptions are assumed to be survivable, + // fatal errors should log a message and call Core::fatal. + QPID_LOG(error, e.what()); + } + } +} + +void EventHandler::invoke(const framing::AMQBody& body) { + if (framing::invoke(*messageHandler, body).wasHandled()) return; +} + +// CPG config-change callback. +void EventHandler::configChange ( + cpg_handle_t /*handle*/, + const cpg_name */*group*/, + const cpg_address */*members*/, int /*nMembers*/, + const cpg_address */*left*/, int /*nLeft*/, + const cpg_address */*joined*/, int /*nJoined*/) +{ + // FIXME aconway 2010-10-20: TODO +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/EventHandler.h b/cpp/src/qpid/cluster/EventHandler.h new file mode 100644 index 0000000000..5645c3980b --- /dev/null +++ b/cpp/src/qpid/cluster/EventHandler.h @@ -0,0 +1,85 @@ +#ifndef QPID_CLUSTER_EVENTHANDLER_H +#define QPID_CLUSTER_EVENTHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// TODO aconway 2010-10-19: experimental cluster code. + +#include "types.h" +#include "Cpg.h" +#include "PollerDispatch.h" + +namespace qpid { + +namespace framing { +class AMQBody; +} + +namespace cluster { +class Core; +class MessageHandler; + +/** + * Dispatch events received from CPG. + * Thread unsafe: only called in CPG deliver thread context. + */ +class EventHandler : public Cpg::Handler +{ + public: + EventHandler(Core&); + ~EventHandler(); + + void deliver( // CPG deliver callback. + cpg_handle_t /*handle*/, + const struct cpg_name *group, + uint32_t /*nodeid*/, + uint32_t /*pid*/, + void* /*msg*/, + int /*msg_len*/); + + void configChange( // CPG config change callback. + cpg_handle_t /*handle*/, + const struct cpg_name */*group*/, + const struct cpg_address */*members*/, int /*nMembers*/, + const struct cpg_address */*left*/, int /*nLeft*/, + const struct cpg_address */*joined*/, int /*nJoined*/ + ); + + + MemberId getSender() { return sender; } + MemberId getSelf() { return self; } + Core& getCore() { return core; } + Cpg& getCpg() { return cpg; } + + private: + void invoke(const framing::AMQBody& body); + + Core& core; + Cpg cpg; + PollerDispatch dispatcher; + MemberId sender; // sender of current event. + MemberId self; + std::auto_ptr<MessageHandler> messageHandler; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EVENTHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/LockedMap.h b/cpp/src/qpid/cluster/LockedMap.h new file mode 100644 index 0000000000..0736e7ac35 --- /dev/null +++ b/cpp/src/qpid/cluster/LockedMap.h @@ -0,0 +1,73 @@ +#ifndef QPID_CLUSTER_LOCKEDMAP_H +#define QPID_CLUSTER_LOCKEDMAP_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Mutex.h" +#include <map> + +namespace qpid { +namespace cluster { + +/** + * A reader-writer locked thread safe map. + */ +template <class Key, class Value> +class LockedMap +{ + public: + /** Get value associated with key, returns Value() if none. */ + Value get(const Key& key) const { + sys::RWlock::ScopedRlock r(lock); + typename Map::const_iterator i = map.find(key); + if (i == map.end()) return Value(); + else return i->second; + } + + /** Associate value with key, overwriting any previous value for key. */ + void put(const Key& key, const Value& value) { + sys::RWlock::ScopedWlock w(lock); + map[key] = value; + } + + /** Associate value with key if there is not already a value associated with key. + * Returns true if the value was added. + */ + bool add(const Key& key, const Value& value) { + sys::RWlock::ScopedWlock w(lock); + return map.insert(key, value).second; + } + + /** Erase the value associated with key if any. Return true if a value was erased. */ + bool erase(const Key& key) { + sys::RWlock::ScopedWlock w(lock); + return map.erase(key); + } + + private: + typedef std::map<Key, Value> Map; + Map map; + mutable sys::RWlock lock; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_LOCKEDMAP_H*/ diff --git a/cpp/src/qpid/cluster/MessageHandler.cpp b/cpp/src/qpid/cluster/MessageHandler.cpp new file mode 100644 index 0000000000..fbbdad38a3 --- /dev/null +++ b/cpp/src/qpid/cluster/MessageHandler.cpp @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Core.h" +#include "MessageHandler.h" +#include "BrokerHandler.h" +#include "EventHandler.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/Queue.h" +#include "qpid/framing/Buffer.h" +#include "qpid/sys/Thread.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace cluster { +using namespace broker; + +MessageHandler::MessageHandler(EventHandler& e) : + broker(e.getCore().getBroker()), + eventHandler(e), + brokerHandler(e.getCore().getBrokerHandler()) +{} + +MessageHandler::~MessageHandler() {} + +MemberId MessageHandler::sender() { return eventHandler.getSender(); } +MemberId MessageHandler::self() { return eventHandler.getSelf(); } + +void MessageHandler::routing(uint64_t sequence, const std::string& message) { + MessageId id(sender(), sequence); + boost::intrusive_ptr<Message> msg; + if (sender() == self()) + msg = eventHandler.getCore().getRoutingMap().get(sequence); + if (!msg) { + framing::Buffer buf(const_cast<char*>(&message[0]), message.size()); + msg = new Message; + msg->decodeHeader(buf); + msg->decodeContent(buf); + } + routingMap[id] = msg; +} + +void MessageHandler::enqueue(uint64_t sequence, const std::string& q) { + MessageId id(sender(), sequence); + boost::shared_ptr<Queue> queue = broker.getQueues().find(q); + if (!queue) throw Exception(QPID_MSG("Cluster message for unknown queue " << q)); + boost::intrusive_ptr<Message> msg = routingMap[id]; + if (!msg) throw Exception(QPID_MSG("Unknown cluster message for queue " << q)); + BrokerHandler::ScopedSuppressReplication ssr; + // TODO aconway 2010-10-21: configable option for strict (wait + // for CPG deliver to do local deliver) vs. loose (local deliver + // immediately). + queue->deliver(msg); +} + +void MessageHandler::routed(uint64_t sequence) { + MessageId id(sender(), sequence); + routingMap.erase(id); + eventHandler.getCore().getRoutingMap().erase(sequence); +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/MessageHandler.h b/cpp/src/qpid/cluster/MessageHandler.h new file mode 100644 index 0000000000..5c32bf474e --- /dev/null +++ b/cpp/src/qpid/cluster/MessageHandler.h @@ -0,0 +1,70 @@ +#ifndef QPID_CLUSTER_MESSAGEHANDLER_H +#define QPID_CLUSTER_MESSAGEHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// TODO aconway 2010-10-19: experimental cluster code. + +#include "qpid/framing/AMQP_AllOperations.h" +#include "MessageId.h" +#include <boost/intrusive_ptr.hpp> +#include <map> + +namespace qpid { + +namespace broker { +class Message; +class Broker; +} + +namespace cluster { +class EventHandler; +class BrokerHandler; + +/** + * Handler for message disposition events. + */ +class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler +{ + public: + MessageHandler(EventHandler&); + ~MessageHandler(); + + void routing(uint64_t sequence, const std::string& message); + void enqueue(uint64_t sequence, const std::string& queue); + void routed(uint64_t sequence); + + private: + typedef std::map<MessageId, boost::intrusive_ptr<broker::Message> > RoutingMap; + + MemberId sender(); + MemberId self(); + + broker::Broker& broker; + EventHandler& eventHandler; + BrokerHandler& brokerHandler; + RoutingMap routingMap; + +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_MESSAGEHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/MessageId.cpp b/cpp/src/qpid/cluster/MessageId.cpp new file mode 100644 index 0000000000..fbd248ed69 --- /dev/null +++ b/cpp/src/qpid/cluster/MessageId.cpp @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MessageId.h" +#include <ostream> + +namespace qpid { +namespace cluster { + +bool operator<(const MessageId& a, const MessageId& b) { + return a.member < b.member || ((a.member == b.member) && a.sequence < b.sequence); +} + +std::ostream& operator<<(std::ostream& o, const MessageId& m) { + return o << m.member << ":" << m.sequence; +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/MessageId.h b/cpp/src/qpid/cluster/MessageId.h new file mode 100644 index 0000000000..16bf7ddd6d --- /dev/null +++ b/cpp/src/qpid/cluster/MessageId.h @@ -0,0 +1,52 @@ +#ifndef QPID_CLUSTER_MESSAGEID_H +#define QPID_CLUSTER_MESSAGEID_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include <iosfwd> + +namespace qpid { +namespace cluster { + +// TODO aconway 2010-10-20: experimental new cluster code. + +/** Sequence number used in message identifiers */ +typedef uint64_t SequenceNumber; + +/** + * Message identifier + */ +struct MessageId { + MemberId member; /// Member that created the message + SequenceNumber sequence; /// Sequence number assiged by member. + MessageId(MemberId m=MemberId(), SequenceNumber s=0) : member(m), sequence(s) {} +}; + +bool operator<(const MessageId&, const MessageId&); + +std::ostream& operator<<(std::ostream&, const MessageId&); + + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_MESSAGEID_H*/ diff --git a/cpp/src/qpid/cluster/PollerDispatch.cpp b/cpp/src/qpid/cluster/PollerDispatch.cpp index b8d94b95a5..43c171efe8 100644 --- a/cpp/src/qpid/cluster/PollerDispatch.cpp +++ b/cpp/src/qpid/cluster/PollerDispatch.cpp @@ -37,9 +37,11 @@ PollerDispatch::PollerDispatch(Cpg& c, boost::shared_ptr<sys::Poller> p, started(false) {} -PollerDispatch::~PollerDispatch() { - if (started) - dispatchHandle.stopWatch(); +PollerDispatch::~PollerDispatch() { stop(); } + +void PollerDispatch::stop() { + if (started) dispatchHandle.stopWatch(); + started = false; } void PollerDispatch::start() { @@ -54,6 +56,7 @@ void PollerDispatch::dispatch(sys::DispatchHandle& h) { h.rewatch(); } catch (const std::exception& e) { QPID_LOG(critical, "Error in cluster dispatch: " << e.what()); + stop(); onError(); } } diff --git a/cpp/src/qpid/cluster/PollerDispatch.h b/cpp/src/qpid/cluster/PollerDispatch.h index 63801e0de9..f16d5ece95 100644 --- a/cpp/src/qpid/cluster/PollerDispatch.h +++ b/cpp/src/qpid/cluster/PollerDispatch.h @@ -41,6 +41,7 @@ class PollerDispatch { ~PollerDispatch(); void start(); + void stop(); private: // Poller callbacks diff --git a/cpp/src/qpid/cluster/new-cluster-design.txt b/cpp/src/qpid/cluster/new-cluster-design.txt index 8ee740372d..abbbcd616c 100644 --- a/cpp/src/qpid/cluster/new-cluster-design.txt +++ b/cpp/src/qpid/cluster/new-cluster-design.txt @@ -328,8 +328,6 @@ and been in use (one of the key missing features). ** Misc outstanding issues & notes -Message IDs: need an efficient cluster-wide message ID. - Replicating wiring - Need async completion of wiring commands? - qpid.sequence_counter: need extra work to support in new design, do we care? diff --git a/cpp/src/qpid/cluster/new-cluster-plan.txt b/cpp/src/qpid/cluster/new-cluster-plan.txt index 57c1241607..4eeb030b1a 100644 --- a/cpp/src/qpid/cluster/new-cluster-plan.txt +++ b/cpp/src/qpid/cluster/new-cluster-plan.txt @@ -38,21 +38,9 @@ a note for later optimization/improvement. - acquire then kill broker: verify can be dequeued other members. - acquire then reject: verify goes on alt-exchange once only. -*** TODO broker::Cluster interface and call points. +*** DONE broker::Cluster interface and call points. -Initial draft is commited. - -Issues to review: - -queue API: internal classes like RingQueuePolicy use Queue::acuqire/dequeue -when messages are pushed. How to reconcile with queue ownership? - -rejecting messages: if there's an alternate exchange where do we do the -re-routing? On origin broker or on all brokers? - -Intercept points: on Queue vs. on DeliveryRecord, SemanticState etc. -Intercepting client actions on the queue vs. internal actions -(e.g. ring policy) +Initial interface commited. *** Main classes @@ -63,7 +51,7 @@ BrokerHandler: LocalMessageMap: - Holds local messages while they are being enqueued. -- thread safe: called by both BrokerHandler and DeliverHandler +- thread safe: called by both BrokerHandler and MessageHandler MessageHandler: - handles delivered mcast messages related to messages. @@ -77,7 +65,7 @@ QueueOwnerHandler: - maintains view of cluster state regarding queue ownership. cluster::Core: class to hold new cluster together (replaces cluster::Cluster) -- thread safe: manage state used by both DeliverHandler and BrokerHandler +- thread safe: manage state used by both MessageHandler and BrokerHandler The following code sketch illustrates only the "happy path" error handling is omitted. @@ -89,13 +77,15 @@ Types: - NodeId 64 bit CPG node-id, identifies member of the cluster. - struct MessageId { NodeId node; SequenceNumber seq; } +NOTE: Message ID's identify a QueuedMessage, i.e. a position on a queue. + Members: - atomic<SequenceNumber> sequence // sequence number for message IDs. - thread_local bool noReplicate // suppress replication. - thread_local bool isRouting // suppress operations while routing - QueuedMessage localMessage[SequenceNumber] // local messages being enqueued. -NOTE: localMessage is also modified by DeliverHandler. +NOTE: localMessage is also modified by MessageHandler. broker::Cluster intercept functions: @@ -150,7 +140,7 @@ dequeue(QueuedMessage) # FIXME revisit - move it out of the queue lock. cleanup(msg) -*** DeliverHandler and mcast messages +*** MessageHandler and mcast messages Types: - struct QueueEntry { QueuedMessage qmsg; NodeId acquired; } - struct QueueKey { MessageId id; QueueName q; } @@ -326,8 +316,9 @@ cancel(q,consumer,consumerCount) - Queue::cancel() - keep design modular, keep threading rules clear. ** TODO [#B] Large message replication. -Need to be able to multicast large messages in fragments - +Multicast should encode messages in fixed size buffers (64k)? +Can't assume we can send message in one chunk. +For 0-10 can use channel numbers & send whole frames packed into larger buffer. ** TODO [#B] Batch CPG multicast messages The new cluster design involves a lot of small multicast messages, they need to be batched into larger CPG messages for efficiency. @@ -437,3 +428,9 @@ Look for ways to capitalize on the similarity & simplify the code. In particular QueuedEvents (async replication) strongly resembles cluster replication, but over TCP rather than multicast. +** TODO [#C] Concurrency for enqueue events. +All enqueue events are being processed in the CPG deliver thread context which +serializes all the work. We only need ordering on a per queue basis, can we +enqueue in parallel on different queues and will that improve performance? +** TODO [#C] Handling immediate messages in a cluster +Include remote consumers in descision to deliver an immediate message? diff --git a/cpp/src/tests/BrokerClusterCalls.cpp b/cpp/src/tests/BrokerClusterCalls.cpp index 6cdd6fc9bf..f659702387 100644 --- a/cpp/src/tests/BrokerClusterCalls.cpp +++ b/cpp/src/tests/BrokerClusterCalls.cpp @@ -42,6 +42,7 @@ using namespace boost; using namespace boost::assign; using namespace qpid::messaging; using boost::format; +using boost::intrusive_ptr; namespace qpid { namespace tests { @@ -59,6 +60,9 @@ class DummyCluster : public broker::Cluster history += (format("%s(%s, %d, %s)") % op % qm.queue->getName() % qm.position % qm.payload->getFrames().getContent()).str(); } + void recordMsg(const string& op, broker::Queue& q, intrusive_ptr<broker::Message> msg) { + history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str(); + } void recordStr(const string& op, const string& name) { history += (format("%s(%s)") % op % name).str(); } @@ -70,7 +74,10 @@ class DummyCluster : public broker::Cluster history += (format("routing(%s)") % m->getFrames().getContent()).str(); } - virtual void enqueue(broker::QueuedMessage& qm) { recordQm("enqueue", qm); } + virtual bool enqueue(broker::Queue& q, const intrusive_ptr<broker::Message>&msg) { + recordMsg("enqueue", q, msg); + return true; + } virtual void routed(const boost::intrusive_ptr<broker::Message>& m) { history += (format("routed(%s)") % m->getFrames().getContent()).str(); @@ -91,9 +98,8 @@ class DummyCluster : public broker::Cluster virtual void release(const broker::QueuedMessage& qm) { if (!isRouting) recordQm("release", qm); } - virtual void dequeue(const broker::QueuedMessage& qm) { - // Never ignore dequeue, used to avoid resource leaks. - recordQm("dequeue", qm); + virtual void drop(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("dequeue", qm); } // Consumers @@ -156,7 +162,7 @@ QPID_AUTO_TEST_CASE(testSimplePubSub) { sender.send(Message("a")); f.s.sync(); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); // Don't check size here as it is uncertain whether acquire has happened yet. @@ -221,7 +227,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) { f.s.reject(m); BOOST_CHECK_EQUAL(h.at(i++), "reject(q, 1, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); BOOST_CHECK_EQUAL(h.at(i++), "rejected(q, 1, a)"); @@ -239,7 +245,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) { bool received = receiver.fetch(m, Duration::IMMEDIATE); BOOST_CHECK(!received); // Timed out BOOST_CHECK_EQUAL(h.at(i++), "routing(t)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, t)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(t)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)"); BOOST_CHECK_EQUAL(h.size(), i); @@ -252,7 +258,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) { f.s.sync(); BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); BOOST_CHECK_EQUAL(h.size(), i); @@ -261,11 +267,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) { sender.send(m); f.s.sync(); BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); - // FIXME: bug in Queue.cpp gives the incorrect position when - // dequeueing a replaced LVQ message. - // BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 2, a)"); // Should be 1 - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, b)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); BOOST_CHECK_EQUAL(h.size(), i); @@ -345,20 +347,19 @@ QPID_AUTO_TEST_CASE(testRingQueue) { BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, b)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(c)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 3, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, c)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(c)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(d)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 4, d)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, d)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(d)"); Receiver receiver = f.s.createReceiver("ring"); @@ -399,15 +400,16 @@ QPID_AUTO_TEST_CASE(testTransactions) { BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit ts.commit(); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, b)"); - BOOST_CHECK_EQUAL(h.size(), i); - // FIXME aconway 2010-10-18: As things stand the cluster is not // compatible with transactions - // - enqueues occur after routing is complete. + // - enqueues occur after routing is complete + // - no call to Cluster::enqueue, should be in Queue::process? // - no transaction context associated with messages in the Cluster interface. // - no call to Cluster::accept in Queue::dequeueCommitted + // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)"); + // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, b)"); + BOOST_CHECK_EQUAL(h.size(), i); + Receiver receiver = ts.createReceiver("q"); BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a"); diff --git a/cpp/src/tests/cluster2_tests.py b/cpp/src/tests/cluster2_tests.py new file mode 100755 index 0000000000..e3a19ae2a0 --- /dev/null +++ b/cpp/src/tests/cluster2_tests.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, signal, sys, time, imp, re, subprocess +from qpid import datatypes, messaging +from qpid.brokertest import * +from qpid.harness import Skipped +from qpid.messaging import Message +from qpid.messaging.exceptions import Empty +from threading import Thread, Lock +from logging import getLogger +from itertools import chain + +log = getLogger("qpid.cluster_tests") + +class Cluster2Tests(BrokerTest): + """Tests for new cluster code.""" + + def test_message_enqueue(self): + """Test basic replication of enqueued messages.""" + + cluster = self.cluster(2, cluster2=True, args=["--log-enable=trace+:cluster"]) + + sn0 = cluster[0].connect().session() + r0p = sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}"); + r0q = sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}"); + s0 = sn0.sender("amq.fanout"); + + sn1 = cluster[1].connect().session() + r1p = sn1.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}"); + r1q = sn1.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}"); + + + # Send messages on member 0 + content = ["a","b","c"] + for m in content: s0.send(Message(m)) + + # Browse on both members. + def check(content, receiver): + for c in content: self.assertEqual(c, receiver.fetch(1).content) + self.assertRaises(Empty, receiver.fetch, 0) + + check(content, r0p) + check(content, r0q) + check(content, r1p) + check(content, r1q) + + sn1.connection.close() + sn0.connection.close() diff --git a/cpp/src/tests/run_cluster_tests b/cpp/src/tests/run_cluster_tests index e136d3810a..3971a39144 100755 --- a/cpp/src/tests/run_cluster_tests +++ b/cpp/src/tests/run_cluster_tests @@ -33,5 +33,5 @@ mkdir -p $OUTDIR CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I $srcdir/cluster_tests.fail} CLUSTER_TESTS=${CLUSTER_TESTS:-$*} -with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1 +with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests -m cluster2_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1 rm -rf $OUTDIR diff --git a/cpp/src/tests/test_env.sh.in b/cpp/src/tests/test_env.sh.in index b5c3b0fa3d..96fe6b64f4 100644 --- a/cpp/src/tests/test_env.sh.in +++ b/cpp/src/tests/test_env.sh.in @@ -63,6 +63,7 @@ export TEST_STORE_LIB=$testmoduledir/test_store.so exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; } exportmodule ACL_LIB acl.so exportmodule CLUSTER_LIB cluster.so +exportmodule CLUSTER2_LIB cluster2.so exportmodule REPLICATING_LISTENER_LIB replicating_listener.so exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so exportmodule SSLCONNECTOR_LIB sslconnector.so diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index 9cbad82d61..a334e2d785 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -279,4 +279,26 @@ <field name="data" type="vbin32"/> </control> </class> + + + <!-- TODO aconway 2010-10-20: Experimental classes for new cluster. --> + + <!-- Message delivery and disposition --> + <class name="cluster-message" code="0x82"> + <!-- FIXME aconway 2010-10-19: create message in fragments --> + <control name="routing" code="0x1"> + <field name="sequence" type="uint64"/> + <field name="message" type="str32"/> + </control> + + <control name="enqueue" code="0x2"> + <field name="sequence" type="uint64"/> + <field name="queue" type="str8"/> + </control> + + <control name="routed" code="0x3"> + <field name="sequence" type="uint64"/> + </control> + + </class> </amqp> |