diff options
-rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 17 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.h | 2 | ||||
-rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.cpp | 9 | ||||
-rw-r--r-- | cpp/lib/client/Connection.cpp | 11 | ||||
-rw-r--r-- | cpp/lib/client/Connection.h | 3 | ||||
-rw-r--r-- | cpp/lib/client/Connector.cpp | 25 | ||||
-rw-r--r-- | cpp/lib/client/Connector.h | 3 | ||||
-rw-r--r-- | cpp/lib/common/sys/apr/Socket.cpp | 5 | ||||
-rw-r--r-- | cpp/tests/BasicP2PTest.h | 2 | ||||
-rw-r--r-- | cpp/tests/ChannelTest.cpp | 32 |
10 files changed, 87 insertions, 22 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 65aa50d3ac..d8fbdc467c 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -43,7 +43,8 @@ Channel::Channel(qpid::framing::ProtocolVersion& _version, OutputHandler* _out, accumulatedAck(0), store(_store), messageBuilder(this, _store, _stagingThreshold), - version(_version){ + version(_version), + flowActive(true){ outstanding.reset(); } @@ -142,7 +143,7 @@ Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ if(!connection || connection != msg->getPublisher()){//check for no_local - if(ackExpected && !parent->checkPrefetch(msg)){ + if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){ blocked = true; }else{ blocked = false; @@ -257,3 +258,15 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){ msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version); } + +void Channel::flow(bool active){ + Mutex::ScopedLock locker(deliveryLock); + bool requestDelivery(!flowActive && active); + flowActive = active; + if (requestDelivery) { + //there may be messages that can be now be delivered + for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ + j->second->requestDispatch(); + } + } +} diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index 888ca3c051..be40a25013 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -89,6 +89,7 @@ namespace qpid { MessageBuilder messageBuilder;//builder for in-progress message Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to qpid::framing::ProtocolVersion version; // version used for this channel + bool flowActive; virtual void complete(Message::shared_ptr& msg); void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); @@ -118,6 +119,7 @@ namespace qpid { void handlePublish(Message* msg, Exchange::shared_ptr exchange); void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header); void handleContent(qpid::framing::AMQContentBody::shared_ptr content); + void flow(bool active); }; struct InvalidAckException{}; diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index 49767f4c71..b91bee5a4b 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -233,12 +233,11 @@ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const strin } } -void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){ - +void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t channel, bool active){ + parent->getChannel(channel)->flow(active); + parent->client->getChannel().flowOk(channel, active); } -void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){ - -} +void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/){ diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index ad8aa1d0dd..f7897aa4df 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -30,9 +30,11 @@ using namespace qpid::framing; using namespace qpid::sys; using namespace qpid::sys; -u_int16_t Connection::channelIdCounter; - -Connection::Connection( bool debug, u_int32_t _max_frame_size, qpid::framing::ProtocolVersion* _version) : max_frame_size(_max_frame_size), closed(true), +Connection::Connection( bool _debug, u_int32_t _max_frame_size, qpid::framing::ProtocolVersion* _version) : + debug(_debug), + channelIdCounter(0), + max_frame_size(_max_frame_size), + closed(true), version(_version->getMajor(),_version->getMinor()) { connector = new Connector(version, debug, _max_frame_size); @@ -96,7 +98,7 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui }else{ THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); } - + closed = false; } void Connection::close(){ @@ -108,6 +110,7 @@ void Connection::close(){ sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok); connector->close(); + closed = true; } } diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index 05d139e99c..bbf8c03b0b 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -68,7 +68,8 @@ namespace client { typedef std::map<int, Channel*>::iterator iterator; - static u_int16_t channelIdCounter; + const bool debug; + u_int16_t channelIdCounter; std::string host; int port; diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp index b34e66fd94..c57b3d6dc4 100644 --- a/cpp/lib/client/Connector.cpp +++ b/cpp/lib/client/Connector.cpp @@ -57,9 +57,10 @@ void Connector::init(ProtocolInitiation* header){ } void Connector::close(){ - closed = true; - socket.close(); - receiver.join(); + if (markClosed()) { + socket.close(); + receiver.join(); + } } void Connector::setInputHandler(InputHandler* handler){ @@ -101,14 +102,24 @@ void Connector::writeToSocket(char* data, size_t available){ } void Connector::handleClosed(){ - closed = true; - socket.close(); - if(shutdownHandler) shutdownHandler->shutdown(); + if (markClosed()) { + socket.close(); + if(shutdownHandler) shutdownHandler->shutdown(); + } +} + +bool Connector::markClosed(){ + if (closed) { + return false; + } else { + closed = true; + return true; + } } void Connector::checkIdle(ssize_t status){ if(timeoutHandler){ - Time t = now() * TIME_MSEC; + Time t = now() * TIME_MSEC; if(status == Socket::SOCKET_TIMEOUT) { if(idleIn && (t - lastIn > idleIn)){ timeoutHandler->idleIn(); diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h index f9e50f3216..eccb931e6c 100644 --- a/cpp/lib/client/Connector.h +++ b/cpp/lib/client/Connector.h @@ -44,7 +44,7 @@ namespace client { const int send_buffer_size; qpid::framing::ProtocolVersion version; - bool closed; + volatile bool closed; int64_t lastIn; int64_t lastOut; @@ -73,6 +73,7 @@ namespace client { void run(); void handleClosed(); + bool markClosed(); public: Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug = false, u_int32_t buffer_size = 1024); diff --git a/cpp/lib/common/sys/apr/Socket.cpp b/cpp/lib/common/sys/apr/Socket.cpp index 336eb4996a..5a5dc2a615 100644 --- a/cpp/lib/common/sys/apr/Socket.cpp +++ b/cpp/lib/common/sys/apr/Socket.cpp @@ -24,6 +24,7 @@ #include <apr/APRBase.h> #include <apr/APRPool.h> +#include <iostream> using namespace qpid::sys; @@ -55,6 +56,7 @@ void Socket::connect(const std::string& host, int port) { void Socket::close() { if (socket == 0) return; + CHECK_APR_SUCCESS(apr_socket_shutdown(socket, APR_SHUTDOWN_READWRITE)); CHECK_APR_SUCCESS(apr_socket_close(socket)); socket = 0; } @@ -76,8 +78,9 @@ ssize_t Socket::recv(void* data, size_t size) apr_status_t status = apr_socket_recv(socket, reinterpret_cast<char*>(data), &received); if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT; + if (APR_STATUS_IS_EOF(status)) return SOCKET_EOF; CHECK_APR_SUCCESS(status); - return received; + return received; } diff --git a/cpp/tests/BasicP2PTest.h b/cpp/tests/BasicP2PTest.h index 989344bb47..8b5d0e7a8c 100644 --- a/cpp/tests/BasicP2PTest.h +++ b/cpp/tests/BasicP2PTest.h @@ -71,7 +71,7 @@ public: std::string queue = params.getString("P2P_QUEUE_AND_KEY_NAME"); int messages = params.getInt("P2P_NUM_MESSAGES"); if (role == "SENDER") { - worker = std::auto_ptr<Worker>(new Sender(options, Exchange::DEFAULT_EXCHANGE, queue, messages)); + worker = std::auto_ptr<Worker>(new Sender(options, Exchange::STANDARD_DIRECT_EXCHANGE, queue, messages)); } else if(role == "RECEIVER"){ worker = std::auto_ptr<Worker>(new Receiver(options, queue, messages)); } else { diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp index f0860b8a28..cc0a90bad9 100644 --- a/cpp/tests/ChannelTest.cpp +++ b/cpp/tests/ChannelTest.cpp @@ -53,6 +53,7 @@ class ChannelTest : public CppUnit::TestCase CPPUNIT_TEST(testDeliveryAndRecovery); CPPUNIT_TEST(testStaging); CPPUNIT_TEST(testQueuePolicy); + CPPUNIT_TEST(testFlow); CPPUNIT_TEST_SUITE_END(); class MockMessageStore : public NullMessageStore @@ -303,6 +304,37 @@ class ChannelTest : public CppUnit::TestCase store.check(); } + + void testFlow(){ + DummyHandler handler; + Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000); + + const string data("abcdefghijklmn"); + + Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); + addContent(msg, data); + Queue::shared_ptr queue(new Queue("my_queue")); + ConnectionToken* owner(0); + string tag("no_ack"); + channel.consume(tag, queue, false, false, owner); + channel.flow(false); + queue->deliver(msg); + CPPUNIT_ASSERT_EQUAL((size_t) 0, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount()); + channel.flow(true); + CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); + BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody())); + AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody())); + AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); + CPPUNIT_ASSERT(deliver); + CPPUNIT_ASSERT(contentHeader); + CPPUNIT_ASSERT(contentBody); + CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); + } + Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize) { Message* msg = new Message(0, exchange, routingKey, false, false); |