diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/Channel.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/client/Channel.cpp | 268 |
1 files changed, 0 insertions, 268 deletions
diff --git a/qpid/cpp/src/qpid/client/Channel.cpp b/qpid/cpp/src/qpid/client/Channel.cpp deleted file mode 100644 index 4af69c8552..0000000000 --- a/qpid/cpp/src/qpid/client/Channel.cpp +++ /dev/null @@ -1,268 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/log/Statement.h" -#include <iostream> -#include <sstream> -#include "Channel.h" -#include "qpid/sys/Monitor.h" -#include "Message.h" -#include "Connection.h" -#include "Demux.h" -#include "FutureResponse.h" -#include "MessageListener.h" -#include "MessageQueue.h" -#include <boost/format.hpp> -#include <boost/bind.hpp> -#include "qpid/framing/all_method_bodies.h" - -using namespace std; -using namespace boost; -using namespace qpid::framing; -using namespace qpid::sys; - -namespace qpid{ -namespace client{ -using namespace arg; - -const std::string empty; - -class ScopedSync -{ - Session& session; - public: - ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); } - ~ScopedSync() { session.setSynchronous(false); } -}; - -Channel::Channel(bool _transactional, u_int16_t _prefetch) : - prefetch(_prefetch), transactional(_transactional), running(false), - uniqueId(true)/*could eventually be the session id*/, nameCounter(0), active(false) -{ -} - -Channel::~Channel() -{ - join(); -} - -void Channel::open(const Session& s) -{ - Mutex::ScopedLock l(stopLock); - if (isOpen()) - throw ChannelBusyException(); - active = true; - session = s; - if(isTransactional()) { - session.txSelect(); - } -} - -bool Channel::isOpen() const { - Mutex::ScopedLock l(stopLock); - return active; -} - -void Channel::setPrefetch(uint32_t _prefetch){ - prefetch = _prefetch; -} - -void Channel::declareExchange(Exchange& _exchange, bool synch){ - ScopedSync s(session, synch); - session.exchangeDeclare(exchange=_exchange.getName(), type=_exchange.getType()); -} - -void Channel::deleteExchange(Exchange& _exchange, bool synch){ - ScopedSync s(session, synch); - session.exchangeDelete(exchange=_exchange.getName(), ifUnused=false); -} - -void Channel::declareQueue(Queue& _queue, bool synch){ - if (_queue.getName().empty()) { - stringstream uniqueName; - uniqueName << uniqueId << "-queue-" << ++nameCounter; - _queue.setName(uniqueName.str()); - } - - ScopedSync s(session, synch); - session.queueDeclare(queue=_queue.getName(), passive=false/*passive*/, durable=_queue.isDurable(), - exclusive=_queue.isExclusive(), autoDelete=_queue.isAutoDelete()); - -} - -void Channel::deleteQueue(Queue& _queue, bool ifunused, bool ifempty, bool synch){ - ScopedSync s(session, synch); - session.queueDelete(queue=_queue.getName(), ifUnused=ifunused, ifEmpty=ifempty); -} - -void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ - string e = exchange.getName(); - string q = queue.getName(); - ScopedSync s(session, synch); - session.queueBind(0, q, e, key, args); -} - -void Channel::commit(){ - session.txCommit(); -} - -void Channel::rollback(){ - session.txRollback(); -} - -void Channel::consume( - Queue& _queue, const std::string& tag, MessageListener* listener, - AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { - - if (tag.empty()) { - throw Exception("A tag must be specified for a consumer."); - } - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i != consumers.end()) - throw NotAllowedException(QPID_MSG("Consumer already exists with tag " << tag )); - Consumer& c = consumers[tag]; - c.listener = listener; - c.ackMode = ackMode; - c.count = 0; - } - uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1; - ScopedSync s(session, synch); - session.messageSubscribe(0, _queue.getName(), tag, noLocal, - confirmMode, 0/*pre-acquire*/, - false, fields ? *fields : FieldTable()); - if (!prefetch) { - session.messageFlowMode(tag, 0/*credit based*/); - } - - //allocate some credit: - session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); - session.messageFlow(tag, 0/*MESSAGES*/, prefetch ? prefetch : 0xFFFFFFFF); -} - -void Channel::cancel(const std::string& tag, bool synch) { - Consumer c; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i == consumers.end()) - return; - c = i->second; - consumers.erase(i); - } - ScopedSync s(session, synch); - session.messageCancel(tag); -} - -bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { - string tag = "get-handler"; - ScopedDivert handler(tag, session.getExecution().getDemux()); - Demux::QueuePtr incoming = handler.getQueue(); - - session.messageSubscribe(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)); - session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); - session.messageFlow(tag, 0/*MESSAGES*/, 1); - Completion status = session.messageFlush(tag); - status.sync(); - session.messageCancel(tag); - - FrameSet::shared_ptr p; - if (incoming->tryPop(p)) { - msg.populate(*p); - if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true); - return true; - } - else - return false; -} - -void Channel::publish(Message& msg, const Exchange& exchange, - const std::string& routingKey, - bool mandatory, bool /*?TODO-restore immediate?*/) { - - msg.getDeliveryProperties().setRoutingKey(routingKey); - msg.getDeliveryProperties().setDiscardUnroutable(!mandatory); - session.messageTransfer(destination=exchange.getName(), content=msg); -} - -void Channel::close() -{ - session.close(); - { - Mutex::ScopedLock l(stopLock); - active = false; - } - stop(); -} - -void Channel::start(){ - running = true; - dispatcher = Thread(*this); -} - -void Channel::stop() { - gets.close(); - join(); -} - -void Channel::join() { - Mutex::ScopedLock l(stopLock); - if(running && dispatcher.id()) { - dispatcher.join(); - running = false; - } -} - -void Channel::dispatch(FrameSet& content, const std::string& destination) -{ - ConsumerMap::iterator i = consumers.find(destination); - if (i != consumers.end()) { - Message msg; - msg.populate(content); - MessageListener* listener = i->second.listener; - listener->received(msg); - if (isOpen() && i->second.ackMode != CLIENT_ACK) { - bool send = i->second.ackMode == AUTO_ACK - || (prefetch && ++(i->second.count) > (prefetch / 2)); - if (send) i->second.count = 0; - session.getExecution().completed(content.getId(), true, send); - } - } else { - QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); - } -} - -void Channel::run() { - try { - while (true) { - FrameSet::shared_ptr content = session.get(); - //need to dispatch this to the relevant listener: - if (content->isA<MessageTransferBody>()) { - dispatch(*content, content->as<MessageTransferBody>()->getDestination()); - } else { - QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod()); - } - } - } catch (const ClosedException&) {} -} - -}} - |