diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2013-01-30 19:47:54 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2013-01-30 19:47:54 +0000 |
| commit | 686d2cd2454c6f8c718731824613e43fd690bb0d (patch) | |
| tree | 81b7b051c608c673d33728fd92b45b8ffcda08a0 /cpp/src/qpid/sys | |
| parent | 9c6950b9bce6ef88bc5591cb0e3c9810a1c68f82 (diff) | |
| download | qpid-python-686d2cd2454c6f8c718731824613e43fd690bb0d.tar.gz | |
QPID-4514: Remove IO readCredit throttling only used by removed cluster code
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1440616 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
| -rw-r--r-- | cpp/src/qpid/sys/AggregateOutput.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AggregateOutput.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.cpp | 38 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/OutputControl.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/RdmaIOPlugin.cpp | 5 |
7 files changed, 1 insertions, 51 deletions
diff --git a/cpp/src/qpid/sys/AggregateOutput.cpp b/cpp/src/qpid/sys/AggregateOutput.cpp index ff9c740926..ebc5689ce5 100644 --- a/cpp/src/qpid/sys/AggregateOutput.cpp +++ b/cpp/src/qpid/sys/AggregateOutput.cpp @@ -32,8 +32,6 @@ void AggregateOutput::abort() { control.abort(); } void AggregateOutput::activateOutput() { control.activateOutput(); } -void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); } - namespace { // Clear the busy flag and notify waiting threads in destructor. struct ScopedBusy { diff --git a/cpp/src/qpid/sys/AggregateOutput.h b/cpp/src/qpid/sys/AggregateOutput.h index 802722ad26..e9dbd5a4cc 100644 --- a/cpp/src/qpid/sys/AggregateOutput.h +++ b/cpp/src/qpid/sys/AggregateOutput.h @@ -59,7 +59,6 @@ class QPID_COMMON_CLASS_EXTERN AggregateOutput : public OutputTask, public Outpu // These may be called concurrently with any function. QPID_COMMON_EXTERN void abort(); QPID_COMMON_EXTERN void activateOutput(); - QPID_COMMON_EXTERN void giveReadCredit(int32_t); QPID_COMMON_EXTERN void addOutputTask(OutputTask* t); // These functions must not be called concurrently with each other. diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index 0225b11d27..13c71e301b 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -59,8 +59,7 @@ AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory reads(0), readError(false), isClient(isClient0), - nodict(nodict0), - readCredit(InfiniteCredit) + nodict(nodict0) {} AsynchIOHandler::~AsynchIOHandler() { @@ -104,21 +103,6 @@ void AsynchIOHandler::activateOutput() { aio->notifyPendingWrite(); } -// Input side -void AsynchIOHandler::giveReadCredit(int32_t credit) { - // Check whether we started in the don't about credit state - if (readCredit.boolCompareAndSwap(InfiniteCredit, credit)) - return; - // TODO In theory should be able to use an atomic operation before taking the lock - // but in practice there seems to be an unexplained race in that case - ScopedLock<Mutex> l(creditLock); - if (readCredit.fetchAndAdd(credit) != 0) - return; - assert(readCredit.get() >= 0); - if (readCredit.get() != 0) - aio->startReading(); -} - namespace { SecuritySettings getSecuritySettings(AsynchIO* aio, bool nodict) { @@ -133,26 +117,6 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { return; } - // Check here for read credit - if (readCredit.get() != InfiniteCredit) { - if (readCredit.get() == 0) { - // FIXME aconway 2009-10-01: Workaround to avoid "false wakeups". - // readbuff is sometimes called with no credit. - // This should be fixed somewhere else to avoid such calls. - aio->unread(buff); - return; - } - // TODO In theory should be able to use an atomic operation before taking the lock - // but in practice there seems to be an unexplained race in that case - ScopedLock<Mutex> l(creditLock); - if (--readCredit == 0) { - assert(readCredit.get() >= 0); - if (readCredit.get() == 0) { - aio->stopReading(); - } - } - } - ++reads; size_t decoded = 0; if (codec) { // Already initiated diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h index 91ddb022af..d93e24fd4c 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/cpp/src/qpid/sys/AsynchIOHandler.h @@ -52,9 +52,6 @@ class AsynchIOHandler : public OutputControl { bool readError; bool isClient; bool nodict; - AtomicValue<int32_t> readCredit; - static const int32_t InfiniteCredit = -1; - Mutex creditLock; boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask; void write(const framing::ProtocolInitiation&); @@ -67,7 +64,6 @@ class AsynchIOHandler : public OutputControl { // Output side QPID_COMMON_EXTERN void abort(); QPID_COMMON_EXTERN void activateOutput(); - QPID_COMMON_EXTERN void giveReadCredit(int32_t credit); // Input side QPID_COMMON_EXTERN void readbuff(AsynchIO& aio, AsynchIOBufferBase* buff); diff --git a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h index 95a08d15ae..53d56ad716 100644 --- a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h +++ b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h @@ -45,7 +45,6 @@ class ConnectionOutputHandlerPtr : public ConnectionOutputHandler size_t getBuffered() const { return next->getBuffered(); } void abort() { next->abort(); } void activateOutput() { next->activateOutput(); } - void giveReadCredit(int32_t credit) { next->giveReadCredit(credit); } void send(framing::AMQFrame& f) { next->send(f); } private: diff --git a/cpp/src/qpid/sys/OutputControl.h b/cpp/src/qpid/sys/OutputControl.h index ad5caa3168..0d801e9d16 100644 --- a/cpp/src/qpid/sys/OutputControl.h +++ b/cpp/src/qpid/sys/OutputControl.h @@ -33,7 +33,6 @@ namespace sys { virtual ~OutputControl() {} virtual void abort() = 0; virtual void activateOutput() = 0; - virtual void giveReadCredit(int32_t credit) = 0; }; } diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp index e1f4362d64..51cc0ed109 100644 --- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -68,7 +68,6 @@ class RdmaIOHandler : public OutputControl { void close(); void abort(); void activateOutput(); - void giveReadCredit(int32_t credit); void initProtocolOut(); // Input side @@ -200,10 +199,6 @@ void RdmaIOHandler::full(Rdma::AsynchIO&) { QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]"); } -// TODO: Dummy implementation of read throttling -void RdmaIOHandler::giveReadCredit(int32_t) { -} - // The logic here is subtly different from TCP as RDMA is message oriented // so we define that an RDMA message is a frame - in this case there is no putting back // of any message remainder - there shouldn't be any. And what we read here can't be |
