diff options
author | Gordon Sim <gsim@apache.org> | 2008-11-14 16:40:22 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-11-14 16:40:22 +0000 |
commit | 072c135f13cd287224ccc9b754a6c2b83940b6cd (patch) | |
tree | 485163f392741b72485b8e7959ed66737fcd21b3 | |
parent | 23482177f452ebdf3023534988efe60d41e8c826 (diff) | |
download | qpid-python-072c135f13cd287224ccc9b754a6c2b83940b6cd.tar.gz |
Added some failover capable tests
Added grantCredit() method to subscription to allow simpler control of message delivery
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@714065 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/client/Subscription.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/Subscription.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionImpl.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionImpl.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 10 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 16 | ||||
-rw-r--r-- | cpp/src/tests/receiver.cpp | 120 | ||||
-rw-r--r-- | cpp/src/tests/sender.cpp | 100 | ||||
-rw-r--r-- | cpp/src/tests/txjob.cpp | 95 | ||||
-rw-r--r-- | cpp/src/tests/txshift.cpp | 185 |
10 files changed, 534 insertions, 12 deletions
diff --git a/cpp/src/qpid/client/Subscription.cpp b/cpp/src/qpid/client/Subscription.cpp index bf788c5f93..1f1b5ac6c6 100644 --- a/cpp/src/qpid/client/Subscription.cpp +++ b/cpp/src/qpid/client/Subscription.cpp @@ -22,6 +22,7 @@ #include "Subscription.h" #include "SubscriptionImpl.h" #include "HandlePrivate.h" +#include "qpid/framing/enum.h" namespace qpid { namespace client { @@ -42,7 +43,8 @@ void Subscription::release(const SequenceSet& messageIds) { impl->release(messag Session Subscription::getSession() const { return impl->getSession(); } SubscriptionManager&Subscription:: getSubscriptionManager() const { return impl->getSubscriptionManager(); } void Subscription::cancel() { impl->cancel(); } - +void Subscription::grantMessageCredit(uint32_t value) { impl->grantCredit(framing::message::CREDIT_UNIT_MESSAGE, value); } +void Subscription::grantByteCredit(uint32_t value) { impl->grantCredit(framing::message::CREDIT_UNIT_BYTE, value); } }} // namespace qpid::client diff --git a/cpp/src/qpid/client/Subscription.h b/cpp/src/qpid/client/Subscription.h index b25a64a4a2..6d9342bf09 100644 --- a/cpp/src/qpid/client/Subscription.h +++ b/cpp/src/qpid/client/Subscription.h @@ -101,6 +101,12 @@ class Subscription : public Handle<SubscriptionImpl> { /** Cancel the subscription. */ void cancel(); + + /** Grant the specified amount of message credit */ + void grantMessageCredit(uint32_t); + + /** Grant the specified amount of byte credit */ + void grantByteCredit(uint32_t); }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SubscriptionImpl.cpp b/cpp/src/qpid/client/SubscriptionImpl.cpp index 0ccf5674fd..6319371a4e 100644 --- a/cpp/src/qpid/client/SubscriptionImpl.cpp +++ b/cpp/src/qpid/client/SubscriptionImpl.cpp @@ -62,6 +62,10 @@ void SubscriptionImpl::setFlowControl(const FlowControl& f) { s.sync(); } +void SubscriptionImpl::grantCredit(framing::message::CreditUnit unit, uint32_t value) { + async(manager.getSession()).messageFlow(name, unit, value); +} + void SubscriptionImpl::setAutoAck(size_t n) { Mutex::ScopedLock l(lock); settings.autoAck = n; @@ -103,7 +107,7 @@ void SubscriptionImpl::release(const SequenceSet& messageIds) { Session SubscriptionImpl::getSession() const { return manager.getSession(); } -SubscriptionManager&SubscriptionImpl:: getSubscriptionManager() const { return manager; } +SubscriptionManager& SubscriptionImpl::getSubscriptionManager() const { return manager; } void SubscriptionImpl::cancel() { manager.cancel(name); } diff --git a/cpp/src/qpid/client/SubscriptionImpl.h b/cpp/src/qpid/client/SubscriptionImpl.h index 0c51b598c8..c4c486daeb 100644 --- a/cpp/src/qpid/client/SubscriptionImpl.h +++ b/cpp/src/qpid/client/SubscriptionImpl.h @@ -25,6 +25,7 @@ #include "qpid/client/SubscriptionSettings.h" #include "qpid/client/Session.h" #include "qpid/client/MessageListener.h" +#include "qpid/framing/enum.h" #include "qpid/framing/SequenceSet.h" #include "qpid/sys/Mutex.h" #include "qpid/RefCounted.h" @@ -88,6 +89,9 @@ class SubscriptionImpl : public RefCounted, public MessageListener { /** Cancel the subscription. */ void cancel(); + /** Grant specified credit for this subscription **/ + void grantCredit(framing::message::CreditUnit unit, uint32_t value); + void received(Message&); private: diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 948126e271..1017480257 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -92,16 +92,6 @@ namespace client { * </li> * </ul> * - * - * <h2>Setting Accept Mode, Acquire Mode, Ack Policy</h2> - * - * <p>setAcceptMode()</p> - * <pre>subscriptions.setAcceptMode(true);</pre> - * <p>setAcquireMode()</p> - * <pre>subscriptions.setAcquireMode(false);</pre> - * - * - * */ class SubscriptionManager : public sys::Runnable { diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index d681cf58fd..b086739c52 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -134,6 +134,22 @@ check_PROGRAMS+=header_test header_test_SOURCES=header_test.cpp TestOptions.h ConnectionOptions.h header_test_LDADD=$(lib_client) +check_PROGRAMS+=txshift +txshift_SOURCES=txshift.cpp TestOptions.h ConnectionOptions.h +txshift_LDADD=$(lib_client) + +check_PROGRAMS+=txjob +txjob_SOURCES=txjob.cpp TestOptions.h ConnectionOptions.h +txjob_LDADD=$(lib_client) + +check_PROGRAMS+=receiver +receiver_SOURCES=receiver.cpp TestOptions.h ConnectionOptions.h +receiver_LDADD=$(lib_client) + +check_PROGRAMS+=sender +sender_SOURCES=sender.cpp TestOptions.h ConnectionOptions.h +sender_LDADD=$(lib_client) + TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest diff --git a/cpp/src/tests/receiver.cpp b/cpp/src/tests/receiver.cpp new file mode 100644 index 0000000000..3a4ac3649d --- /dev/null +++ b/cpp/src/tests/receiver.cpp @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/client/FailoverManager.h> +#include <qpid/client/Session.h> +#include <qpid/client/Message.h> +#include <qpid/client/SubscriptionManager.h> +#include <qpid/client/SubscriptionManager.h> +#include "TestOptions.h" + +#include <iostream> +#include <fstream> + + +using namespace qpid; +using namespace qpid::client; +using namespace qpid::framing; + +using namespace std; + +struct Args : public qpid::TestOptions +{ + string queue; + uint messages; + bool ignoreDuplicates; + + Args() : queue("test-queue"), messages(0), ignoreDuplicates(false) + { + addOptions() + ("queue", qpid::optValue(queue, "QUEUE NAME"), "Queue from which to request messages") + ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely") + ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)"); + } +}; + +const string EOS("eos"); + +class Receiver : public MessageListener, public FailoverManager::Command +{ + public: + Receiver(const string& queue, uint messages, bool ignoreDuplicates); + void received(Message& message); + void execute(AsyncSession& session, bool isRetry); + private: + const string queue; + const uint count; + const bool skipDups; + Subscription subscription; + uint processed; + uint lastSn; + + bool isDuplicate(Message& message); +}; + +Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates) : + queue(q), count(messages), skipDups(ignoreDuplicates), processed(0), lastSn(0) {} + +void Receiver::received(Message & message) +{ + if (!(skipDups && isDuplicate(message))) { + bool eos = message.getData() == EOS; + if (!eos) std::cout << message.getData() << std::endl; + if (eos || ++processed == count) subscription.cancel(); + } +} + +bool Receiver::isDuplicate(Message& message) +{ + uint sn = message.getHeaders().getAsInt("sn"); + if (lastSn < sn) { + lastSn = sn; + return false; + } else { + return true; + } +} + +void Receiver::execute(AsyncSession& session, bool /*isRetry*/) +{ + SubscriptionManager subs(session); + subscription = subs.subscribe(*this, queue); + subs.run(); +} + +int main(int argc, char ** argv) +{ + Args opts; + try { + opts.parse(argc, argv); + FailoverManager connection(opts.con); + Receiver receiver(opts.queue, opts.messages, opts.ignoreDuplicates); + connection.execute(receiver); + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cerr << "Failure: " << error.what() << std::endl; + } + return 1; +} + + + diff --git a/cpp/src/tests/sender.cpp b/cpp/src/tests/sender.cpp new file mode 100644 index 0000000000..2da1990041 --- /dev/null +++ b/cpp/src/tests/sender.cpp @@ -0,0 +1,100 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/client/FailoverManager.h> +#include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> +#include <qpid/client/Message.h> +#include <qpid/client/MessageReplayTracker.h> +#include <qpid/Exception.h> +#include "TestOptions.h" + +#include <iostream> + +using namespace qpid; +using namespace qpid::client; +using namespace qpid::framing; + +using namespace std; + +struct Args : public qpid::TestOptions +{ + string destination; + string key; + bool sendEos; + + Args() : key("test-queue"), sendEos(false) + { + addOptions() + ("exchange", qpid::optValue(destination, "EXCHANGE"), "Exchange to send messages to") + ("routing-key", qpid::optValue(key, "KEY"), "Routing key to add to messages") + ("send-eos", qpid::optValue(sendEos), "Send EOS message to mark end of input"); + } +}; + +const string EOS("eos"); + +class Sender : public FailoverManager::Command +{ + public: + Sender(const std::string& destination, const std::string& key, bool sendEos); + void execute(AsyncSession& session, bool isRetry); + private: + MessageReplayTracker sender; + Message message; + const bool sendEos; + uint sent; +}; + +Sender::Sender(const std::string& destination, const std::string& key, bool eos) : + sender(10), message(destination, key), sendEos(eos), sent(0) {} + +void Sender::execute(AsyncSession& session, bool isRetry) +{ + if (isRetry) sender.replay(session); + else sender.init(session); + string data; + while (std::cin >> data) { + message.setData(data); + message.getHeaders().setInt("sn", ++sent); + sender.send(message); + } + if (sendEos) { + message.setData(EOS); + sender.send(message); + } +} + +int main(int argc, char ** argv) +{ + Args opts; + try { + opts.parse(argc, argv); + FailoverManager connection(opts.con); + Sender sender(opts.destination, opts.key, opts.sendEos); + connection.execute(sender); + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << "Failed: " << error.what() << std::endl; + } + return 1; +} diff --git a/cpp/src/tests/txjob.cpp b/cpp/src/tests/txjob.cpp new file mode 100644 index 0000000000..336f77014d --- /dev/null +++ b/cpp/src/tests/txjob.cpp @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <iostream> +#include <boost/bind.hpp> +#include <boost/ptr_container/ptr_vector.hpp> + +#include "TestOptions.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/client/FailoverManager.h" +#include "qpid/client/Message.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/sys/Thread.h" + +using namespace qpid::client; +using namespace qpid::sys; + +struct Args : public qpid::TestOptions +{ + string workQueue; + string source; + string dest; + uint messages; + uint jobs; + bool quit; + bool declareQueues; + + Args() : workQueue("txshift-control"), source("txshift-1"), dest("txshift-2"), messages(0), jobs(0), + quit(false), declareQueues(false) + { + addOptions() + ("messages", qpid::optValue(messages, "N"), "Number of messages to shift") + ("jobs", qpid::optValue(jobs, "N"), "Number of shift jobs to request") + ("source", qpid::optValue(source, "QUEUE NAME"), "source queue from which messages will be shifted") + ("dest", qpid::optValue(dest, "QUEUE NAME"), "dest queue to which messages will be shifted") + ("work-queue", qpid::optValue(workQueue, "QUEUE NAME"), "work queue from which to take instructions") + ("add-quit", qpid::optValue(quit), "add a 'quit' instruction to the queue (after any other jobs)") + ("declare-queues", qpid::optValue(declareQueues), "issue a declare for all queues"); + } +}; + +//TODO: might be nice to make this capable of failover as well at some +//point; for now its just for the setup phase. +int main(int argc, char** argv) +{ + Args opts; + try { + opts.parse(argc, argv); + Connection connection; + connection.open(opts.con); + Session session = connection.newSession(); + if (opts.declareQueues) { + session.queueDeclare(arg::queue=opts.workQueue); + session.queueDeclare(arg::queue=opts.source); + session.queueDeclare(arg::queue=opts.dest); + } + for (uint i = 0; i < opts.jobs; ++i) { + Message job("transfer", opts.workQueue); + job.getHeaders().setString("src", opts.source); + job.getHeaders().setString("dest", opts.dest); + job.getHeaders().setInt("count", opts.messages); + async(session).messageTransfer(arg::content=job); + } + + if (opts.quit) { + async(session).messageTransfer(arg::content=Message("quit", opts.workQueue)); + } + + session.sync(); + session.close(); + + return 0; + } catch(const std::exception& e) { + std::cout << e.what() << std::endl; + return 1; + } +} diff --git a/cpp/src/tests/txshift.cpp b/cpp/src/tests/txshift.cpp new file mode 100644 index 0000000000..5db08d7a53 --- /dev/null +++ b/cpp/src/tests/txshift.cpp @@ -0,0 +1,185 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <iostream> +#include <boost/bind.hpp> +#include <boost/ptr_container/ptr_vector.hpp> + +#include "TestOptions.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/client/FailoverManager.h" +#include "qpid/client/Message.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/Thread.h" + +using namespace qpid::client; +using namespace qpid::sys; + +struct Args : public qpid::TestOptions +{ + string workQueue; + size_t workers; + + Args() : workQueue("txshift-control"), workers(1) + { + addOptions() + ("workers", qpid::optValue(workers, "N"), "Number of separate worker sessions to start") + ("work-queue", qpid::optValue(workQueue, "NAME"), "work queue from which to take instructions"); + } +}; + +struct Transfer : MessageListener +{ + std::string control; + std::string source; + std::string destination; + uint expected; + uint transfered; + SubscriptionSettings controlSettings; + Subscription controlSubscription; + SubscriptionSettings sourceSettings; + Subscription sourceSubscription; + + Transfer(const std::string control_) : control(control_), expected(0), transfered(0) {} + + void subscribeToSource(SubscriptionManager& manager) + { + sourceSettings.autoAck = 0;//will accept once at the end of the batch + sourceSettings.flowControl = FlowControl::messageCredit(expected); + sourceSubscription = manager.subscribe(*this, source, sourceSettings); + QPID_LOG(info, "Subscribed to source: " << source << " expecting: " << expected); + } + + void subscribeToControl(SubscriptionManager& manager) + { + controlSettings.flowControl = FlowControl::messageCredit(1); + controlSubscription = manager.subscribe(*this, control, controlSettings); + QPID_LOG(info, "Subscribed to job queue"); + } + + void received(Message& message) + { + QPID_LOG(debug, "received: " << message.getData() << " for " << message.getDestination()); + if (message.getDestination() == source) { + receivedFromSource(message); + } else if (message.getDestination() == control) { + receivedFromControl(message); + } else { + QPID_LOG(error, "Unexpected message: " << message.getData() << " to " << message.getDestination()); + } + } + + void receivedFromSource(Message& message) + { + QPID_LOG(debug, "transfering " << (transfered+1) << " of " << expected); + message.getDeliveryProperties().setRoutingKey(destination); + async(sourceSubscription.getSession()).messageTransfer(arg::content=message); + if (++transfered == expected) { + QPID_LOG(info, "completed job: " << transfered << " messages shifted from " << + source << " to " << destination); + sourceSubscription.accept(sourceSubscription.getUnaccepted()); + sourceSubscription.getSession().txCommit(); + sourceSubscription.cancel(); + //grant credit to allow broker to send us another control message + controlSubscription.grantMessageCredit(1); + } + } + + void receivedFromControl(Message& message) + { + if (message.getData() == "transfer") { + source = message.getHeaders().getAsString("src"); + destination = message.getHeaders().getAsString("dest"); + expected = message.getHeaders().getAsInt("count"); + transfered = 0; + QPID_LOG(info, "received transfer request: " << expected << " messages to be shifted from " << + source << " to " << destination); + subscribeToSource(controlSubscription.getSubscriptionManager()); + } else if (message.getData() == "quit") { + QPID_LOG(info, "received quit request"); + controlSubscription.cancel(); + } else { + std::cerr << "Rejecting invalid message: " << message.getData() << std::endl; + controlSubscription.getSession().messageReject(SequenceSet(message.getId())); + } + } + +}; + +struct Worker : FailoverManager::Command, Runnable +{ + FailoverManager& connection; + Transfer transfer; + Thread runner; + + Worker(FailoverManager& c, const std::string& controlQueue) : connection(c), transfer(controlQueue) {} + + void run() + { + connection.execute(*this); + } + + void start() + { + runner = Thread(this); + } + + void join() + { + runner.join(); + } + + void execute(AsyncSession& session, bool isRetry) + { + if (isRetry) QPID_LOG(info, "Retrying..."); + session.txSelect(); + SubscriptionManager subs(session); + transfer.subscribeToControl(subs); + subs.run(); + } +}; + +int main(int argc, char** argv) +{ + Args opts; + try { + opts.parse(argc, argv); + FailoverManager connection(opts.con); + connection.connect(); + if (opts.workers == 1) { + Worker worker(connection, opts.workQueue); + worker.run(); + } else { + boost::ptr_vector<Worker> workers; + for (size_t i = 0; i < opts.workers; i++) { + workers.push_back(new Worker(connection, opts.workQueue)); + } + for_each(workers.begin(), workers.end(), boost::bind(&Worker::start, _1)); + for_each(workers.begin(), workers.end(), boost::bind(&Worker::join, _1)); + } + + return 0; + } catch(const std::exception& e) { + std::cout << e.what() << std::endl; + return 1; + } +} |