diff options
author | Gordon Sim <gsim@apache.org> | 2007-08-15 17:26:43 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-08-15 17:26:43 +0000 |
commit | 3f2ac50fdb042c2c48ebbdc1e70e442f0bf1ab86 (patch) | |
tree | 11a8838ed371c23f57391b4a212c57ddb44051ff /cpp/src/qpid/client/ClientChannel.cpp | |
parent | fb26cfb87668cd7b87cf7cdea2ca1f8c367de1a2 (diff) | |
download | qpid-python-3f2ac50fdb042c2c48ebbdc1e70e442f0bf1ab86.tar.gz |
Altered old client channel to use new generated session interface (primarily to reduce the number of places where method bodies are constructed).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566274 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r-- | cpp/src/qpid/client/ClientChannel.cpp | 109 |
1 files changed, 39 insertions, 70 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index 424ff97ea1..3cf0373b7f 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -46,6 +46,14 @@ namespace client{ const std::string empty; +class ScopedSync +{ + Session& session; +public: + ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); } + ~ScopedSync() { session.setSynchronous(false); } +}; + }} Channel::Channel(bool _transactional, u_int16_t _prefetch) : @@ -64,7 +72,8 @@ void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); connection = c; - session = s; + sessionCore = s; + session = auto_ptr<Session>(new Session(c, s)); } bool Channel::isOpen() const { @@ -73,10 +82,10 @@ bool Channel::isOpen() const { } void Channel::setQos() { - sendSync(false, make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false))); + session->basicQos(0, getPrefetch(), false); if(isTransactional()) { //I think this is wrong! should only send TxSelect once... - sendSync(false, make_shared_ptr(new TxSelectBody(version))); + session->txSelect(); } } @@ -86,52 +95,46 @@ void Channel::setPrefetch(uint16_t _prefetch){ } void Channel::declareExchange(Exchange& exchange, bool synch){ - string name = exchange.getName(); - string type = exchange.getType(); FieldTable args; - sendSync(synch, make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args))); + ScopedSync s(*session, synch); + session->exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args); } void Channel::deleteExchange(Exchange& exchange, bool synch){ - string name = exchange.getName(); - sendSync(synch, make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false))); + ScopedSync s(*session, synch); + session->exchangeDelete(0, exchange.getName(), false); } void Channel::declareQueue(Queue& queue, bool synch){ - string name = queue.getName(); FieldTable args; - QueueDeclareOkBody::shared_ptr response = - sendAndReceiveSync<QueueDeclareOkBody>( - synch, - make_shared_ptr(new QueueDeclareBody( - version, 0, name, empty, false/*passive*/, queue.isDurable(), - queue.isExclusive(), queue.isAutoDelete(), !synch, args))); + ScopedSync s(*session, synch); + Response r = session->queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(), + queue.isExclusive(), queue.isAutoDelete(), !synch, args); + if(synch) { if(queue.getName().length() == 0) - queue.setName(response->getQueue()); + queue.setName(r.as<QueueDeclareOkBody>().getQueue()); } } void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ - //ticket, queue, ifunused, ifempty, nowait - string name = queue.getName(); - sendAndReceiveSync<QueueDeleteOkBody>( - synch, - make_shared_ptr(new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch))); + ScopedSync s(*session, synch); + session->queueDelete(0, queue.getName(), ifunused, ifempty, !synch); } void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ string e = exchange.getName(); string q = queue.getName(); - sendSync(synch, make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args))); + ScopedSync s(*session, synch); + session->queueBind(0, q, e, key, args); } void Channel::commit(){ - sendSync(false, make_shared_ptr(new TxCommitBody(version))); + session->txCommit(); } void Channel::rollback(){ - sendSync(false, make_shared_ptr(new TxRollbackBody(version))); + session->txRollback(); } void Channel::close() @@ -141,43 +144,14 @@ void Channel::close() Mutex::ScopedLock l(lock); if (connection); { - connection->released(session); + session.reset(); + sessionCore.reset(); connection.reset(); } } stop(); } -AMQMethodBody::shared_ptr Channel::sendAndReceive(AMQMethodBody::shared_ptr toSend, ClassId /*c*/, MethodId /*m*/) -{ - session->setSync(true); - Response r = session->send(toSend, true); - session->setSync(false); - return r.getPtr(); -} - -void Channel::sendSync(bool sync, AMQMethodBody::shared_ptr command) -{ - if(sync) { - session->setSync(true); - session->send(command, false); - session->setSync(false); - } else { - session->send(command); - } -} - -AMQMethodBody::shared_ptr Channel::sendAndReceiveSync( - bool sync, AMQMethodBody::shared_ptr body, ClassId c, MethodId m) -{ - if(sync) - return sendAndReceive(body, c, m); - else { - session->send(body); - return AMQMethodBody::shared_ptr(); - } -} - void Channel::consume( Queue& queue, const std::string& tag, MessageListener* listener, AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { @@ -195,12 +169,10 @@ void Channel::consume( c.ackMode = ackMode; c.lastDeliveryTag = 0; } - sendAndReceiveSync<BasicConsumeOkBody>( - synch, - make_shared_ptr(new BasicConsumeBody( - version, 0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable()))); + ScopedSync s(*session, synch); + session->basicConsume(0, queue.getName(), tag, noLocal, + ackMode == NO_ACK, false, !synch, + fields ? *fields : FieldTable()); } void Channel::cancel(const std::string& tag, bool synch) { @@ -213,16 +185,13 @@ void Channel::cancel(const std::string& tag, bool synch) { c = i->second; consumers.erase(i); } - sendAndReceiveSync<BasicCancelOkBody>( - synch, make_shared_ptr(new BasicCancelBody(version, tag, !synch))); + ScopedSync s(*session, synch); + session->basicCancel(tag, !synch); } bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { - - AMQMethodBody::shared_ptr request(new BasicGetBody(version, 0, queue.getName(), ackMode)); - - Response response = session->send(request, true); - session->flush(); + Response response = session->basicGet(0, queue.getName(), ackMode == NO_ACK); + sessionCore->flush();//TODO: need to expose the ability to request completion info through session if (response.isA<BasicGetEmptyBody>()) { return false; } else { @@ -239,7 +208,7 @@ void Channel::publish(const Message& msg, const Exchange& exchange, const string e = exchange.getName(); string key = routingKey; - session->send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)), msg, false); + session->basicPublish(0, e, key, mandatory, immediate, msg); } void Channel::start(){ @@ -248,7 +217,7 @@ void Channel::start(){ } void Channel::stop() { - session->stop(); + //session->stop(); gets.close(); join(); } |