diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 25 |
1 files changed, 17 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 82ed8bf8c9..7432fbbc33 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -57,12 +57,12 @@ * - management::ManagementBroker: uses MessageHandler supplied by cluster * to send messages to the broker via the cluster. * - * - Dtx: not yet supported with cluster. - * - * cluster::ExpiryPolicy implements the strategy for message expiry. + * cluster::ExpiryPolicy uses cluster time. * * ClusterTimer implements periodic timed events in the cluster context. - * Used for periodic management events. + * Used for: + * - periodic management events. + * - DTX transaction timeouts. * * <h1>CLUSTER PROTOCOL OVERVIEW</h1> * @@ -199,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1128070; +const uint32_t Cluster::CLUSTER_VERSION = 1159329; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -526,7 +526,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) { void Cluster::processFrame(const EventFrame& e, Lock& l) { if (e.isCluster()) { - QPID_LOG(trace, *this << " DLVR: " << e); + QPID_LOG_IF(trace, loggable(e.frame), *this << " DLVR: " << e); ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); @@ -535,14 +535,15 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) { map.incrementFrameSeq(); ConnectionPtr connection = getConnection(e, l); if (connection) { - QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); + QPID_LOG_IF(trace, loggable(e.frame), + *this << " DLVR " << map.getFrameSeq() << ": " << e); connection->deliveredFrame(e); } else throw Exception(QPID_MSG("Unknown connection: " << e)); } else // Drop connection frames while state < CATCHUP - QPID_LOG(trace, *this << " DROP (joining): " << e); + QPID_LOG_IF(trace, loggable(e.frame), *this << " DROP (joining): " << e); } // Called in deliverFrameQueue thread @@ -1219,4 +1220,12 @@ bool Cluster::deferDeliveryImpl(const std::string& queue, return true; } +bool Cluster::loggable(const AMQFrame& f) { + const AMQMethodBody* method = (f.getMethod()); + if (!method) return true; // Not a method + bool isClock = method->amqpClassId() == ClusterClockBody::CLASS_ID + && method->amqpMethodId() == ClusterClockBody::METHOD_ID; + return !isClock; +} + }} // namespace qpid::cluster |