summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ClientChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp65
1 files changed, 30 insertions, 35 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index cc2b7aedc8..1a0fd25bc3 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -56,7 +56,7 @@ class ScopedSync
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
prefetch(_prefetch), transactional(_transactional), running(false),
- uniqueId(true)/*could eventually be the session id*/, nameCounter(0)
+ uniqueId(true)/*could eventually be the session id*/, nameCounter(0), active(false)
{
}
@@ -65,26 +65,25 @@ Channel::~Channel()
join();
}
-void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s)
+void Channel::open(const Session& s)
{
+ Mutex::ScopedLock l(lock);
if (isOpen())
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
-
- connection = c;
- sessionCore = s;
- session = auto_ptr<Session>(new Session(c, s));
+ active = true;
+ session = s;
}
bool Channel::isOpen() const {
Mutex::ScopedLock l(lock);
- return connection;
+ return active;
}
void Channel::setQos() {
- session->basicQos(0, getPrefetch(), false);
+ session.basicQos(0, getPrefetch(), false);
if(isTransactional()) {
//I think this is wrong! should only send TxSelect once...
- session->txSelect();
+ session.txSelect();
}
}
@@ -95,13 +94,13 @@ void Channel::setPrefetch(uint16_t _prefetch){
void Channel::declareExchange(Exchange& exchange, bool synch){
FieldTable args;
- ScopedSync s(*session, synch);
- session->exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args);
+ ScopedSync s(session, synch);
+ session.exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args);
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
- ScopedSync s(*session, synch);
- session->exchangeDelete(0, exchange.getName(), false);
+ ScopedSync s(session, synch);
+ session.exchangeDelete(0, exchange.getName(), false);
}
void Channel::declareQueue(Queue& queue, bool synch){
@@ -112,30 +111,30 @@ void Channel::declareQueue(Queue& queue, bool synch){
}
FieldTable args;
- ScopedSync s(*session, synch);
- session->queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(),
+ ScopedSync s(session, synch);
+ session.queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(),
queue.isExclusive(), queue.isAutoDelete(), args);
}
void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
- ScopedSync s(*session, synch);
- session->queueDelete(0, queue.getName(), ifunused, ifempty);
+ ScopedSync s(session, synch);
+ session.queueDelete(0, queue.getName(), ifunused, ifempty);
}
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- ScopedSync s(*session, synch);
- session->queueBind(0, q, e, key, args);
+ ScopedSync s(session, synch);
+ session.queueBind(0, q, e, key, args);
}
void Channel::commit(){
- session->txCommit();
+ session.txCommit();
}
void Channel::rollback(){
- session->txRollback();
+ session.txRollback();
}
void Channel::consume(
@@ -155,8 +154,8 @@ void Channel::consume(
c.ackMode = ackMode;
c.lastDeliveryTag = 0;
}
- ScopedSync s(*session, synch);
- session->basicConsume(0, queue.getName(), tag, noLocal,
+ ScopedSync s(session, synch);
+ session.basicConsume(0, queue.getName(), tag, noLocal,
ackMode == NO_ACK, false, !synch,
fields ? *fields : FieldTable());
}
@@ -171,13 +170,13 @@ void Channel::cancel(const std::string& tag, bool synch) {
c = i->second;
consumers.erase(i);
}
- ScopedSync s(*session, synch);
- session->basicCancel(tag);
+ ScopedSync s(session, synch);
+ session.basicCancel(tag);
}
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
- Response response = session->basicGet(0, queue.getName(), ackMode == NO_ACK);
- sessionCore->flush();//TODO: need to expose the ability to request completion info through session
+ Response response = session.basicGet(0, queue.getName(), ackMode == NO_ACK);
+ session.execution().sendFlushRequest();
if (response.isA<BasicGetEmptyBody>()) {
return false;
} else {
@@ -194,19 +193,15 @@ void Channel::publish(const Message& msg, const Exchange& exchange,
const string e = exchange.getName();
string key = routingKey;
- session->basicPublish(0, e, key, mandatory, immediate, msg);
+ session.basicPublish(0, e, key, mandatory, immediate, msg);
}
void Channel::close()
{
- session->close();
+ session.close();
{
Mutex::ScopedLock l(lock);
- if (connection);
- {
- sessionCore.reset();
- connection.reset();
- }
+ active = false;
}
stop();
}
@@ -232,7 +227,7 @@ void Channel::join() {
void Channel::run() {
try {
while (true) {
- FrameSet::shared_ptr content = session->get();
+ FrameSet::shared_ptr content = session.get();
//need to dispatch this to the relevant listener:
if (content->isA<BasicDeliverBody>()) {
ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());