summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ClientChannel.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-08-15 17:26:43 +0000
committerGordon Sim <gsim@apache.org>2007-08-15 17:26:43 +0000
commit3f2ac50fdb042c2c48ebbdc1e70e442f0bf1ab86 (patch)
tree11a8838ed371c23f57391b4a212c57ddb44051ff /cpp/src/qpid/client/ClientChannel.cpp
parentfb26cfb87668cd7b87cf7cdea2ca1f8c367de1a2 (diff)
downloadqpid-python-3f2ac50fdb042c2c48ebbdc1e70e442f0bf1ab86.tar.gz
Altered old client channel to use new generated session interface (primarily to reduce the number of places where method bodies are constructed).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566274 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp109
1 files changed, 39 insertions, 70 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index 424ff97ea1..3cf0373b7f 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -46,6 +46,14 @@ namespace client{
const std::string empty;
+class ScopedSync
+{
+ Session& session;
+public:
+ ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); }
+ ~ScopedSync() { session.setSynchronous(false); }
+};
+
}}
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
@@ -64,7 +72,8 @@ void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s)
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
connection = c;
- session = s;
+ sessionCore = s;
+ session = auto_ptr<Session>(new Session(c, s));
}
bool Channel::isOpen() const {
@@ -73,10 +82,10 @@ bool Channel::isOpen() const {
}
void Channel::setQos() {
- sendSync(false, make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false)));
+ session->basicQos(0, getPrefetch(), false);
if(isTransactional()) {
//I think this is wrong! should only send TxSelect once...
- sendSync(false, make_shared_ptr(new TxSelectBody(version)));
+ session->txSelect();
}
}
@@ -86,52 +95,46 @@ void Channel::setPrefetch(uint16_t _prefetch){
}
void Channel::declareExchange(Exchange& exchange, bool synch){
- string name = exchange.getName();
- string type = exchange.getType();
FieldTable args;
- sendSync(synch, make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args)));
+ ScopedSync s(*session, synch);
+ session->exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args);
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
- string name = exchange.getName();
- sendSync(synch, make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false)));
+ ScopedSync s(*session, synch);
+ session->exchangeDelete(0, exchange.getName(), false);
}
void Channel::declareQueue(Queue& queue, bool synch){
- string name = queue.getName();
FieldTable args;
- QueueDeclareOkBody::shared_ptr response =
- sendAndReceiveSync<QueueDeclareOkBody>(
- synch,
- make_shared_ptr(new QueueDeclareBody(
- version, 0, name, empty, false/*passive*/, queue.isDurable(),
- queue.isExclusive(), queue.isAutoDelete(), !synch, args)));
+ ScopedSync s(*session, synch);
+ Response r = session->queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(),
+ queue.isExclusive(), queue.isAutoDelete(), !synch, args);
+
if(synch) {
if(queue.getName().length() == 0)
- queue.setName(response->getQueue());
+ queue.setName(r.as<QueueDeclareOkBody>().getQueue());
}
}
void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
- //ticket, queue, ifunused, ifempty, nowait
- string name = queue.getName();
- sendAndReceiveSync<QueueDeleteOkBody>(
- synch,
- make_shared_ptr(new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)));
+ ScopedSync s(*session, synch);
+ session->queueDelete(0, queue.getName(), ifunused, ifempty, !synch);
}
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- sendSync(synch, make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args)));
+ ScopedSync s(*session, synch);
+ session->queueBind(0, q, e, key, args);
}
void Channel::commit(){
- sendSync(false, make_shared_ptr(new TxCommitBody(version)));
+ session->txCommit();
}
void Channel::rollback(){
- sendSync(false, make_shared_ptr(new TxRollbackBody(version)));
+ session->txRollback();
}
void Channel::close()
@@ -141,43 +144,14 @@ void Channel::close()
Mutex::ScopedLock l(lock);
if (connection);
{
- connection->released(session);
+ session.reset();
+ sessionCore.reset();
connection.reset();
}
}
stop();
}
-AMQMethodBody::shared_ptr Channel::sendAndReceive(AMQMethodBody::shared_ptr toSend, ClassId /*c*/, MethodId /*m*/)
-{
- session->setSync(true);
- Response r = session->send(toSend, true);
- session->setSync(false);
- return r.getPtr();
-}
-
-void Channel::sendSync(bool sync, AMQMethodBody::shared_ptr command)
-{
- if(sync) {
- session->setSync(true);
- session->send(command, false);
- session->setSync(false);
- } else {
- session->send(command);
- }
-}
-
-AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
- bool sync, AMQMethodBody::shared_ptr body, ClassId c, MethodId m)
-{
- if(sync)
- return sendAndReceive(body, c, m);
- else {
- session->send(body);
- return AMQMethodBody::shared_ptr();
- }
-}
-
void Channel::consume(
Queue& queue, const std::string& tag, MessageListener* listener,
AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
@@ -195,12 +169,10 @@ void Channel::consume(
c.ackMode = ackMode;
c.lastDeliveryTag = 0;
}
- sendAndReceiveSync<BasicConsumeOkBody>(
- synch,
- make_shared_ptr(new BasicConsumeBody(
- version, 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable())));
+ ScopedSync s(*session, synch);
+ session->basicConsume(0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, !synch,
+ fields ? *fields : FieldTable());
}
void Channel::cancel(const std::string& tag, bool synch) {
@@ -213,16 +185,13 @@ void Channel::cancel(const std::string& tag, bool synch) {
c = i->second;
consumers.erase(i);
}
- sendAndReceiveSync<BasicCancelOkBody>(
- synch, make_shared_ptr(new BasicCancelBody(version, tag, !synch)));
+ ScopedSync s(*session, synch);
+ session->basicCancel(tag, !synch);
}
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
-
- AMQMethodBody::shared_ptr request(new BasicGetBody(version, 0, queue.getName(), ackMode));
-
- Response response = session->send(request, true);
- session->flush();
+ Response response = session->basicGet(0, queue.getName(), ackMode == NO_ACK);
+ sessionCore->flush();//TODO: need to expose the ability to request completion info through session
if (response.isA<BasicGetEmptyBody>()) {
return false;
} else {
@@ -239,7 +208,7 @@ void Channel::publish(const Message& msg, const Exchange& exchange,
const string e = exchange.getName();
string key = routingKey;
- session->send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)), msg, false);
+ session->basicPublish(0, e, key, mandatory, immediate, msg);
}
void Channel::start(){
@@ -248,7 +217,7 @@ void Channel::start(){
}
void Channel::stop() {
- session->stop();
+ //session->stop();
gets.close();
join();
}