diff options
author | Alan Conway <aconway@apache.org> | 2007-02-21 19:25:45 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-02-21 19:25:45 +0000 |
commit | 876d0b94c37f252b08c81656386100fad18a8a46 (patch) | |
tree | 4840b0d697d4629fd5c518507b58fceb7de1578a /cpp/lib | |
parent | c36fb4454be5ce4311aa5f5d0e5683db713c5545 (diff) | |
download | qpid-python-876d0b94c37f252b08c81656386100fad18a8a46.tar.gz |
Thread safety fixes for race conditions on incoming messages.
* cpp/lib/client/MessageListener.h: const correctness.
* cpp/tests/*: MessageListener const change.
* cpp/lib/broker/Content.h: Removed out-of-date FIXME comments.
* cpp/lib/client/ClientChannel.h/ .cpp():
- added locking for consumers map and other member access.
- refactored implementations of Basic get, deliver, return:
most logic now encapsulted in IncomingMessage class.
- fix channel close problems.
* cpp/lib/client/ClientMessage.h/.cpp:
- const correctness & API convenience fixes.
- getMethod/setMethod/getHeader: for new IncomingMessage
* cpp/lib/client/Connection.h/.cpp:
- Fixes to channel closure.
* cpp/lib/client/IncomingMessage.h/.cpp:
- Encapsulate *all* incoming message handling for client.
- Moved handling of BasicGetOk to IncomingMessage to fix race.
- Thread safety fixes.
* cpp/lib/client/ResponseHandler.h/.cpp:
- added getResponse for ClientChannel.
* cpp/lib/common/Exception.h:
- added missing throwSelf implementations.
- added ShutdownException as general purpose shut-down indicator.
- added EmptyException as general purpose "empty" indicator.
* cpp/lib/common/sys/Condition|Monitor|Mutex.h|.cpp:
- Condition variable abstraction extracted from Monitor for situations
where a single lock is associated with multiple conditions.
* cpp/tests/ClientChannelTest.cpp:
- Test incoming message transfer, get, consume etc.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510161 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib')
-rw-r--r-- | cpp/lib/broker/Content.h | 4 | ||||
-rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 302 | ||||
-rw-r--r-- | cpp/lib/client/ClientChannel.h | 21 | ||||
-rw-r--r-- | cpp/lib/client/ClientMessage.cpp | 37 | ||||
-rw-r--r-- | cpp/lib/client/ClientMessage.h | 152 | ||||
-rw-r--r-- | cpp/lib/client/Connection.cpp | 15 | ||||
-rw-r--r-- | cpp/lib/client/IncomingMessage.cpp | 152 | ||||
-rw-r--r-- | cpp/lib/client/IncomingMessage.h | 117 | ||||
-rw-r--r-- | cpp/lib/client/ResponseHandler.cpp | 14 | ||||
-rw-r--r-- | cpp/lib/client/ResponseHandler.h | 2 | ||||
-rw-r--r-- | cpp/lib/common/Exception.cpp | 4 | ||||
-rw-r--r-- | cpp/lib/common/Exception.h | 16 | ||||
-rw-r--r-- | cpp/lib/common/ExceptionHolder.h | 14 | ||||
-rw-r--r-- | cpp/lib/common/framing/ChannelAdapter.h | 2 | ||||
-rw-r--r-- | cpp/lib/common/sys/Condition.h | 128 | ||||
-rw-r--r-- | cpp/lib/common/sys/Monitor.h | 84 | ||||
-rw-r--r-- | cpp/lib/common/sys/Mutex.h | 3 |
17 files changed, 636 insertions, 431 deletions
diff --git a/cpp/lib/broker/Content.h b/cpp/lib/broker/Content.h index 1b884536f0..08f205f9c4 100644 --- a/cpp/lib/broker/Content.h +++ b/cpp/lib/broker/Content.h @@ -42,8 +42,6 @@ class Content{ virtual ~Content(){} /** Add a block of data to the content */ - // FIXME aconway 2007-02-07: - // virtual void add(const DataBlock& data) = 0; virtual void add(framing::AMQContentBody::shared_ptr data) = 0; /** Total size of content in bytes */ @@ -54,8 +52,6 @@ class Content{ * Subdivide blocks if necessary to ensure each block is * <= framesize bytes long. */ - // FIXME aconway 2007-02-07: - // virtual void send(SendFn send, u_int32_t framesize) = 0; virtual void send(framing::ChannelAdapter& channel, u_int32_t framesize) = 0; //FIXME aconway 2007-02-07: This is inconsistently implemented diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index 42e5cf3054..a8fa219c16 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include <iostream> #include <ClientChannel.h> #include <sys/Monitor.h> #include <ClientMessage.h> @@ -29,17 +30,14 @@ // handling of errors that should close the connection or the channel. // Make sure the user thread receives a connection in each case. // - -using namespace boost; //to use dynamic_pointer_cast +using namespace std; +using namespace boost; using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; -const std::string Channel::OK("OK"); - Channel::Channel(bool _transactional, u_int16_t _prefetch) : connection(0), - incoming(0), prefetch(_prefetch), transactional(_transactional) { } @@ -106,8 +104,8 @@ void Channel::protocolInit( ConnectionRedirectBody::shared_ptr redirect( shared_polymorphic_downcast<ConnectionRedirectBody>( responses.getResponse())); - std::cout << "Received redirection to " << redirect->getHost() - << std::endl; + cout << "Received redirection to " << redirect->getHost() + << endl; } else { THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); } @@ -183,11 +181,11 @@ void Channel::consume( Queue& queue, std::string& tag, MessageListener* listener, int ackMode, bool noLocal, bool synch, const FieldTable* fields) { - string q = queue.getName(); sendAndReceiveSync<BasicConsumeOkBody>( synch, new BasicConsumeBody( - version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch, + version, 0, queue.getName(), tag, noLocal, + ackMode == NO_ACK, false, !synch, fields ? *fields : FieldTable())); if (synch) { BasicConsumeOkBody::shared_ptr response = @@ -195,90 +193,78 @@ void Channel::consume( responses.getResponse()); tag = response->getConsumerTag(); } - Consumer& c = consumers[tag]; - c.listener = listener; - c.ackMode = ackMode; - c.lastDeliveryTag = 0; + // FIXME aconway 2007-02-20: Race condition! + // We could receive the first message for the consumer + // before we create the consumer below. + // Move consumer creation to handler for BasicConsumeOkBody + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + THROW_QPID_ERROR(CLIENT_ERROR, + "Consumer already exists with tag="+tag); + Consumer& c = consumers[tag]; + c.listener = listener; + c.ackMode = ackMode; + c.lastDeliveryTag = 0; + } } void Channel::cancel(const std::string& tag, bool synch) { - ConsumerMap::iterator i = consumers.find(tag); - if (i != consumers.end()) { - Consumer& c = i->second; - if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) - send(new BasicAckBody(version, c.lastDeliveryTag, true)); - sendAndReceiveSync<BasicCancelOkBody>( - synch, new BasicCancelBody(version, tag, !synch)); - consumers.erase(tag); + Consumer c; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i == consumers.end()) + return; + c = i->second; + consumers.erase(i); } + if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) + send(new BasicAckBody(version, c.lastDeliveryTag, true)); + sendAndReceiveSync<BasicCancelOkBody>( + synch, new BasicCancelBody(version, tag, !synch)); } void Channel::cancelAll(){ - while(!consumers.empty()) { - Consumer c = consumers.begin()->second; - consumers.erase(consumers.begin()); + ConsumerMap consumersCopy; + { + Mutex::ScopedLock l(lock); + consumersCopy = consumers; + consumers.clear(); + } + for (ConsumerMap::iterator i=consumersCopy.begin(); + i != consumersCopy.end(); ++i) + { + Consumer& c = i->second; if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) && c.lastDeliveryTag > 0) { - // Let exceptions propagate, if one fails no point - // trying the rest. NB no memory leaks if we do, - // ConsumerMap holds values, not pointers. - // send(new BasicAckBody(version, c.lastDeliveryTag, true)); } } } -void Channel::retrieve(Message& msg){ - Monitor::ScopedLock l(retrievalMonitor); - while(retrieved == 0){ - retrievalMonitor.wait(); - } - - msg.header = retrieved->getHeader(); - msg.deliveryTag = retrieved->getDeliveryTag(); - msg.data = retrieved->getData(); - delete retrieved; - retrieved = 0; -} - bool Channel::get(Message& msg, const Queue& queue, int ackMode) { - string name = queue.getName(); - responses.expect(); - send(new BasicGetBody(version, 0, name, ackMode)); - responses.waitForResponse(); - AMQMethodBody::shared_ptr response = responses.getResponse(); - if(response->isA<BasicGetOkBody>()) { - if(incoming != 0){ - std::cout << "Existing message not complete" << std::endl; - // FIXME aconway 2007-01-26: close the connection? the channel? - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response)); - } - retrieve(msg); - return true; - }if(response->isA<BasicGetEmptyBody>()){ - return false; - }else{ - // FIXME aconway 2007-01-26: must close the connection. - THROW_QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame"); - } + // Expect a message starting with a BasicGetOk + incoming.startGet(); + send(new BasicGetBody(version, 0, queue.getName(), ackMode)); + return incoming.waitGet(msg); } -void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){ - // FIXME aconway 2007-01-30: Rework for message class. - - string e = exchange.getName(); +void Channel::publish( + const Message& msg, const Exchange& exchange, + const std::string& routingKey, bool mandatory, bool immediate) +{ + // FIXME aconway 2007-01-30: Rework for 0-9 message class. + const string e = exchange.getName(); string key = routingKey; send(new BasicPublishBody(version, 0, e, key, mandatory, immediate)); //break msg up into header frame and content frame(s) and send these - string data = msg.getData(); - msg.header->setContentSize(data.length()); send(msg.header); - + string data = msg.getData(); u_int64_t data_length = data.length(); if(data_length > 0){ u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes @@ -312,30 +298,30 @@ void Channel::handleMethodInContext( { //channel.flow, channel.close, basic.deliver, basic.return or a //response to a synchronous request - if(responses.isWaiting()){ + if(responses.isWaiting()) { responses.signalResponse(body); - }else if(body->isA<BasicDeliverBody>()) { - if(incoming != 0){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body)); - } - }else if(body->isA<BasicReturnBody>()){ - if(incoming != 0){ - std::cout << "Existing message not complete" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body)); - } - }else if(body->isA<ChannelCloseBody>()){ + return; + } + + if(body->isA<BasicDeliverBody>() + || body->isA<BasicReturnBody>() + || body->isA<BasicGetOkBody>() + || body->isA<BasicGetEmptyBody>()) + + { + incoming.add(body); + return; + } + else if(body->isA<ChannelCloseBody>()) { peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body)); - }else if(body->isA<ChannelFlowBody>()){ - // TODO aconway 2007-01-24: - }else if(body->isA<ConnectionCloseBody>()){ + } + else if(body->isA<ChannelFlowBody>()){ + // TODO aconway 2007-01-24: not implemented yet. + } + else if(body->isA<ConnectionCloseBody>()){ connection->close(); - }else{ + } + else { connection->close( 504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId()); @@ -343,31 +329,13 @@ void Channel::handleMethodInContext( } void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ - if(incoming == 0){ - //handle invalid frame sequence - std::cout << "Invalid message sequence: got header before return or deliver." << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver."); - }else{ - incoming->setHeader(body); - if(incoming->isComplete()){ - enqueue(); - } - } + incoming.add(body); } void Channel::handleContent(AMQContentBody::shared_ptr body){ - if(incoming == 0){ - //handle invalid frame sequence - std::cout << "Invalid message sequence: got content before return or deliver." << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver."); - }else{ - incoming->addContent(body); - if(incoming->isComplete()){ - enqueue(); - } - } + incoming.add(body); } - + void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat"); } @@ -376,35 +344,6 @@ void Channel::start(){ dispatcher = Thread(this); } -void Channel::run(){ - dispatch(); -} - -void Channel::enqueue(){ - Monitor::ScopedLock l(retrievalMonitor); - if(incoming->isResponse()){ - retrieved = incoming; - retrievalMonitor.notify(); - }else{ - messages.push(incoming); - dispatchMonitor.notify(); - } - incoming = 0; -} - -IncomingMessage* Channel::dequeue(){ - Monitor::ScopedLock l(dispatchMonitor); - while(messages.empty() && isOpen()){ - dispatchMonitor.wait(); - } - IncomingMessage* msg = 0; - if(!messages.empty()){ - msg = messages.front(); - messages.pop(); - } - return msg; -} - void Channel::deliver(Consumer& consumer, Message& msg){ //record delivery tag: consumer.lastDeliveryTag = msg.getDeliveryTag(); @@ -412,8 +351,6 @@ void Channel::deliver(Consumer& consumer, Message& msg){ //allow registered listener to handle the message consumer.listener->received(msg); - //if the handler calls close on the channel or connection while - //handling this message, then consumer will now have been deleted. if(isOpen()){ bool multiple(false); switch(consumer.ackMode){ @@ -432,35 +369,53 @@ void Channel::deliver(Consumer& consumer, Message& msg){ //a transaction until it commits. } -void Channel::dispatch(){ - while(isOpen()){ - IncomingMessage* incomingMsg = dequeue(); - if(incomingMsg){ - //Note: msg is currently only valid for duration of this call - Message msg(incomingMsg->getHeader()); - msg.data = incomingMsg->getData(); - if(incomingMsg->isReturn()){ - if(returnsHandler == 0){ - //print warning to log/console - std::cout << "Message returned: " << msg.getData() << std::endl; - }else{ - returnsHandler->returned(msg); +void Channel::run() { + while(isOpen()) { + try { + Message msg = incoming.waitDispatch(); + if(msg.getMethod()->isA<BasicReturnBody>()) { + ReturnedMessageHandler* handler=0; + { + Mutex::ScopedLock l(lock); + handler=returnsHandler; } - }else{ - msg.deliveryTag = incomingMsg->getDeliveryTag(); - std::string tag = incomingMsg->getConsumerTag(); - - if(consumers.find(tag) == consumers.end()) - std::cout << "Unknown consumer: " << tag << std::endl; - else - deliver(consumers[tag], msg); + if(handler == 0) { + // TODO aconway 2007-02-20: proper logging. + cout << "Message returned: " << msg.getData() << endl; + } + else + handler->returned(msg); + } + else { + BasicDeliverBody::shared_ptr deliverBody = + boost::shared_polymorphic_downcast<BasicDeliverBody>( + msg.getMethod()); + std::string tag = deliverBody->getConsumerTag(); + Consumer consumer; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if(i == consumers.end()) + THROW_QPID_ERROR(PROTOCOL_ERROR+504, + "Unknown consumer tag=" + tag); + consumer = i->second; + } + deliver(consumer, msg); } - delete incomingMsg; + } + catch (const ShutdownException&) { + /* Orderly shutdown */ + } + catch (const Exception& e) { + // FIXME aconway 2007-02-20: Report exception to user. + cout << "client::Channel::run() terminated by: " << e.toString() + << "(" << typeid(e).name() << ")" << endl; } } } void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ + Mutex::ScopedLock l(lock); returnsHandler = handler; } @@ -469,13 +424,17 @@ void Channel::close( u_int16_t code, const std::string& text, ClassId classId, MethodId methodId) { - if (getId() != 0 && isOpen()) { + if (isOpen()) { try { - sendAndReceive<ChannelCloseOkBody>( - new ChannelCloseBody(version, code, text, classId, methodId)); - cancelAll(); + if (getId() != 0) { + sendAndReceive<ChannelCloseOkBody>( + new ChannelCloseBody( + version, code, text, classId, methodId)); + } + static_cast<ConnectionForChannel*>(connection)->erase(getId()); closeInternal(); } catch (...) { + static_cast<ConnectionForChannel*>(connection)->erase(getId()); closeInternal(); throw; } @@ -491,14 +450,13 @@ void Channel::peerClose(ChannelCloseBody::shared_ptr) { } void Channel::closeInternal() { - assert(isOpen()); + if (isOpen()); { - Monitor::ScopedLock l(dispatchMonitor); - static_cast<ConnectionForChannel*>(connection)->erase(getId()); + cancelAll(); + incoming.shutdown(); connection = 0; // A 0 response means we are closed. responses.signalResponse(AMQMethodBody::shared_ptr()); - dispatchMonitor.notify(); } dispatcher.join(); } diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h index ed67fd8f6b..9c422305b0 100644 --- a/cpp/lib/client/ClientChannel.h +++ b/cpp/lib/client/ClientChannel.h @@ -89,19 +89,12 @@ class Channel : public framing::ChannelAdapter, u_int64_t lastDeliveryTag; }; typedef std::map<std::string, Consumer> ConsumerMap; - typedef std::queue<boost::shared_ptr<framing::AMQMethodBody> > IncomingMethods; - static const std::string OK; - + sys::Mutex lock; Connection* connection; sys::Thread dispatcher; - IncomingMethods incomingMethods; - IncomingMessage* incoming; + IncomingMessage incoming; ResponseHandler responses; - std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume - IncomingMessage* retrieved;//holds response to basic.get - sys::Monitor dispatchMonitor; - sys::Monitor retrievalMonitor; ConsumerMap consumers; ReturnedMessageHandler* returnsHandler; @@ -109,10 +102,7 @@ class Channel : public framing::ChannelAdapter, const bool transactional; framing::ProtocolVersion version; - void enqueue(); void retrieve(Message& msg); - IncomingMessage* dequeue(); - void dispatch(); void deliver(Consumer& consumer, Message& msg); void handleHeader(framing::AMQHeaderBody::shared_ptr body); @@ -307,7 +297,8 @@ class Channel : public framing::ChannelAdapter, * receive this message on publication, the message will be * returned (see setReturnedMessageHandler()). */ - void publish(Message& msg, const Exchange& exchange, const std::string& routingKey, + void publish(const Message& msg, const Exchange& exchange, + const std::string& routingKey, bool mandatory = false, bool immediate = false); /** @@ -352,8 +343,8 @@ class Channel : public framing::ChannelAdapter, * Closing a channel that is not open has no effect. */ void close( - framing::ReplyCode = 200, const std::string& =OK, - framing::ClassId = 0, framing::MethodId = 0); + framing::ReplyCode = 200, const std::string& ="OK", + framing::ClassId = 0, framing::MethodId = 0); /** * Set a handler for this channel that will process any diff --git a/cpp/lib/client/ClientMessage.cpp b/cpp/lib/client/ClientMessage.cpp index 8b08f7e535..bd4adb78f7 100644 --- a/cpp/lib/client/ClientMessage.cpp +++ b/cpp/lib/client/ClientMessage.cpp @@ -19,7 +19,6 @@ * */ #include <ClientMessage.h> - using namespace qpid::client; using namespace qpid::framing; @@ -40,63 +39,63 @@ Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){ Message::~Message(){ } -BasicHeaderProperties* Message::getHeaderProperties(){ +BasicHeaderProperties* Message::getHeaderProperties() const { return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); } -const std::string& Message::getContentType(){ +const std::string& Message::getContentType() const { return getHeaderProperties()->getContentType(); } -const std::string& Message::getContentEncoding(){ +const std::string& Message::getContentEncoding() const { return getHeaderProperties()->getContentEncoding(); } -FieldTable& Message::getHeaders(){ +FieldTable& Message::getHeaders() const { return getHeaderProperties()->getHeaders(); } -u_int8_t Message::getDeliveryMode(){ +u_int8_t Message::getDeliveryMode() const { return getHeaderProperties()->getDeliveryMode(); } -u_int8_t Message::getPriority(){ +u_int8_t Message::getPriority() const { return getHeaderProperties()->getPriority(); } -const std::string& Message::getCorrelationId(){ +const std::string& Message::getCorrelationId() const { return getHeaderProperties()->getCorrelationId(); } -const std::string& Message::getReplyTo(){ +const std::string& Message::getReplyTo() const { return getHeaderProperties()->getReplyTo(); } -const std::string& Message::getExpiration(){ +const std::string& Message::getExpiration() const { return getHeaderProperties()->getExpiration(); } -const std::string& Message::getMessageId(){ +const std::string& Message::getMessageId() const { return getHeaderProperties()->getMessageId(); } -u_int64_t Message::getTimestamp(){ +u_int64_t Message::getTimestamp() const { return getHeaderProperties()->getTimestamp(); } -const std::string& Message::getType(){ +const std::string& Message::getType() const { return getHeaderProperties()->getType(); } -const std::string& Message::getUserId(){ +const std::string& Message::getUserId() const { return getHeaderProperties()->getUserId(); } -const std::string& Message::getAppId(){ +const std::string& Message::getAppId() const { return getHeaderProperties()->getAppId(); } -const std::string& Message::getClusterId(){ +const std::string& Message::getClusterId() const { return getHeaderProperties()->getClusterId(); } @@ -155,3 +154,9 @@ void Message::setAppId(const std::string& appId){ void Message::setClusterId(const std::string& clusterId){ getHeaderProperties()->setClusterId(clusterId); } + + +u_int64_t Message::getDeliveryTag() const { + BasicDeliverBody* deliver=dynamic_cast<BasicDeliverBody*>(method.get()); + return deliver ? deliver->getDeliveryTag() : 0; +} diff --git a/cpp/lib/client/ClientMessage.h b/cpp/lib/client/ClientMessage.h index 148f9240c8..8661f6b791 100644 --- a/cpp/lib/client/ClientMessage.h +++ b/cpp/lib/client/ClientMessage.h @@ -25,89 +25,99 @@ #include <framing/amqp_framing.h> namespace qpid { + namespace client { +class IncomingMessage; - /** - * A representation of messages for sent or recived through the - * client api. - * - * \ingroup clientapi - */ - class Message{ - qpid::framing::AMQHeaderBody::shared_ptr header; - std::string data; - bool redelivered; - u_int64_t deliveryTag; +/** + * A representation of messages for sent or recived through the + * client api. + * + * \ingroup clientapi + */ +class Message { + framing::AMQMethodBody::shared_ptr method; + framing::AMQHeaderBody::shared_ptr header; + std::string data; + bool redelivered; - qpid::framing::BasicHeaderProperties* getHeaderProperties(); - Message(qpid::framing::AMQHeaderBody::shared_ptr& header); + // FIXME aconway 2007-02-20: const incorrect, needs const return type. + framing::BasicHeaderProperties* getHeaderProperties() const; + Message(qpid::framing::AMQHeaderBody::shared_ptr& header); - public: - Message(const std::string& data=std::string()); - ~Message(); + public: + Message(const std::string& data=std::string()); + ~Message(); - /** - * Allows the application to access the content of messages - * received. - * - * @return a string representing the data of the message - */ - std::string getData() const { return data; } + /** + * Allows the application to access the content of messages + * received. + * + * @return a string representing the data of the message + */ + std::string getData() const { return data; } - /** - * Allows the application to set the content of messages to be - * sent. - * - * @param data a string representing the data of the message - */ - void setData(const std::string& _data); + /** + * Allows the application to set the content of messages to be + * sent. + * + * @param data a string representing the data of the message + */ + void setData(const std::string& _data); - /** - * @return true if this message was delivered previously (to - * any consumer) but was not acknowledged. - */ - inline bool isRedelivered(){ return redelivered; } - inline void setRedelivered(bool _redelivered){ redelivered = _redelivered; } + /** + * @return true if this message was delivered previously (to + * any consumer) but was not acknowledged. + */ + bool isRedelivered(){ return redelivered; } + void setRedelivered(bool _redelivered){ redelivered = _redelivered; } - inline u_int64_t getDeliveryTag(){ return deliveryTag; } + u_int64_t getDeliveryTag() const; - const std::string& getContentType(); - const std::string& getContentEncoding(); - qpid::framing::FieldTable& getHeaders(); - u_int8_t getDeliveryMode(); - u_int8_t getPriority(); - const std::string& getCorrelationId(); - const std::string& getReplyTo(); - const std::string& getExpiration(); - const std::string& getMessageId(); - u_int64_t getTimestamp(); - const std::string& getType(); - const std::string& getUserId(); - const std::string& getAppId(); - const std::string& getClusterId(); + const std::string& getContentType() const; + const std::string& getContentEncoding() const; + qpid::framing::FieldTable& getHeaders() const; + u_int8_t getDeliveryMode() const; + u_int8_t getPriority() const; + const std::string& getCorrelationId() const; + const std::string& getReplyTo() const; + const std::string& getExpiration() const; + const std::string& getMessageId() const; + u_int64_t getTimestamp() const; + const std::string& getType() const; + const std::string& getUserId() const; + const std::string& getAppId() const; + const std::string& getClusterId() const; - void setContentType(const std::string& type); - void setContentEncoding(const std::string& encoding); - void setHeaders(const qpid::framing::FieldTable& headers); - /** - * Sets the delivery mode. 1 = non-durable, 2 = durable. - */ - void setDeliveryMode(u_int8_t mode); - void setPriority(u_int8_t priority); - void setCorrelationId(const std::string& correlationId); - void setReplyTo(const std::string& replyTo); - void setExpiration(const std::string& expiration); - void setMessageId(const std::string& messageId); - void setTimestamp(u_int64_t timestamp); - void setType(const std::string& type); - void setUserId(const std::string& userId); - void setAppId(const std::string& appId); - void setClusterId(const std::string& clusterId); + void setContentType(const std::string& type); + void setContentEncoding(const std::string& encoding); + void setHeaders(const qpid::framing::FieldTable& headers); + /** + * Sets the delivery mode. 1 = non-durable, 2 = durable. + */ + void setDeliveryMode(u_int8_t mode); + void setPriority(u_int8_t priority); + void setCorrelationId(const std::string& correlationId); + void setReplyTo(const std::string& replyTo); + void setExpiration(const std::string& expiration); + void setMessageId(const std::string& messageId); + void setTimestamp(u_int64_t timestamp); + void setType(const std::string& type); + void setUserId(const std::string& userId); + void setAppId(const std::string& appId); + void setClusterId(const std::string& clusterId); + /** Get the method used to deliver this message */ + boost::shared_ptr<framing::AMQMethodBody> getMethod() const + { return method; } + + void setMethod(framing::AMQMethodBody::shared_ptr m) { method=m; } + boost::shared_ptr<framing::AMQHeaderBody> getHeader(); - // TODO aconway 2007-02-15: remove friendships. - friend class Channel; - }; + // TODO aconway 2007-02-15: remove friendships. + friend class IncomingMessage; + friend class Channel; +}; }} diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index 5b97ca8e5d..566c8fc573 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -18,7 +18,9 @@ * under the License. * */ +#include <algorithm> #include <boost/format.hpp> +#include <boost/bind.hpp> #include <Connection.h> #include <ClientChannel.h> @@ -27,7 +29,6 @@ #include <iostream> #include <sstream> #include <MethodBodyInstances.h> -#include <boost/bind.hpp> #include <functional> using namespace qpid::framing; @@ -83,15 +84,17 @@ void Connection::close( { if(isOpen) { // TODO aconway 2007-01-29: Exception handling - could end up - // partly closed. + // partly closed with threads left unjoined. isOpen = false; channel0.sendAndReceive<ConnectionCloseOkBody>( new ConnectionCloseBody( getVersion(), code, msg, classId, methodId)); - while(!channels.empty()) { - channels.begin()->second->close(); - channels.erase(channels.begin()); - } + + using boost::bind; + for_each(channels.begin(), channels.end(), + bind(&Channel::closeInternal, + bind(&ChannelMap::value_type::second, _1))); + channels.clear(); connector->close(); } } diff --git a/cpp/lib/client/IncomingMessage.cpp b/cpp/lib/client/IncomingMessage.cpp index c1f6ca880f..07f94ceb64 100644 --- a/cpp/lib/client/IncomingMessage.cpp +++ b/cpp/lib/client/IncomingMessage.cpp @@ -19,58 +19,154 @@ * */ #include <IncomingMessage.h> +#include "framing/AMQHeaderBody.h" +#include "framing/AMQContentBody.h" +#include "BasicGetOkBody.h" +#include "BasicReturnBody.h" +#include "BasicDeliverBody.h" #include <QpidError.h> #include <iostream> -using namespace qpid::client; -using namespace qpid::framing; +namespace qpid { +namespace client { -IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){} -IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){} -IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){} +using namespace sys; +using namespace framing; -IncomingMessage::~IncomingMessage(){ +struct IncomingMessage::Guard: public Mutex::ScopedLock { + Guard(IncomingMessage* im) : Mutex::ScopedLock(im->lock) { + im->shutdownError.throwIf(); + } +}; + +IncomingMessage::IncomingMessage() { reset(); } + +void IncomingMessage::reset() { + state = &IncomingMessage::expectRequest; + endFn= &IncomingMessage::endRequest; + buildMessage = Message(); +} + +void IncomingMessage::startGet() { + Guard g(this); + if (state != &IncomingMessage::expectRequest) { + endGet(new QPID_ERROR(CLIENT_ERROR, "Message already in progress.")); + } + else { + state = &IncomingMessage::expectGetOk; + endFn = &IncomingMessage::endGet; + getError.reset(); + getState = GETTING; + } } -void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr _header){ - this->header = _header; +bool IncomingMessage::waitGet(Message& msg) { + Guard g(this); + while (getState == GETTING && !shutdownError && !getError) + getReady.wait(lock); + shutdownError.throwIf(); + getError.throwIf(); + msg = getMessage; + return getState==GOT; } -void IncomingMessage::addContent(AMQContentBody::shared_ptr content){ - data.append(content->getData()); +Message IncomingMessage::waitDispatch() { + Guard g(this); + while(dispatchQueue.empty() && !shutdownError) + dispatchReady.wait(lock); + shutdownError.throwIf(); + + Message msg(dispatchQueue.front()); + dispatchQueue.pop(); + return msg; } -bool IncomingMessage::isComplete(){ - return header != 0 && header->getContentSize() == data.size(); +void IncomingMessage::add(BodyPtr body) { + Guard g(this); + shutdownError.throwIf(); + // Call the current state function. + (this->*state)(body); } -bool IncomingMessage::isReturn(){ - return returned; +void IncomingMessage::shutdown() { + Mutex::ScopedLock l(lock); + shutdownError.reset(new ShutdownException()); + getReady.notify(); + dispatchReady.notify(); } -bool IncomingMessage::isDelivery(){ - return delivered; +bool IncomingMessage::isShutdown() const { + Mutex::ScopedLock l(lock); + return shutdownError; } -bool IncomingMessage::isResponse(){ - return response; +// Common check for all the expect functions. Called in network thread. +template<class T> +boost::shared_ptr<T> IncomingMessage::expectCheck(BodyPtr body) { + boost::shared_ptr<T> ptr = boost::dynamic_pointer_cast<T>(body); + if (!ptr) + throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type"); + return ptr; } -const string& IncomingMessage::getConsumerTag(){ - if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery"); - return delivered->getConsumerTag(); +void IncomingMessage::expectGetOk(BodyPtr body) { + if (dynamic_cast<BasicGetOkBody*>(body.get())) + state = &IncomingMessage::expectHeader; + else if (dynamic_cast<BasicGetEmptyBody*>(body.get())) { + getState = EMPTY; + endGet(); + } + else + throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type"); } -u_int64_t IncomingMessage::getDeliveryTag(){ - if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery"); - return delivered->getDeliveryTag(); +void IncomingMessage::expectHeader(BodyPtr body) { + AMQHeaderBody::shared_ptr header = expectCheck<AMQHeaderBody>(body); + buildMessage.header = header; + state = &IncomingMessage::expectContent; + checkComplete(); } -AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){ - return header; +void IncomingMessage::expectContent(BodyPtr body) { + AMQContentBody::shared_ptr content = expectCheck<AMQContentBody>(body); + buildMessage.setData(buildMessage.getData() + content->getData()); + checkComplete(); +} + +void IncomingMessage::checkComplete() { + size_t declaredSize = buildMessage.header->getContentSize(); + size_t currentSize = buildMessage.getData().size(); + if (declaredSize == currentSize) + (this->*endFn)(0); + else if (declaredSize < currentSize) + (this->*endFn)(new QPID_ERROR( + PROTOCOL_ERROR, "Message content exceeds declared size.")); +} + +void IncomingMessage::expectRequest(BodyPtr body) { + AMQMethodBody::shared_ptr method = expectCheck<AMQMethodBody>(body); + buildMessage.setMethod(method); + state = &IncomingMessage::expectHeader; +} + +void IncomingMessage::endGet(Exception* ex) { + getError.reset(ex); + if (getState == GETTING) { + getMessage = buildMessage; + getState = GOT; + } + reset(); + getReady.notify(); } -std::string IncomingMessage::getData() const { - return data; +void IncomingMessage::endRequest(Exception* ex) { + ExceptionHolder eh(ex); + if (!eh) { + dispatchQueue.push(buildMessage); + reset(); + dispatchReady.notify(); + } + eh.throwIf(); } +}} // namespace qpid::client diff --git a/cpp/lib/client/IncomingMessage.h b/cpp/lib/client/IncomingMessage.h index a2aa4d8441..2d7c8723c5 100644 --- a/cpp/lib/client/IncomingMessage.h +++ b/cpp/lib/client/IncomingMessage.h @@ -1,3 +1,6 @@ +#ifndef _IncomingMessage_ +#define _IncomingMessage_ + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,43 +22,97 @@ * */ #include <string> -#include <vector> +#include <queue> #include <framing/amqp_framing.h> +#include "ExceptionHolder.h" +#include "ClientMessage.h" +#include "sys/Mutex.h" +#include "sys/Condition.h" -#ifndef _IncomingMessage_ -#define _IncomingMessage_ +namespace qpid { -#include <ClientMessage.h> +namespace framing { +class AMQBody; +} -namespace qpid { namespace client { +/** + * Accumulates incoming message frames into messages. + * Client-initiated messages (basic.get) are initiated and made + * available to the user thread one at a time. + * + * Broker initiated messages (basic.return, basic.deliver) are + * queued for handling by the user dispatch thread. + */ +class IncomingMessage { + public: + typedef boost::shared_ptr<framing::AMQBody> BodyPtr; + IncomingMessage(); + + /** Expect a new message starting with getOk. Called in user thread.*/ + void startGet(); - class IncomingMessage{ - //content will be preceded by one of these method frames - qpid::framing::BasicDeliverBody::shared_ptr delivered; - qpid::framing::BasicReturnBody::shared_ptr returned; - qpid::framing::BasicGetOkBody::shared_ptr response; - qpid::framing::AMQHeaderBody::shared_ptr header; - std::string data; - public: - IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro); - IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro); - IncomingMessage(qpid::framing::BasicGetOkBody::shared_ptr intro); - ~IncomingMessage(); - void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); - void addContent(qpid::framing::AMQContentBody::shared_ptr content); - bool isComplete(); - bool isReturn(); - bool isDelivery(); - bool isResponse(); - const std::string& getConsumerTag();//only relevant if isDelivery() - qpid::framing::AMQHeaderBody::shared_ptr& getHeader(); - u_int64_t getDeliveryTag(); - std::string getData() const; - }; + /** Wait for the message to complete, return the message. + * Called in user thread. + *@raises QpidError if there was an error. + */ + bool waitGet(Message&); -} -} + /** Wait for the next broker-initiated message. */ + Message waitDispatch(); + + /** Add a frame body to the message. Called in network thread. */ + void add(BodyPtr); + + /** Shut down: all further calls to any function throw ex. */ + void shutdown(); + + /** Check if shutdown */ + bool isShutdown() const; + + private: + + typedef void (IncomingMessage::* ExpectFn)(BodyPtr); + typedef void (IncomingMessage::* EndFn)(Exception*); + typedef std::queue<Message> MessageQueue; + struct Guard; + friend struct Guard; + + void reset(); + template <class T> boost::shared_ptr<T> expectCheck(BodyPtr); + + // State functions - a state machine where each state is + // a member function that processes a frame body. + void expectGetOk(BodyPtr); + void expectHeader(BodyPtr); + void expectContent(BodyPtr); + void expectRequest(BodyPtr); + + // End functions. + void endGet(Exception* ex = 0); + void endRequest(Exception* ex); + + // Check for complete message. + void checkComplete(); + + mutable sys::Mutex lock; + ExpectFn state; + EndFn endFn; + Message buildMessage; + ExceptionHolder shutdownError; + + // For basic.get messages. + sys::Condition getReady; + ExceptionHolder getError; + Message getMessage; + enum { GETTING, GOT, EMPTY } getState; + + // For broker-initiated messages + sys::Condition dispatchReady; + MessageQueue dispatchQueue; +}; + +}} #endif diff --git a/cpp/lib/client/ResponseHandler.cpp b/cpp/lib/client/ResponseHandler.cpp index ea48fa2386..4498de41ae 100644 --- a/cpp/lib/client/ResponseHandler.cpp +++ b/cpp/lib/client/ResponseHandler.cpp @@ -59,11 +59,8 @@ void ResponseHandler::receive(ClassId c, MethodId m) { Monitor::ScopedLock l(monitor); while (waiting) monitor.wait(); - if (!response) { - THROW_QPID_ERROR( - PROTOCOL_ERROR, "Channel closed unexpectedly."); - } - if(!validate(response->amqpClassId(), response->amqpMethodId())) { + getResponse(); // Check for closed. + if(!validate(response->amqpClassId(), response->amqpMethodId())) { THROW_QPID_ERROR( PROTOCOL_ERROR, boost::format("Expected class:method %d:%d, got %d:%d") @@ -71,6 +68,13 @@ void ResponseHandler::receive(ClassId c, MethodId m) { } } +framing::AMQMethodBody::shared_ptr ResponseHandler::getResponse() { + if (!response) + THROW_QPID_ERROR( + PROTOCOL_ERROR, "Channel closed unexpectedly."); + return response; +} + RequestId ResponseHandler::getRequestId() { assert(response->getRequestId()); return response->getRequestId(); diff --git a/cpp/lib/client/ResponseHandler.h b/cpp/lib/client/ResponseHandler.h index af0c250eb1..d28048c3d3 100644 --- a/cpp/lib/client/ResponseHandler.h +++ b/cpp/lib/client/ResponseHandler.h @@ -42,7 +42,7 @@ class ResponseHandler{ ~ResponseHandler(); bool isWaiting(){ return waiting; } - framing::AMQMethodBody::shared_ptr getResponse(){ return response;} + framing::AMQMethodBody::shared_ptr getResponse(); void waitForResponse(); void signalResponse(framing::AMQMethodBody::shared_ptr response); diff --git a/cpp/lib/common/Exception.cpp b/cpp/lib/common/Exception.cpp index 0161518011..f7d91498e0 100644 --- a/cpp/lib/common/Exception.cpp +++ b/cpp/lib/common/Exception.cpp @@ -39,4 +39,8 @@ Exception* Exception::clone() const throw() { return new Exception(*this); } void Exception::throwSelf() const { throw *this; } +ShutdownException::ShutdownException() : Exception("Shut down.") {} + +EmptyException::EmptyException() : Exception("Empty.") {} + } // namespace qpid diff --git a/cpp/lib/common/Exception.h b/cpp/lib/common/Exception.h index a07e85e7e3..b8cd68cf8c 100644 --- a/cpp/lib/common/Exception.h +++ b/cpp/lib/common/Exception.h @@ -66,6 +66,7 @@ struct ChannelException : public Exception { template <class T> ChannelException(framing::ReplyCode code_, const T& message) : Exception(message), code(code_) {} + void throwSelf() const { throw *this; } }; struct ConnectionException : public Exception { @@ -73,8 +74,23 @@ struct ConnectionException : public Exception { template <class T> ConnectionException(framing::ReplyCode code_, const T& message) : Exception(message), code(code_) {} + void throwSelf() const { throw *this; } }; +/** + * Exception used to indicate that a thread should shut down. + * Does not indicate an error that should be signalled to the user. + */ +struct ShutdownException : public Exception { + ShutdownException(); + void throwSelf() const { throw *this; } +}; + +/** Exception to indicate empty queue or other empty state */ +struct EmptyException : public Exception { + EmptyException(); + void throwSelf() const { throw *this; } +}; } diff --git a/cpp/lib/common/ExceptionHolder.h b/cpp/lib/common/ExceptionHolder.h index 1e5a30e27c..2769455aba 100644 --- a/cpp/lib/common/ExceptionHolder.h +++ b/cpp/lib/common/ExceptionHolder.h @@ -19,11 +19,15 @@ * */ +#include <assert.h> #include <Exception.h> #include <boost/shared_ptr.hpp> namespace qpid { +// FIXME aconway 2007-02-20: Not necessary, a simple +// Exception::shared_ptr will do the job. Remove +// /** * Holder for a heap-allocated exc eption that can be stack allocated * and thrown safely. @@ -49,11 +53,11 @@ class ExceptionHolder : public Exception, public boost::shared_ptr<Exception> ~ExceptionHolder() throw() {} - const char* what() const throw() { return (*this)->what(); } - std::string toString() const throw() { return (*this)->toString(); } - virtual Exception* clone() const throw() { return (*this)->clone(); } - virtual void throwSelf() const { (*this)->throwSelf(); } - virtual void throwIf() const { if (*this) (*this)->throwSelf(); } + const char* what() const throw() { return get()->what(); } + std::string toString() const throw() { return get()->toString(); } + Exception* clone() const throw() { return get()->clone(); } + void throwIf() const { if (get()) get()->throwSelf(); } + void throwSelf() const { assert(get()); get()->throwSelf(); } }; } // namespace qpid diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h index 36362a417a..f6e3986eed 100644 --- a/cpp/lib/common/framing/ChannelAdapter.h +++ b/cpp/lib/common/framing/ChannelAdapter.h @@ -34,6 +34,8 @@ namespace framing { class MethodContext; +// FIXME aconway 2007-02-20: Rename as ChannelBase or just Channel. + /** * Base class for client and broker channels. * diff --git a/cpp/lib/common/sys/Condition.h b/cpp/lib/common/sys/Condition.h new file mode 100644 index 0000000000..9d70af5b84 --- /dev/null +++ b/cpp/lib/common/sys/Condition.h @@ -0,0 +1,128 @@ +#ifndef _sys_Condition_h +#define _sys_Condition_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <sys/errno.h> +#include <boost/noncopyable.hpp> +#include <sys/Mutex.h> +#include <sys/Time.h> + +#ifdef USE_APR +# include <apr_thread_cond.h> +#endif + +namespace qpid { +namespace sys { + +/** + * A condition variable for thread synchronization. + */ +class Condition +{ + public: + inline Condition(); + inline ~Condition(); + inline void wait(Mutex&); + inline bool wait(Mutex&, const Time& absoluteTime); + inline void notify(); + inline void notifyAll(); + + private: +#ifdef USE_APR + apr_thread_cond_t* condition; +#else + pthread_cond_t condition; +#endif +}; + + +// APR ================================================================ +#ifdef USE_APR + +Condition::Condition() { + CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); +} + +Condition::~Condition() { + CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); +} + +void Condition::wait(Mutex& mutex) { + CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex.mutex)); +} + +bool Condition::wait(Mutex& mutex, const Time& absoluteTime){ + // APR uses microseconds. + apr_status_t status = + apr_thread_cond_timedwait( + condition, mutex.mutex, absoluteTime/TIME_USEC); + if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status); + return status == 0; +} + +void Condition::notify(){ + CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); +} + +void Condition::notifyAll(){ + CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); +} + +#else +// POSIX ================================================================ + +Condition::Condition() { + QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0)); +} + +Condition::~Condition() { + QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition)); +} + +void Condition::wait(Mutex& mutex) { + QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex)); +} + +bool Condition::wait(Mutex& mutex, const Time& absoluteTime){ + struct timespec ts; + toTimespec(ts, absoluteTime); + int status = pthread_cond_timedwait(&condition, &mutex.mutex, &ts); + if (status != 0) { + if (status == ETIMEDOUT) return false; + throw QPID_POSIX_ERROR(status); + } + return true; +} + +void Condition::notify(){ + QPID_POSIX_THROW_IF(pthread_cond_signal(&condition)); +} + +void Condition::notifyAll(){ + QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition)); +} +#endif /*USE_APR*/ + + +}} +#endif /*!_sys_Condition_h*/ diff --git a/cpp/lib/common/sys/Monitor.h b/cpp/lib/common/sys/Monitor.h index e58931e699..a3bbd3c5aa 100644 --- a/cpp/lib/common/sys/Monitor.h +++ b/cpp/lib/common/sys/Monitor.h @@ -23,9 +23,7 @@ */ #include <sys/errno.h> -#include <boost/noncopyable.hpp> -#include <sys/Mutex.h> -#include <sys/Time.h> +#include <sys/Condition.h> #ifdef USE_APR # include <apr_thread_cond.h> @@ -37,91 +35,21 @@ namespace sys { /** * A monitor is a condition variable and a mutex */ -class Monitor : public Mutex -{ +class Monitor : public Mutex, public Condition { public: - inline Monitor(); - inline ~Monitor(); + using Condition::wait; inline void wait(); inline bool wait(const Time& absoluteTime); - inline void notify(); - inline void notifyAll(); - - private: -#ifdef USE_APR - apr_thread_cond_t* condition; -#else - pthread_cond_t condition; -#endif }; -// APR ================================================================ -#ifdef USE_APR - -Monitor::Monitor() { - CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); -} - -Monitor::~Monitor() { - CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); -} - -void Monitor::wait() { - CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); -} - -bool Monitor::wait(const Time& absoluteTime){ - // APR uses microseconds. - apr_status_t status = - apr_thread_cond_timedwait(condition, mutex, absoluteTime/TIME_USEC); - if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status); - return status == 0; -} - -void Monitor::notify(){ - CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); -} - -void Monitor::notifyAll(){ - CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); -} - -#else -// POSIX ================================================================ - -Monitor::Monitor() { - QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0)); -} - -Monitor::~Monitor() { - QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition)); -} - void Monitor::wait() { - QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex)); -} - -bool Monitor::wait(const Time& absoluteTime){ - struct timespec ts; - toTimespec(ts, absoluteTime); - int status = pthread_cond_timedwait(&condition, &mutex, &ts); - if (status != 0) { - if (status == ETIMEDOUT) return false; - throw QPID_POSIX_ERROR(status); - } - return true; + Condition::wait(*this); } -void Monitor::notify(){ - QPID_POSIX_THROW_IF(pthread_cond_signal(&condition)); +bool Monitor::wait(const Time& absoluteTime) { + return Condition::wait(*this, absoluteTime); } -void Monitor::notifyAll(){ - QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition)); -} -#endif /*USE_APR*/ - - }} #endif /*!_sys_Monitor_h*/ diff --git a/cpp/lib/common/sys/Mutex.h b/cpp/lib/common/sys/Mutex.h index 87d537fb9e..9db9be0981 100644 --- a/cpp/lib/common/sys/Mutex.h +++ b/cpp/lib/common/sys/Mutex.h @@ -32,6 +32,8 @@ namespace qpid { namespace sys { +class Condition; + /** * Scoped lock template: calls lock() in ctor, unlock() in dtor. * L can be any class with lock() and unlock() functions. @@ -76,6 +78,7 @@ class Mutex : private boost::noncopyable { #else pthread_mutex_t mutex; #endif + friend class Condition; }; #ifdef USE_APR |