diff options
Diffstat (limited to 'cpp/src/qpid/client/MessageMessageChannel.cpp')
-rw-r--r-- | cpp/src/qpid/client/MessageMessageChannel.cpp | 431 |
1 files changed, 0 insertions, 431 deletions
diff --git a/cpp/src/qpid/client/MessageMessageChannel.cpp b/cpp/src/qpid/client/MessageMessageChannel.cpp deleted file mode 100644 index 2a8f7a01c1..0000000000 --- a/cpp/src/qpid/client/MessageMessageChannel.cpp +++ /dev/null @@ -1,431 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include <boost/format.hpp> -#include "MessageMessageChannel.h" -#include "qpid/framing/AMQMethodBody.h" -#include "ClientChannel.h" -#include "ReturnedMessageHandler.h" -#include "MessageListener.h" -#include "qpid/framing/FieldTable.h" -#include "Connection.h" -#include "qpid/shared_ptr.h" -#include <boost/bind.hpp> - -namespace qpid { -namespace client { - -using namespace std; -using namespace sys; -using namespace framing; - -MessageMessageChannel::MessageMessageChannel(Channel& ch) - : channel(ch), tagCount(0) {} - -string MessageMessageChannel::newTag() { - Mutex::ScopedLock l(lock); - return (boost::format("__tag%d")%++tagCount).str(); -} - -void MessageMessageChannel::consume( - Queue& queue, std::string& tag, MessageListener* /*listener*/, - AckMode ackMode, bool noLocal, bool /*synch*/, const FieldTable* fields) -{ - if (tag.empty()) - tag = newTag(); - channel.sendAndReceive<MessageOkBody>( - make_shared_ptr(new MessageConsumeBody( - channel.getVersion(), 0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, fields ? *fields : FieldTable()))); - -// // FIXME aconway 2007-02-20: Race condition! -// // We could receive the first message for the consumer -// // before we create the consumer below. -// // Move consumer creation to handler for MessageConsumeOkBody -// { -// Mutex::ScopedLock l(lock); -// ConsumerMap::iterator i = consumers.find(tag); -// if (i != consumers.end()) -// THROW_QPID_ERROR(CLIENT_ERROR, -// "Consumer already exists with tag="+tag); -// Consumer& c = consumers[tag]; -// c.listener = listener; -// c.ackMode = ackMode; -// c.lastDeliveryTag = 0; -// } -} - - -void MessageMessageChannel::cancel(const std::string& /*tag*/, bool /*synch*/) { - // FIXME aconway 2007-02-23: -// Consumer c; -// { -// Mutex::ScopedLock l(lock); -// ConsumerMap::iterator i = consumers.find(tag); -// if (i == consumers.end()) -// return; -// c = i->second; -// consumers.erase(i); -// } -// if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) -// channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true)); -// channel.sendAndReceiveSync<MessageCancelOkBody>( -// synch, new MessageCancelBody(channel.version, tag, !synch)); -} - -void MessageMessageChannel::close(){ - // FIXME aconway 2007-02-23: -// ConsumerMap consumersCopy; -// { -// Mutex::ScopedLock l(lock); -// consumersCopy = consumers; -// consumers.clear(); -// } -// for (ConsumerMap::iterator i=consumersCopy.begin(); -// i != consumersCopy.end(); ++i) -// { -// Consumer& c = i->second; -// if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) -// && c.lastDeliveryTag > 0) -// { -// channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true)); -// } -// } -// incoming.shutdown(); -} - -void MessageMessageChannel::cancelAll(){ -} - -/** Destination ID for the current get. - * Must not clash with a generated consumer ID. - * TODO aconway 2007-03-06: support multiple outstanding gets? - */ -const string getDestinationId("__get__"); - -/** - * A destination that provides a Correlator::Action to handle - * MessageEmpty responses. - */ -struct MessageGetDestination : public IncomingMessage::WaitableDestination -{ - void response(shared_ptr<AMQResponseBody> response) { - if (response->amqpClassId() == MessageOkBody::CLASS_ID) { - switch (response->amqpMethodId()) { - case MessageOkBody::METHOD_ID: - // Nothing to do, wait for transfer. - return; - case MessageEmptyBody::METHOD_ID: - empty(); // Wake up waiter with empty queue. - return; - } - } - throw QPID_ERROR(PROTOCOL_ERROR, "Invalid response"); - } - - Correlator::Action action() { - return boost::bind(&MessageGetDestination::response, this, _1); - } -}; - -bool MessageMessageChannel::get( - Message& msg, const Queue& queue, AckMode ackMode) -{ - Mutex::ScopedLock l(lock); - std::string destName=newTag(); - MessageGetDestination dest; - incoming.addDestination(destName, dest); - channel.send( - make_shared_ptr( - new MessageGetBody( - channel.version, 0, queue.getName(), destName, ackMode)), - dest.action()); - return dest.wait(msg); -} - - -/** Convert a message to a transfer command. */ -MessageTransferBody::shared_ptr makeTransfer( - ProtocolVersion version, - const Message& msg, const string& destination, - const std::string& routingKey, bool mandatory, bool immediate) -{ - return MessageTransferBody::shared_ptr( - new MessageTransferBody( - version, - 0, // FIXME aconway 2007-04-03: ticket. - destination, - msg.isRedelivered(), - immediate, - 0, // FIXME aconway 2007-02-23: ttl - msg.getPriority(), - msg.getTimestamp(), - static_cast<uint8_t>(msg.getDeliveryMode()), - 0, // FIXME aconway 2007-04-03: Expiration - string(), // Exchange: for broker use only. - routingKey, - msg.getMessageId(), - msg.getCorrelationId(), - msg.getReplyTo(), - msg.getContentType(), - msg.getContentEncoding(), - msg.getUserId(), - msg.getAppId(), - string(), // FIXME aconway 2007-04-03: TransactionId - string(), //FIXME aconway 2007-04-03: SecurityToken - msg.getHeaders(), - Content(INLINE, msg.getData()), - mandatory - )); -} - -// FIXME aconway 2007-04-05: Generated code should provide this. -/** - * Calculate the size of a frame containing the given body type - * if all variable-lengths parts are empty. - */ -template <class T> size_t overhead() { - static AMQFrame frame( - ProtocolVersion(), 0, make_shared_ptr(new T(ProtocolVersion()))); - return frame.size(); -} - -void MessageMessageChannel::publish( - const Message& msg, const Exchange& exchange, - const std::string& routingKey, bool mandatory, bool immediate) -{ - MessageTransferBody::shared_ptr transfer = makeTransfer( - channel.getVersion(), - msg, exchange.getName(), routingKey, mandatory, immediate); - // Frame itself uses 8 bytes. - u_int32_t frameMax = channel.connection->getMaxFrameSize() - 8; - if (transfer->size() <= frameMax) { - channel.sendAndReceive<MessageOkBody>(transfer); - } - else { - std::string ref = newTag(); - std::string data = transfer->getBody().getValue(); - size_t chunk = - channel.connection->getMaxFrameSize() - - (overhead<MessageAppendBody>() + ref.size()); - // TODO aconway 2007-04-05: cast around lack of generated setters - const_cast<Content&>(transfer->getBody()) = Content(REFERENCE,ref); - channel.send( - make_shared_ptr(new MessageOpenBody(channel.version, ref))); - channel.send(transfer); - const char* p = data.data(); - const char* end = data.data()+data.size(); - while (p+chunk <= end) { - channel.send( - make_shared_ptr( - new MessageAppendBody(channel.version, ref, std::string(p, chunk)))); - p += chunk; - } - if (p < end) { - channel.send( - make_shared_ptr( - new MessageAppendBody(channel.version, ref, std::string(p, end-p)))); - } - channel.send(make_shared_ptr(new MessageCloseBody(channel.version, ref))); - } -} - -void copy(Message& msg, MessageTransferBody& transfer) { - // FIXME aconway 2007-04-05: Verify all required fields - // are copied. - msg.setContentType(transfer.getContentType()); - msg.setContentEncoding(transfer.getContentEncoding()); - msg.setHeaders(transfer.getApplicationHeaders()); - msg.setDeliveryMode(DeliveryMode(transfer.getDeliveryMode())); - msg.setPriority(transfer.getPriority()); - msg.setCorrelationId(transfer.getCorrelationId()); - msg.setReplyTo(transfer.getReplyTo()); - // FIXME aconway 2007-04-05: TTL/Expiration - msg.setMessageId(transfer.getMessageId()); - msg.setTimestamp(transfer.getTimestamp()); - msg.setUserId(transfer.getUserId()); - msg.setAppId(transfer.getAppId()); - msg.setDestination(transfer.getDestination()); - msg.setRedelivered(transfer.getRedelivered()); - msg.setDeliveryTag(0); // No meaning in 0-9 - if (transfer.getBody().isInline()) - msg.setData(transfer.getBody().getValue()); -} - -void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { - assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID); - switch(method->amqpMethodId()) { - case MessageAppendBody::METHOD_ID: { - MessageAppendBody::shared_ptr append = - shared_polymorphic_downcast<MessageAppendBody>(method); - incoming.appendReference(append->getReference(), append->getBytes()); - break; - } - case MessageOpenBody::METHOD_ID: { - MessageOpenBody::shared_ptr open = - shared_polymorphic_downcast<MessageOpenBody>(method); - incoming.openReference(open->getReference()); - break; - } - - case MessageCloseBody::METHOD_ID: { - MessageCloseBody::shared_ptr close = - shared_polymorphic_downcast<MessageCloseBody>(method); - incoming.closeReference(close->getReference()); - break; - } - - case MessageTransferBody::METHOD_ID: { - MessageTransferBody::shared_ptr transfer= - shared_polymorphic_downcast<MessageTransferBody>(method); - if (transfer->getBody().isInline()) { - Message msg; - copy(msg, *transfer); - // Deliver it. - incoming.getDestination(transfer->getDestination()).message(msg); - } - else { - Message& msg=incoming.createMessage( - transfer->getDestination(), transfer->getBody().getValue()); - copy(msg, *transfer); - // Will be delivered when reference closes. - } - break; - } - - case MessageEmptyBody::METHOD_ID: - case MessageOkBody::METHOD_ID: - // Nothing to do - break; - - // FIXME aconway 2007-04-03: TODO - case MessageCancelBody::METHOD_ID: - case MessageCheckpointBody::METHOD_ID: - case MessageOffsetBody::METHOD_ID: - case MessageQosBody::METHOD_ID: - case MessageRecoverBody::METHOD_ID: - case MessageRejectBody::METHOD_ID: - case MessageResumeBody::METHOD_ID: - break; - default: - throw Channel::UnknownMethod(); - } -} - -void MessageMessageChannel::handle(AMQHeaderBody::shared_ptr ){ - throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported"); -} - -void MessageMessageChannel::handle(AMQContentBody::shared_ptr ){ - throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported"); -} - -// FIXME aconway 2007-02-23: -// void MessageMessageChannel::deliver(IncomingMessage::Destination& consumer, Message& msg){ -// //record delivery tag: -// consumer.lastDeliveryTag = msg.getDeliveryTag(); - -// //allow registered listener to handle the message -// consumer.listener->received(msg); - -// if(channel.isOpen()){ -// bool multiple(false); -// switch(consumer.ackMode){ -// case LAZY_ACK: -// multiple = true; -// if(++(consumer.count) < channel.getPrefetch()) -// break; -// //else drop-through -// case AUTO_ACK: -// consumer.lastDeliveryTag = 0; -// channel.send( -// new MessageAckBody( -// channel.version, msg.getDeliveryTag(), multiple)); -// case NO_ACK: // Nothing to do -// case CLIENT_ACK: // User code must ack. -// break; -// // TODO aconway 2007-02-22: Provide a way for user -// // to ack! -// } -// } - -// //as it stands, transactionality is entirely orthogonal to ack -// //mode, though the acks will not be processed by the broker under -// //a transaction until it commits. -// } - - -void MessageMessageChannel::run() { - // FIXME aconway 2007-02-23: -// while(channel.isOpen()) { -// try { -// Message msg = incoming.waitDispatch(); -// if(msg.getMethod()->isA<MessageReturnBody>()) { -// ReturnedMessageHandler* handler=0; -// { -// Mutex::ScopedLock l(lock); -// handler=returnsHandler; -// } -// if(handler == 0) { -// // TODO aconway 2007-02-20: proper logging. -// QPID_LOG(warn, "No handler for message."); -// } -// else -// handler->returned(msg); -// } -// else { -// MessageDeliverBody::shared_ptr deliverBody = -// boost::shared_polymorphic_downcast<MessageDeliverBody>( -// msg.getMethod()); -// std::string tag = deliverBody->getConsumerTag(); -// Consumer consumer; -// { -// Mutex::ScopedLock l(lock); -// ConsumerMap::iterator i = consumers.find(tag); -// if(i == consumers.end()) -// THROW_QPID_ERROR(PROTOCOL_ERROR+504, -// "Unknown consumer tag=" + tag); -// consumer = i->second; -// } -// deliver(consumer, msg); -// } -// } -// catch (const ShutdownException&) { -// /* Orderly shutdown */ -// } -// catch (const Exception& e) { -// QPID_LOG(error, e.what()); -// } -// } -} - -void MessageMessageChannel::setReturnedMessageHandler( - ReturnedMessageHandler* ) -{ - throw QPID_ERROR(INTERNAL_ERROR, "Message class does not support returns"); -} - -void MessageMessageChannel::setQos(){ - channel.sendAndReceive<MessageOkBody>( - make_shared_ptr(new MessageQosBody(channel.version, 0, channel.getPrefetch(), false))); - if(channel.isTransactional()) - channel.sendAndReceive<TxSelectOkBody>( - make_shared_ptr(new TxSelectBody(channel.version))); -} - -}} // namespace qpid::client |