diff options
author | Alan Conway <aconway@apache.org> | 2009-05-15 15:12:05 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-05-15 15:12:05 +0000 |
commit | e5a0aff72c3117114d2572c3e3d6e77238b2263b (patch) | |
tree | 761f1ea0a3a4632b648da8c380a53b55533da631 /cpp | |
parent | 90f49326a937bc0c767b99c922e2bcf29058ef36 (diff) | |
download | qpid-python-e5a0aff72c3117114d2572c3e3d6e77238b2263b.tar.gz |
Undo change from r774809.
This fix is incorrect. The timer will go off in each member, and each
one will send a response message which is replicated, resulting in a
response from each member being enqueued rather than a single
response.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@775182 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 17 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageHandler.h | 43 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 66 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/EventFrame.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 30 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 3 |
9 files changed, 19 insertions, 152 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 263f64f528..63ca7009d9 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -391,7 +391,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Link.cpp \ qpid/broker/LinkRegistry.cpp \ qpid/broker/Message.cpp \ - qpid/broker/MessageHandler.h \ qpid/broker/MessageAdapter.cpp \ qpid/broker/MessageBuilder.cpp \ qpid/broker/MessageStoreModule.cpp \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index d917fe7017..749489fbfd 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -154,7 +154,6 @@ Broker::Broker(const Broker::Options& conf) : queueEvents(poller), recovery(true), expiryPolicy(new ExpiryPolicy), - clusterMessageHandler(0), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { if (conf.enableMgmt) { @@ -265,7 +264,7 @@ Broker::Broker(const Broker::Options& conf) : queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC); } - // Initialize known broker urls (TODO: add support for urls SSL, RDMA, etc.) + //initialize known broker urls (TODO: add support for urls for other transports (SSL, RDMA)): if (conf.knownHosts.empty()) { boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(TCP_TRANSPORT); if (factory) { diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 32e9ce2087..8f4621bb39 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -1,5 +1,5 @@ -#ifndef QPID_BROKER_BROKER_H -#define QPID_BROKER_BROKER_H +#ifndef _Broker_ +#define _Broker_ /* * @@ -68,7 +68,6 @@ struct Url; namespace broker { class ExpiryPolicy; -class MessageHandler; static const uint16_t DEFAULT_PORT=5672; @@ -144,7 +143,6 @@ public: std::string federationTag; bool recovery; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; - MessageHandler* clusterMessageHandler; public: @@ -238,19 +236,10 @@ public: bool getRecovery() const { return recovery; } management::ManagementAgent* getManagementAgent() { return managementAgent; } - - /** Handler to route messages to queues with replication. - * Required for messages that are generated in a way the cluster - * cannot predict, e.g. as a result of a timer firing. - * - * @return 0 if not in a cluster. - */ - MessageHandler* getClusterMessageHandler() { return clusterMessageHandler; } - void setClusterMessageHandler(MessageHandler& h) { clusterMessageHandler = &h; } }; }} -#endif /*!QPID_BROKER_BROKER_H*/ +#endif /*!_Broker_*/ diff --git a/cpp/src/qpid/broker/MessageHandler.h b/cpp/src/qpid/broker/MessageHandler.h deleted file mode 100644 index b29823cc54..0000000000 --- a/cpp/src/qpid/broker/MessageHandler.h +++ /dev/null @@ -1,43 +0,0 @@ -#ifndef QPID_BROKER_MESSAGEHANDLER_H -#define QPID_BROKER_MESSAGEHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <boost/intrusive_ptr.hpp> - -namespace qpid { -namespace broker { - -class Message; - -/** - * Handler for messages. - */ -class MessageHandler -{ - public: - virtual ~MessageHandler() {} - virtual void handle(const boost::intrusive_ptr<Message>&) = 0; -}; -}} // namespace qpid::broker - -#endif /*!QPID_BROKER_MESSAGEHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 8a93773718..5f51bb9dad 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -98,7 +98,6 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/SessionState.h" -#include "qpid/framing/frame_functors.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" @@ -110,15 +109,6 @@ #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterUpdateOfferBody.h" #include "qpid/framing/ClusterUpdateRequestBody.h" - -#include "qpid/framing/ConnectionStartOkBody.h" -#include "qpid/framing/ConnectionTuneBody.h" -#include "qpid/framing/ConnectionOpenBody.h" -#include "qpid/framing/SessionAttachBody.h" -#include "qpid/framing/SessionRequestTimeoutBody.h" -#include "qpid/framing/SessionCommandPointBody.h" -#include "qpid/framing/AMQP_ClientProxy.h" - #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" #include "qpid/management/IdAllocator.h" @@ -135,7 +125,6 @@ #include <iterator> #include <map> #include <ostream> -#include <sstream> namespace qpid { namespace cluster { @@ -206,10 +195,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // Failover exchange provides membership updates to clients. failoverExchange.reset(new FailoverExchange(this)); broker.getExchanges().registerExchange(failoverExchange); + + // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange. broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); - broker.setClusterMessageHandler(*this); - if (settings.quorum) quorum.init(); + if (settings.quorum) quorum.init(); cpg.join(name); // pump the CPG dispatch manually till we get initialized. while (!initialized) @@ -744,7 +734,6 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { } void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) { - if (state == LEFT) return; // If we receive an errorCheck here, it's because we have processed past the point // of the error so respond with ERROR_TYPE_NONE assert(map.getFrameSeq() >= frameSeq); @@ -753,53 +742,4 @@ void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self); } -size_t accumulateEncodedSize(size_t total, const AMQFrame& f) { return total + f.encodedSize(); } - -// -// If the broker needs to send messages to itself in an -// unpredictable context (e.g. management messages generated when -// a timer expires) it uses "selfConnection" -// -// selfConnection behaves as a local client connection, with -// respect to replication. However instead of mcasting data from a -// client, data for the selfConnection is mcast directly from -// Cluster::handle. -// -void Cluster::handle(const boost::intrusive_ptr<broker::Message>& msg) { - // NOTE: don't take the lock here. We don't need to as mcast is thread safe, - // and locking here can cause deadlock with management locks. - // - - // Create self-connection on demand - if (selfConnection == ConnectionId()) { - QPID_LOG(debug, "Initialize self-connection"); - ostringstream name; - name << "qpid.cluster-self." << self; - ConnectionPtr selfc = new Connection(*this, shadowOut, name.str(), self, false, false); - selfConnection = selfc->getId(); - vector<AMQFrame> frames; - frames.push_back(AMQFrame((ConnectionStartOkBody()))); - frames.push_back(AMQFrame((ConnectionTuneBody(ProtocolVersion(),32767,65535,0,120)))); - frames.push_back(AMQFrame((ConnectionOpenBody()))); - frames.push_back(AMQFrame((SessionAttachBody(ProtocolVersion(), name.str(), false)))); - frames.push_back(AMQFrame(SessionRequestTimeoutBody(ProtocolVersion(), 0))); - frames.push_back(AMQFrame(SessionCommandPointBody(ProtocolVersion(), 0, 0))); - size_t size = accumulate(frames.begin(), frames.end(), 0, accumulateEncodedSize); - vector<char> store(size); - Buffer buf(store.data(), size); - for_each(frames.begin(), frames.end(), boost::bind(&AMQFrame::encode, _1, boost::ref(buf))); - assert(buf.available() == 0); - selfc->decode(store.data(), size); // Multicast - } - - QPID_LOG(trace, "Message to self on " << selfConnection << ": " << *msg->getFrames().getMethod()); - const FrameSet& frames = msg->getFrames(); - size_t size = accumulate(frames.begin(), frames.end(), 0, accumulateEncodedSize); - Event e(DATA, selfConnection, size); - Buffer buf(e.getData(), e.getSize()); - EncodeFrame encoder(buf); - msg->getFrames().map(encoder); - mcast.mcast(e); -} - }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 10d49484a8..bd401f3715 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -38,7 +38,6 @@ #include "qmf/org/apache/qpid/cluster/Cluster.h" #include "qpid/Url.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/MessageHandler.h" #include "qpid/management/Manageable.h" #include "qpid/sys/Monitor.h" @@ -65,7 +64,7 @@ class EventFrame; /** * Connection to the cluster */ -class Cluster : private Cpg::Handler, public management::Manageable, public broker::MessageHandler { +class Cluster : private Cpg::Handler, public management::Manageable { public: typedef boost::intrusive_ptr<Connection> ConnectionPtr; typedef std::vector<ConnectionPtr> ConnectionVector; @@ -114,9 +113,6 @@ class Cluster : private Cpg::Handler, public management::Manageable, public brok Decoder& getDecoder() { return decoder; } ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; } - - // Called in timer threads by management to replicate messages. - void handle(const boost::intrusive_ptr<broker::Message>&); private: typedef sys::Monitor::ScopedLock Lock; @@ -203,7 +199,6 @@ class Cluster : private Cpg::Handler, public management::Manageable, public brok const std::string name; Url myUrl; const MemberId self; - ConnectionId selfConnection; framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; qpid::management::ManagementAgent* mAgent; diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h index 752d32be17..e275aac7aa 100644 --- a/cpp/src/qpid/cluster/EventFrame.h +++ b/cpp/src/qpid/cluster/EventFrame.h @@ -43,7 +43,6 @@ struct EventFrame bool isCluster() const { return connectionId.getNumber() == 0; } bool isConnection() const { return connectionId.getNumber() != 0; } - bool isControl() const { return type == CONTROL; } bool isLastInEvent() const { return readCredit; } MemberId getMemberId() const { return connectionId.getMember(); } diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 3063c5f44c..8dce82ba84 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -25,7 +25,6 @@ #include "qpid/broker/DeliverableMessage.h" #include "qpid/log/Statement.h" #include <qpid/broker/Message.h> -#include <qpid/broker/MessageHandler.h> #include "qpid/framing/MessageTransferBody.h" #include "qpid/sys/Time.h" #include "qpid/broker/ConnectionState.h" @@ -265,11 +264,9 @@ bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) } void ManagementAgent::sendBuffer(Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - string routingKey, - bool isPredictable) - + uint32_t length, + qpid::broker::Exchange::shared_ptr exchange, + string routingKey) { if (exchange.get() == 0) return; @@ -289,21 +286,14 @@ void ManagementAgent::sendBuffer(Buffer& buf, msg->getFrames().append(method); msg->getFrames().append(header); - DeliveryProperties* delivery = msg->getFrames().getHeaders()->get<DeliveryProperties>(true); - delivery->setRoutingKey(routingKey); - - MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); + MessageProperties* props = + msg->getFrames().getHeaders()->get<MessageProperties>(true); props->setContentLength(length); msg->getFrames().append(content); + DeliverableMessage deliverable (msg); try { - if (!isPredictable && broker->getClusterMessageHandler()) { - broker->getClusterMessageHandler()->handle(msg); - } - else { - DeliverableMessage deliverable (msg); - exchange->route(deliverable, routingKey, 0); - } + exchange->route(deliverable, routingKey, 0); } catch(exception&) {} } @@ -357,7 +347,7 @@ void ManagementAgent::periodicProcessing (void) contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "console.obj.1.0." + object->getPackageName() + "." + object->getClassName(); - sendBuffer (msgBuffer, contentSize, mExchange, routingKey, false); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { @@ -368,7 +358,7 @@ void ManagementAgent::periodicProcessing (void) contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "console.obj.1.0." + object->getPackageName() + "." + object->getClassName(); - sendBuffer (msgBuffer, contentSize, mExchange, routingKey, false); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } if (object->isDeleted()) @@ -397,7 +387,7 @@ void ManagementAgent::periodicProcessing (void) contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "console.heartbeat.1.0"; - sendBuffer (msgBuffer, contentSize, mExchange, routingKey, false); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } } diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index 1216679f0e..2411e6c277 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -204,8 +204,7 @@ private: void sendBuffer (framing::Buffer& buf, uint32_t length, qpid::broker::Exchange::shared_ptr exchange, - std::string routingKey, - bool isPredictable=true); + std::string routingKey); void moveNewObjectsLH(); bool authorizeAgentMessageLH(qpid::broker::Message& msg); |