summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-05-14 15:14:32 +0000
committerAlan Conway <aconway@apache.org>2009-05-14 15:14:32 +0000
commitf5ba512c1d74e1d777b8e926f5424638daa480ab (patch)
treee0e010c0ef2b47bed7d050ea0c47805c23285a9e /cpp
parent9a9d179a3b16703d2ae4f5d85cb9f87dc92495d5 (diff)
downloadqpid-python-f5ba512c1d74e1d777b8e926f5424638daa480ab.tar.gz
Fix for unpredictable enqueues by timer-triggered management code in a cluster.
ManagementAgent uses Broker::getClusterMessageHandler() (if non-0) to enqueue timer-triggered messages. Cluster provides handler that enqueues via cluster multicast. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@774809 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/qpid/broker/Broker.cpp3
-rw-r--r--cpp/src/qpid/broker/Broker.h17
-rw-r--r--cpp/src/qpid/broker/MessageHandler.h43
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp134
-rw-r--r--cpp/src/qpid/cluster/Cluster.h7
-rw-r--r--cpp/src/qpid/cluster/EventFrame.h1
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp30
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h3
9 files changed, 220 insertions, 19 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 63ca7009d9..263f64f528 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -391,6 +391,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Link.cpp \
qpid/broker/LinkRegistry.cpp \
qpid/broker/Message.cpp \
+ qpid/broker/MessageHandler.h \
qpid/broker/MessageAdapter.cpp \
qpid/broker/MessageBuilder.cpp \
qpid/broker/MessageStoreModule.cpp \
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 749489fbfd..d917fe7017 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -154,6 +154,7 @@ Broker::Broker(const Broker::Options& conf) :
queueEvents(poller),
recovery(true),
expiryPolicy(new ExpiryPolicy),
+ clusterMessageHandler(0),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
if (conf.enableMgmt) {
@@ -264,7 +265,7 @@ Broker::Broker(const Broker::Options& conf) :
queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC);
}
- //initialize known broker urls (TODO: add support for urls for other transports (SSL, RDMA)):
+ // Initialize known broker urls (TODO: add support for urls SSL, RDMA, etc.)
if (conf.knownHosts.empty()) {
boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(TCP_TRANSPORT);
if (factory) {
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 8f4621bb39..32e9ce2087 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -1,5 +1,5 @@
-#ifndef _Broker_
-#define _Broker_
+#ifndef QPID_BROKER_BROKER_H
+#define QPID_BROKER_BROKER_H
/*
*
@@ -68,6 +68,7 @@ struct Url;
namespace broker {
class ExpiryPolicy;
+class MessageHandler;
static const uint16_t DEFAULT_PORT=5672;
@@ -143,6 +144,7 @@ public:
std::string federationTag;
bool recovery;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+ MessageHandler* clusterMessageHandler;
public:
@@ -236,10 +238,19 @@ public:
bool getRecovery() const { return recovery; }
management::ManagementAgent* getManagementAgent() { return managementAgent; }
+
+ /** Handler to route messages to queues with replication.
+ * Required for messages that are generated in a way the cluster
+ * cannot predict, e.g. as a result of a timer firing.
+ *
+ * @return 0 if not in a cluster.
+ */
+ MessageHandler* getClusterMessageHandler() { return clusterMessageHandler; }
+ void setClusterMessageHandler(MessageHandler& h) { clusterMessageHandler = &h; }
};
}}
-#endif /*!_Broker_*/
+#endif /*!QPID_BROKER_BROKER_H*/
diff --git a/cpp/src/qpid/broker/MessageHandler.h b/cpp/src/qpid/broker/MessageHandler.h
new file mode 100644
index 0000000000..b29823cc54
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageHandler.h
@@ -0,0 +1,43 @@
+#ifndef QPID_BROKER_MESSAGEHANDLER_H
+#define QPID_BROKER_MESSAGEHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+class Message;
+
+/**
+ * Handler for messages.
+ */
+class MessageHandler
+{
+ public:
+ virtual ~MessageHandler() {}
+ virtual void handle(const boost::intrusive_ptr<Message>&) = 0;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_MESSAGEHANDLER_H*/
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 1f39fe9ae9..8a93773718 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -16,6 +16,74 @@
*
*/
+/** CLUSTER IMPLEMENTATION OVERVIEW
+ *
+ * The cluster works on the principle that if all members of the
+ * cluster receive identical input, they will all produce identical
+ * results. cluster::Connections intercept data received from clients
+ * and multicast it via CPG. The data is processed (passed to the
+ * broker::Connection) only when it is received from CPG in cluster
+ * order. Each cluster member has Connection objects for directly
+ * connected clients and "shadow" Connection objects for connections
+ * to other members.
+ *
+ * This assumes that all broker actions occur deterministically in
+ * response to data arriving on client connections. There are two
+ * situations where this assumption fails:
+ * - sending data in response to polling local connections for writabiliy.
+ * - taking actions based on a timer or timestamp comparison.
+ *
+ * IMPORTANT NOTE: any time code is added to the broker that uses timers,
+ * the cluster may need to be updated to take account of this.
+ *
+ *
+ * USE OF TIMESTAMPS IN THE BROKER
+ *
+ * The following are the current areas where broker uses timers or timestamps:
+ *
+ * - Producer flow control: broker::SemanticState uses connection::getClusterOrderOutput.
+ * a FrameHandler that sends frames to the client via the cluster. Used by broker::SessionState
+ *
+ * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is implemented by cluster::ExpiryPolicy.
+ *
+ * - Connection heartbeat: sends connection controls, not part of session command counting so OK to ignore.
+ *
+ * - LinkRegistry: only cluster elder is ever active for links.
+ *
+ * - management::ManagementBroker: uses MessageHandler supplied by cluster
+ * to send messages to the broker via the cluster.
+ *
+ * - Dtx: not yet supported with cluster.
+ *
+ * cluster::ExpiryPolicy implements the strategy for message expiry.
+ *
+ * CLUSTER PROTOCOL OVERVIEW
+ *
+ * Messages sent to/from CPG are called Events.
+ *
+ * An Event carries a ConnectionId, which includes a MemberId and a
+ * connection number.
+ *
+ * Events are either
+ * - Connection events: non-0 connection number and are associated with a connection.
+ * - Cluster Events: 0 connection number, are not associated with a connectin.
+ *
+ * Events are further categorized as:
+ * - Control: carries method frame(s) that affect cluster behavior.
+ * - Data: carries raw data received from a client connection.
+ *
+ * The cluster defines extensions to the AMQP command set in ../../../xml/cluster.xml
+ * which defines two classes:
+ * - cluster: cluster control information.
+ * - cluster.connection: control information for a specific connection.
+ *
+ * The following combinations are legal:
+ * - Data frames carrying connection data.
+ * - Cluster control events carrying cluster commands.
+ * - Connection control events carrying cluster.connection commands.
+ * - Connection control events carrying non-cluster frames: frames sent to the client.
+ * e.g. flow-control frames generated on a timer.
+ */
#include "Cluster.h"
#include "ClusterSettings.h"
#include "Connection.h"
@@ -30,6 +98,7 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/SessionState.h"
+#include "qpid/framing/frame_functors.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
@@ -41,6 +110,15 @@
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterUpdateOfferBody.h"
#include "qpid/framing/ClusterUpdateRequestBody.h"
+
+#include "qpid/framing/ConnectionStartOkBody.h"
+#include "qpid/framing/ConnectionTuneBody.h"
+#include "qpid/framing/ConnectionOpenBody.h"
+#include "qpid/framing/SessionAttachBody.h"
+#include "qpid/framing/SessionRequestTimeoutBody.h"
+#include "qpid/framing/SessionCommandPointBody.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
#include "qpid/management/IdAllocator.h"
@@ -57,6 +135,7 @@
#include <iterator>
#include <map>
#include <ostream>
+#include <sstream>
namespace qpid {
namespace cluster {
@@ -127,11 +206,10 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// Failover exchange provides membership updates to clients.
failoverExchange.reset(new FailoverExchange(this));
broker.getExchanges().registerExchange(failoverExchange);
-
- // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange.
broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
-
+ broker.setClusterMessageHandler(*this);
if (settings.quorum) quorum.init();
+
cpg.join(name);
// pump the CPG dispatch manually till we get initialized.
while (!initialized)
@@ -666,6 +744,7 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
}
void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) {
+ if (state == LEFT) return;
// If we receive an errorCheck here, it's because we have processed past the point
// of the error so respond with ERROR_TYPE_NONE
assert(map.getFrameSeq() >= frameSeq);
@@ -674,4 +753,53 @@ void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock
ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
}
+size_t accumulateEncodedSize(size_t total, const AMQFrame& f) { return total + f.encodedSize(); }
+
+//
+// If the broker needs to send messages to itself in an
+// unpredictable context (e.g. management messages generated when
+// a timer expires) it uses "selfConnection"
+//
+// selfConnection behaves as a local client connection, with
+// respect to replication. However instead of mcasting data from a
+// client, data for the selfConnection is mcast directly from
+// Cluster::handle.
+//
+void Cluster::handle(const boost::intrusive_ptr<broker::Message>& msg) {
+ // NOTE: don't take the lock here. We don't need to as mcast is thread safe,
+ // and locking here can cause deadlock with management locks.
+ //
+
+ // Create self-connection on demand
+ if (selfConnection == ConnectionId()) {
+ QPID_LOG(debug, "Initialize self-connection");
+ ostringstream name;
+ name << "qpid.cluster-self." << self;
+ ConnectionPtr selfc = new Connection(*this, shadowOut, name.str(), self, false, false);
+ selfConnection = selfc->getId();
+ vector<AMQFrame> frames;
+ frames.push_back(AMQFrame((ConnectionStartOkBody())));
+ frames.push_back(AMQFrame((ConnectionTuneBody(ProtocolVersion(),32767,65535,0,120))));
+ frames.push_back(AMQFrame((ConnectionOpenBody())));
+ frames.push_back(AMQFrame((SessionAttachBody(ProtocolVersion(), name.str(), false))));
+ frames.push_back(AMQFrame(SessionRequestTimeoutBody(ProtocolVersion(), 0)));
+ frames.push_back(AMQFrame(SessionCommandPointBody(ProtocolVersion(), 0, 0)));
+ size_t size = accumulate(frames.begin(), frames.end(), 0, accumulateEncodedSize);
+ vector<char> store(size);
+ Buffer buf(store.data(), size);
+ for_each(frames.begin(), frames.end(), boost::bind(&AMQFrame::encode, _1, boost::ref(buf)));
+ assert(buf.available() == 0);
+ selfc->decode(store.data(), size); // Multicast
+ }
+
+ QPID_LOG(trace, "Message to self on " << selfConnection << ": " << *msg->getFrames().getMethod());
+ const FrameSet& frames = msg->getFrames();
+ size_t size = accumulate(frames.begin(), frames.end(), 0, accumulateEncodedSize);
+ Event e(DATA, selfConnection, size);
+ Buffer buf(e.getData(), e.getSize());
+ EncodeFrame encoder(buf);
+ msg->getFrames().map(encoder);
+ mcast.mcast(e);
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index bd401f3715..10d49484a8 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -38,6 +38,7 @@
#include "qmf/org/apache/qpid/cluster/Cluster.h"
#include "qpid/Url.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/MessageHandler.h"
#include "qpid/management/Manageable.h"
#include "qpid/sys/Monitor.h"
@@ -64,7 +65,7 @@ class EventFrame;
/**
* Connection to the cluster
*/
-class Cluster : private Cpg::Handler, public management::Manageable {
+class Cluster : private Cpg::Handler, public management::Manageable, public broker::MessageHandler {
public:
typedef boost::intrusive_ptr<Connection> ConnectionPtr;
typedef std::vector<ConnectionPtr> ConnectionVector;
@@ -113,6 +114,9 @@ class Cluster : private Cpg::Handler, public management::Manageable {
Decoder& getDecoder() { return decoder; }
ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; }
+
+ // Called in timer threads by management to replicate messages.
+ void handle(const boost::intrusive_ptr<broker::Message>&);
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -199,6 +203,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
const std::string name;
Url myUrl;
const MemberId self;
+ ConnectionId selfConnection;
framing::Uuid clusterId;
NoOpConnectionOutputHandler shadowOut;
qpid::management::ManagementAgent* mAgent;
diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h
index e275aac7aa..752d32be17 100644
--- a/cpp/src/qpid/cluster/EventFrame.h
+++ b/cpp/src/qpid/cluster/EventFrame.h
@@ -43,6 +43,7 @@ struct EventFrame
bool isCluster() const { return connectionId.getNumber() == 0; }
bool isConnection() const { return connectionId.getNumber() != 0; }
+ bool isControl() const { return type == CONTROL; }
bool isLastInEvent() const { return readCredit; }
MemberId getMemberId() const { return connectionId.getMember(); }
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 8dce82ba84..3063c5f44c 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -25,6 +25,7 @@
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/log/Statement.h"
#include <qpid/broker/Message.h>
+#include <qpid/broker/MessageHandler.h>
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/sys/Time.h"
#include "qpid/broker/ConnectionState.h"
@@ -264,9 +265,11 @@ bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
}
void ManagementAgent::sendBuffer(Buffer& buf,
- uint32_t length,
- qpid::broker::Exchange::shared_ptr exchange,
- string routingKey)
+ uint32_t length,
+ qpid::broker::Exchange::shared_ptr exchange,
+ string routingKey,
+ bool isPredictable)
+
{
if (exchange.get() == 0)
return;
@@ -286,14 +289,21 @@ void ManagementAgent::sendBuffer(Buffer& buf,
msg->getFrames().append(method);
msg->getFrames().append(header);
- MessageProperties* props =
- msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ DeliveryProperties* delivery = msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ delivery->setRoutingKey(routingKey);
+
+ MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
props->setContentLength(length);
msg->getFrames().append(content);
- DeliverableMessage deliverable (msg);
try {
- exchange->route(deliverable, routingKey, 0);
+ if (!isPredictable && broker->getClusterMessageHandler()) {
+ broker->getClusterMessageHandler()->handle(msg);
+ }
+ else {
+ DeliverableMessage deliverable (msg);
+ exchange->route(deliverable, routingKey, 0);
+ }
} catch(exception&) {}
}
@@ -347,7 +357,7 @@ void ManagementAgent::periodicProcessing (void)
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
routingKey = "console.obj.1.0." + object->getPackageName() + "." + object->getClassName();
- sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ sendBuffer (msgBuffer, contentSize, mExchange, routingKey, false);
}
if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) {
@@ -358,7 +368,7 @@ void ManagementAgent::periodicProcessing (void)
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
routingKey = "console.obj.1.0." + object->getPackageName() + "." + object->getClassName();
- sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ sendBuffer (msgBuffer, contentSize, mExchange, routingKey, false);
}
if (object->isDeleted())
@@ -387,7 +397,7 @@ void ManagementAgent::periodicProcessing (void)
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
routingKey = "console.heartbeat.1.0";
- sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ sendBuffer (msgBuffer, contentSize, mExchange, routingKey, false);
}
}
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index 2411e6c277..1216679f0e 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -204,7 +204,8 @@ private:
void sendBuffer (framing::Buffer& buf,
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
- std::string routingKey);
+ std::string routingKey,
+ bool isPredictable=true);
void moveNewObjectsLH();
bool authorizeAgentMessageLH(qpid::broker::Message& msg);