diff options
Diffstat (limited to 'cpp/src/qpid/client/SessionImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 74 |
1 files changed, 1 insertions, 73 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 7cf4ef648e..9ac5323a53 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -62,7 +62,6 @@ SessionImpl::SessionImpl(const std::string& name, boost::shared_ptr<ConnectionIm proxy(ioHandler), nextIn(0), nextOut(0), - sendMsgCredit(0), doClearDeliveryPropertiesExchange(true), autoDetach(true) { @@ -83,7 +82,6 @@ SessionImpl::~SessionImpl() { handleClosed(); state.waitWaiters(); } - delete sendMsgCredit; } connection->erase(channel); } @@ -373,10 +371,6 @@ void SessionImpl::sendRawFrame(AMQFrame& frame) { Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content) { - // Only message transfers have content - if (content && sendMsgCredit) { - sendMsgCredit->acquire(); - } Acquire a(sendLock); SequenceNumber id = nextOut++; { @@ -480,9 +474,7 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread //make sure the command id sequence and completion //tracking takes account of execution commands Lock l(state); - completedIn.add(nextIn++); - } else if (invoke(static_cast<MessageHandler&>(*this), *frame.getBody())) { - ; + completedIn.add(nextIn++); } else { //if not handled by this class, its for the application: deliver(frame); @@ -692,70 +684,6 @@ void SessionImpl::exception(uint16_t errorCode, setTimeout(0); } -// Message methods: -void SessionImpl::accept(const qpid::framing::SequenceSet&) -{ -} - -void SessionImpl::reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&) -{ -} - -void SessionImpl::release(const qpid::framing::SequenceSet&, bool) -{ -} - -MessageResumeResult SessionImpl::resume(const std::string&, const std::string&) -{ - throw NotImplementedException("resuming transfers not yet supported"); -} - -namespace { - const std::string QPID_SESSION_DEST = ""; - const uint8_t FLOW_MODE_CREDIT = 0; - const uint8_t CREDIT_MODE_MSG = 0; -} - -void SessionImpl::setFlowMode(const std::string& dest, uint8_t flowMode) -{ - if ( dest != QPID_SESSION_DEST ) { - QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest); - return; - } - - if ( flowMode != FLOW_MODE_CREDIT ) { - throw NotImplementedException("window flow control mode not supported by producer"); - } - Lock l(state); - sendMsgCredit = new sys::Semaphore(0); -} - -void SessionImpl::flow(const std::string& dest, uint8_t mode, uint32_t credit) -{ - if ( dest != QPID_SESSION_DEST ) { - QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest); - return; - } - - if ( mode != CREDIT_MODE_MSG ) { - return; - } - if (sendMsgCredit) { - sendMsgCredit->release(credit); - } -} - -void SessionImpl::stop(const std::string& dest) -{ - if ( dest != QPID_SESSION_DEST ) { - QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest); - return; - } - if (sendMsgCredit) { - sendMsgCredit->forceLock(); - } -} - //private utility methods: inline void SessionImpl::setState(State s) //call with lock held |