summaryrefslogtreecommitdiff
path: root/cpp/src/client/BasicMessageChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/client/BasicMessageChannel.cpp')
-rw-r--r--cpp/src/client/BasicMessageChannel.cpp395
1 files changed, 395 insertions, 0 deletions
diff --git a/cpp/src/client/BasicMessageChannel.cpp b/cpp/src/client/BasicMessageChannel.cpp
new file mode 100644
index 0000000000..26c3fe543c
--- /dev/null
+++ b/cpp/src/client/BasicMessageChannel.cpp
@@ -0,0 +1,395 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "BasicMessageChannel.h"
+#include "../framing/AMQMethodBody.h"
+#include "ClientChannel.h"
+#include "ReturnedMessageHandler.h"
+#include "MessageListener.h"
+#include "../framing/FieldTable.h"
+#include "Connection.h"
+#include <queue>
+#include <iostream>
+#include <boost/format.hpp>
+#include <boost/variant.hpp>
+
+namespace qpid {
+namespace client {
+
+using namespace std;
+using namespace sys;
+using namespace framing;
+using boost::format;
+
+namespace {
+
+// Destination name constants
+const std::string BASIC_GET("__basic_get__");
+const std::string BASIC_RETURN("__basic_return__");
+
+// Reference name constant
+const std::string BASIC_REF("__basic_reference__");
+}
+
+class BasicMessageChannel::WaitableDestination :
+ public IncomingMessage::Destination
+{
+ public:
+ WaitableDestination() : shutdownFlag(false) {}
+ void message(const Message& msg) {
+ Mutex::ScopedLock l(monitor);
+ queue.push(msg);
+ monitor.notify();
+ }
+
+ void empty() {
+ Mutex::ScopedLock l(monitor);
+ queue.push(Empty());
+ monitor.notify();
+ }
+
+ bool wait(Message& msgOut) {
+ Mutex::ScopedLock l(monitor);
+ while (queue.empty() && !shutdownFlag)
+ monitor.wait();
+ if (shutdownFlag)
+ return false;
+ Message* msg = boost::get<Message>(&queue.front());
+ bool success = msg;
+ if (success)
+ msgOut=*msg;
+ queue.pop();
+ if (!queue.empty())
+ monitor.notify(); // Wake another waiter.
+ return success;
+ }
+
+ void shutdown() {
+ Mutex::ScopedLock l(monitor);
+ shutdownFlag = true;
+ monitor.notifyAll();
+ }
+
+ private:
+ struct Empty {};
+ typedef boost::variant<Message,Empty> Item;
+ sys::Monitor monitor;
+ std::queue<Item> queue;
+ bool shutdownFlag;
+};
+
+
+BasicMessageChannel::BasicMessageChannel(Channel& ch)
+ : channel(ch), returnsHandler(0),
+ destGet(new WaitableDestination()),
+ destDispatch(new WaitableDestination())
+{
+ incoming.addDestination(BASIC_RETURN, *destDispatch);
+}
+
+void BasicMessageChannel::consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields)
+{
+ {
+ // Note we create a consumer even if tag="". In that case
+ // It will be renamed when we handle BasicConsumeOkBody.
+ //
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ THROW_QPID_ERROR(CLIENT_ERROR,
+ "Consumer already exists with tag="+tag);
+ Consumer& c = consumers[tag];
+ c.listener = listener;
+ c.ackMode = ackMode;
+ c.lastDeliveryTag = 0;
+ }
+
+ // FIXME aconway 2007-03-23: get processed in both.
+
+ // BasicConsumeOkBody is really processed in handle(), here
+ // we just pick up the tag to return to the user.
+ //
+ // We can't process it here because messages for the consumer may
+ // already be arriving.
+ //
+ BasicConsumeOkBody::shared_ptr ok =
+ channel.sendAndReceiveSync<BasicConsumeOkBody>(
+ synch,
+ new BasicConsumeBody(
+ channel.version, 0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, !synch,
+ fields ? *fields : FieldTable()));
+ tag = ok->getConsumerTag();
+}
+
+
+void BasicMessageChannel::cancel(const std::string& tag, bool synch) {
+ Consumer c;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i == consumers.end())
+ return;
+ c = i->second;
+ consumers.erase(i);
+ }
+ if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+ channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+ channel.sendAndReceiveSync<BasicCancelOkBody>(
+ synch, new BasicCancelBody(channel.version, tag, !synch));
+}
+
+void BasicMessageChannel::close(){
+ ConsumerMap consumersCopy;
+ {
+ Mutex::ScopedLock l(lock);
+ consumersCopy = consumers;
+ consumers.clear();
+ }
+ destGet->shutdown();
+ destDispatch->shutdown();
+ for (ConsumerMap::iterator i=consumersCopy.begin();
+ i != consumersCopy.end(); ++i)
+ {
+ Consumer& c = i->second;
+ if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
+ && c.lastDeliveryTag > 0)
+ {
+ channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+ }
+ }
+}
+
+
+bool BasicMessageChannel::get(
+ Message& msg, const Queue& queue, AckMode ackMode)
+{
+ // Prepare for incoming response
+ incoming.addDestination(BASIC_GET, *destGet);
+ channel.send(
+ new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
+ bool got = destGet->wait(msg);
+ return got;
+}
+
+void BasicMessageChannel::publish(
+ const Message& msg, const Exchange& exchange,
+ const std::string& routingKey, bool mandatory, bool immediate)
+{
+ const string e = exchange.getName();
+ string key = routingKey;
+
+ // Make a header for the message
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ BasicHeaderProperties::copy(
+ *static_cast<BasicHeaderProperties*>(header->getProperties()), msg);
+ header->setContentSize(msg.getData().size());
+
+ channel.send(
+ new BasicPublishBody(
+ channel.version, 0, e, key, mandatory, immediate));
+ channel.send(header);
+ string data = msg.getData();
+ u_int64_t data_length = data.length();
+ if(data_length > 0){
+ //frame itself uses 8 bytes
+ u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8;
+ if(data_length < frag_size){
+ channel.send(new AMQContentBody(data));
+ }else{
+ u_int32_t offset = 0;
+ u_int32_t remaining = data_length - offset;
+ while (remaining > 0) {
+ u_int32_t length = remaining > frag_size ? frag_size : remaining;
+ string frag(data.substr(offset, length));
+ channel.send(new AMQContentBody(frag));
+
+ offset += length;
+ remaining = data_length - offset;
+ }
+ }
+ }
+}
+
+void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
+ assert(method->amqpClassId() ==BasicGetBody::CLASS_ID);
+ switch(method->amqpMethodId()) {
+ case BasicGetOkBody::METHOD_ID: {
+ incoming.openReference(BASIC_REF);
+ incoming.createMessage(BASIC_GET, BASIC_REF);
+ return;
+ }
+ case BasicGetEmptyBody::METHOD_ID: {
+ incoming.getDestination(BASIC_GET).empty();
+ incoming.removeDestination(BASIC_GET);
+ return;
+ }
+ case BasicDeliverBody::METHOD_ID: {
+ BasicDeliverBody::shared_ptr deliver=
+ boost::shared_polymorphic_downcast<BasicDeliverBody>(method);
+ incoming.openReference(BASIC_REF);
+ Message& msg = incoming.createMessage(
+ deliver->getConsumerTag(), BASIC_REF);
+ msg.setDestination(deliver->getConsumerTag());
+ msg.setDeliveryTag(deliver->getDeliveryTag());
+ msg.setRedelivered(deliver->getRedelivered());
+ return;
+ }
+ case BasicReturnBody::METHOD_ID: {
+ incoming.openReference(BASIC_REF);
+ incoming.createMessage(BASIC_RETURN, BASIC_REF);
+ return;
+ }
+ case BasicConsumeOkBody::METHOD_ID: {
+ Mutex::ScopedLock l(lock);
+ BasicConsumeOkBody::shared_ptr consumeOk =
+ boost::shared_polymorphic_downcast<BasicConsumeOkBody>(method);
+ std::string tag = consumeOk->getConsumerTag();
+ ConsumerMap::iterator i = consumers.find(std::string());
+ if (i != consumers.end()) {
+ // Need to rename the un-named consumer.
+ if (consumers.find(tag) == consumers.end()) {
+ consumers[tag] = i->second;
+ consumers.erase(i);
+ }
+ else // Tag already exists.
+ throw ChannelException(404, "Tag already exists: "+tag);
+ }
+ // FIXME aconway 2007-03-23: Integrate consumer & destination
+ // maps.
+ incoming.addDestination(tag, *destDispatch);
+ return;
+ }
+ }
+ throw Channel::UnknownMethod();
+}
+
+void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr header) {
+ BasicHeaderProperties* props =
+ boost::polymorphic_downcast<BasicHeaderProperties*>(
+ header->getProperties());
+ IncomingMessage::Reference& ref = incoming.getReference(BASIC_REF);
+ assert (ref.messages.size() == 1);
+ ref.messages.front().BasicHeaderProperties::operator=(*props);
+ incoming_size = header->getContentSize();
+ if (incoming_size==0)
+ incoming.closeReference(BASIC_REF);
+}
+
+void BasicMessageChannel::handle(AMQContentBody::shared_ptr content){
+ incoming.appendReference(BASIC_REF, content->getData());
+ size_t size = incoming.getReference(BASIC_REF).data.size();
+ if (size >= incoming_size) {
+ incoming.closeReference(BASIC_REF);
+ if (size > incoming_size)
+ throw ChannelException(502, "Content exceeded declared size");
+ }
+}
+
+void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){
+ //record delivery tag:
+ consumer.lastDeliveryTag = msg.getDeliveryTag();
+
+ //allow registered listener to handle the message
+ consumer.listener->received(msg);
+
+ if(channel.isOpen()){
+ bool multiple(false);
+ switch(consumer.ackMode){
+ case LAZY_ACK:
+ multiple = true;
+ if(++(consumer.count) < channel.getPrefetch())
+ break;
+ //else drop-through
+ case AUTO_ACK:
+ consumer.lastDeliveryTag = 0;
+ channel.send(
+ new BasicAckBody(
+ channel.version,
+ msg.getDeliveryTag(),
+ multiple));
+ case NO_ACK: // Nothing to do
+ case CLIENT_ACK: // User code must ack.
+ break;
+ // TODO aconway 2007-02-22: Provide a way for user
+ // to ack!
+ }
+ }
+
+ //as it stands, transactionality is entirely orthogonal to ack
+ //mode, though the acks will not be processed by the broker under
+ //a transaction until it commits.
+}
+
+
+void BasicMessageChannel::run() {
+ while(channel.isOpen()) {
+ try {
+ Message msg;
+ bool gotMessge = destDispatch->wait(msg);
+ if (gotMessge) {
+ if(msg.getDestination() == BASIC_RETURN) {
+ ReturnedMessageHandler* handler=0;
+ {
+ Mutex::ScopedLock l(lock);
+ handler=returnsHandler;
+ }
+ if(handler != 0)
+ handler->returned(msg);
+ }
+ else {
+ Consumer consumer;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(
+ msg.getDestination());
+ if(i == consumers.end())
+ THROW_QPID_ERROR(PROTOCOL_ERROR+504,
+ "Unknown consumer tag=" +
+ msg.getDestination());
+ consumer = i->second;
+ }
+ deliver(consumer, msg);
+ }
+ }
+ }
+ catch (const ShutdownException&) {
+ /* Orderly shutdown */
+ }
+ catch (const Exception& e) {
+ // FIXME aconway 2007-02-20: Report exception to user.
+ cout << "client::BasicMessageChannel::run() terminated by: "
+ << e.toString() << endl;
+ }
+ }
+}
+
+void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
+ Mutex::ScopedLock l(lock);
+ returnsHandler = handler;
+}
+
+void BasicMessageChannel::setQos(){
+ channel.sendAndReceive<BasicQosOkBody>(
+ new BasicQosBody(channel.version, 0, channel.getPrefetch(), false));
+ if(channel.isTransactional())
+ channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version));
+}
+
+}} // namespace qpid::client