summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/Channel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/Channel.cpp')
-rw-r--r--cpp/src/qpid/client/Channel.cpp428
1 files changed, 0 insertions, 428 deletions
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
deleted file mode 100644
index ab4ea5b787..0000000000
--- a/cpp/src/qpid/client/Channel.cpp
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <qpid/client/Channel.h>
-#include <qpid/sys/Monitor.h>
-#include <qpid/client/Message.h>
-#include <qpid/QpidError.h>
-#include <qpid/client/MethodBodyInstances.h>
-
-using namespace boost; //to use dynamic_pointer_cast
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-Channel::Channel(bool _transactional, u_int16_t _prefetch) :
- id(0),
- con(0),
- out(0),
- incoming(0),
- closed(true),
- prefetch(_prefetch),
- transactional(_transactional),
-// AMQP version management change - kpvdr 2006-11-20
-// TODO: Make this class version-aware and link these hard-wired numbers to that version
- version(8, 0)
-{ }
-
-Channel::~Channel(){
- stop();
-}
-
-void Channel::setPrefetch(u_int16_t _prefetch){
- prefetch = _prefetch;
- if(con != 0 && out != 0){
- setQos();
- }
-}
-
-void Channel::setQos(){
-// AMQP version management change - kpvdr 2006-11-20
-// TODO: Make this class version-aware and link these hard-wired numbers to that version
- sendAndReceive(new AMQFrame(id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok);
- if(transactional){
- sendAndReceive(new AMQFrame(id, new TxSelectBody(version)), method_bodies.tx_select_ok);
- }
-}
-
-void Channel::declareExchange(Exchange& exchange, bool synch){
- string name = exchange.getName();
- string type = exchange.getType();
- FieldTable args;
- AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args));
- if(synch){
- sendAndReceive(frame, method_bodies.exchange_declare_ok);
- }else{
- out->send(frame);
- }
-}
-
-void Channel::deleteExchange(Exchange& exchange, bool synch){
- string name = exchange.getName();
- AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(version, 0, name, false, !synch));
- if(synch){
- sendAndReceive(frame, method_bodies.exchange_delete_ok);
- }else{
- out->send(frame);
- }
-}
-
-void Channel::declareQueue(Queue& queue, bool synch){
- string name = queue.getName();
- FieldTable args;
- AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(version, 0, name, false, false,
- queue.isExclusive(),
- queue.isAutoDelete(), !synch, args));
- if(synch){
- sendAndReceive(frame, method_bodies.queue_declare_ok);
- if(queue.getName().length() == 0){
- QueueDeclareOkBody::shared_ptr response =
- dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse());
- queue.setName(response->getQueue());
- }
- }else{
- out->send(frame);
- }
-}
-
-void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
- //ticket, queue, ifunused, ifempty, nowait
- string name = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
- if(synch){
- sendAndReceive(frame, method_bodies.queue_delete_ok);
- }else{
- out->send(frame);
- }
-}
-
-void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
- string e = exchange.getName();
- string q = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new QueueBindBody(version, 0, q, e, key,!synch, args));
- if(synch){
- sendAndReceive(frame, method_bodies.queue_bind_ok);
- }else{
- out->send(frame);
- }
-}
-
-void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener,
- int ackMode, bool noLocal, bool synch){
-
- string q = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(version, 0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch));
- if(synch){
- sendAndReceive(frame, method_bodies.basic_consume_ok);
- BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse());
- tag = response->getConsumerTag();
- }else{
- out->send(frame);
- }
- Consumer* c = new Consumer();
- c->listener = listener;
- c->ackMode = ackMode;
- c->lastDeliveryTag = 0;
- consumers[tag] = c;
-}
-
-void Channel::cancel(std::string& tag, bool synch){
- Consumer* c = consumers[tag];
- if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(id, new BasicAckBody(version, c->lastDeliveryTag, true)));
- }
-
- AMQFrame* frame = new AMQFrame(id, new BasicCancelBody(version, (string&) tag, !synch));
- if(synch){
- sendAndReceive(frame, method_bodies.basic_cancel_ok);
- }else{
- out->send(frame);
- }
- consumers.erase(tag);
- if(c != 0){
- delete c;
- }
-}
-
-void Channel::cancelAll(){
- for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){
- Consumer* c = i->second;
- if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true)));
- }
- consumers.erase(i);
- delete c;
- }
-}
-
-void Channel::retrieve(Message& msg){
- Monitor::ScopedLock l(retrievalMonitor);
- while(retrieved == 0){
- retrievalMonitor.wait();
- }
-
- msg.header = retrieved->getHeader();
- msg.deliveryTag = retrieved->getDeliveryTag();
- retrieved->getData(msg.data);
- delete retrieved;
- retrieved = 0;
-}
-
-bool Channel::get(Message& msg, const Queue& queue, int ackMode){
- string name = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new BasicGetBody(version, 0, name, ackMode));
- responses.expect();
- out->send(frame);
- responses.waitForResponse();
- AMQMethodBody::shared_ptr response = responses.getResponse();
- if(method_bodies.basic_get_ok.match(response.get())){
- if(incoming != 0){
- std::cout << "Existing message not complete" << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response));
- }
- retrieve(msg);
- return true;
- }if(method_bodies.basic_get_empty.match(response.get())){
- return false;
- }else{
- THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get.");
- }
-}
-
-
-void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){
- string e = exchange.getName();
- string key = routingKey;
-
- out->send(new AMQFrame(id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
- //break msg up into header frame and content frame(s) and send these
- string data = msg.getData();
- msg.header->setContentSize(data.length());
- AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header));
- out->send(new AMQFrame(id, body));
-
- u_int64_t data_length = data.length();
- if(data_length > 0){
- u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes
- if(data_length < frag_size){
- out->send(new AMQFrame(id, new AMQContentBody(data)));
- }else{
- u_int32_t offset = 0;
- u_int32_t remaining = data_length - offset;
- while (remaining > 0) {
- u_int32_t length = remaining > frag_size ? frag_size : remaining;
- string frag(data.substr(offset, length));
- out->send(new AMQFrame(id, new AMQContentBody(frag)));
-
- offset += length;
- remaining = data_length - offset;
- }
- }
- }
-}
-
-void Channel::commit(){
- AMQFrame* frame = new AMQFrame(id, new TxCommitBody(version));
- sendAndReceive(frame, method_bodies.tx_commit_ok);
-}
-
-void Channel::rollback(){
- AMQFrame* frame = new AMQFrame(id, new TxRollbackBody(version));
- sendAndReceive(frame, method_bodies.tx_rollback_ok);
-}
-
-void Channel::handleMethod(AMQMethodBody::shared_ptr body){
- //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request
- if(responses.isWaiting()){
- responses.signalResponse(body);
- }else if(method_bodies.basic_deliver.match(body.get())){
- if(incoming != 0){
- std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
- }
- }else if(method_bodies.basic_return.match(body.get())){
- if(incoming != 0){
- std::cout << "Existing message not complete" << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
- }
- }else if(method_bodies.channel_close.match(body.get())){
- con->removeChannel(this);
- //need to signal application that channel has been closed through exception
-
- }else if(method_bodies.channel_flow.match(body.get())){
-
- }else{
- //signal error
- std::cout << "Unhandled method: " << *body << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method");
- }
-}
-
-void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
- if(incoming == 0){
- //handle invalid frame sequence
- std::cout << "Invalid message sequence: got header before return or deliver." << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver.");
- }else{
- incoming->setHeader(body);
- if(incoming->isComplete()){
- enqueue();
- }
- }
-}
-
-void Channel::handleContent(AMQContentBody::shared_ptr body){
- if(incoming == 0){
- //handle invalid frame sequence
- std::cout << "Invalid message sequence: got content before return or deliver." << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver.");
- }else{
- incoming->addContent(body);
- if(incoming->isComplete()){
- enqueue();
- }
- }
-}
-
-void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
-}
-
-void Channel::start(){
- dispatcher = Thread(this);
-}
-
-void Channel::stop(){
- {
- Monitor::ScopedLock l(dispatchMonitor);
- closed = true;
- dispatchMonitor.notify();
- }
- dispatcher.join();
-}
-
-void Channel::run(){
- dispatch();
-}
-
-void Channel::enqueue(){
- if(incoming->isResponse()){
- Monitor::ScopedLock l(retrievalMonitor);
- retrieved = incoming;
- retrievalMonitor.notify();
- }else{
- Monitor::ScopedLock l(dispatchMonitor);
- messages.push(incoming);
- dispatchMonitor.notify();
- }
- incoming = 0;
-}
-
-IncomingMessage* Channel::dequeue(){
- Monitor::ScopedLock l(dispatchMonitor);
- while(messages.empty() && !closed){
- dispatchMonitor.wait();
- }
- IncomingMessage* msg = 0;
- if(!messages.empty()){
- msg = messages.front();
- messages.pop();
- }
- return msg;
-}
-
-void Channel::deliver(Consumer* consumer, Message& msg){
- //record delivery tag:
- consumer->lastDeliveryTag = msg.getDeliveryTag();
-
- //allow registered listener to handle the message
- consumer->listener->received(msg);
-
- //if the handler calls close on the channel or connection while
- //handling this message, then consumer will now have been deleted.
- if(!closed){
- bool multiple(false);
- switch(consumer->ackMode){
- case LAZY_ACK:
- multiple = true;
- if(++(consumer->count) < prefetch) break;
- //else drop-through
- case AUTO_ACK:
- out->send(new AMQFrame(id, new BasicAckBody(msg.getDeliveryTag(), multiple)));
- consumer->lastDeliveryTag = 0;
- }
- }
-
- //as it stands, transactionality is entirely orthogonal to ack
- //mode, though the acks will not be processed by the broker under
- //a transaction until it commits.
-}
-
-void Channel::dispatch(){
- while(!closed){
- IncomingMessage* incomingMsg = dequeue();
- if(incomingMsg){
- //Note: msg is currently only valid for duration of this call
- Message msg(incomingMsg->getHeader());
- incomingMsg->getData(msg.data);
- if(incomingMsg->isReturn()){
- if(returnsHandler == 0){
- //print warning to log/console
- std::cout << "Message returned: " << msg.getData() << std::endl;
- }else{
- returnsHandler->returned(msg);
- }
- }else{
- msg.deliveryTag = incomingMsg->getDeliveryTag();
- std::string tag = incomingMsg->getConsumerTag();
-
- if(consumers[tag] == 0){
- //signal error
- std::cout << "Unknown consumer: " << tag << std::endl;
- }else{
- deliver(consumers[tag], msg);
- }
- }
- delete incomingMsg;
- }
- }
-}
-
-void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
- returnsHandler = handler;
-}
-
-void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
- responses.expect();
- out->send(frame);
- responses.receive(body);
-}
-
-void Channel::close(){
- if(con != 0){
- con->closeChannel(this);
- }
-}