diff options
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/replication/ReplicationExchange.cpp | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/replication/constants.h | 1 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/replication_test | 49 |
6 files changed, 78 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index ee5831fed0..3c8c237b98 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1012,6 +1012,10 @@ void Queue::setPosition(SequenceNumber n) { sequence = n; } +SequenceNumber Queue::getPosition() { + return sequence; +} + int Queue::getEventMode() { return eventMode; } void Queue::setQueueEventManager(QueueEvents& mgr) diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 475766ae30..6703d06bbb 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -324,6 +324,9 @@ namespace qpid { * Used by cluster to replicate queues. */ void setPosition(framing::SequenceNumber pos); + /** return current position sequence number for the next message on the queue. + */ + framing::SequenceNumber getPosition(); int getEventMode(); void setQueueEventManager(QueueEvents&); QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key); diff --git a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp index 1a3ce1c069..b7d52372f4 100644 --- a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp +++ b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp @@ -72,6 +72,7 @@ void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueu FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders(); headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName()); headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE); + headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position); route(msg); } diff --git a/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp b/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp index c0cc36efe3..053335e6ad 100644 --- a/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp +++ b/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp @@ -83,15 +83,27 @@ void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); Queue::shared_ptr queue = queues.find(queueName); if (queue) { - FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders(); - headers.erase(REPLICATION_TARGET_QUEUE); - headers.erase(REPLICATION_EVENT_SEQNO); - headers.erase(REPLICATION_EVENT_TYPE); - msg.deliverTo(queue); - QPID_LOG(debug, "Enqueued replicated message onto " << queueName); - if (mgmtExchange != 0) { - mgmtExchange->inc_msgRoutes(); - mgmtExchange->inc_byteRoutes( msg.contentSize()); + + SequenceNumber seqno1(args->getAsInt(QUEUE_MESSAGE_POSITION)); + + if (queue->getPosition() > seqno1) // test queue.pos < seqnumber + { + QPID_LOG(error, "Cannot enqueue replicated message. Destination Queue " << queueName << " ahead of source queue"); + mgmtExchange->inc_msgDrops(); + mgmtExchange->inc_byteDrops(msg.contentSize()); + } else { + queue->setPosition(--seqno1); // note that queue will ++ before enqueue. + + FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders(); + headers.erase(REPLICATION_TARGET_QUEUE); + headers.erase(REPLICATION_EVENT_SEQNO); + headers.erase(REPLICATION_EVENT_TYPE); + msg.deliverTo(queue); + QPID_LOG(debug, "Enqueued replicated message onto " << queueName); + if (mgmtExchange != 0) { + mgmtExchange->inc_msgRoutes(); + mgmtExchange->inc_byteRoutes( msg.contentSize()); + } } } else { QPID_LOG(error, "Cannot enqueue replicated message. Queue " << queueName << " does not exist"); diff --git a/qpid/cpp/src/qpid/replication/constants.h b/qpid/cpp/src/qpid/replication/constants.h index fb7085c570..c5ba7d3d6a 100644 --- a/qpid/cpp/src/qpid/replication/constants.h +++ b/qpid/cpp/src/qpid/replication/constants.h @@ -26,6 +26,7 @@ const std::string REPLICATION_EVENT_TYPE("qpid.replication.type"); const std::string REPLICATION_EVENT_SEQNO("qpid.replication.seqno"); const std::string REPLICATION_TARGET_QUEUE("qpid.replication.target_queue"); const std::string DEQUEUED_MESSAGE_POSITION("qpid.replication.message"); +const std::string QUEUE_MESSAGE_POSITION("qpid.replication.queue.position"); const int ENQUEUE(1); const int DEQUEUE(2); diff --git a/qpid/cpp/src/tests/replication_test b/qpid/cpp/src/tests/replication_test index 6e0c1c8d3b..3a0d94e0c4 100755 --- a/qpid/cpp/src/tests/replication_test +++ b/qpid/cpp/src/tests/replication_test @@ -98,7 +98,6 @@ if test -d ${PYTHON_DIR} && test -f ../.libs/replicating_listener.so && test -f ./receiver --port $BROKER_B --queue queue-b > queue-b-backup.repl ./receiver --port $BROKER_B --queue queue-c > queue-c-backup.repl - stop_brokers tail -5 queue-a-input.repl > queue-a-expected.repl tail -10 queue-b-input.repl > queue-b-expected.repl @@ -108,6 +107,54 @@ if test -d ${PYTHON_DIR} && test -f ../.libs/replicating_listener.so && test -f grep 'queue-d does not exist' replication-dest.log > /dev/null || echo "WARNING: Expected error to be logged!" + stop_brokers + + # now check offsets working (enqueue based on position being set, not queue abs position) + + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true --log-enable info+ --log-to-file replication-source.log --log-to-stderr 0 > qpidd.port + BROKER_A=`cat qpidd.port` + + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port + BROKER_B=`cat qpidd.port` + + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication + $PYTHON_DIR/commands/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication + + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-e --generate-queue-events 2 + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-e + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-d --generate-queue-events 2 + + i=1 + while [ $i -le 10 ]; do + echo Message $i for A >> queue-e-input.repl + i=`expr $i + 1` + done + + ./sender --port $BROKER_A --routing-key queue-e --send-eos 1 < queue-e-input.repl + ./receiver --port $BROKER_A --queue queue-e --messages 10 > /dev/null + # now check offsets working + + ../qpidd -q --port $BROKER_B + unset BROKER_B + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port + BROKER_B=`cat qpidd.port` + + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-e + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication + $PYTHON_DIR/commands/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication + # now send another 15 + i=11 + while [ $i -le 15 ]; do + echo Message $i for A >> queue-e1-input.repl + i=`expr $i + 1` + done + ./sender --port $BROKER_A --routing-key queue-e --send-eos 1 < queue-e1-input.repl + + ./receiver --port $BROKER_B --queue queue-e > queue-e-backup.repl + diff queue-e-backup.repl queue-e1-input.repl || FAIL=1 + + stop_brokers + if [ x$FAIL != x ]; then echo replication test failed: expectations not met! exit 1 |