diff options
Diffstat (limited to 'cpp/lib/client/ClientChannel.cpp')
-rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 38 |
1 files changed, 19 insertions, 19 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index c7b8e39ae5..3d0b547b07 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -56,9 +56,9 @@ void Channel::setPrefetch(u_int16_t _prefetch){ void Channel::setQos(){ // AMQP version management change - kpvdr 2006-11-20 // TODO: Make this class version-aware and link these hard-wired numbers to that version - sendAndReceive(new AMQFrame(id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok); + sendAndReceive(new AMQFrame(version, id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok); if(transactional){ - sendAndReceive(new AMQFrame(id, new TxSelectBody(version)), method_bodies.tx_select_ok); + sendAndReceive(new AMQFrame(version, id, new TxSelectBody(version)), method_bodies.tx_select_ok); } } @@ -66,7 +66,7 @@ void Channel::declareExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); string type = exchange.getType(); FieldTable args; - AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args)); + AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args)); if(synch){ sendAndReceive(frame, method_bodies.exchange_declare_ok); }else{ @@ -76,7 +76,7 @@ void Channel::declareExchange(Exchange& exchange, bool synch){ void Channel::deleteExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); - AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(version, 0, name, false, !synch)); + AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeleteBody(version, 0, name, false, !synch)); if(synch){ sendAndReceive(frame, method_bodies.exchange_delete_ok); }else{ @@ -87,7 +87,7 @@ void Channel::deleteExchange(Exchange& exchange, bool synch){ void Channel::declareQueue(Queue& queue, bool synch){ string name = queue.getName(); FieldTable args; - AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(version, 0, name, false, false, + AMQFrame* frame = new AMQFrame(version, id, new QueueDeclareBody(version, 0, name, false, false, queue.isExclusive(), queue.isAutoDelete(), !synch, args)); if(synch){ @@ -105,7 +105,7 @@ void Channel::declareQueue(Queue& queue, bool synch){ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ //ticket, queue, ifunused, ifempty, nowait string name = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)); + AMQFrame* frame = new AMQFrame(version, id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)); if(synch){ sendAndReceive(frame, method_bodies.queue_delete_ok); }else{ @@ -116,7 +116,7 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch) void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ string e = exchange.getName(); string q = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new QueueBindBody(version, 0, q, e, key,!synch, args)); + AMQFrame* frame = new AMQFrame(version, id, new QueueBindBody(version, 0, q, e, key,!synch, args)); if(synch){ sendAndReceive(frame, method_bodies.queue_bind_ok); }else{ @@ -130,7 +130,7 @@ void Channel::consume( { string q = queue.getName(); AMQFrame* frame = - new AMQFrame( + new AMQFrame(version, id, new BasicConsumeBody( version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch, @@ -152,10 +152,10 @@ void Channel::consume( void Channel::cancel(std::string& tag, bool synch){ Consumer* c = consumers[tag]; if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(id, new BasicAckBody(version, c->lastDeliveryTag, true))); + out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true))); } - AMQFrame* frame = new AMQFrame(id, new BasicCancelBody(version, (string&) tag, !synch)); + AMQFrame* frame = new AMQFrame(version, id, new BasicCancelBody(version, (string&) tag, !synch)); if(synch){ sendAndReceive(frame, method_bodies.basic_cancel_ok); }else{ @@ -171,7 +171,7 @@ void Channel::cancelAll(){ for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){ Consumer* c = i->second; if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true))); + out->send(new AMQFrame(version, id, new BasicAckBody(c->lastDeliveryTag, true))); } consumers.erase(i); delete c; @@ -193,7 +193,7 @@ void Channel::retrieve(Message& msg){ bool Channel::get(Message& msg, const Queue& queue, int ackMode){ string name = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new BasicGetBody(version, 0, name, ackMode)); + AMQFrame* frame = new AMQFrame(version, id, new BasicGetBody(version, 0, name, ackMode)); responses.expect(); out->send(frame); responses.waitForResponse(); @@ -219,25 +219,25 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& string e = exchange.getName(); string key = routingKey; - out->send(new AMQFrame(id, new BasicPublishBody(version, 0, e, key, mandatory, immediate))); + out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate))); //break msg up into header frame and content frame(s) and send these string data = msg.getData(); msg.header->setContentSize(data.length()); AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header)); - out->send(new AMQFrame(id, body)); + out->send(new AMQFrame(version, id, body)); u_int64_t data_length = data.length(); if(data_length > 0){ u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes if(data_length < frag_size){ - out->send(new AMQFrame(id, new AMQContentBody(data))); + out->send(new AMQFrame(version, id, new AMQContentBody(data))); }else{ u_int32_t offset = 0; u_int32_t remaining = data_length - offset; while (remaining > 0) { u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(data.substr(offset, length)); - out->send(new AMQFrame(id, new AMQContentBody(frag))); + out->send(new AMQFrame(version, id, new AMQContentBody(frag))); offset += length; remaining = data_length - offset; @@ -247,12 +247,12 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& } void Channel::commit(){ - AMQFrame* frame = new AMQFrame(id, new TxCommitBody(version)); + AMQFrame* frame = new AMQFrame(version, id, new TxCommitBody(version)); sendAndReceive(frame, method_bodies.tx_commit_ok); } void Channel::rollback(){ - AMQFrame* frame = new AMQFrame(id, new TxRollbackBody(version)); + AMQFrame* frame = new AMQFrame(version, id, new TxRollbackBody(version)); sendAndReceive(frame, method_bodies.tx_rollback_ok); } @@ -377,7 +377,7 @@ void Channel::deliver(Consumer* consumer, Message& msg){ if(++(consumer->count) < prefetch) break; //else drop-through case AUTO_ACK: - out->send(new AMQFrame(id, new BasicAckBody(msg.getDeliveryTag(), multiple))); + out->send(new AMQFrame(version, id, new BasicAckBody(msg.getDeliveryTag(), multiple))); consumer->lastDeliveryTag = 0; } } |