summaryrefslogtreecommitdiff
path: root/cpp/lib/client/Basic.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/client/Basic.cpp')
-rw-r--r--cpp/lib/client/Basic.cpp255
1 files changed, 0 insertions, 255 deletions
diff --git a/cpp/lib/client/Basic.cpp b/cpp/lib/client/Basic.cpp
deleted file mode 100644
index 4a1cf249a8..0000000000
--- a/cpp/lib/client/Basic.cpp
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include "Basic.h"
-#include "AMQMethodBody.h"
-#include "ClientChannel.h"
-#include "ReturnedMessageHandler.h"
-#include "MessageListener.h"
-#include "framing/FieldTable.h"
-#include "Connection.h"
-
-using namespace std;
-
-namespace qpid {
-namespace client {
-
-using namespace sys;
-using namespace framing;
-
-Basic::Basic(Channel& ch) : channel(ch), returnsHandler(0) {}
-
-void Basic::consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields)
-{
- channel.sendAndReceiveSync<BasicConsumeOkBody>(
- synch,
- new BasicConsumeBody(
- channel.version, 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable()));
- if (synch) {
- BasicConsumeOkBody::shared_ptr response =
- boost::shared_polymorphic_downcast<BasicConsumeOkBody>(
- channel.responses.getResponse());
- tag = response->getConsumerTag();
- }
- // FIXME aconway 2007-02-20: Race condition!
- // We could receive the first message for the consumer
- // before we create the consumer below.
- // Move consumer creation to handler for BasicConsumeOkBody
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if (i != consumers.end())
- THROW_QPID_ERROR(CLIENT_ERROR,
- "Consumer already exists with tag="+tag);
- Consumer& c = consumers[tag];
- c.listener = listener;
- c.ackMode = ackMode;
- c.lastDeliveryTag = 0;
- }
-}
-
-
-void Basic::cancel(const std::string& tag, bool synch) {
- Consumer c;
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if (i == consumers.end())
- return;
- c = i->second;
- consumers.erase(i);
- }
- if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
- channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
- channel.sendAndReceiveSync<BasicCancelOkBody>(
- synch, new BasicCancelBody(channel.version, tag, !synch));
-}
-
-void Basic::cancelAll(){
- ConsumerMap consumersCopy;
- {
- Mutex::ScopedLock l(lock);
- consumersCopy = consumers;
- consumers.clear();
- }
- for (ConsumerMap::iterator i=consumersCopy.begin();
- i != consumersCopy.end(); ++i)
- {
- Consumer& c = i->second;
- if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
- && c.lastDeliveryTag > 0)
- {
- channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
- }
- }
-}
-
-
-
-bool Basic::get(Message& msg, const Queue& queue, AckMode ackMode) {
- // Expect a message starting with a BasicGetOk
- incoming.startGet();
- channel.send(new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
- return incoming.waitGet(msg);
-}
-
-
-void Basic::publish(
- const Message& msg, const Exchange& exchange,
- const std::string& routingKey, bool mandatory, bool immediate)
-{
- const string e = exchange.getName();
- string key = routingKey;
- channel.send(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate));
- //break msg up into header frame and content frame(s) and send these
- channel.send(msg.getHeader());
- string data = msg.getData();
- uint64_t data_length = data.length();
- if(data_length > 0){
- //frame itself uses 8 bytes
- uint32_t frag_size = channel.connection->getMaxFrameSize() - 8;
- if(data_length < frag_size){
- channel.send(new AMQContentBody(data));
- }else{
- uint32_t offset = 0;
- uint32_t remaining = data_length - offset;
- while (remaining > 0) {
- uint32_t length = remaining > frag_size ? frag_size : remaining;
- string frag(data.substr(offset, length));
- channel.send(new AMQContentBody(frag));
-
- offset += length;
- remaining = data_length - offset;
- }
- }
- }
-}
-
-void Basic::handle(boost::shared_ptr<AMQMethodBody> method) {
- assert(method->amqpClassId() ==BasicGetBody::CLASS_ID);
- switch(method->amqpMethodId()) {
- case BasicDeliverBody::METHOD_ID:
- case BasicReturnBody::METHOD_ID:
- case BasicGetOkBody::METHOD_ID:
- case BasicGetEmptyBody::METHOD_ID:
- incoming.add(method);
- return;
- }
- throw Channel::UnknownMethod();
-}
-
-void Basic::deliver(Consumer& consumer, Message& msg){
- //record delivery tag:
- consumer.lastDeliveryTag = msg.getDeliveryTag();
-
- //allow registered listener to handle the message
- consumer.listener->received(msg);
-
- if(channel.isOpen()){
- bool multiple(false);
- switch(consumer.ackMode){
- case LAZY_ACK:
- multiple = true;
- if(++(consumer.count) < channel.getPrefetch())
- break;
- //else drop-through
- case AUTO_ACK:
- consumer.lastDeliveryTag = 0;
- channel.send(
- new BasicAckBody(
- channel.version, msg.getDeliveryTag(), multiple));
- case NO_ACK: // Nothing to do
- case CLIENT_ACK: // User code must ack.
- break;
- // TODO aconway 2007-02-22: Provide a way for user
- // to ack!
- }
- }
-
- //as it stands, transactionality is entirely orthogonal to ack
- //mode, though the acks will not be processed by the broker under
- //a transaction until it commits.
-}
-
-
-void Basic::run() {
- while(channel.isOpen()) {
- try {
- Message msg = incoming.waitDispatch();
- if(msg.getMethod()->isA<BasicReturnBody>()) {
- ReturnedMessageHandler* handler=0;
- {
- Mutex::ScopedLock l(lock);
- handler=returnsHandler;
- }
- if(handler == 0) {
- // TODO aconway 2007-02-20: proper logging.
- cout << "Message returned: " << msg.getData() << endl;
- }
- else
- handler->returned(msg);
- }
- else {
- BasicDeliverBody::shared_ptr deliverBody =
- boost::shared_polymorphic_downcast<BasicDeliverBody>(
- msg.getMethod());
- std::string tag = deliverBody->getConsumerTag();
- Consumer consumer;
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if(i == consumers.end())
- THROW_QPID_ERROR(PROTOCOL_ERROR+504,
- "Unknown consumer tag=" + tag);
- consumer = i->second;
- }
- deliver(consumer, msg);
- }
- }
- catch (const ShutdownException&) {
- /* Orderly shutdown */
- }
- catch (const Exception& e) {
- // FIXME aconway 2007-02-20: Report exception to user.
- cout << "client::Basic::run() terminated by: " << e.toString()
- << "(" << typeid(e).name() << ")" << endl;
- }
- }
-}
-
-void Basic::setReturnedMessageHandler(ReturnedMessageHandler* handler){
- Mutex::ScopedLock l(lock);
- returnsHandler = handler;
-}
-
-void Basic::setQos(){
- channel.sendAndReceive<BasicQosOkBody>(
- new BasicQosBody(channel.version, 0, channel.getPrefetch(), false));
- if(channel.isTransactional())
- channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version));
-}
-
-
-// TODO aconway 2007-02-22: NOTES:
-// Move incoming to BasicChannel - check for uses.
-
-}} // namespace qpid::client