summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp109
-rw-r--r--cpp/src/qpid/client/ClientChannel.h36
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp5
-rw-r--r--cpp/src/qpid/client/Response.h8
4 files changed, 46 insertions, 112 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index 424ff97ea1..3cf0373b7f 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -46,6 +46,14 @@ namespace client{
const std::string empty;
+class ScopedSync
+{
+ Session& session;
+public:
+ ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); }
+ ~ScopedSync() { session.setSynchronous(false); }
+};
+
}}
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
@@ -64,7 +72,8 @@ void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s)
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
connection = c;
- session = s;
+ sessionCore = s;
+ session = auto_ptr<Session>(new Session(c, s));
}
bool Channel::isOpen() const {
@@ -73,10 +82,10 @@ bool Channel::isOpen() const {
}
void Channel::setQos() {
- sendSync(false, make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false)));
+ session->basicQos(0, getPrefetch(), false);
if(isTransactional()) {
//I think this is wrong! should only send TxSelect once...
- sendSync(false, make_shared_ptr(new TxSelectBody(version)));
+ session->txSelect();
}
}
@@ -86,52 +95,46 @@ void Channel::setPrefetch(uint16_t _prefetch){
}
void Channel::declareExchange(Exchange& exchange, bool synch){
- string name = exchange.getName();
- string type = exchange.getType();
FieldTable args;
- sendSync(synch, make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args)));
+ ScopedSync s(*session, synch);
+ session->exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args);
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
- string name = exchange.getName();
- sendSync(synch, make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false)));
+ ScopedSync s(*session, synch);
+ session->exchangeDelete(0, exchange.getName(), false);
}
void Channel::declareQueue(Queue& queue, bool synch){
- string name = queue.getName();
FieldTable args;
- QueueDeclareOkBody::shared_ptr response =
- sendAndReceiveSync<QueueDeclareOkBody>(
- synch,
- make_shared_ptr(new QueueDeclareBody(
- version, 0, name, empty, false/*passive*/, queue.isDurable(),
- queue.isExclusive(), queue.isAutoDelete(), !synch, args)));
+ ScopedSync s(*session, synch);
+ Response r = session->queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(),
+ queue.isExclusive(), queue.isAutoDelete(), !synch, args);
+
if(synch) {
if(queue.getName().length() == 0)
- queue.setName(response->getQueue());
+ queue.setName(r.as<QueueDeclareOkBody>().getQueue());
}
}
void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
- //ticket, queue, ifunused, ifempty, nowait
- string name = queue.getName();
- sendAndReceiveSync<QueueDeleteOkBody>(
- synch,
- make_shared_ptr(new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)));
+ ScopedSync s(*session, synch);
+ session->queueDelete(0, queue.getName(), ifunused, ifempty, !synch);
}
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- sendSync(synch, make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args)));
+ ScopedSync s(*session, synch);
+ session->queueBind(0, q, e, key, args);
}
void Channel::commit(){
- sendSync(false, make_shared_ptr(new TxCommitBody(version)));
+ session->txCommit();
}
void Channel::rollback(){
- sendSync(false, make_shared_ptr(new TxRollbackBody(version)));
+ session->txRollback();
}
void Channel::close()
@@ -141,43 +144,14 @@ void Channel::close()
Mutex::ScopedLock l(lock);
if (connection);
{
- connection->released(session);
+ session.reset();
+ sessionCore.reset();
connection.reset();
}
}
stop();
}
-AMQMethodBody::shared_ptr Channel::sendAndReceive(AMQMethodBody::shared_ptr toSend, ClassId /*c*/, MethodId /*m*/)
-{
- session->setSync(true);
- Response r = session->send(toSend, true);
- session->setSync(false);
- return r.getPtr();
-}
-
-void Channel::sendSync(bool sync, AMQMethodBody::shared_ptr command)
-{
- if(sync) {
- session->setSync(true);
- session->send(command, false);
- session->setSync(false);
- } else {
- session->send(command);
- }
-}
-
-AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
- bool sync, AMQMethodBody::shared_ptr body, ClassId c, MethodId m)
-{
- if(sync)
- return sendAndReceive(body, c, m);
- else {
- session->send(body);
- return AMQMethodBody::shared_ptr();
- }
-}
-
void Channel::consume(
Queue& queue, const std::string& tag, MessageListener* listener,
AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
@@ -195,12 +169,10 @@ void Channel::consume(
c.ackMode = ackMode;
c.lastDeliveryTag = 0;
}
- sendAndReceiveSync<BasicConsumeOkBody>(
- synch,
- make_shared_ptr(new BasicConsumeBody(
- version, 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable())));
+ ScopedSync s(*session, synch);
+ session->basicConsume(0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, !synch,
+ fields ? *fields : FieldTable());
}
void Channel::cancel(const std::string& tag, bool synch) {
@@ -213,16 +185,13 @@ void Channel::cancel(const std::string& tag, bool synch) {
c = i->second;
consumers.erase(i);
}
- sendAndReceiveSync<BasicCancelOkBody>(
- synch, make_shared_ptr(new BasicCancelBody(version, tag, !synch)));
+ ScopedSync s(*session, synch);
+ session->basicCancel(tag, !synch);
}
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
-
- AMQMethodBody::shared_ptr request(new BasicGetBody(version, 0, queue.getName(), ackMode));
-
- Response response = session->send(request, true);
- session->flush();
+ Response response = session->basicGet(0, queue.getName(), ackMode == NO_ACK);
+ sessionCore->flush();//TODO: need to expose the ability to request completion info through session
if (response.isA<BasicGetEmptyBody>()) {
return false;
} else {
@@ -239,7 +208,7 @@ void Channel::publish(const Message& msg, const Exchange& exchange,
const string e = exchange.getName();
string key = routingKey;
- session->send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)), msg, false);
+ session->basicPublish(0, e, key, mandatory, immediate, msg);
}
void Channel::start(){
@@ -248,7 +217,7 @@ void Channel::start(){
}
void Channel::stop() {
- session->stop();
+ //session->stop();
gets.close();
join();
}
diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h
index bf5e2aa0d9..98e04db109 100644
--- a/cpp/src/qpid/client/ClientChannel.h
+++ b/cpp/src/qpid/client/ClientChannel.h
@@ -21,13 +21,14 @@
* under the License.
*
*/
+#include <memory>
#include <boost/scoped_ptr.hpp>
#include "qpid/framing/amqp_framing.h"
#include "ClientExchange.h"
#include "ClientMessage.h"
#include "ClientQueue.h"
#include "ConnectionImpl.h"
-#include "SessionCore.h"
+#include "Session.h"
#include "qpid/Exception.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Runnable.h"
@@ -58,9 +59,6 @@ class ReturnedMessageHandler;
class Channel : private sys::Runnable
{
private:
- struct UnknownMethod {};
- typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
-
struct Consumer{
MessageListener* listener;
AckMode ackMode;
@@ -81,40 +79,14 @@ class Channel : private sys::Runnable
ConsumerMap consumers;
ConnectionImpl::shared_ptr connection;
- SessionCore::shared_ptr session;
+ std::auto_ptr<Session> session;
+ SessionCore::shared_ptr sessionCore;
framing::ChannelId channelId;
BlockingQueue<ReceivedContent::shared_ptr> gets;
void stop();
void setQos();
-
- framing::AMQMethodBody::shared_ptr sendAndReceive(
- framing::AMQMethodBody::shared_ptr,
- framing::ClassId = 0, framing::MethodId = 0);
-
- framing::AMQMethodBody::shared_ptr sendAndReceiveSync(
- bool sync,
- framing::AMQMethodBody::shared_ptr,
- framing::ClassId, framing::MethodId);
-
- void sendSync(bool sync, framing::AMQMethodBody::shared_ptr body);
-
-
- template <class BodyType>
- boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) {
- return boost::shared_polymorphic_downcast<BodyType>(
- sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID));
- }
-
- template <class BodyType>
- boost::shared_ptr<BodyType> sendAndReceiveSync(
- bool sync, framing::AMQMethodBody::shared_ptr body) {
- return boost::shared_polymorphic_downcast<BodyType>(
- sendAndReceiveSync(
- sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
- }
-
void open(ConnectionImpl::shared_ptr, SessionCore::shared_ptr);
void closeInternal();
void join();
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index 47c01f2d67..e63ac69da6 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -47,10 +47,9 @@ void ConnectionImpl::allocated(SessionCore::shared_ptr session)
void ConnectionImpl::released(SessionCore::shared_ptr session)
{
SessionMap::iterator i = sessions.find(session->getId());
- if (i == sessions.end()) {
- throw Exception("Id not in use.");
+ if (i != sessions.end()) {
+ sessions.erase(i);
}
- sessions.erase(i);
}
void ConnectionImpl::handle(framing::AMQFrame& frame)
diff --git a/cpp/src/qpid/client/Response.h b/cpp/src/qpid/client/Response.h
index 425d78e7cd..745d4648ad 100644
--- a/cpp/src/qpid/client/Response.h
+++ b/cpp/src/qpid/client/Response.h
@@ -39,7 +39,7 @@ public:
template <class T> T& as()
{
framing::AMQMethodBody::shared_ptr response(future->getResponse());
- return boost::shared_polymorphic_cast<T>(*response);
+ return dynamic_cast<T&>(*response);
}
template <class T> bool isA()
{
@@ -51,12 +51,6 @@ public:
{
return future->waitForCompletion();
}
-
- //TODO: only exposed for old channel class, may want to hide this eventually
- framing::AMQMethodBody::shared_ptr getPtr()
- {
- return future->getResponse();
- }
};
}}