diff options
author | Andrew Stitcher <astitcher@apache.org> | 2007-04-02 11:40:48 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2007-04-02 11:40:48 +0000 |
commit | 16e203a0d32df9829bcf4fb738ef89fc94404155 (patch) | |
tree | b5dbb15f4a238ca377236ce16140443e20ed3e4a /cpp/lib/client/BasicMessageChannel.cpp | |
parent | fb410c63d08e87019b3d2a8d85820ae809758f62 (diff) | |
download | qpid-python-16e203a0d32df9829bcf4fb738ef89fc94404155.tar.gz |
Fix for the most disruptive items in QPID-243.
* All #include lines now use '""' rather than '<>' where appropriate.
* #include lines within the qpid project use relative includes so that
the same path will work in /usr/include when installed as part of the
client libraries.
* All the source code has now been rearranged to be under src in a directory
analogous to the namespace of the classes in it.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@524769 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/BasicMessageChannel.cpp')
-rw-r--r-- | cpp/lib/client/BasicMessageChannel.cpp | 395 |
1 files changed, 0 insertions, 395 deletions
diff --git a/cpp/lib/client/BasicMessageChannel.cpp b/cpp/lib/client/BasicMessageChannel.cpp deleted file mode 100644 index d6929965ee..0000000000 --- a/cpp/lib/client/BasicMessageChannel.cpp +++ /dev/null @@ -1,395 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "BasicMessageChannel.h" -#include "AMQMethodBody.h" -#include "ClientChannel.h" -#include "ReturnedMessageHandler.h" -#include "MessageListener.h" -#include "framing/FieldTable.h" -#include "Connection.h" -#include <queue> -#include <iostream> -#include <boost/format.hpp> -#include <boost/variant.hpp> - -namespace qpid { -namespace client { - -using namespace std; -using namespace sys; -using namespace framing; -using boost::format; - -namespace { - -// Destination name constants -const std::string BASIC_GET("__basic_get__"); -const std::string BASIC_RETURN("__basic_return__"); - -// Reference name constant -const std::string BASIC_REF("__basic_reference__"); -} - -class BasicMessageChannel::WaitableDestination : - public IncomingMessage::Destination -{ - public: - WaitableDestination() : shutdownFlag(false) {} - void message(const Message& msg) { - Mutex::ScopedLock l(monitor); - queue.push(msg); - monitor.notify(); - } - - void empty() { - Mutex::ScopedLock l(monitor); - queue.push(Empty()); - monitor.notify(); - } - - bool wait(Message& msgOut) { - Mutex::ScopedLock l(monitor); - while (queue.empty() && !shutdownFlag) - monitor.wait(); - if (shutdownFlag) - return false; - Message* msg = boost::get<Message>(&queue.front()); - bool success = msg; - if (success) - msgOut=*msg; - queue.pop(); - if (!queue.empty()) - monitor.notify(); // Wake another waiter. - return success; - } - - void shutdown() { - Mutex::ScopedLock l(monitor); - shutdownFlag = true; - monitor.notifyAll(); - } - - private: - struct Empty {}; - typedef boost::variant<Message,Empty> Item; - sys::Monitor monitor; - std::queue<Item> queue; - bool shutdownFlag; -}; - - -BasicMessageChannel::BasicMessageChannel(Channel& ch) - : channel(ch), returnsHandler(0), - destGet(new WaitableDestination()), - destDispatch(new WaitableDestination()) -{ - incoming.addDestination(BASIC_RETURN, *destDispatch); -} - -void BasicMessageChannel::consume( - Queue& queue, std::string& tag, MessageListener* listener, - AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) -{ - { - // Note we create a consumer even if tag="". In that case - // It will be renamed when we handle BasicConsumeOkBody. - // - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i != consumers.end()) - THROW_QPID_ERROR(CLIENT_ERROR, - "Consumer already exists with tag="+tag); - Consumer& c = consumers[tag]; - c.listener = listener; - c.ackMode = ackMode; - c.lastDeliveryTag = 0; - } - - // FIXME aconway 2007-03-23: get processed in both. - - // BasicConsumeOkBody is really processed in handle(), here - // we just pick up the tag to return to the user. - // - // We can't process it here because messages for the consumer may - // already be arriving. - // - BasicConsumeOkBody::shared_ptr ok = - channel.sendAndReceiveSync<BasicConsumeOkBody>( - synch, - new BasicConsumeBody( - channel.version, 0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable())); - tag = ok->getConsumerTag(); -} - - -void BasicMessageChannel::cancel(const std::string& tag, bool synch) { - Consumer c; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i == consumers.end()) - return; - c = i->second; - consumers.erase(i); - } - if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) - channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); - channel.sendAndReceiveSync<BasicCancelOkBody>( - synch, new BasicCancelBody(channel.version, tag, !synch)); -} - -void BasicMessageChannel::close(){ - ConsumerMap consumersCopy; - { - Mutex::ScopedLock l(lock); - consumersCopy = consumers; - consumers.clear(); - } - destGet->shutdown(); - destDispatch->shutdown(); - for (ConsumerMap::iterator i=consumersCopy.begin(); - i != consumersCopy.end(); ++i) - { - Consumer& c = i->second; - if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) - && c.lastDeliveryTag > 0) - { - channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); - } - } -} - - -bool BasicMessageChannel::get( - Message& msg, const Queue& queue, AckMode ackMode) -{ - // Prepare for incoming response - incoming.addDestination(BASIC_GET, *destGet); - channel.send( - new BasicGetBody(channel.version, 0, queue.getName(), ackMode)); - bool got = destGet->wait(msg); - return got; -} - -void BasicMessageChannel::publish( - const Message& msg, const Exchange& exchange, - const std::string& routingKey, bool mandatory, bool immediate) -{ - const string e = exchange.getName(); - string key = routingKey; - - // Make a header for the message - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - BasicHeaderProperties::copy( - *static_cast<BasicHeaderProperties*>(header->getProperties()), msg); - header->setContentSize(msg.getData().size()); - - channel.send( - new BasicPublishBody( - channel.version, 0, e, key, mandatory, immediate)); - channel.send(header); - string data = msg.getData(); - u_int64_t data_length = data.length(); - if(data_length > 0){ - //frame itself uses 8 bytes - u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8; - if(data_length < frag_size){ - channel.send(new AMQContentBody(data)); - }else{ - u_int32_t offset = 0; - u_int32_t remaining = data_length - offset; - while (remaining > 0) { - u_int32_t length = remaining > frag_size ? frag_size : remaining; - string frag(data.substr(offset, length)); - channel.send(new AMQContentBody(frag)); - - offset += length; - remaining = data_length - offset; - } - } - } -} - -void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { - assert(method->amqpClassId() ==BasicGetBody::CLASS_ID); - switch(method->amqpMethodId()) { - case BasicGetOkBody::METHOD_ID: { - incoming.openReference(BASIC_REF); - incoming.createMessage(BASIC_GET, BASIC_REF); - return; - } - case BasicGetEmptyBody::METHOD_ID: { - incoming.getDestination(BASIC_GET).empty(); - incoming.removeDestination(BASIC_GET); - return; - } - case BasicDeliverBody::METHOD_ID: { - BasicDeliverBody::shared_ptr deliver= - boost::shared_polymorphic_downcast<BasicDeliverBody>(method); - incoming.openReference(BASIC_REF); - Message& msg = incoming.createMessage( - deliver->getConsumerTag(), BASIC_REF); - msg.setDestination(deliver->getConsumerTag()); - msg.setDeliveryTag(deliver->getDeliveryTag()); - msg.setRedelivered(deliver->getRedelivered()); - return; - } - case BasicReturnBody::METHOD_ID: { - incoming.openReference(BASIC_REF); - incoming.createMessage(BASIC_RETURN, BASIC_REF); - return; - } - case BasicConsumeOkBody::METHOD_ID: { - Mutex::ScopedLock l(lock); - BasicConsumeOkBody::shared_ptr consumeOk = - boost::shared_polymorphic_downcast<BasicConsumeOkBody>(method); - std::string tag = consumeOk->getConsumerTag(); - ConsumerMap::iterator i = consumers.find(std::string()); - if (i != consumers.end()) { - // Need to rename the un-named consumer. - if (consumers.find(tag) == consumers.end()) { - consumers[tag] = i->second; - consumers.erase(i); - } - else // Tag already exists. - throw ChannelException(404, "Tag already exists: "+tag); - } - // FIXME aconway 2007-03-23: Integrate consumer & destination - // maps. - incoming.addDestination(tag, *destDispatch); - return; - } - } - throw Channel::UnknownMethod(); -} - -void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr header) { - BasicHeaderProperties* props = - boost::polymorphic_downcast<BasicHeaderProperties*>( - header->getProperties()); - IncomingMessage::Reference& ref = incoming.getReference(BASIC_REF); - assert (ref.messages.size() == 1); - ref.messages.front().BasicHeaderProperties::operator=(*props); - incoming_size = header->getContentSize(); - if (incoming_size==0) - incoming.closeReference(BASIC_REF); -} - -void BasicMessageChannel::handle(AMQContentBody::shared_ptr content){ - incoming.appendReference(BASIC_REF, content->getData()); - size_t size = incoming.getReference(BASIC_REF).data.size(); - if (size >= incoming_size) { - incoming.closeReference(BASIC_REF); - if (size > incoming_size) - throw ChannelException(502, "Content exceeded declared size"); - } -} - -void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){ - //record delivery tag: - consumer.lastDeliveryTag = msg.getDeliveryTag(); - - //allow registered listener to handle the message - consumer.listener->received(msg); - - if(channel.isOpen()){ - bool multiple(false); - switch(consumer.ackMode){ - case LAZY_ACK: - multiple = true; - if(++(consumer.count) < channel.getPrefetch()) - break; - //else drop-through - case AUTO_ACK: - consumer.lastDeliveryTag = 0; - channel.send( - new BasicAckBody( - channel.version, - msg.getDeliveryTag(), - multiple)); - case NO_ACK: // Nothing to do - case CLIENT_ACK: // User code must ack. - break; - // TODO aconway 2007-02-22: Provide a way for user - // to ack! - } - } - - //as it stands, transactionality is entirely orthogonal to ack - //mode, though the acks will not be processed by the broker under - //a transaction until it commits. -} - - -void BasicMessageChannel::run() { - while(channel.isOpen()) { - try { - Message msg; - bool gotMessge = destDispatch->wait(msg); - if (gotMessge) { - if(msg.getDestination() == BASIC_RETURN) { - ReturnedMessageHandler* handler=0; - { - Mutex::ScopedLock l(lock); - handler=returnsHandler; - } - if(handler != 0) - handler->returned(msg); - } - else { - Consumer consumer; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find( - msg.getDestination()); - if(i == consumers.end()) - THROW_QPID_ERROR(PROTOCOL_ERROR+504, - "Unknown consumer tag=" + - msg.getDestination()); - consumer = i->second; - } - deliver(consumer, msg); - } - } - } - catch (const ShutdownException&) { - /* Orderly shutdown */ - } - catch (const Exception& e) { - // FIXME aconway 2007-02-20: Report exception to user. - cout << "client::BasicMessageChannel::run() terminated by: " - << e.toString() << endl; - } - } -} - -void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ - Mutex::ScopedLock l(lock); - returnsHandler = handler; -} - -void BasicMessageChannel::setQos(){ - channel.sendAndReceive<BasicQosOkBody>( - new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)); - if(channel.isTransactional()) - channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version)); -} - -}} // namespace qpid::client |