diff options
26 files changed, 405 insertions, 98 deletions
diff --git a/cpp/rubygen/templates/structs.rb b/cpp/rubygen/templates/structs.rb index 69405f05f5..422fd8c207 100644 --- a/cpp/rubygen/templates/structs.rb +++ b/cpp/rubygen/templates/structs.rb @@ -43,7 +43,7 @@ class StructGen < CppGen end def printable_form(f) - if (f.cpptype.name == "u_int8_t") + if (f.cpptype.name == "uint8_t") return "(int) " + f.cppname else return f.cppname diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index a1d8e38372..217d3c3b4c 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -212,6 +212,7 @@ libqpidclient_la_SOURCES = \ qpid/client/ClientQueue.cpp \ qpid/client/ConnectionImpl.cpp \ qpid/client/Connector.cpp \ + qpid/client/Demux.cpp \ qpid/client/MessageListener.cpp \ qpid/client/Correlator.cpp \ qpid/client/CompletionTracker.cpp \ @@ -297,7 +298,9 @@ nobase_include_HEADERS = \ qpid/client/ConnectionImpl.h \ qpid/client/Connector.h \ qpid/client/Completion.h \ + qpid/client/Demux.h \ qpid/client/MessageListener.h \ + qpid/client/MessageQueue.h \ qpid/client/BlockingQueue.h \ qpid/client/Correlator.h \ qpid/client/CompletionTracker.h \ diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index 1a13a31a5e..d96622cd4f 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -126,7 +126,7 @@ bool Queue::acquire(const QueuedMessage& msg) { void Queue::requestDispatch(Consumer* c, bool sync){ if (!c || c->preAcquires()) { if (sync) { - serializer.dispatch(); + dispatch(); } else { serializer.execute(dispatchCallback); } diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp index 650182c807..c98fdd6291 100644 --- a/cpp/src/qpid/broker/Session.cpp +++ b/cpp/src/qpid/broker/Session.cpp @@ -251,8 +251,8 @@ Session::ConsumerImpl::ConsumerImpl(Session* _parent, acquire(_acquire), blocked(false), windowing(true), - msgCredit(0xFFFFFFFF), - byteCredit(0xFFFFFFFF) {} + msgCredit(0), + byteCredit(0) {} bool Session::ConsumerImpl::deliver(QueuedMessage& msg) { diff --git a/cpp/src/qpid/client/BlockingQueue.h b/cpp/src/qpid/client/BlockingQueue.h index 7081b76b68..a9d8ec2857 100644 --- a/cpp/src/qpid/client/BlockingQueue.h +++ b/cpp/src/qpid/client/BlockingQueue.h @@ -62,7 +62,7 @@ public: } } - void push(T t) + void push(const T& t) { sys::Monitor::ScopedLock l(lock); bool wasEmpty = queue.empty(); @@ -78,6 +78,12 @@ public: closed = true; lock.notifyAll(); } + + bool empty() + { + sys::Monitor::ScopedLock l(lock); + return queue.empty(); + } }; }} diff --git a/cpp/src/qpid/client/ChannelHandler.cpp b/cpp/src/qpid/client/ChannelHandler.cpp index 754b0544c6..c9b7a68f38 100644 --- a/cpp/src/qpid/client/ChannelHandler.cpp +++ b/cpp/src/qpid/client/ChannelHandler.cpp @@ -36,7 +36,9 @@ void ChannelHandler::incoming(AMQFrame& frame) ChannelCloseBody* closeBody= dynamic_cast<ChannelCloseBody*>(body->getMethod()); if (closeBody) { - setState(CLOSED); + setState(CLOSED_BY_PEER); + code = closeBody->getReplyCode(); + text = closeBody->getReplyText(); if (onClose) { onClose(closeBody->getReplyCode(), closeBody->getReplyText()); } @@ -65,8 +67,10 @@ void ChannelHandler::outgoing(AMQFrame& frame) if (getState() == OPEN) { frame.setChannel(id); out(frame); - } else { + } else if (getState() == CLOSED) { throw Exception("Channel not open"); + } else if (getState() == CLOSED_BY_PEER) { + throw ChannelException(code, text); } } @@ -80,7 +84,7 @@ void ChannelHandler::open(uint16_t _id) std::set<int> states; states.insert(OPEN); - states.insert(CLOSED); + states.insert(CLOSED_BY_PEER); waitFor(states); if (getState() != OPEN) { throw Exception("Failed to open channel."); diff --git a/cpp/src/qpid/client/ChannelHandler.h b/cpp/src/qpid/client/ChannelHandler.h index 556e13a4f8..24c24e49c4 100644 --- a/cpp/src/qpid/client/ChannelHandler.h +++ b/cpp/src/qpid/client/ChannelHandler.h @@ -30,9 +30,12 @@ namespace client { class ChannelHandler : private StateManager, public ChainableFrameHandler { - enum STATES {OPENING, OPEN, CLOSING, CLOSED}; + enum STATES {OPENING, OPEN, CLOSING, CLOSED, CLOSED_BY_PEER}; framing::ProtocolVersion version; uint16_t id; + + uint16_t code; + std::string text; void handleMethod(framing::AMQMethodBody* method); diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index a014fd90c5..87062e1470 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -26,8 +26,10 @@ #include "ClientMessage.h" #include "qpid/QpidError.h" #include "Connection.h" +#include "Demux.h" #include "FutureResponse.h" #include "MessageListener.h" +#include "MessageQueue.h" #include <boost/format.hpp> #include <boost/bind.hpp> #include "qpid/framing/all_method_bodies.h" @@ -72,6 +74,9 @@ void Channel::open(const Session& s) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); active = true; session = s; + if(isTransactional()) { + session.txSelect(); + } } bool Channel::isOpen() const { @@ -79,17 +84,8 @@ bool Channel::isOpen() const { return active; } -void Channel::setQos() { - session.basicQos((prefetchCount=getPrefetch(), global=false)); - if(isTransactional()) { - //I think this is wrong! should only send TxSelect once... - session.txSelect(); - } -} - -void Channel::setPrefetch(uint16_t _prefetch){ +void Channel::setPrefetch(uint32_t _prefetch){ prefetch = _prefetch; - setQos(); } void Channel::declareExchange(Exchange& _exchange, bool synch){ @@ -157,6 +153,9 @@ void Channel::consume( session.messageSubscribe(0, _queue.getName(), tag, noLocal, confirmMode, 0/*pre-acquire*/, false, fields ? *fields : FieldTable()); + //allocate some credit: + session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); + session.messageFlow(tag, 0/*MESSAGES*/, prefetch ? prefetch : 0xFFFFFFFF); } void Channel::cancel(const std::string& tag, bool synch) { @@ -173,21 +172,29 @@ void Channel::cancel(const std::string& tag, bool synch) { session.messageCancel(tag); } -bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { - Response response = session.basicGet(0, queue.getName(), ackMode == NO_ACK); - session.execution().sendFlushRequest(); - if (response.isA<BasicGetEmptyBody>()) { +bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { + string tag = "get-handler"; + ScopedDivert handler(tag, session.execution().getDemux()); + Demux::Queue& incoming = handler.getQueue(); + + session.messageSubscribe((destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1))); + session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); + session.messageFlow(tag, 0/*MESSAGES*/, 1); + Completion status = session.messageFlush(tag); + status.sync(); + session.messageCancel(tag); + + if (incoming.empty()) { return false; } else { - FrameSet::shared_ptr content = gets.pop(); - msg.populate(*content); + msg.populate(*(incoming.pop())); return true; } } void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, - bool mandatory, bool /*immediate TODO-restore immediate?*/) { + bool mandatory, bool /*?TODO-restore immediate?*/) { msg.getDeliveryProperties().setRoutingKey(routingKey); msg.getDeliveryProperties().setDiscardUnroutable(!mandatory); @@ -224,14 +231,23 @@ void Channel::join() { void Channel::dispatch(FrameSet& content, const std::string& destination) { - ConsumerMap::iterator i = consumers.find(destination); - if (i != consumers.end()) { + MessageListener* listener(0); + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(destination); + if (i != consumers.end()) { + Message msg; + msg.populate(content); + listener = i->second.listener; + } + } + if (listener) { Message msg; msg.populate(content); - i->second.listener->received(msg); + listener->received(msg); } else { QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); - } + } } void Channel::run() { @@ -239,12 +255,8 @@ void Channel::run() { while (true) { FrameSet::shared_ptr content = session.get(); //need to dispatch this to the relevant listener: - if (content->isA<BasicDeliverBody>()) { - dispatch(*content, content->as<BasicDeliverBody>()->getConsumerTag()); - } else if (content->isA<MessageTransferBody>()) { + if (content->isA<MessageTransferBody>()) { dispatch(*content, content->as<MessageTransferBody>()->getDestination()); - } else if (content->isA<BasicGetOkBody>()) { - gets.push(content); } else { QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod()); } diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h index 9e5e3a2e70..b33af65d21 100644 --- a/cpp/src/qpid/client/ClientChannel.h +++ b/cpp/src/qpid/client/ClientChannel.h @@ -71,7 +71,7 @@ class Channel : private sys::Runnable mutable sys::Mutex lock; sys::Thread dispatcher; - uint16_t prefetch; + uint32_t prefetch; const bool transactional; framing::ProtocolVersion version; @@ -88,7 +88,6 @@ class Channel : private sys::Runnable void stop(); - void setQos(); void open(const Session& session); void closeInternal(); void join(); @@ -110,7 +109,7 @@ class Channel : private sys::Runnable * messages the channel is willing to have sent to it * asynchronously */ - Channel(bool transactional = false, u_int16_t prefetch = 500); + Channel(bool transactional = false, u_int16_t prefetch = 0); ~Channel(); @@ -204,9 +203,9 @@ class Channel : private sys::Runnable /** * Change the prefetch in use. */ - void setPrefetch(uint16_t prefetch); + void setPrefetch(uint32_t prefetch); - uint16_t getPrefetch() { return prefetch; } + uint32_t getPrefetch() { return prefetch; } /** * Start message dispatching on a new thread diff --git a/cpp/src/qpid/client/Demux.cpp b/cpp/src/qpid/client/Demux.cpp new file mode 100644 index 0000000000..d85ad92003 --- /dev/null +++ b/cpp/src/qpid/client/Demux.cpp @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Demux.h" +#include "qpid/Exception.h" +#include "qpid/framing/MessageTransferBody.h" + +#include <iostream> + +namespace qpid { +namespace client { + +ByTransferDest::ByTransferDest(const std::string& d) : dest(d) {} +bool ByTransferDest::operator()(const framing::FrameSet& frameset) const +{ + return frameset.isA<framing::MessageTransferBody>() && + frameset.as<framing::MessageTransferBody>()->getDestination() == dest; +} + +ScopedDivert::ScopedDivert(const std::string& _dest, Demux& _demuxer) : dest(_dest), demuxer(_demuxer) +{ + queue = &(demuxer.add(dest, ByTransferDest(dest))); +} + +ScopedDivert::~ScopedDivert() +{ + demuxer.remove(dest); +} + +Demux::Queue& ScopedDivert::getQueue() +{ + return *queue; +} + +void Demux::handle(framing::FrameSet::shared_ptr frameset) +{ + sys::Mutex::ScopedLock l(lock); + bool matched = false; + for (iterator i = records.begin(); i != records.end() && !matched; i++) { + if (i->condition && i->condition(*frameset)) { + matched = true; + i->queue->push(frameset); + } + } + if (!matched) { + defaultQueue.push(frameset); + } +} + +void Demux::close() +{ + sys::Mutex::ScopedLock l(lock); + for (iterator i = records.begin(); i != records.end(); i++) { + i->queue->close(); + } + defaultQueue.close(); +} + +Demux::Queue& Demux::add(const std::string& name, Condition condition) +{ + sys::Mutex::ScopedLock l(lock); + iterator i = std::find_if(records.begin(), records.end(), Find(name)); + if (i == records.end()) { + Record r(name, condition); + records.push_back(r); + return *(r.queue); + } else { + throw Exception("Queue already exists for " + name); + } +} + +void Demux::remove(const std::string& name) +{ + sys::Mutex::ScopedLock l(lock); + records.remove_if(Find(name)); +} + +Demux::Queue& Demux::get(const std::string& name) +{ + sys::Mutex::ScopedLock l(lock); + iterator i = std::find_if(records.begin(), records.end(), Find(name)); + if (i == records.end()) { + throw Exception("No queue for " + name); + } + return *(i->queue); +} + +Demux::Queue& Demux::getDefault() +{ + return defaultQueue; +} + +Demux::Find::Find(const std::string& n) : name(n) {} + +bool Demux::Find::operator()(const Record& record) const +{ + return record.name == name; +} + +}} + diff --git a/cpp/src/qpid/client/Demux.h b/cpp/src/qpid/client/Demux.h new file mode 100644 index 0000000000..0f261f70ba --- /dev/null +++ b/cpp/src/qpid/client/Demux.h @@ -0,0 +1,96 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <list> +#include <boost/function.hpp> +#include <boost/shared_ptr.hpp> +#include "qpid/framing/FrameSet.h" +#include "qpid/sys/Mutex.h" +#include "BlockingQueue.h" + +#ifndef _Demux_ +#define _Demux_ + +namespace qpid { +namespace client { + +class ByTransferDest +{ + const std::string dest; +public: + ByTransferDest(const std::string& dest); + bool operator()(const framing::FrameSet& frameset) const; +}; + +class Demux +{ +public: + typedef boost::function<bool(const framing::FrameSet&)> Condition; + typedef BlockingQueue<framing::FrameSet::shared_ptr> Queue; + + void handle(framing::FrameSet::shared_ptr); + void close(); + + Queue& add(const std::string& name, Condition); + void remove(const std::string& name); + Queue& get(const std::string& name); + Queue& getDefault(); +private: + typedef boost::shared_ptr<Queue> QueuePtr; + struct Record + { + const std::string name; + Condition condition; + QueuePtr queue; + + Record(const std::string& n, Condition c) : name(n), condition(c), queue(new Queue()) {} + }; + + sys::Mutex lock; + std::list<Record> records; + Queue defaultQueue; + + typedef std::list<Record>::iterator iterator; + + struct Find + { + const std::string name; + Find(const std::string& name); + bool operator()(const Record& record) const; + }; +}; + +class ScopedDivert +{ + const std::string dest; + Demux& demuxer; + Demux::Queue* queue; +public: + ScopedDivert(const std::string& dest, Demux& demuxer); + ~ScopedDivert(); + Demux::Queue& getQueue(); +}; + +} +} + + +#endif diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h index 1e8c48734d..809dcc7592 100644 --- a/cpp/src/qpid/client/Execution.h +++ b/cpp/src/qpid/client/Execution.h @@ -22,6 +22,7 @@ #define _Execution_ #include "qpid/framing/SequenceNumber.h" +#include "Demux.h" namespace qpid { namespace client { @@ -33,6 +34,7 @@ public: virtual void sendSyncRequest() = 0; virtual void sendFlushRequest() = 0; virtual void completed(const framing::SequenceNumber& id, bool cumulative, bool send) = 0; + virtual Demux& getDemux() = 0; }; }} diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index c2b5e45928..8ea2cc64e6 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -68,7 +68,7 @@ void ExecutionHandler::handle(AMQFrame& frame) } arriving->append(frame); if (arriving->isComplete()) { - received.push(arriving); + demux.handle(arriving); arriving.reset(); } } else { diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index 3078f6bc3a..88424b555a 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -27,10 +27,10 @@ #include "qpid/framing/FrameSet.h" #include "qpid/framing/MethodContent.h" #include "qpid/framing/SequenceNumber.h" -#include "BlockingQueue.h" #include "ChainableFrameHandler.h" #include "CompletionTracker.h" #include "Correlator.h" +#include "Demux.h" #include "Execution.h" namespace qpid { @@ -46,7 +46,7 @@ class ExecutionHandler : framing::FrameSet::shared_ptr arriving; Correlator correlation; CompletionTracker completion; - BlockingQueue<framing::FrameSet::shared_ptr> received; + Demux demux; framing::ProtocolVersion version; uint64_t maxFrameSize; framing::AccumulatedAck completionStatus; @@ -79,7 +79,7 @@ public: void setMaxFrameSize(uint64_t size) { maxFrameSize = size; } Correlator& getCorrelator() { return correlation; } CompletionTracker& getCompletionTracker() { return completion; } - BlockingQueue<framing::FrameSet::shared_ptr>& getReceived() { return received; } + Demux& getDemux() { return demux; } }; }} diff --git a/cpp/src/qpid/client/MessageQueue.h b/cpp/src/qpid/client/MessageQueue.h new file mode 100644 index 0000000000..1f5c492910 --- /dev/null +++ b/cpp/src/qpid/client/MessageQueue.h @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _MessageQueue_ +#define _MessageQueue_ +#include <iostream> +#include "BlockingQueue.h" +#include "MessageListener.h" + +namespace qpid { +namespace client { + + /** + * A MessageListener implementation that simply queues up + * messages. + * + * \ingroup clientapi + */ + class MessageQueue : public MessageListener, public BlockingQueue<Message> + { + std::queue<Message> messages; + public: + void received(Message& msg) + { + std::cout << "Adding message to queue: " << msg.getData() << std::endl; + push(msg); + } + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 8dfe42989b..3595479642 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -50,7 +50,7 @@ ExecutionHandler& SessionCore::getExecution() FrameSet::shared_ptr SessionCore::get() { - return l3.getReceived().pop(); + return l3.getDemux().getDefault().pop(); } void SessionCore::setSync(bool s) @@ -71,7 +71,7 @@ void SessionCore::close() void SessionCore::stop() { - l3.getReceived().close(); + l3.getDemux().close(); l3.getCompletionTracker().close(); } @@ -98,6 +98,8 @@ void SessionCore::checkClosed() Future SessionCore::send(const AMQBody& command) { + checkClosed(); + Future f; //any result/response listeners must be set before the command is sent if (command.getMethod()->resultExpected()) { @@ -120,6 +122,7 @@ Future SessionCore::send(const AMQBody& command) Future SessionCore::send(const AMQBody& command, const MethodContent& content) { + checkClosed(); //content bearing methods don't currently have responses or //results, if that changes should follow procedure for the other //send method impl: diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 9f68716104..7c58708974 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -65,6 +65,8 @@ class ClientSessionTest : public CppUnit::TestCase session.queueDeclare_(queue=queueName, exclusive=true, autoDelete=true); //subcribe to the queue with confirm_mode = 1: session.messageSubscribe_(queue=queueName, destination=dest, acquireMode=1); + session.messageFlow((destination=dest, unit=0, value=1));//messages + session.messageFlow((destination=dest, unit=1, value=0xFFFFFFFF));//bytes //publish a message: TransferContent _content(data); _content.getDeliveryProperties().setRoutingKey("my-queue"); diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt index 5b2fb593e1..878afee3c5 100644 --- a/python/cpp_failing_0-10.txt +++ b/python/cpp_failing_0-10.txt @@ -1,3 +1,2 @@ tests_0-10.alternate-exchange.AlternateExchangeTests.test_immediate -tests_0-10.basic.BasicTests.test_get diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index 28c07ba43a..c2e3024a7e 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -259,8 +259,17 @@ class TestBase(unittest.TestCase): else: self.uniqueTag += 1 consumer_tag = "tag" + str(self.uniqueTag) self.channel.message_subscribe(queue=queueName, destination=consumer_tag) + self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) + self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) return self.client.queue(consumer_tag) + def subscribe(self, channel=None, **keys): + channel = channel or self.channel + consumer_tag = keys["destination"] + channel.message_subscribe(**keys) + channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) + channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) + def assertEmpty(self, queue): """Assert that the queue is empty""" try: diff --git a/python/tests_0-10/alternate-exchange.py b/python/tests_0-10/alternate-exchange.py index d6ac62ccfe..a749d733b0 100644 --- a/python/tests_0-10/alternate-exchange.py +++ b/python/tests_0-10/alternate-exchange.py @@ -39,13 +39,13 @@ class AlternateExchangeTests(TestBase): #declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages channel.queue_declare(queue="returns", exclusive=True) channel.queue_bind(queue="returns", exchange="secondary") - channel.message_subscribe(destination="a", queue="returns") + self.subscribe(destination="a", queue="returns") returned = self.client.queue("a") #declare, bind (to the primary exchange) and consume from a queue for 'processed' messages channel.queue_declare(queue="processed", exclusive=True) channel.queue_bind(queue="processed", exchange="primary", routing_key="my-key") - channel.message_subscribe(destination="b", queue="processed") + self.subscribe(destination="b", queue="processed") processed = self.client.queue("b") #publish to the primary exchange @@ -73,7 +73,7 @@ class AlternateExchangeTests(TestBase): channel.exchange_declare(exchange="dlq", type="fanout") channel.queue_declare(queue="deleted", exclusive=True) channel.queue_bind(exchange="dlq", queue="deleted") - channel.message_subscribe(destination="dlq", queue="deleted") + self.subscribe(destination="dlq", queue="deleted") dlq = self.client.queue("dlq") #create a queue using the dlq as its alternate exchange: @@ -103,7 +103,7 @@ class AlternateExchangeTests(TestBase): channel.exchange_declare(exchange="dlq", type="fanout") channel.queue_declare(queue="immediate", exclusive=True) channel.queue_bind(exchange="dlq", queue="immediate") - channel.message_subscribe(destination="dlq", queue="immediate") + self.subscribe(destination="dlq", queue="immediate") dlq = self.client.queue("dlq") #create a queue using the dlq as its alternate exchange: diff --git a/python/tests_0-10/broker.py b/python/tests_0-10/broker.py index 0eb71287ec..0df7eb09fa 100644 --- a/python/tests_0-10/broker.py +++ b/python/tests_0-10/broker.py @@ -35,7 +35,7 @@ class BrokerTests(TestBase): # No ack consumer ctag = "tag1" - ch.message_subscribe(queue = "myqueue", destination = ctag, confirm_mode = 0) + self.subscribe(ch, queue = "myqueue", destination = ctag) body = "test no-ack" ch.message_transfer(content = Content(body, properties = {"routing_key" : "myqueue"})) msg = self.client.queue(ctag).get(timeout = 5) @@ -44,7 +44,9 @@ class BrokerTests(TestBase): # Acknowledging consumer self.queue_declare(ch, queue = "otherqueue") ctag = "tag2" - ch.message_subscribe(queue = "otherqueue", destination = ctag, confirm_mode = 1) + self.subscribe(ch, queue = "otherqueue", destination = ctag, confirm_mode = 1) + ch.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF) + ch.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF) body = "test ack" ch.message_transfer(content = Content(body, properties = {"routing_key" : "otherqueue"})) msg = self.client.queue(ctag).get(timeout = 5) @@ -60,7 +62,7 @@ class BrokerTests(TestBase): self.queue_declare(channel, queue="test-queue") channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") consumer_tag = "tag1" - channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0) + self.subscribe(queue="test-queue", destination=consumer_tag) queue = self.client.queue(consumer_tag) body = "Immediate Delivery" @@ -84,7 +86,7 @@ class BrokerTests(TestBase): channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"})) consumer_tag = "tag1" - channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0) + self.subscribe(queue="test-queue", destination=consumer_tag) queue = self.client.queue(consumer_tag) msg = queue.get(timeout=5) self.assert_(msg.content.body == body) @@ -111,7 +113,7 @@ class BrokerTests(TestBase): def test_channel_flow(self): channel = self.channel channel.queue_declare(queue="flow_test_queue", exclusive=True) - channel.message_subscribe(destination="my-tag", queue="flow_test_queue") + self.subscribe(destination="my-tag", queue="flow_test_queue") incoming = self.client.queue("my-tag") channel.channel_flow(active=False) diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py index 29a4d3bf0d..b5645cb596 100644 --- a/python/tests_0-10/dtx.py +++ b/python/tests_0-10/dtx.py @@ -366,7 +366,7 @@ class DtxTests(TestBase): #check the second message is available, but not the first self.assertMessageCount(1, "tx-queue") - channel.message_subscribe(queue="tx-queue", destination="results", confirm_mode=1) + self.subscribe(channel, queue="tx-queue", destination="results", confirm_mode=1) msg = self.client.queue("results").get(timeout=1) self.assertEqual("two", msg.content['message_id']) channel.message_cancel(destination="results") @@ -602,5 +602,7 @@ class DtxTests(TestBase): def assertMessageId(self, expected, queue): self.channel.message_subscribe(queue=queue, destination="results") + self.channel.message_flow(destination="results", unit=0, value=1) + self.channel.message_flow(destination="results", unit=1, value=0xFFFFFFFF) self.assertEqual(expected, self.client.queue("results").get(timeout=1).content['message_id']) self.channel.message_cancel(destination="results") diff --git a/python/tests_0-10/example.py b/python/tests_0-10/example.py index e3e2c3b095..9dbe73e3cb 100644 --- a/python/tests_0-10/example.py +++ b/python/tests_0-10/example.py @@ -69,6 +69,8 @@ class ExampleTest (TestBase): # field that is filled if the reply includes content. In this case the # interesting field is the consumer_tag. channel.message_subscribe(queue="test-queue", destination="consumer_tag") + channel.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF) + channel.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF) # We can use the Client.queue(...) method to access the queue # corresponding to our consumer_tag. diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index 8089709314..ba26dda309 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -34,8 +34,8 @@ class MessageTests(TestBase): channel.queue_declare(queue="test-queue-1a", exclusive=True) channel.queue_declare(queue="test-queue-1b", exclusive=True) #establish two consumers one of which excludes delivery of locally sent messages - channel.message_subscribe(destination="local_included", queue="test-queue-1a") - channel.message_subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True) + self.subscribe(destination="local_included", queue="test-queue-1a") + self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True) #send a message channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local")) @@ -61,9 +61,9 @@ class MessageTests(TestBase): channel.queue_declare(queue="test-queue-2", exclusive=True) #check that an exclusive consumer prevents other consumer being created: - channel.message_subscribe(destination="first", queue="test-queue-2", exclusive=True) + self.subscribe(destination="first", queue="test-queue-2", exclusive=True) try: - channel.message_subscribe(destination="second", queue="test-queue-2") + self.subscribe(destination="second", queue="test-queue-2") self.fail("Expected consume request to fail due to previous exclusive consumer") except Closed, e: self.assertChannelException(403, e.args[0]) @@ -73,9 +73,9 @@ class MessageTests(TestBase): channel.channel_open() #check that an exclusive consumer cannot be created if a consumer already exists: - channel.message_subscribe(destination="first", queue="test-queue-2") + self.subscribe(channel, destination="first", queue="test-queue-2") try: - channel.message_subscribe(destination="second", queue="test-queue-2", exclusive=True) + self.subscribe(destination="second", queue="test-queue-2", exclusive=True) self.fail("Expected exclusive consume request to fail due to previous consumer") except Closed, e: self.assertChannelException(403, e.args[0]) @@ -87,7 +87,7 @@ class MessageTests(TestBase): channel = self.channel try: #queue specified but doesn't exist: - channel.message_subscribe(queue="invalid-queue") + self.subscribe(queue="invalid-queue", destination="") self.fail("Expected failure when consuming from non-existent queue") except Closed, e: self.assertChannelException(404, e.args[0]) @@ -96,7 +96,7 @@ class MessageTests(TestBase): channel.channel_open() try: #queue not specified and none previously declared for channel: - channel.message_subscribe(queue="") + self.subscribe(channel, queue="", destination="") self.fail("Expected failure when consuming from unspecified queue") except Closed, e: self.assertConnectionException(530, e.args[0]) @@ -110,9 +110,9 @@ class MessageTests(TestBase): channel.queue_declare(queue="test-queue-3", exclusive=True) #check that attempts to use duplicate tags are detected and prevented: - channel.message_subscribe(destination="first", queue="test-queue-3") + self.subscribe(destination="first", queue="test-queue-3") try: - channel.message_subscribe(destination="first", queue="test-queue-3") + self.subscribe(destination="first", queue="test-queue-3") self.fail("Expected consume request to fail due to non-unique tag") except Closed, e: self.assertConnectionException(530, e.args[0]) @@ -124,7 +124,7 @@ class MessageTests(TestBase): channel = self.channel #setup, declare a queue: channel.queue_declare(queue="test-queue-4", exclusive=True) - channel.message_subscribe(destination="my-consumer", queue="test-queue-4") + self.subscribe(destination="my-consumer", queue="test-queue-4") channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One")) #cancel should stop messages being delivered @@ -150,7 +150,7 @@ class MessageTests(TestBase): channel = self.channel channel.queue_declare(queue="test-ack-queue", exclusive=True) - channel.message_subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1) + self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One")) @@ -194,7 +194,7 @@ class MessageTests(TestBase): channel = self.channel channel.queue_declare(queue="test-requeue", exclusive=True) - channel.message_subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1) + self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One")) @@ -225,7 +225,7 @@ class MessageTests(TestBase): #requeue unacked messages (Three and Five) channel.message_recover(requeue=True) - channel.message_subscribe(queue="test-requeue", destination="consumer_tag") + self.subscribe(queue="test-requeue", destination="consumer_tag") queue2 = self.client.queue("consumer_tag") msg3b = queue2.get(timeout=1) @@ -256,7 +256,7 @@ class MessageTests(TestBase): #setup: declare queue and subscribe channel = self.channel channel.queue_declare(queue="test-prefetch-count", exclusive=True) - subscription = channel.message_subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1) + subscription = self.subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") #set prefetch to 5: @@ -298,7 +298,7 @@ class MessageTests(TestBase): #setup: declare queue and subscribe channel = self.channel channel.queue_declare(queue="test-prefetch-size", exclusive=True) - subscription = channel.message_subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1) + subscription = self.subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") #set prefetch to 50 bytes (each message is 9 or 10 bytes): @@ -345,13 +345,13 @@ class MessageTests(TestBase): channel.queue_declare(queue = "r", exclusive=True) channel.queue_bind(queue = "r", exchange = "amq.fanout") - channel.message_subscribe(queue = "q", destination = "consumer", confirm_mode = 1) + self.subscribe(queue = "q", destination = "consumer", confirm_mode = 1) channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah")) msg = self.client.queue("consumer").get(timeout = 1) self.assertEquals(msg.content.body, "blah, blah") channel.message_reject([msg.command_id, msg.command_id]) - channel.message_subscribe(queue = "r", destination = "checker") + self.subscribe(queue = "r", destination = "checker") msg = self.client.queue("checker").get(timeout = 1) self.assertEquals(msg.content.body, "blah, blah") @@ -365,8 +365,6 @@ class MessageTests(TestBase): #create consumer (for now that defaults to infinite credit) channel.message_subscribe(queue = "q", destination = "c") channel.message_flow_mode(mode = 0, destination = "c") - #set credit to zero (can remove this once move to proper default for subscribe method) - channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) @@ -397,8 +395,6 @@ class MessageTests(TestBase): #create consumer (for now that defaults to infinite credit) channel.message_subscribe(queue = "q", destination = "c") channel.message_flow_mode(mode = 0, destination = "c") - #set credit to zero (can remove this once move to proper default for subscribe method) - channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) @@ -431,8 +427,6 @@ class MessageTests(TestBase): #create consumer (for now that defaults to infinite credit) channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) channel.message_flow_mode(mode = 1, destination = "c") - #set credit to zero (can remove this once move to proper default for subscribe method) - channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) @@ -465,8 +459,6 @@ class MessageTests(TestBase): #create consumer (for now that defaults to infinite credit) channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) channel.message_flow_mode(mode = 1, destination = "c") - #set credit to zero (can remove this once move to proper default for subscribe method) - channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) @@ -506,8 +498,8 @@ class MessageTests(TestBase): for i in range(1, 6): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) - channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) - channel.message_subscribe(queue = "q", destination = "b", acquire_mode = 1) + self.subscribe(queue = "q", destination = "a", acquire_mode = 1) + self.subscribe(queue = "q", destination = "b", acquire_mode = 1) for i in range(6, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) @@ -532,7 +524,7 @@ class MessageTests(TestBase): channel.queue_declare(queue = "q", exclusive=True) channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me")) - channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1) + self.subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1) msg = self.client.queue("a").get(timeout = 1) channel.message_acquire([msg.command_id, msg.command_id]) msg.complete() @@ -548,7 +540,7 @@ class MessageTests(TestBase): channel.queue_declare(queue = "q", exclusive=True) channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me")) - channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1) + self.subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1) msg = self.client.queue("a").get(timeout = 1) channel.message_cancel(destination = "a") channel.message_release([msg.command_id, msg.command_id]) diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py index 05fa1aebc6..e3438116c8 100644 --- a/python/tests_0-10/queue.py +++ b/python/tests_0-10/queue.py @@ -49,7 +49,7 @@ class QueueTests(TestBase): #send a further message and consume it, ensuring that the other messages are really gone channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"})) - channel.message_subscribe(queue="test-queue", destination="tag") + self.subscribe(queue="test-queue", destination="tag") queue = self.client.queue("tag") msg = queue.get(timeout=1) self.assertEqual("four", msg.content.body) @@ -169,8 +169,8 @@ class QueueTests(TestBase): channel.queue_declare(queue="queue-1", exclusive="True") channel.queue_declare(queue="queue-2", exclusive="True") - channel.message_subscribe(queue="queue-1", destination="queue-1") - channel.message_subscribe(queue="queue-2", destination="queue-2") + self.subscribe(queue="queue-1", destination="queue-1") + self.subscribe(queue="queue-2", destination="queue-2") queue1 = self.client.queue("queue-1") queue2 = self.client.queue("queue-2") @@ -257,7 +257,7 @@ class QueueTests(TestBase): channel.channel_open() #empty queue: - channel.message_subscribe(destination="consumer_tag", queue="delete-me-2") + self.subscribe(channel, destination="consumer_tag", queue="delete-me-2") queue = self.client.queue("consumer_tag") msg = queue.get(timeout=1) self.assertEqual("message", msg.content.body) @@ -282,7 +282,7 @@ class QueueTests(TestBase): #create a queue and register a consumer: channel.queue_declare(queue="delete-me-3") channel.queue_declare(queue="delete-me-3", passive="True") - channel.message_subscribe(destination="consumer_tag", queue="delete-me-3") + self.subscribe(destination="consumer_tag", queue="delete-me-3") #need new channel now: channel2 = self.client.channel(2) diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py index 7c50de4ee2..2415a88fb2 100644 --- a/python/tests_0-10/tx.py +++ b/python/tests_0-10/tx.py @@ -41,13 +41,13 @@ class TxTests(TestBase): channel = self.channel channel.tx_select() - channel.message_subscribe(queue="tx-commit-a", destination="qa", confirm_mode=1) + self.subscribe(channel, queue="tx-commit-a", destination="qa", confirm_mode=1) queue_a = self.client.queue("qa") - channel.message_subscribe(queue="tx-commit-b", destination="qb", confirm_mode=1) + self.subscribe(channel, queue="tx-commit-b", destination="qb", confirm_mode=1) queue_b = self.client.queue("qb") - channel.message_subscribe(queue="tx-commit-c", destination="qc", confirm_mode=1) + self.subscribe(channel, queue="tx-commit-c", destination="qc", confirm_mode=1) queue_c = self.client.queue("qc") #check results @@ -176,7 +176,7 @@ class TxTests(TestBase): channel.tx_select() #consume and ack messages - channel.message_subscribe(queue=name_a, destination="sub_a", confirm_mode=1) + self.subscribe(channel, queue=name_a, destination="sub_a", confirm_mode=1) queue_a = self.client.queue("sub_a") for i in range(1, 5): msg = queue_a.get(timeout=1) @@ -184,13 +184,13 @@ class TxTests(TestBase): msg.complete() - channel.message_subscribe(queue=name_b, destination="sub_b", confirm_mode=1) + self.subscribe(channel, queue=name_b, destination="sub_b", confirm_mode=1) queue_b = self.client.queue("sub_b") msg = queue_b.get(timeout=1) self.assertEqual("Message 6", msg.content.body) msg.complete() - sub_c = channel.message_subscribe(queue=name_c, destination="sub_c", confirm_mode=1) + sub_c = self.subscribe(channel, queue=name_c, destination="sub_c", confirm_mode=1) queue_c = self.client.queue("sub_c") msg = queue_c.get(timeout=1) self.assertEqual("Message 7", msg.content.body) |