summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp157
1 files changed, 115 insertions, 42 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 792f9f65f4..394749aad2 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -24,6 +24,8 @@
#include "Cluster.h"
#include "UpdateReceiver.h"
#include "qpid/assert.h"
+#include "qpid/broker/DtxAck.h"
+#include "qpid/broker/DtxBuffer.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/TxBuffer.h"
@@ -97,7 +99,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
external,
isLink,
isCatchUp ? ++catchUpId : 0,
- isCatchUp), // isCatchUp => shadow
+ // The first catch-up connection is not considered a shadow
+ // as it needs to be authenticated.
+ isCatchUp && self.second > 1),
expectProtocolHeader(isLink),
mcastFrameHandler(cluster.getMulticast(), self),
updateIn(c.getUpdateReceiver()),
@@ -114,7 +118,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
if (!updateIn.nextShadowMgmtId.empty())
connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
updateIn.nextShadowMgmtId.clear();
- }
+ }
init();
QPID_LOG(debug, cluster << " local connection " << *this);
}
@@ -167,7 +171,7 @@ void Connection::announce(
AMQFrame frame;
while (frame.decode(buf))
connection->received(frame);
- connection->setUserId(username);
+ connection->setUserId(username);
}
// Do managment actions now that the connection is replicated.
connection->raiseConnectEvent();
@@ -194,7 +198,7 @@ void Connection::received(framing::AMQFrame& f) {
<< *this << ": " << f);
return;
}
- QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
+ QPID_LOG_IF(trace, Cluster::loggable(f), cluster << " RECV " << *this << ": " << f);
if (isLocal()) { // Local catch-up connection.
currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled())
@@ -214,16 +218,9 @@ void Connection::received(framing::AMQFrame& f) {
}
}
-bool Connection::checkUnsupported(const AMQBody& body) {
- std::string message;
- if (body.getMethod()) {
- switch (body.getMethod()->amqpClassId()) {
- case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
- }
- }
- if (!message.empty())
- connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message);
- return !message.empty();
+bool Connection::checkUnsupported(const AMQBody&) {
+ // Throw an exception for unsupported commands. Currently all are supported.
+ return false;
}
struct GiveReadCreditOnExit {
@@ -424,7 +421,8 @@ void Connection::sessionState(
const SequenceNumber& expected,
const SequenceNumber& received,
const SequenceSet& unknownCompleted,
- const SequenceSet& receivedIncomplete)
+ const SequenceSet& receivedIncomplete,
+ bool dtxSelected)
{
sessionState().setState(
replayStart,
@@ -434,7 +432,9 @@ void Connection::sessionState(
received,
unknownCompleted,
receivedIncomplete);
- QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
+ if (dtxSelected) semanticState().selectDtx();
+ QPID_LOG(debug, cluster << " received session state update for "
+ << sessionState().getId());
// The output tasks will be added later in the update process.
connection->getOutputTasks().removeAll();
}
@@ -464,11 +464,24 @@ void Connection::shadowReady(
output.setSendMax(sendMax);
}
+void Connection::setDtxBuffer(const UpdateReceiver::DtxBufferRef& bufRef) {
+ broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+ broker::DtxWorkRecord* record = mgr.getWork(bufRef.xid);
+ broker::DtxBuffer::shared_ptr buffer = (*record)[bufRef.index];
+ if (bufRef.suspended)
+ bufRef.semanticState->getSuspendedXids()[bufRef.xid] = buffer;
+ else
+ bufRef.semanticState->setDtxBuffer(buffer);
+}
+
+// Marks the end of the update.
void Connection::membership(const FieldTable& joiners, const FieldTable& members,
const framing::SequenceNumber& frameSeq)
{
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
updateIn.consumerNumbering.clear();
+ for_each(updateIn.dtxBuffers.begin(), updateIn.dtxBuffers.end(),
+ boost::bind(&Connection::setDtxBuffer, this, _1));
closeUpdated();
cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
}
@@ -536,8 +549,16 @@ void Connection::deliveryRecord(const string& qname,
} else { // Message at original position in original queue
queue->find(position, m);
}
- if (!m.payload)
- throw Exception(QPID_MSG("deliveryRecord no update message"));
+ // FIXME aconway 2011-08-19: removed:
+ // if (!m.payload)
+ // throw Exception(QPID_MSG("deliveryRecord no update message"));
+ //
+ // It seems this could happen legitimately in the case one
+ // session browses message M, then another session acquires
+ // it. In that case the browsers delivery record is !acquired
+ // but the message is not on its original Queue. In that case
+ // we'll get a deliveryRecord with no payload for the browser.
+ //
}
broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
@@ -545,7 +566,11 @@ void Connection::deliveryRecord(const string& qname,
if (cancelled) dr.cancel(dr.getTag());
if (completed) dr.complete();
if (ended) dr.setEnded(); // Exsitance of message
- semanticState().record(dr); // Part of the session's unacked list.
+
+ if (dtxBuffer) // Record for next dtx-ack
+ dtxAckRecords.push_back(dr);
+ else
+ semanticState().record(dr); // Record on session's unacked list.
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
@@ -561,29 +586,29 @@ void Connection::queueFairshareState(const std::string& qname, const uint8_t pri
namespace {
- // find a StatefulQueueObserver that matches a given identifier
- class ObserverFinder {
- const std::string id;
- boost::shared_ptr<broker::QueueObserver> target;
- ObserverFinder(const ObserverFinder&) {}
- public:
- ObserverFinder(const std::string& _id) : id(_id) {}
- broker::StatefulQueueObserver *getObserver()
- {
- if (target)
- return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
- return 0;
- }
- void operator() (boost::shared_ptr<broker::QueueObserver> o)
- {
- if (!target) {
- broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
- if (p && p->getId() == id) {
- target = o;
- }
+// find a StatefulQueueObserver that matches a given identifier
+class ObserverFinder {
+ const std::string id;
+ boost::shared_ptr<broker::QueueObserver> target;
+ ObserverFinder(const ObserverFinder&) {}
+ public:
+ ObserverFinder(const std::string& _id) : id(_id) {}
+ broker::StatefulQueueObserver *getObserver()
+ {
+ if (target)
+ return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
+ return 0;
+ }
+ void operator() (boost::shared_ptr<broker::QueueObserver> o)
+ {
+ if (!target) {
+ broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+ if (p && p->getId() == id) {
+ target = o;
}
}
- };
+ }
+};
}
@@ -615,6 +640,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) {
void Connection::txStart() {
txBuffer.reset(new broker::TxBuffer());
}
+
void Connection::txAccept(const framing::SequenceSet& acked) {
txBuffer->enlist(boost::shared_ptr<broker::TxAccept>(
new broker::TxAccept(acked, semanticState().getUnacked())));
@@ -630,8 +656,10 @@ void Connection::txEnqueue(const std::string& queue) {
new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload)));
}
-void Connection::txPublish(const framing::Array& queues, bool delivered) {
- boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload));
+void Connection::txPublish(const framing::Array& queues, bool delivered)
+{
+ boost::shared_ptr<broker::TxPublish> txPub(
+ new broker::TxPublish(getUpdateMessage().payload));
for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
txPub->deliverTo(findQueue((*i)->get<std::string>()));
txPub->delivered = delivered;
@@ -646,6 +674,51 @@ void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) {
semanticState().setAccumulatedAck(s);
}
+void Connection::dtxStart(const std::string& xid,
+ bool ended,
+ bool suspended,
+ bool failed,
+ bool expired)
+{
+ dtxBuffer.reset(new broker::DtxBuffer(xid, ended, suspended, failed, expired));
+ txBuffer = dtxBuffer;
+}
+
+void Connection::dtxEnd() {
+ broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+ std::string xid = dtxBuffer->getXid();
+ if (mgr.exists(xid))
+ mgr.join(xid, dtxBuffer);
+ else
+ mgr.start(xid, dtxBuffer);
+ dtxBuffer.reset();
+ txBuffer.reset();
+}
+
+// Sent after all DeliveryRecords for a dtx-ack have been collected in dtxAckRecords
+void Connection::dtxAck() {
+ dtxBuffer->enlist(
+ boost::shared_ptr<broker::DtxAck>(new broker::DtxAck(dtxAckRecords)));
+ dtxAckRecords.clear();
+}
+
+void Connection::dtxBufferRef(const std::string& xid, uint32_t index, bool suspended) {
+ // Save the association between DtxBuffers and the session so we
+ // can set the DtxBuffers at the end of the update when the
+ // DtxManager has been replicated.
+ updateIn.dtxBuffers.push_back(
+ UpdateReceiver::DtxBufferRef(xid, index, suspended, &semanticState()));
+}
+
+// Sent at end of work record.
+void Connection::dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout)
+{
+ broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+ if (timeout) mgr.setTimeout(xid, timeout);
+ if (prepared) mgr.prepare(xid);
+}
+
+
void Connection::exchange(const std::string& encoded) {
Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);