diff options
author | Gordon Sim <gsim@apache.org> | 2007-09-06 20:27:33 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-09-06 20:27:33 +0000 |
commit | b33a63b36c659a894143382d0a61efe6a598fcc6 (patch) | |
tree | 0efc848ae9cc6064d615c6968b1d127e92b231d3 /cpp/src/qpid/client/ClientChannel.cpp | |
parent | 748698e4b8d5bd0c3ccec4ca898d334c13fc0795 (diff) | |
download | qpid-python-b33a63b36c659a894143382d0a61efe6a598fcc6.tar.gz |
Implementation of execution.result on the client side
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@573359 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r-- | cpp/src/qpid/client/ClientChannel.cpp | 65 |
1 files changed, 30 insertions, 35 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index cc2b7aedc8..1a0fd25bc3 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -56,7 +56,7 @@ class ScopedSync Channel::Channel(bool _transactional, u_int16_t _prefetch) : prefetch(_prefetch), transactional(_transactional), running(false), - uniqueId(true)/*could eventually be the session id*/, nameCounter(0) + uniqueId(true)/*could eventually be the session id*/, nameCounter(0), active(false) { } @@ -65,26 +65,25 @@ Channel::~Channel() join(); } -void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s) +void Channel::open(const Session& s) { + Mutex::ScopedLock l(lock); if (isOpen()) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); - - connection = c; - sessionCore = s; - session = auto_ptr<Session>(new Session(c, s)); + active = true; + session = s; } bool Channel::isOpen() const { Mutex::ScopedLock l(lock); - return connection; + return active; } void Channel::setQos() { - session->basicQos(0, getPrefetch(), false); + session.basicQos(0, getPrefetch(), false); if(isTransactional()) { //I think this is wrong! should only send TxSelect once... - session->txSelect(); + session.txSelect(); } } @@ -95,13 +94,13 @@ void Channel::setPrefetch(uint16_t _prefetch){ void Channel::declareExchange(Exchange& exchange, bool synch){ FieldTable args; - ScopedSync s(*session, synch); - session->exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args); + ScopedSync s(session, synch); + session.exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args); } void Channel::deleteExchange(Exchange& exchange, bool synch){ - ScopedSync s(*session, synch); - session->exchangeDelete(0, exchange.getName(), false); + ScopedSync s(session, synch); + session.exchangeDelete(0, exchange.getName(), false); } void Channel::declareQueue(Queue& queue, bool synch){ @@ -112,30 +111,30 @@ void Channel::declareQueue(Queue& queue, bool synch){ } FieldTable args; - ScopedSync s(*session, synch); - session->queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(), + ScopedSync s(session, synch); + session.queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), args); } void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ - ScopedSync s(*session, synch); - session->queueDelete(0, queue.getName(), ifunused, ifempty); + ScopedSync s(session, synch); + session.queueDelete(0, queue.getName(), ifunused, ifempty); } void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ string e = exchange.getName(); string q = queue.getName(); - ScopedSync s(*session, synch); - session->queueBind(0, q, e, key, args); + ScopedSync s(session, synch); + session.queueBind(0, q, e, key, args); } void Channel::commit(){ - session->txCommit(); + session.txCommit(); } void Channel::rollback(){ - session->txRollback(); + session.txRollback(); } void Channel::consume( @@ -155,8 +154,8 @@ void Channel::consume( c.ackMode = ackMode; c.lastDeliveryTag = 0; } - ScopedSync s(*session, synch); - session->basicConsume(0, queue.getName(), tag, noLocal, + ScopedSync s(session, synch); + session.basicConsume(0, queue.getName(), tag, noLocal, ackMode == NO_ACK, false, !synch, fields ? *fields : FieldTable()); } @@ -171,13 +170,13 @@ void Channel::cancel(const std::string& tag, bool synch) { c = i->second; consumers.erase(i); } - ScopedSync s(*session, synch); - session->basicCancel(tag); + ScopedSync s(session, synch); + session.basicCancel(tag); } bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { - Response response = session->basicGet(0, queue.getName(), ackMode == NO_ACK); - sessionCore->flush();//TODO: need to expose the ability to request completion info through session + Response response = session.basicGet(0, queue.getName(), ackMode == NO_ACK); + session.execution().sendFlushRequest(); if (response.isA<BasicGetEmptyBody>()) { return false; } else { @@ -194,19 +193,15 @@ void Channel::publish(const Message& msg, const Exchange& exchange, const string e = exchange.getName(); string key = routingKey; - session->basicPublish(0, e, key, mandatory, immediate, msg); + session.basicPublish(0, e, key, mandatory, immediate, msg); } void Channel::close() { - session->close(); + session.close(); { Mutex::ScopedLock l(lock); - if (connection); - { - sessionCore.reset(); - connection.reset(); - } + active = false; } stop(); } @@ -232,7 +227,7 @@ void Channel::join() { void Channel::run() { try { while (true) { - FrameSet::shared_ptr content = session->get(); + FrameSet::shared_ptr content = session.get(); //need to dispatch this to the relevant listener: if (content->isA<BasicDeliverBody>()) { ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag()); |