diff options
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionContext.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.cpp | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateExchange.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/ClusterFixture.h | 4 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ais_check | 29 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 58 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/clustered_replication_test | 4 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/run_cluster_tests | 32 |
13 files changed, 118 insertions, 79 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index ed9b6653c3..7e3090bf17 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -302,6 +302,18 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) return !blocked; } +namespace { +struct ConsumerName { + const SemanticState::ConsumerImpl& consumer; + ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {} +}; + +ostream& operator<<(ostream& o, const ConsumerName& pc) { + return o << pc.consumer.getName() << " on " + << pc.consumer.getParent().getSession().getSessionId(); +} +} + void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) { uint32_t originalMsgCredit = msgCredit; @@ -312,7 +324,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) if (byteCredit != 0xFFFFFFFF) { byteCredit -= msg->getRequiredCredit(); } - QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent + QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this) << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit << " now bytes: " << byteCredit << " msgs: " << msgCredit); @@ -320,15 +332,13 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) { - if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { - QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent - << ", bytes: " << byteCredit << " msgs: " << msgCredit); - return false; - } else { - QPID_LOG(debug, "Credit available for '" << name << "' on " << parent - << " bytes: " << byteCredit << " msgs: " << msgCredit); - return true; - } + bool enoughCredit = msgCredit > 0 && + (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit()); + QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient credit for ") + << ConsumerName(*this) + << ", have bytes: " << byteCredit << " msgs: " << msgCredit + << ", need " << msg->getRequiredCredit() << " bytes"); + return enoughCredit; } SemanticState::ConsumerImpl::~ConsumerImpl() {} diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index da8383fc12..89fe7b83dd 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -129,6 +129,7 @@ class SemanticState : private boost::noncopyable { const framing::FieldTable& getArguments() const { return arguments; } SemanticState& getParent() { return *parent; } + const SemanticState& getParent() const { return *parent; } }; private: @@ -163,6 +164,7 @@ class SemanticState : private boost::noncopyable { ~SemanticState(); SessionContext& getSession() { return session; } + const SessionContext& getSession() const { return session; } ConsumerImpl& find(const std::string& destination); diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h index cfdbd100c3..afbbb2cc22 100644 --- a/qpid/cpp/src/qpid/broker/SessionContext.h +++ b/qpid/cpp/src/qpid/broker/SessionContext.h @@ -28,7 +28,7 @@ #include "qpid/sys/OutputControl.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/OwnershipToken.h" - +#include "qpid/SessionId.h" #include <boost/noncopyable.hpp> @@ -45,6 +45,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl virtual framing::AMQP_ClientProxy& getProxy() = 0; virtual Broker& getBroker() = 0; virtual uint16_t getChannel() const = 0; + virtual const SessionId& getSessionId() const = 0; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 67fd4f4f38..eade93ddaa 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -118,6 +118,8 @@ class SessionState : public qpid::SessionState, bool processSendCredit(uint32_t msgs); + const SessionId& getSessionId() const { return getId(); } + private: void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp index 8ead44a172..32541dceac 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -64,7 +64,8 @@ SessionImpl::SessionImpl(const std::string& name, boost::shared_ptr<ConnectionIm proxy(ioHandler), nextIn(0), nextOut(0), - sendMsgCredit(0) + sendMsgCredit(0), + doClearDeliveryPropertiesExchange(true) { channel.next = connectionShared.get(); } @@ -396,11 +397,16 @@ void SessionImpl::sendContent(const MethodContent& content) { AMQFrame header(content.getHeader()); - // Client is not allowed to set the delivery-properties.exchange. - AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody()); - if (headerp && headerp->get<DeliveryProperties>()) - headerp->get<DeliveryProperties>(true)->clearExchangeFlag(); - + // doClearDeliveryPropertiesExchange is set by cluster update client so + // it can send messages with delivery-properties.exchange set. + // + if (doClearDeliveryPropertiesExchange) { + // Normal client is not allowed to set the delivery-properties.exchange + // so clear it here. + AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody()); + if (headerp && headerp->get<DeliveryProperties>()) + headerp->get<DeliveryProperties>(true)->clearExchangeFlag(); + } header.setFirstSegment(false); uint64_t data_length = content.getData().length(); if(data_length > 0){ diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h index 49d268c44d..0624bb8b3c 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/SessionImpl.h @@ -130,6 +130,8 @@ public: */ boost::shared_ptr<ConnectionImpl> getConnection(); + void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; } + private: enum State { INACTIVE, @@ -243,6 +245,8 @@ private: // Only keep track of message credit sys::Semaphore* sendMsgCredit; + bool doClearDeliveryPropertiesExchange; + friend class client::SessionHandler; }; diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 2e557f2ab6..d6df8bd5ac 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -209,9 +209,16 @@ class MessageUpdater { ClusterConnectionProxy(session).expiryId(*expiryId); } + // We can't send a broker::Message via the normal client API, + // and it would be expensive to copy it into a client::Message + // so we go a bit under the client API covers here. + // SessionBase_0_10Access sb(session); + // Disable client code that clears the delivery-properties.exchange + sb.get()->setDoClearDeliveryPropertiesExchange(false); framing::MessageTransferBody transfer( - framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); + framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, + message::ACQUIRE_MODE_PRE_ACQUIRED); sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased()); if (message.payload->isContentReleased()){ diff --git a/qpid/cpp/src/qpid/cluster/UpdateExchange.h b/qpid/cpp/src/qpid/cluster/UpdateExchange.h index 194a3d386d..00a92c7f1e 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateExchange.h +++ b/qpid/cpp/src/qpid/cluster/UpdateExchange.h @@ -30,7 +30,7 @@ namespace qpid { namespace cluster { /** - * A keyless exchange (like fanout exchange) that does not modify deliver-properties.exchange + * A keyless exchange (like fanout exchange) that does not modify delivery-properties.exchange * on messages. */ class UpdateExchange : public broker::FanOutExchange diff --git a/qpid/cpp/src/tests/ClusterFixture.h b/qpid/cpp/src/tests/ClusterFixture.h index 5952cc1736..1eee32b9a4 100644 --- a/qpid/cpp/src/tests/ClusterFixture.h +++ b/qpid/cpp/src/tests/ClusterFixture.h @@ -75,10 +75,10 @@ class ClusterFixture : public vector<uint16_t> { /** @param localIndex can be -1 meaning don't automatically start a local broker. * A local broker can be started with addLocal(). */ - ClusterFixture(size_t n, const Args& args, int localIndex=0); + ClusterFixture(size_t n, const Args& args, int localIndex=-1); /**@param updateArgs function is passed the index of the cluster member and can update the arguments. */ - ClusterFixture(size_t n, boost::function<void (Args&, size_t)> updateArgs, int localIndex); + ClusterFixture(size_t n, boost::function<void (Args&, size_t)> updateArgs, int localIndex=-1); void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } void add(); // Add a broker. diff --git a/qpid/cpp/src/tests/ais_check b/qpid/cpp/src/tests/ais_check index d76c518808..1481cf5bab 100755 --- a/qpid/cpp/src/tests/ais_check +++ b/qpid/cpp/src/tests/ais_check @@ -21,29 +21,14 @@ srcdir=`dirname $0` # Check AIS requirements and run tests if found. -id -nG | grep '\<ais\>' >/dev/null || \ - NOGROUP="You are not a member of the ais group." -ps -u root | grep 'aisexec\|corosync' >/dev/null || \ - NOAISEXEC="The aisexec or corosync daemon is not running as root" - -if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then - cat <<EOF - - =========== WARNING: NOT RUNNING AIS TESTS ============== - - Tests that depend on the openais library (used for clustering) - will not be run because: - $NOGROUP - $NOAISEXEC - - ========================================================== - -EOF +ps -u root | grep 'aisexec\|corosync' >/dev/null || { + echo WARNING: Skipping cluster tests, the aisexec or corosync daemon is not running. exit 0; # A warning, not a failure. -fi +} -# Execute command with the ais group set. +# Execute command with the ais group set if user is a member. with_ais_group() { - id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the ais group."; exit 1; } - echo $* | newgrp ais + if id -nG | grep '\<ais\>' >/dev/null; then sg -c "$*" + else "$@" + fi } diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 28fcdd13ad..bc7a666c95 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -85,6 +85,12 @@ void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) { args += "--no-data-dir"; } +ClusterFixture::Args prepareArgs(const bool durableFlag = false) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + return args; +} + // Timeout for tests that wait for messages const sys::Duration TIMEOUT=sys::TIME_SEC/4; @@ -596,16 +602,19 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) { } } -QPID_AUTO_TEST_CASE(testCatchupSharedState) { +// Test that message data and delivery properties are updated properly. +QPID_AUTO_TEST_CASE(testUpdateMessages) { ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); - // Create some shared state. + // Create messages with different delivery properties c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.exchangeBind(arg::exchange="amq.fanout", arg::queue="q"); c0.session.messageTransfer(arg::content=makeMessage("foo","q", durableFlag)); - c0.session.messageTransfer(arg::content=makeMessage("bar","q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("bar","q", durableFlag), + arg::destination="amq.fanout"); while (c0.session.queueQuery("q").getMessageCount() != 2) sys::usleep(1000); // Wait for message to show up on broker 0. @@ -628,9 +637,12 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "foo"); + BOOST_CHECK(m.getDeliveryProperties().hasExchange()); BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), ""); BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "bar"); + BOOST_CHECK(m.getDeliveryProperties().hasExchange()); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "amq.fanout"); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); // Add another broker, don't wait for join - should be stalled till ready. @@ -1100,6 +1112,44 @@ QPID_AUTO_TEST_CASE(testRelease) { } } -QPID_AUTO_TEST_SUITE_END() +// Browse for 1 message with byte credit, return true if a message was +// received false if not. +bool browseByteCredit(Client& c, const string& q, int n, Message& m) { + SubscriptionSettings browseSettings( + FlowControl(1, n, false), // 1 message, n bytes credit, no window + ACCEPT_MODE_NONE, + ACQUIRE_MODE_NOT_ACQUIRED, + 0 // No auto-ack. + ); + LocalQueue lq; + Subscription s = c.subs.subscribe(lq, q, browseSettings); + c.session.messageFlush(arg::destination=q, arg::sync=true); + c.session.sync(); + c.subs.getSubscription(q).cancel(); + return lq.get(m, 0); // No timeout, flush should push message thru. +} + +// Ensure cluster update preserves exact message size, use byte credt as test. +QPID_AUTO_TEST_CASE(testExactByteCredit) { + ClusterFixture cluster(1, prepareArgs(), -1); + Client c0(cluster[0], "c0"); + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=Message("MyMessage", "q")); + cluster.add(); + + int size=36; // Size of message on broker: headers+body + Client c1(cluster[1], "c1"); + Message m; + + // Ensure we get the message with exact credit. + BOOST_CHECK(browseByteCredit(c0, "q", size, m)); + BOOST_CHECK(browseByteCredit(c1, "q", size, m)); + // and not with one byte less. + BOOST_CHECK(!browseByteCredit(c0, "q", size-1, m)); + BOOST_CHECK(!browseByteCredit(c1, "q", size-1, m)); +} + + +QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/clustered_replication_test b/qpid/cpp/src/tests/clustered_replication_test index 4dcd113c47..4f13b4672c 100755 --- a/qpid/cpp/src/tests/clustered_replication_test +++ b/qpid/cpp/src/tests/clustered_replication_test @@ -30,10 +30,6 @@ fail() { echo $1 exit 1 } -with_ais_group() { - id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the ais group." 1>&2; exit 1; } - echo $* | newgrp ais -} stop_brokers() { if [[ $PRIMARY1 ]] ; then diff --git a/qpid/cpp/src/tests/run_cluster_tests b/qpid/cpp/src/tests/run_cluster_tests index c89e657243..d1a58f9f6a 100755 --- a/qpid/cpp/src/tests/run_cluster_tests +++ b/qpid/cpp/src/tests/run_cluster_tests @@ -22,11 +22,11 @@ # Check that top_builddir and srcdir are set # If not, assume local run from test dir if [ -z ${top_builddir} -o -z ${srcdir} ]; then - srcdir=`pwd` + srcdir=`dirname $0` top_builddir=${srcdir}/../../ fi TEST_DIR=${top_builddir}/src/tests -. `dirname $0`/python_env.sh +. $srcdir/python_env.sh if test -z $1; then CLUSTER_TEST="$PYTHON_COMMANDS/qpid-python-test -m cluster_tests cluster_tests.ShortTests.\*" @@ -35,32 +35,8 @@ else echo "Running $1..." fi - # Check AIS requirements -id -nG | grep '\<ais\>' > /dev/null || NOGROUP="You are not a member of the ais group." -ps -u root | grep 'aisexec\|corosync' > /dev/null || NOAISEXEC="The aisexec or corosync daemon is not running as root." -if ! test -d ${PYTHON_DIR}; then - NO_PYTHON_DIR="PYTHON_DIR=\"${PYTHON_DIR}\" not found or does not exist." -fi - -if test -n "${NOGROUP}" -o -n "${NOAISEXEC}" -o -n "${NO_PYTHON_DIR}"; then - cat <<EOF - - ======== WARNING: PYTHON CLUSTER TESTS DISABLED =========== - - Tests that depend on the openais library (used for clustering) - and python will not be run because: - - ${NOGROUP} - ${NOAISEXEC} - ${NO_PYTHON_DIR} - - =========================================================== - -EOF - exit 0 -fi - +. $srcdir/ais_check # Check XML exchange requirements XML_LIB=$srcdir/../.libs/xml.so @@ -103,7 +79,7 @@ export TMP_DATA_DIR # Run the test -sg ais -c "${CLUSTER_TEST}" +with_ais_group ${CLUSTER_TEST} RETCODE=$? if test x${RETCODE} != x0; then |