summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp50
1 files changed, 30 insertions, 20 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 8037559c3d..cc6c8a3f30 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -46,18 +46,13 @@ namespace qpid {
namespace ha {
using namespace broker;
using namespace framing;
+using namespace framing::execution;
using namespace std;
using std::exception;
using sys::Mutex;
const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
-namespace {
-const string QPID_HA(QPID_HA_PREFIX);
-const std::string TYPE_NAME(QPID_HA+"queue-replicator");
-}
-
-
std::string QueueReplicator::replicatorName(const std::string& queueName) {
return QUEUE_REPLICATOR_PREFIX + queueName;
}
@@ -68,20 +63,21 @@ bool QueueReplicator::isReplicatorName(const std::string& name) {
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
public:
- ErrorListener(const std::string& prefix) : logPrefix(prefix) {}
- void connectionException(framing::connection::CloseCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Connection error: " << msg);
- }
- void channelException(framing::session::DetachCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Channel error: " << msg);
- }
- void executionException(framing::execution::ErrorCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Execution error: " << msg);
+ ErrorListener(const boost::shared_ptr<QueueReplicator>& qr)
+ : queueReplicator(qr), logPrefix(qr->logPrefix) {}
+
+ void connectionException(framing::connection::CloseCode, const std::string&) {}
+ void channelException(framing::session::DetachCode, const std::string&) {}
+ void executionException(framing::execution::ErrorCode, const std::string&) {}
+
+ void incomingExecutionException(ErrorCode e, const std::string& msg) {
+ queueReplicator->incomingExecutionException(e, msg);
}
void detach() {
QPID_LOG(debug, logPrefix << "Session detached");
}
private:
+ boost::shared_ptr<QueueReplicator> queueReplicator;
std::string logPrefix;
};
@@ -128,6 +124,8 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
boost::bind(&QueueReplicator::idEvent, this, _1, _2);
}
+QueueReplicator::~QueueReplicator() {}
+
// This must be called immediately after the constructor.
// It has to be separate so we can call shared_from_this().
void QueueReplicator::activate() {
@@ -161,7 +159,7 @@ void QueueReplicator::activate() {
);
bridge = result.first;
bridge->setErrorListener(
- boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix)));
+ boost::shared_ptr<ErrorListener>(new ErrorListener(shared_from_this())));
// Enable callback to destroy()
queue->addObserver(
@@ -173,8 +171,6 @@ void QueueReplicator::disconnect() {
sessionHandler = 0;
}
-QueueReplicator::~QueueReplicator() {}
-
// Called from Queue::destroyed()
void QueueReplicator::destroy() {
boost::shared_ptr<Bridge> bridge2; // To call outside of lock
@@ -200,7 +196,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
AMQP_ServerProxy peer(sessionHandler->out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
FieldTable arguments;
- arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
+ arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType());
arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
arguments.setString(ReplicatingSubscription::QPID_ID_SET,
@@ -289,12 +285,26 @@ ReplicationId QueueReplicator::getMaxId() {
return maxId;
}
+void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) {
+ if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) {
+ // If the queue is destroyed at the same time we are subscribing, we may
+ // get a not-found or resource-deleted exception before the
+ // BrokerReplicator gets the queue-delete event. Shut down the bridge by
+ // calling destroy(), we can let the BrokerReplicator delete the queue
+ // when the queue-delete arrives.
+ QPID_LOG(debug, logPrefix << "Deleted on primary: " << msg);
+ destroy();
+ }
+ else
+ QPID_LOG(error, logPrefix << "Incoming execution exception: " << msg);
+}
+
// Unused Exchange methods.
bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
bool QueueReplicator::hasBindings() { return false; }
-std::string QueueReplicator::getType() const { return TYPE_NAME; }
+std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; }
}} // namespace qpid::broker