diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-05 09:47:07 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-05 09:47:07 +0000 |
commit | b6a70a78410d771ebbf5c22160d2e012e8b5f59a (patch) | |
tree | 4f073d8737e44c1c8beeb2af1cecd3dd85445386 /qpid/cpp/src | |
parent | c8523699bbc6c5a58972070b5a53638686d0a1a9 (diff) | |
download | qpid-python-b6a70a78410d771ebbf5c22160d2e012e8b5f59a.tar.gz |
Fix for QPID-534. Get now detects closure correctly. Also fixed broker to allow channel.close-ok (and fixed client to send it).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@553441 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/broker/BrokerChannel.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/BasicMessageChannel.cpp | 23 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/BasicMessageChannel.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ClientChannel.cpp | 46 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ClientChannel.h | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ClientConnection.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/MessageChannel.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/MessageMessageChannel.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/MessageMessageChannel.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/ChannelAdapter.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Makefile.am | 3 | ||||
-rw-r--r-- | qpid/cpp/src/tests/exception_test.cpp | 67 |
12 files changed, 148 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp index 86768f0d88..3d9eab4433 100644 --- a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp @@ -401,9 +401,11 @@ void Channel::handleMethodInContext( { try{ if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { - std::stringstream out; - out << "Attempt to use unopened channel: " << getId(); - throw ConnectionException(504, out.str()); + if (!method->isA<ChannelCloseOkBody>()) { + std::stringstream out; + out << "Attempt to use unopened channel: " << getId(); + throw ConnectionException(504, out.str()); + } } else { method->invoke(*adapter, context); } diff --git a/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp b/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp index 91849c735e..60368268c0 100644 --- a/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp +++ b/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp @@ -100,34 +100,32 @@ void BasicMessageChannel::cancel(const std::string& tag, bool synch) { c = i->second; consumers.erase(i); } - if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) + if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) { channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); + } channel.sendAndReceiveSync<BasicCancelOkBody>( synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch))); } void BasicMessageChannel::close(){ - ConsumerMap consumersCopy; - { - Mutex::ScopedLock l(lock); - consumersCopy = consumers; - consumers.clear(); - } destGet.shutdown(); destDispatch.shutdown(); - for (ConsumerMap::iterator i=consumersCopy.begin(); - i != consumersCopy.end(); ++i) +} + +void BasicMessageChannel::cancelAll(){ + Mutex::ScopedLock l(lock); + for (ConsumerMap::iterator i = consumers.begin(); i != consumers.end(); i++) { Consumer& c = i->second; - if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) - && c.lastDeliveryTag > 0) + if (c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) { channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); } + channel.send(new BasicCancelBody(channel.version, i->first, true)); } + consumers.clear(); } - bool BasicMessageChannel::get( Message& msg, const Queue& queue, AckMode ackMode) { @@ -324,6 +322,7 @@ void BasicMessageChannel::run() { // Orderly shutdown. } catch (const Exception& e) { + std::cout << "Error caught by dispatch thread: " << e.what() << std::endl; // FIXME aconway 2007-02-20: Report exception to user. QPID_LOG(error, e.what()); } diff --git a/qpid/cpp/src/qpid/client/BasicMessageChannel.h b/qpid/cpp/src/qpid/client/BasicMessageChannel.h index 13e1cf1e00..99838321ae 100644 --- a/qpid/cpp/src/qpid/client/BasicMessageChannel.h +++ b/qpid/cpp/src/qpid/client/BasicMessageChannel.h @@ -61,6 +61,8 @@ class BasicMessageChannel : public MessageChannel void close(); + void cancelAll(); + private: struct Consumer{ diff --git a/qpid/cpp/src/qpid/client/ClientChannel.cpp b/qpid/cpp/src/qpid/client/ClientChannel.cpp index 0cb0931155..ab6b9a41c3 100644 --- a/qpid/cpp/src/qpid/client/ClientChannel.cpp +++ b/qpid/cpp/src/qpid/client/ClientChannel.cpp @@ -40,7 +40,7 @@ using namespace qpid::framing; using namespace qpid::sys; Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) : - connection(0), prefetch(_prefetch), transactional(_transactional) + connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false) { switch (mode) { case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break; @@ -50,7 +50,8 @@ Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) : } Channel::~Channel(){ - close(); + closeInternal(); + stop(); } void Channel::open(ChannelId id, Connection& con) @@ -119,7 +120,10 @@ void Channel::protocolInit( } } -bool Channel::isOpen() const { return connection; } +bool Channel::isOpen() const { + Mutex::ScopedLock l(lock); + return connection; +} void Channel::setQos() { messaging->setQos(); @@ -187,7 +191,7 @@ void Channel::rollback(){ } void Channel::handleMethodInContext( - AMQMethodBody::shared_ptr method, const MethodContext&) +AMQMethodBody::shared_ptr method, const MethodContext& ctxt) { // Special case for consume OK as it is both an expected response // and needs handling in this thread. @@ -204,7 +208,7 @@ void Channel::handleMethodInContext( switch (method->amqpClassId()) { case MessageOkBody::CLASS_ID: case BasicGetOkBody::CLASS_ID: messaging->handle(method); break; - case ChannelCloseBody::CLASS_ID: handleChannel(method); break; + case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break; case ConnectionCloseBody::CLASS_ID: handleConnection(method); break; default: throw UnknownMethod(); } @@ -216,9 +220,10 @@ void Channel::handleMethodInContext( } } -void Channel::handleChannel(AMQMethodBody::shared_ptr method) { +void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& ctxt) { switch (method->amqpMethodId()) { case ChannelCloseBody::METHOD_ID: + send(new ChannelCloseOkBody(version, ctxt.getRequestId())); peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method)); return; case ChannelFlowBody::METHOD_ID: @@ -249,6 +254,7 @@ void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ } void Channel::start(){ + running = true; dispatcher = Thread(*messaging); } @@ -260,6 +266,8 @@ void Channel::close( if (isOpen()) { try { if (getId() != 0) { + if (code == 200) messaging->cancelAll(); + sendAndReceive<ChannelCloseOkBody>( make_shared_ptr(new ChannelCloseBody( version, code, text, classId, methodId))); @@ -272,23 +280,35 @@ void Channel::close( throw; } } + stop(); } // Channel closed by peer. -void Channel::peerClose(ChannelCloseBody::shared_ptr) { +void Channel::peerClose(ChannelCloseBody::shared_ptr reason) { assert(isOpen()); + //record reason: + errorCode = reason->getReplyCode(); + errorText = reason->getReplyText(); closeInternal(); } void Channel::closeInternal() { - if (isOpen()); + Mutex::ScopedLock l(lock); + if (connection); { - messaging->close(); connection = 0; + messaging->close(); // A 0 response means we are closed. responses.signalResponse(AMQMethodBody::shared_ptr()); } - dispatcher.join(); +} + +void Channel::stop() { + Mutex::ScopedLock l(stopLock); + if(running) { + dispatcher.join(); + running = false; + } } AMQMethodBody::shared_ptr Channel::sendAndReceive( @@ -321,7 +341,11 @@ void Channel::cancel(const std::string& tag, bool synch) { } bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { - return messaging->get(msg, queue, ackMode); + bool result = messaging->get(msg, queue, ackMode); + if (!isOpen()) { + throw ChannelException(errorCode, errorText); + } + return result; } void Channel::publish(const Message& msg, const Exchange& exchange, diff --git a/qpid/cpp/src/qpid/client/ClientChannel.h b/qpid/cpp/src/qpid/client/ClientChannel.h index 4faf778d28..cea1245e6a 100644 --- a/qpid/cpp/src/qpid/client/ClientChannel.h +++ b/qpid/cpp/src/qpid/client/ClientChannel.h @@ -27,6 +27,7 @@ #include "ClientMessage.h" #include "ClientQueue.h" #include "ResponseHandler.h" +#include "qpid/Exception.h" #include "qpid/framing/ChannelAdapter.h" #include "qpid/sys/Thread.h" #include "AckMode.h" @@ -58,7 +59,7 @@ class Channel : public framing::ChannelAdapter struct UnknownMethod {}; typedef shared_ptr<framing::AMQMethodBody> MethodPtr; - sys::Mutex lock; + mutable sys::Mutex lock; boost::scoped_ptr<MessageChannel> messaging; Connection* connection; sys::Thread dispatcher; @@ -68,12 +69,20 @@ class Channel : public framing::ChannelAdapter const bool transactional; framing::ProtocolVersion version; + uint16_t errorCode; + std::string errorText; + + sys::Mutex stopLock; + bool running; + + void stop(); + void handleHeader(framing::AMQHeaderBody::shared_ptr body); void handleContent(framing::AMQContentBody::shared_ptr body); void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body); void handleMethodInContext( framing::AMQMethodBody::shared_ptr, const framing::MethodContext&); - void handleChannel(framing::AMQMethodBody::shared_ptr method); + void handleChannel(framing::AMQMethodBody::shared_ptr method, const framing::MethodContext& ctxt); void handleConnection(framing::AMQMethodBody::shared_ptr method); void setQos(); diff --git a/qpid/cpp/src/qpid/client/ClientConnection.cpp b/qpid/cpp/src/qpid/client/ClientConnection.cpp index bddadb0800..102de555fd 100644 --- a/qpid/cpp/src/qpid/client/ClientConnection.cpp +++ b/qpid/cpp/src/qpid/client/ClientConnection.cpp @@ -25,6 +25,8 @@ #include "Connection.h" #include "ClientChannel.h" #include "ClientMessage.h" +#include "qpid/log/Logger.h" +#include "qpid/log/Options.h" #include "qpid/log/Statement.h" #include "qpid/QpidError.h" #include <iostream> @@ -49,6 +51,9 @@ Connection::Connection( isOpen(false), debug(_debug) { setConnector(defaultConnector); + qpid::log::Options o; + o.trace = debug; + qpid::log::Logger::instance().configure(o, "qpid-c++-client"); } Connection::~Connection(){} @@ -143,6 +148,7 @@ void Connection::received(AMQFrame& frame){ try{ channel->getHandlers().in->handle(frame); }catch(const qpid::QpidError& e){ + std::cout << "Caught error while handling " << frame << ": " << e.what() <<std::endl; channelException( *channel, dynamic_cast<AMQMethodBody*>(frame.getBody().get()), e); } diff --git a/qpid/cpp/src/qpid/client/MessageChannel.h b/qpid/cpp/src/qpid/client/MessageChannel.h index a830a47986..abf0f8270a 100644 --- a/qpid/cpp/src/qpid/client/MessageChannel.h +++ b/qpid/cpp/src/qpid/client/MessageChannel.h @@ -83,8 +83,11 @@ class MessageChannel : public sys::Runnable /** Send channel's QOS settings */ virtual void setQos() = 0; - /** Channel is closing */ + /** Channel has closed */ virtual void close() = 0; + + /** Cancel all consumers */ + virtual void cancelAll() = 0; }; }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp b/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp index c73f501ec5..2a8f7a01c1 100644 --- a/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp +++ b/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp @@ -109,6 +109,8 @@ void MessageMessageChannel::close(){ // incoming.shutdown(); } +void MessageMessageChannel::cancelAll(){ +} /** Destination ID for the current get. * Must not clash with a generated consumer ID. diff --git a/qpid/cpp/src/qpid/client/MessageMessageChannel.h b/qpid/cpp/src/qpid/client/MessageMessageChannel.h index 12c4786b81..44b64b3d80 100644 --- a/qpid/cpp/src/qpid/client/MessageMessageChannel.h +++ b/qpid/cpp/src/qpid/client/MessageMessageChannel.h @@ -62,6 +62,8 @@ class MessageMessageChannel : public MessageChannel void close(); + void cancelAll(); + private: typedef boost::ptr_map<std::string, IncomingMessage::WaitableDestination> Destinations; diff --git a/qpid/cpp/src/qpid/framing/ChannelAdapter.h b/qpid/cpp/src/qpid/framing/ChannelAdapter.h index 0597e5e372..5f92383ee3 100644 --- a/qpid/cpp/src/qpid/framing/ChannelAdapter.h +++ b/qpid/cpp/src/qpid/framing/ChannelAdapter.h @@ -58,6 +58,7 @@ class ChannelAdapter : private BodyHandler { *@param output Processed frames are forwarded to this handler. */ ChannelAdapter() : id(0) {} + virtual ~ChannelAdapter() {} /** Initialize the channel adapter. */ void init(ChannelId, OutputHandler&, ProtocolVersion); diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 76f821798d..ee1a7317e0 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -95,6 +95,7 @@ unit_tests = \ testprogs= \ client_test \ + exception_test \ echo_service \ topic_listener \ topic_publisher @@ -103,7 +104,7 @@ check_PROGRAMS += $(testprogs) interop_runner TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test -system_tests = client_test quick_topictest +system_tests = client_test exception_test quick_topictest TESTS += run-unit-tests start_broker $(system_tests) python_tests kill_broker EXTRA_DIST += \ diff --git a/qpid/cpp/src/tests/exception_test.cpp b/qpid/cpp/src/tests/exception_test.cpp new file mode 100644 index 0000000000..fffae796dd --- /dev/null +++ b/qpid/cpp/src/tests/exception_test.cpp @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <iostream> + +#include "TestOptions.h" +#include "qpid/QpidError.h" +#include "qpid/client/ClientChannel.h" +#include "qpid/client/Connection.h" +#include "qpid/client/ClientMessage.h" + +using namespace qpid::client; +using namespace qpid::sys; +using std::string; + +int main(int argc, char** argv) +{ + qpid::TestOptions opts; + opts.parse(argc, argv); + + try { + Connection con(opts.trace); + con.open(opts.host, opts.port, opts.username, opts.password, opts.virtualhost); + + Queue queue("I don't exist!"); + Channel channel; + con.openChannel(channel); + channel.start(); + //test handling of get (which is a bit odd) + try { + Message msg; + if (channel.get(msg, queue)) { + std::cout << "Received " << msg.getData() << " from " << queue.getName() << std::endl; + } else { + std::cout << "Queue " << queue.getName() << " was empty." << std::endl; + } + con.close(); + return 1; + } catch (const qpid::ChannelException& e) { + std::cout << "get failed as expected: " << e.what() << std::endl; + } + + con.close(); + return 0; + } catch(const std::exception& e) { + std::cout << "got unexpected exception: " << e.what() << std::endl; + return 1; + } +} |