diff options
author | Gordon Sim <gsim@apache.org> | 2007-08-15 17:26:43 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-08-15 17:26:43 +0000 |
commit | 3f2ac50fdb042c2c48ebbdc1e70e442f0bf1ab86 (patch) | |
tree | 11a8838ed371c23f57391b4a212c57ddb44051ff /cpp/src | |
parent | fb26cfb87668cd7b87cf7cdea2ca1f8c367de1a2 (diff) | |
download | qpid-python-3f2ac50fdb042c2c48ebbdc1e70e442f0bf1ab86.tar.gz |
Altered old client channel to use new generated session interface (primarily to reduce the number of places where method bodies are constructed).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566274 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/ClientChannel.cpp | 109 | ||||
-rw-r--r-- | cpp/src/qpid/client/ClientChannel.h | 36 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/Response.h | 8 |
4 files changed, 46 insertions, 112 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index 424ff97ea1..3cf0373b7f 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -46,6 +46,14 @@ namespace client{ const std::string empty; +class ScopedSync +{ + Session& session; +public: + ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); } + ~ScopedSync() { session.setSynchronous(false); } +}; + }} Channel::Channel(bool _transactional, u_int16_t _prefetch) : @@ -64,7 +72,8 @@ void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); connection = c; - session = s; + sessionCore = s; + session = auto_ptr<Session>(new Session(c, s)); } bool Channel::isOpen() const { @@ -73,10 +82,10 @@ bool Channel::isOpen() const { } void Channel::setQos() { - sendSync(false, make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false))); + session->basicQos(0, getPrefetch(), false); if(isTransactional()) { //I think this is wrong! should only send TxSelect once... - sendSync(false, make_shared_ptr(new TxSelectBody(version))); + session->txSelect(); } } @@ -86,52 +95,46 @@ void Channel::setPrefetch(uint16_t _prefetch){ } void Channel::declareExchange(Exchange& exchange, bool synch){ - string name = exchange.getName(); - string type = exchange.getType(); FieldTable args; - sendSync(synch, make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args))); + ScopedSync s(*session, synch); + session->exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args); } void Channel::deleteExchange(Exchange& exchange, bool synch){ - string name = exchange.getName(); - sendSync(synch, make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false))); + ScopedSync s(*session, synch); + session->exchangeDelete(0, exchange.getName(), false); } void Channel::declareQueue(Queue& queue, bool synch){ - string name = queue.getName(); FieldTable args; - QueueDeclareOkBody::shared_ptr response = - sendAndReceiveSync<QueueDeclareOkBody>( - synch, - make_shared_ptr(new QueueDeclareBody( - version, 0, name, empty, false/*passive*/, queue.isDurable(), - queue.isExclusive(), queue.isAutoDelete(), !synch, args))); + ScopedSync s(*session, synch); + Response r = session->queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(), + queue.isExclusive(), queue.isAutoDelete(), !synch, args); + if(synch) { if(queue.getName().length() == 0) - queue.setName(response->getQueue()); + queue.setName(r.as<QueueDeclareOkBody>().getQueue()); } } void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ - //ticket, queue, ifunused, ifempty, nowait - string name = queue.getName(); - sendAndReceiveSync<QueueDeleteOkBody>( - synch, - make_shared_ptr(new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch))); + ScopedSync s(*session, synch); + session->queueDelete(0, queue.getName(), ifunused, ifempty, !synch); } void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ string e = exchange.getName(); string q = queue.getName(); - sendSync(synch, make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args))); + ScopedSync s(*session, synch); + session->queueBind(0, q, e, key, args); } void Channel::commit(){ - sendSync(false, make_shared_ptr(new TxCommitBody(version))); + session->txCommit(); } void Channel::rollback(){ - sendSync(false, make_shared_ptr(new TxRollbackBody(version))); + session->txRollback(); } void Channel::close() @@ -141,43 +144,14 @@ void Channel::close() Mutex::ScopedLock l(lock); if (connection); { - connection->released(session); + session.reset(); + sessionCore.reset(); connection.reset(); } } stop(); } -AMQMethodBody::shared_ptr Channel::sendAndReceive(AMQMethodBody::shared_ptr toSend, ClassId /*c*/, MethodId /*m*/) -{ - session->setSync(true); - Response r = session->send(toSend, true); - session->setSync(false); - return r.getPtr(); -} - -void Channel::sendSync(bool sync, AMQMethodBody::shared_ptr command) -{ - if(sync) { - session->setSync(true); - session->send(command, false); - session->setSync(false); - } else { - session->send(command); - } -} - -AMQMethodBody::shared_ptr Channel::sendAndReceiveSync( - bool sync, AMQMethodBody::shared_ptr body, ClassId c, MethodId m) -{ - if(sync) - return sendAndReceive(body, c, m); - else { - session->send(body); - return AMQMethodBody::shared_ptr(); - } -} - void Channel::consume( Queue& queue, const std::string& tag, MessageListener* listener, AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { @@ -195,12 +169,10 @@ void Channel::consume( c.ackMode = ackMode; c.lastDeliveryTag = 0; } - sendAndReceiveSync<BasicConsumeOkBody>( - synch, - make_shared_ptr(new BasicConsumeBody( - version, 0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable()))); + ScopedSync s(*session, synch); + session->basicConsume(0, queue.getName(), tag, noLocal, + ackMode == NO_ACK, false, !synch, + fields ? *fields : FieldTable()); } void Channel::cancel(const std::string& tag, bool synch) { @@ -213,16 +185,13 @@ void Channel::cancel(const std::string& tag, bool synch) { c = i->second; consumers.erase(i); } - sendAndReceiveSync<BasicCancelOkBody>( - synch, make_shared_ptr(new BasicCancelBody(version, tag, !synch))); + ScopedSync s(*session, synch); + session->basicCancel(tag, !synch); } bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { - - AMQMethodBody::shared_ptr request(new BasicGetBody(version, 0, queue.getName(), ackMode)); - - Response response = session->send(request, true); - session->flush(); + Response response = session->basicGet(0, queue.getName(), ackMode == NO_ACK); + sessionCore->flush();//TODO: need to expose the ability to request completion info through session if (response.isA<BasicGetEmptyBody>()) { return false; } else { @@ -239,7 +208,7 @@ void Channel::publish(const Message& msg, const Exchange& exchange, const string e = exchange.getName(); string key = routingKey; - session->send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)), msg, false); + session->basicPublish(0, e, key, mandatory, immediate, msg); } void Channel::start(){ @@ -248,7 +217,7 @@ void Channel::start(){ } void Channel::stop() { - session->stop(); + //session->stop(); gets.close(); join(); } diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h index bf5e2aa0d9..98e04db109 100644 --- a/cpp/src/qpid/client/ClientChannel.h +++ b/cpp/src/qpid/client/ClientChannel.h @@ -21,13 +21,14 @@ * under the License. * */ +#include <memory> #include <boost/scoped_ptr.hpp> #include "qpid/framing/amqp_framing.h" #include "ClientExchange.h" #include "ClientMessage.h" #include "ClientQueue.h" #include "ConnectionImpl.h" -#include "SessionCore.h" +#include "Session.h" #include "qpid/Exception.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Runnable.h" @@ -58,9 +59,6 @@ class ReturnedMessageHandler; class Channel : private sys::Runnable { private: - struct UnknownMethod {}; - typedef shared_ptr<framing::AMQMethodBody> MethodPtr; - struct Consumer{ MessageListener* listener; AckMode ackMode; @@ -81,40 +79,14 @@ class Channel : private sys::Runnable ConsumerMap consumers; ConnectionImpl::shared_ptr connection; - SessionCore::shared_ptr session; + std::auto_ptr<Session> session; + SessionCore::shared_ptr sessionCore; framing::ChannelId channelId; BlockingQueue<ReceivedContent::shared_ptr> gets; void stop(); void setQos(); - - framing::AMQMethodBody::shared_ptr sendAndReceive( - framing::AMQMethodBody::shared_ptr, - framing::ClassId = 0, framing::MethodId = 0); - - framing::AMQMethodBody::shared_ptr sendAndReceiveSync( - bool sync, - framing::AMQMethodBody::shared_ptr, - framing::ClassId, framing::MethodId); - - void sendSync(bool sync, framing::AMQMethodBody::shared_ptr body); - - - template <class BodyType> - boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) { - return boost::shared_polymorphic_downcast<BodyType>( - sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID)); - } - - template <class BodyType> - boost::shared_ptr<BodyType> sendAndReceiveSync( - bool sync, framing::AMQMethodBody::shared_ptr body) { - return boost::shared_polymorphic_downcast<BodyType>( - sendAndReceiveSync( - sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID)); - } - void open(ConnectionImpl::shared_ptr, SessionCore::shared_ptr); void closeInternal(); void join(); diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 47c01f2d67..e63ac69da6 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -47,10 +47,9 @@ void ConnectionImpl::allocated(SessionCore::shared_ptr session) void ConnectionImpl::released(SessionCore::shared_ptr session) { SessionMap::iterator i = sessions.find(session->getId()); - if (i == sessions.end()) { - throw Exception("Id not in use."); + if (i != sessions.end()) { + sessions.erase(i); } - sessions.erase(i); } void ConnectionImpl::handle(framing::AMQFrame& frame) diff --git a/cpp/src/qpid/client/Response.h b/cpp/src/qpid/client/Response.h index 425d78e7cd..745d4648ad 100644 --- a/cpp/src/qpid/client/Response.h +++ b/cpp/src/qpid/client/Response.h @@ -39,7 +39,7 @@ public: template <class T> T& as() { framing::AMQMethodBody::shared_ptr response(future->getResponse()); - return boost::shared_polymorphic_cast<T>(*response); + return dynamic_cast<T&>(*response); } template <class T> bool isA() { @@ -51,12 +51,6 @@ public: { return future->waitForCompletion(); } - - //TODO: only exposed for old channel class, may want to hide this eventually - framing::AMQMethodBody::shared_ptr getPtr() - { - return future->getResponse(); - } }; }} |