summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-05 09:47:07 +0000
committerGordon Sim <gsim@apache.org>2007-07-05 09:47:07 +0000
commitb6a70a78410d771ebbf5c22160d2e012e8b5f59a (patch)
tree4f073d8737e44c1c8beeb2af1cecd3dd85445386
parentc8523699bbc6c5a58972070b5a53638686d0a1a9 (diff)
downloadqpid-python-b6a70a78410d771ebbf5c22160d2e012e8b5f59a.tar.gz
Fix for QPID-534. Get now detects closure correctly. Also fixed broker to allow channel.close-ok (and fixed client to send it).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@553441 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerChannel.cpp8
-rw-r--r--qpid/cpp/src/qpid/client/BasicMessageChannel.cpp23
-rw-r--r--qpid/cpp/src/qpid/client/BasicMessageChannel.h2
-rw-r--r--qpid/cpp/src/qpid/client/ClientChannel.cpp46
-rw-r--r--qpid/cpp/src/qpid/client/ClientChannel.h13
-rw-r--r--qpid/cpp/src/qpid/client/ClientConnection.cpp6
-rw-r--r--qpid/cpp/src/qpid/client/MessageChannel.h5
-rw-r--r--qpid/cpp/src/qpid/client/MessageMessageChannel.cpp2
-rw-r--r--qpid/cpp/src/qpid/client/MessageMessageChannel.h2
-rw-r--r--qpid/cpp/src/qpid/framing/ChannelAdapter.h1
-rw-r--r--qpid/cpp/src/tests/Makefile.am3
-rw-r--r--qpid/cpp/src/tests/exception_test.cpp67
12 files changed, 148 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
index 86768f0d88..3d9eab4433 100644
--- a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -401,9 +401,11 @@ void Channel::handleMethodInContext(
{
try{
if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
- std::stringstream out;
- out << "Attempt to use unopened channel: " << getId();
- throw ConnectionException(504, out.str());
+ if (!method->isA<ChannelCloseOkBody>()) {
+ std::stringstream out;
+ out << "Attempt to use unopened channel: " << getId();
+ throw ConnectionException(504, out.str());
+ }
} else {
method->invoke(*adapter, context);
}
diff --git a/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp b/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp
index 91849c735e..60368268c0 100644
--- a/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp
+++ b/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp
@@ -100,34 +100,32 @@ void BasicMessageChannel::cancel(const std::string& tag, bool synch) {
c = i->second;
consumers.erase(i);
}
- if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+ if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) {
channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+ }
channel.sendAndReceiveSync<BasicCancelOkBody>(
synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch)));
}
void BasicMessageChannel::close(){
- ConsumerMap consumersCopy;
- {
- Mutex::ScopedLock l(lock);
- consumersCopy = consumers;
- consumers.clear();
- }
destGet.shutdown();
destDispatch.shutdown();
- for (ConsumerMap::iterator i=consumersCopy.begin();
- i != consumersCopy.end(); ++i)
+}
+
+void BasicMessageChannel::cancelAll(){
+ Mutex::ScopedLock l(lock);
+ for (ConsumerMap::iterator i = consumers.begin(); i != consumers.end(); i++)
{
Consumer& c = i->second;
- if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
- && c.lastDeliveryTag > 0)
+ if (c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
{
channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
}
+ channel.send(new BasicCancelBody(channel.version, i->first, true));
}
+ consumers.clear();
}
-
bool BasicMessageChannel::get(
Message& msg, const Queue& queue, AckMode ackMode)
{
@@ -324,6 +322,7 @@ void BasicMessageChannel::run() {
// Orderly shutdown.
}
catch (const Exception& e) {
+ std::cout << "Error caught by dispatch thread: " << e.what() << std::endl;
// FIXME aconway 2007-02-20: Report exception to user.
QPID_LOG(error, e.what());
}
diff --git a/qpid/cpp/src/qpid/client/BasicMessageChannel.h b/qpid/cpp/src/qpid/client/BasicMessageChannel.h
index 13e1cf1e00..99838321ae 100644
--- a/qpid/cpp/src/qpid/client/BasicMessageChannel.h
+++ b/qpid/cpp/src/qpid/client/BasicMessageChannel.h
@@ -61,6 +61,8 @@ class BasicMessageChannel : public MessageChannel
void close();
+ void cancelAll();
+
private:
struct Consumer{
diff --git a/qpid/cpp/src/qpid/client/ClientChannel.cpp b/qpid/cpp/src/qpid/client/ClientChannel.cpp
index 0cb0931155..ab6b9a41c3 100644
--- a/qpid/cpp/src/qpid/client/ClientChannel.cpp
+++ b/qpid/cpp/src/qpid/client/ClientChannel.cpp
@@ -40,7 +40,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) :
- connection(0), prefetch(_prefetch), transactional(_transactional)
+ connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false)
{
switch (mode) {
case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break;
@@ -50,7 +50,8 @@ Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) :
}
Channel::~Channel(){
- close();
+ closeInternal();
+ stop();
}
void Channel::open(ChannelId id, Connection& con)
@@ -119,7 +120,10 @@ void Channel::protocolInit(
}
}
-bool Channel::isOpen() const { return connection; }
+bool Channel::isOpen() const {
+ Mutex::ScopedLock l(lock);
+ return connection;
+}
void Channel::setQos() {
messaging->setQos();
@@ -187,7 +191,7 @@ void Channel::rollback(){
}
void Channel::handleMethodInContext(
- AMQMethodBody::shared_ptr method, const MethodContext&)
+AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
{
// Special case for consume OK as it is both an expected response
// and needs handling in this thread.
@@ -204,7 +208,7 @@ void Channel::handleMethodInContext(
switch (method->amqpClassId()) {
case MessageOkBody::CLASS_ID:
case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
- case ChannelCloseBody::CLASS_ID: handleChannel(method); break;
+ case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break;
case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
default: throw UnknownMethod();
}
@@ -216,9 +220,10 @@ void Channel::handleMethodInContext(
}
}
-void Channel::handleChannel(AMQMethodBody::shared_ptr method) {
+void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& ctxt) {
switch (method->amqpMethodId()) {
case ChannelCloseBody::METHOD_ID:
+ send(new ChannelCloseOkBody(version, ctxt.getRequestId()));
peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
return;
case ChannelFlowBody::METHOD_ID:
@@ -249,6 +254,7 @@ void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
}
void Channel::start(){
+ running = true;
dispatcher = Thread(*messaging);
}
@@ -260,6 +266,8 @@ void Channel::close(
if (isOpen()) {
try {
if (getId() != 0) {
+ if (code == 200) messaging->cancelAll();
+
sendAndReceive<ChannelCloseOkBody>(
make_shared_ptr(new ChannelCloseBody(
version, code, text, classId, methodId)));
@@ -272,23 +280,35 @@ void Channel::close(
throw;
}
}
+ stop();
}
// Channel closed by peer.
-void Channel::peerClose(ChannelCloseBody::shared_ptr) {
+void Channel::peerClose(ChannelCloseBody::shared_ptr reason) {
assert(isOpen());
+ //record reason:
+ errorCode = reason->getReplyCode();
+ errorText = reason->getReplyText();
closeInternal();
}
void Channel::closeInternal() {
- if (isOpen());
+ Mutex::ScopedLock l(lock);
+ if (connection);
{
- messaging->close();
connection = 0;
+ messaging->close();
// A 0 response means we are closed.
responses.signalResponse(AMQMethodBody::shared_ptr());
}
- dispatcher.join();
+}
+
+void Channel::stop() {
+ Mutex::ScopedLock l(stopLock);
+ if(running) {
+ dispatcher.join();
+ running = false;
+ }
}
AMQMethodBody::shared_ptr Channel::sendAndReceive(
@@ -321,7 +341,11 @@ void Channel::cancel(const std::string& tag, bool synch) {
}
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
- return messaging->get(msg, queue, ackMode);
+ bool result = messaging->get(msg, queue, ackMode);
+ if (!isOpen()) {
+ throw ChannelException(errorCode, errorText);
+ }
+ return result;
}
void Channel::publish(const Message& msg, const Exchange& exchange,
diff --git a/qpid/cpp/src/qpid/client/ClientChannel.h b/qpid/cpp/src/qpid/client/ClientChannel.h
index 4faf778d28..cea1245e6a 100644
--- a/qpid/cpp/src/qpid/client/ClientChannel.h
+++ b/qpid/cpp/src/qpid/client/ClientChannel.h
@@ -27,6 +27,7 @@
#include "ClientMessage.h"
#include "ClientQueue.h"
#include "ResponseHandler.h"
+#include "qpid/Exception.h"
#include "qpid/framing/ChannelAdapter.h"
#include "qpid/sys/Thread.h"
#include "AckMode.h"
@@ -58,7 +59,7 @@ class Channel : public framing::ChannelAdapter
struct UnknownMethod {};
typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
- sys::Mutex lock;
+ mutable sys::Mutex lock;
boost::scoped_ptr<MessageChannel> messaging;
Connection* connection;
sys::Thread dispatcher;
@@ -68,12 +69,20 @@ class Channel : public framing::ChannelAdapter
const bool transactional;
framing::ProtocolVersion version;
+ uint16_t errorCode;
+ std::string errorText;
+
+ sys::Mutex stopLock;
+ bool running;
+
+ void stop();
+
void handleHeader(framing::AMQHeaderBody::shared_ptr body);
void handleContent(framing::AMQContentBody::shared_ptr body);
void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
void handleMethodInContext(
framing::AMQMethodBody::shared_ptr, const framing::MethodContext&);
- void handleChannel(framing::AMQMethodBody::shared_ptr method);
+ void handleChannel(framing::AMQMethodBody::shared_ptr method, const framing::MethodContext& ctxt);
void handleConnection(framing::AMQMethodBody::shared_ptr method);
void setQos();
diff --git a/qpid/cpp/src/qpid/client/ClientConnection.cpp b/qpid/cpp/src/qpid/client/ClientConnection.cpp
index bddadb0800..102de555fd 100644
--- a/qpid/cpp/src/qpid/client/ClientConnection.cpp
+++ b/qpid/cpp/src/qpid/client/ClientConnection.cpp
@@ -25,6 +25,8 @@
#include "Connection.h"
#include "ClientChannel.h"
#include "ClientMessage.h"
+#include "qpid/log/Logger.h"
+#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
#include "qpid/QpidError.h"
#include <iostream>
@@ -49,6 +51,9 @@ Connection::Connection(
isOpen(false), debug(_debug)
{
setConnector(defaultConnector);
+ qpid::log::Options o;
+ o.trace = debug;
+ qpid::log::Logger::instance().configure(o, "qpid-c++-client");
}
Connection::~Connection(){}
@@ -143,6 +148,7 @@ void Connection::received(AMQFrame& frame){
try{
channel->getHandlers().in->handle(frame);
}catch(const qpid::QpidError& e){
+ std::cout << "Caught error while handling " << frame << ": " << e.what() <<std::endl;
channelException(
*channel, dynamic_cast<AMQMethodBody*>(frame.getBody().get()), e);
}
diff --git a/qpid/cpp/src/qpid/client/MessageChannel.h b/qpid/cpp/src/qpid/client/MessageChannel.h
index a830a47986..abf0f8270a 100644
--- a/qpid/cpp/src/qpid/client/MessageChannel.h
+++ b/qpid/cpp/src/qpid/client/MessageChannel.h
@@ -83,8 +83,11 @@ class MessageChannel : public sys::Runnable
/** Send channel's QOS settings */
virtual void setQos() = 0;
- /** Channel is closing */
+ /** Channel has closed */
virtual void close() = 0;
+
+ /** Cancel all consumers */
+ virtual void cancelAll() = 0;
};
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp b/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp
index c73f501ec5..2a8f7a01c1 100644
--- a/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp
+++ b/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp
@@ -109,6 +109,8 @@ void MessageMessageChannel::close(){
// incoming.shutdown();
}
+void MessageMessageChannel::cancelAll(){
+}
/** Destination ID for the current get.
* Must not clash with a generated consumer ID.
diff --git a/qpid/cpp/src/qpid/client/MessageMessageChannel.h b/qpid/cpp/src/qpid/client/MessageMessageChannel.h
index 12c4786b81..44b64b3d80 100644
--- a/qpid/cpp/src/qpid/client/MessageMessageChannel.h
+++ b/qpid/cpp/src/qpid/client/MessageMessageChannel.h
@@ -62,6 +62,8 @@ class MessageMessageChannel : public MessageChannel
void close();
+ void cancelAll();
+
private:
typedef boost::ptr_map<std::string, IncomingMessage::WaitableDestination>
Destinations;
diff --git a/qpid/cpp/src/qpid/framing/ChannelAdapter.h b/qpid/cpp/src/qpid/framing/ChannelAdapter.h
index 0597e5e372..5f92383ee3 100644
--- a/qpid/cpp/src/qpid/framing/ChannelAdapter.h
+++ b/qpid/cpp/src/qpid/framing/ChannelAdapter.h
@@ -58,6 +58,7 @@ class ChannelAdapter : private BodyHandler {
*@param output Processed frames are forwarded to this handler.
*/
ChannelAdapter() : id(0) {}
+ virtual ~ChannelAdapter() {}
/** Initialize the channel adapter. */
void init(ChannelId, OutputHandler&, ProtocolVersion);
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 76f821798d..ee1a7317e0 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -95,6 +95,7 @@ unit_tests = \
testprogs= \
client_test \
+ exception_test \
echo_service \
topic_listener \
topic_publisher
@@ -103,7 +104,7 @@ check_PROGRAMS += $(testprogs) interop_runner
TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test
-system_tests = client_test quick_topictest
+system_tests = client_test exception_test quick_topictest
TESTS += run-unit-tests start_broker $(system_tests) python_tests kill_broker
EXTRA_DIST += \
diff --git a/qpid/cpp/src/tests/exception_test.cpp b/qpid/cpp/src/tests/exception_test.cpp
new file mode 100644
index 0000000000..fffae796dd
--- /dev/null
+++ b/qpid/cpp/src/tests/exception_test.cpp
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <iostream>
+
+#include "TestOptions.h"
+#include "qpid/QpidError.h"
+#include "qpid/client/ClientChannel.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/ClientMessage.h"
+
+using namespace qpid::client;
+using namespace qpid::sys;
+using std::string;
+
+int main(int argc, char** argv)
+{
+ qpid::TestOptions opts;
+ opts.parse(argc, argv);
+
+ try {
+ Connection con(opts.trace);
+ con.open(opts.host, opts.port, opts.username, opts.password, opts.virtualhost);
+
+ Queue queue("I don't exist!");
+ Channel channel;
+ con.openChannel(channel);
+ channel.start();
+ //test handling of get (which is a bit odd)
+ try {
+ Message msg;
+ if (channel.get(msg, queue)) {
+ std::cout << "Received " << msg.getData() << " from " << queue.getName() << std::endl;
+ } else {
+ std::cout << "Queue " << queue.getName() << " was empty." << std::endl;
+ }
+ con.close();
+ return 1;
+ } catch (const qpid::ChannelException& e) {
+ std::cout << "get failed as expected: " << e.what() << std::endl;
+ }
+
+ con.close();
+ return 0;
+ } catch(const std::exception& e) {
+ std::cout << "got unexpected exception: " << e.what() << std::endl;
+ return 1;
+ }
+}