diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 50 |
1 files changed, 30 insertions, 20 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 8037559c3d..cc6c8a3f30 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -46,18 +46,13 @@ namespace qpid { namespace ha { using namespace broker; using namespace framing; +using namespace framing::execution; using namespace std; using std::exception; using sys::Mutex; const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -namespace { -const string QPID_HA(QPID_HA_PREFIX); -const std::string TYPE_NAME(QPID_HA+"queue-replicator"); -} - - std::string QueueReplicator::replicatorName(const std::string& queueName) { return QUEUE_REPLICATOR_PREFIX + queueName; } @@ -68,20 +63,21 @@ bool QueueReplicator::isReplicatorName(const std::string& name) { class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { public: - ErrorListener(const std::string& prefix) : logPrefix(prefix) {} - void connectionException(framing::connection::CloseCode, const std::string& msg) { - QPID_LOG(error, logPrefix << "Connection error: " << msg); - } - void channelException(framing::session::DetachCode, const std::string& msg) { - QPID_LOG(error, logPrefix << "Channel error: " << msg); - } - void executionException(framing::execution::ErrorCode, const std::string& msg) { - QPID_LOG(error, logPrefix << "Execution error: " << msg); + ErrorListener(const boost::shared_ptr<QueueReplicator>& qr) + : queueReplicator(qr), logPrefix(qr->logPrefix) {} + + void connectionException(framing::connection::CloseCode, const std::string&) {} + void channelException(framing::session::DetachCode, const std::string&) {} + void executionException(framing::execution::ErrorCode, const std::string&) {} + + void incomingExecutionException(ErrorCode e, const std::string& msg) { + queueReplicator->incomingExecutionException(e, msg); } void detach() { QPID_LOG(debug, logPrefix << "Session detached"); } private: + boost::shared_ptr<QueueReplicator> queueReplicator; std::string logPrefix; }; @@ -128,6 +124,8 @@ QueueReplicator::QueueReplicator(HaBroker& hb, boost::bind(&QueueReplicator::idEvent, this, _1, _2); } +QueueReplicator::~QueueReplicator() {} + // This must be called immediately after the constructor. // It has to be separate so we can call shared_from_this(). void QueueReplicator::activate() { @@ -161,7 +159,7 @@ void QueueReplicator::activate() { ); bridge = result.first; bridge->setErrorListener( - boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix))); + boost::shared_ptr<ErrorListener>(new ErrorListener(shared_from_this()))); // Enable callback to destroy() queue->addObserver( @@ -173,8 +171,6 @@ void QueueReplicator::disconnect() { sessionHandler = 0; } -QueueReplicator::~QueueReplicator() {} - // Called from Queue::destroyed() void QueueReplicator::destroy() { boost::shared_ptr<Bridge> bridge2; // To call outside of lock @@ -200,7 +196,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa AMQP_ServerProxy peer(sessionHandler->out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); FieldTable arguments; - arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); + arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType()); arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize? arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable()); arguments.setString(ReplicatingSubscription::QPID_ID_SET, @@ -289,12 +285,26 @@ ReplicationId QueueReplicator::getMaxId() { return maxId; } +void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) { + if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) { + // If the queue is destroyed at the same time we are subscribing, we may + // get a not-found or resource-deleted exception before the + // BrokerReplicator gets the queue-delete event. Shut down the bridge by + // calling destroy(), we can let the BrokerReplicator delete the queue + // when the queue-delete arrives. + QPID_LOG(debug, logPrefix << "Deleted on primary: " << msg); + destroy(); + } + else + QPID_LOG(error, logPrefix << "Incoming execution exception: " << msg); +} + // Unused Exchange methods. bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; } bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; } bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; } bool QueueReplicator::hasBindings() { return false; } -std::string QueueReplicator::getType() const { return TYPE_NAME; } +std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; } }} // namespace qpid::broker |