diff options
author | weidai <weidai@57ff6487-cd31-0410-9ec3-f628ee90f5f0> | 2006-04-06 21:20:25 +0000 |
---|---|---|
committer | weidai <weidai@57ff6487-cd31-0410-9ec3-f628ee90f5f0> | 2006-04-06 21:20:25 +0000 |
commit | 82c4eb38804e011cfd855bc9e292f7533bfe4c2f (patch) | |
tree | 34dba126863f587607debfecab883867f5f2d3dd /network.cpp | |
parent | 4c4b05b70d06e818417f6b4f879183a2f233c91b (diff) | |
download | cryptopp-82c4eb38804e011cfd855bc9e292f7533bfe4c2f.tar.gz |
merge in changes by denis bider and fix compile on gcc 3.4.4 and MSVC 6
git-svn-id: svn://svn.code.sf.net/p/cryptopp/code/trunk/c5@219 57ff6487-cd31-0410-9ec3-f628ee90f5f0
Diffstat (limited to 'network.cpp')
-rw-r--r-- | network.cpp | 343 |
1 files changed, 303 insertions, 40 deletions
diff --git a/network.cpp b/network.cpp index 49800fd..24f5013 100644 --- a/network.cpp +++ b/network.cpp @@ -8,6 +8,133 @@ NAMESPACE_BEGIN(CryptoPP) +lword LimitedBandwidth::ComputeCurrentTransceiveLimit() +{ + if (!m_maxBytesPerSecond) + return ULONG_MAX; + + double curTime = GetCurTimeAndCleanUp(); + lword total = 0; + for (OpQueue::size_type i=0; i!=m_ops.size(); ++i) + total += m_ops[i].second; + return SaturatingSubtract(m_maxBytesPerSecond, total); +} + +double LimitedBandwidth::TimeToNextTransceive() +{ + if (!m_maxBytesPerSecond) + return 0; + + if (!m_nextTransceiveTime) + ComputeNextTransceiveTime(); + + return SaturatingSubtract(m_nextTransceiveTime, m_timer.ElapsedTimeAsDouble()); +} + +void LimitedBandwidth::NoteTransceive(lword size) +{ + if (m_maxBytesPerSecond) + { + double curTime = GetCurTimeAndCleanUp(); + m_ops.push_back(std::make_pair(curTime, size)); + m_nextTransceiveTime = 0; + } +} + +void LimitedBandwidth::ComputeNextTransceiveTime() +{ + double curTime = GetCurTimeAndCleanUp(); + lword total = 0; + for (unsigned int i=0; i!=m_ops.size(); ++i) + total += m_ops[i].second; + m_nextTransceiveTime = + (total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000; +} + +double LimitedBandwidth::GetCurTimeAndCleanUp() +{ + if (!m_maxBytesPerSecond) + return 0; + + double curTime = m_timer.ElapsedTimeAsDouble(); + while (m_ops.size() && (m_ops.front().first + 1000 < curTime)) + m_ops.pop_front(); + return curTime; +} + +void LimitedBandwidth::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack) +{ + double nextTransceiveTime = TimeToNextTransceive(); + if (nextTransceiveTime) + container.ScheduleEvent(nextTransceiveTime, CallStack("LimitedBandwidth::GetWaitObjects()", &callStack)); +} + +// ************************************************************* + +size_t NonblockingSource::GeneralPump2( + lword& byteCount, bool blockingOutput, + unsigned long maxTime, bool checkDelimiter, byte delimiter) +{ + m_blockedBySpeedLimit = false; + + if (!GetMaxBytesPerSecond()) + { + size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter); + m_doPumpBlocked = (ret != 0); + return ret; + } + + bool forever = (maxTime == INFINITE_TIME); + unsigned long timeToGo = maxTime; + Timer timer(Timer::MILLISECONDS, forever); + lword maxSize = byteCount; + byteCount = 0; + + timer.StartTimer(); + + while (true) + { + lword curMaxSize = UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount); + + if (curMaxSize || m_doPumpBlocked) + { + if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime()); + size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter); + m_doPumpBlocked = (ret != 0); + if (curMaxSize) + { + NoteTransceive(curMaxSize); + byteCount += curMaxSize; + } + if (ret) + return ret; + } + + if (maxSize != ULONG_MAX && byteCount >= maxSize) + break; + + if (!forever) + { + timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime()); + if (!timeToGo) + break; + } + + double waitTime = TimeToNextTransceive(); + if (!forever && waitTime > timeToGo) + { + m_blockedBySpeedLimit = true; + break; + } + + WaitObjectContainer container; + LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSource::GeneralPump2() - speed limit", 0)); + container.Wait((unsigned long)waitTime); + } + + return 0; +} + size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking) { if (messageCount == 0) @@ -30,10 +157,68 @@ size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blockin return 0; } +lword NonblockingSink::TimedFlush(unsigned long maxTime, size_t targetSize) +{ + m_blockedBySpeedLimit = false; + + size_t curBufSize = GetCurrentBufferSize(); + if (curBufSize <= targetSize && (targetSize || !EofPending())) + return 0; + + if (!GetMaxBytesPerSecond()) + return DoFlush(maxTime, targetSize); + + bool forever = (maxTime == INFINITE_TIME); + unsigned long timeToGo = maxTime; + Timer timer(Timer::MILLISECONDS, forever); + lword totalFlushed = 0; + + timer.StartTimer(); + + while (true) + { + size_t flushSize = UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit()); + if (flushSize || EofPending()) + { + if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime()); + size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize); + if (ret) + { + NoteTransceive(ret); + curBufSize -= ret; + totalFlushed += ret; + } + } + + if (curBufSize <= targetSize && (targetSize || !EofPending())) + break; + + if (!forever) + { + timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime()); + if (!timeToGo) + break; + } + + double waitTime = TimeToNextTransceive(); + if (!forever && waitTime > timeToGo) + { + m_blockedBySpeedLimit = true; + break; + } + + WaitObjectContainer container; + LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSink::TimedFlush() - speed limit", 0)); + container.Wait((unsigned long)waitTime); + } + + return totalFlushed; +} + bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking) { TimedFlush(blocking ? INFINITE_TIME : 0); - return hardFlush && !!GetCurrentBufferSize(); + return hardFlush && (!!GetCurrentBufferSize() || EofPending()); } // ************************************************************* @@ -47,19 +232,29 @@ NetworkSource::NetworkSource(BufferedTransformation *attachment) { } -void NetworkSource::GetWaitObjects(WaitObjectContainer &container) +unsigned int NetworkSource::GetMaxWaitObjectCount() const +{ + return LimitedBandwidth::GetMaxWaitObjectCount() + + GetReceiver().GetMaxWaitObjectCount() + + AttachedTransformation()->GetMaxWaitObjectCount(); +} + +void NetworkSource::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack) { - if (!m_outputBlocked) + if (BlockedBySpeedLimit()) + LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - speed limit", &callStack)); + else if (!m_outputBlocked) { if (m_dataBegin == m_dataEnd) - AccessReceiver().GetWaitObjects(container); + AccessReceiver().GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - no data", &callStack)); else - container.SetNoWait(); + container.SetNoWait(CallStack("NetworkSource::GetWaitObjects() - have data", &callStack)); } - AttachedTransformation()->GetWaitObjects(container); + + AttachedTransformation()->GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - attachment", &callStack)); } -size_t NetworkSource::GeneralPump2(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter) +size_t NetworkSource::DoPump(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter) { NetworkReceiver &receiver = AccessReceiver(); @@ -81,7 +276,9 @@ size_t NetworkSource::GeneralPump2(lword &byteCount, bool blockingOutput, unsign if (m_waitingForResult) { - if (receiver.MustWaitForResult() && !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()))) + if (receiver.MustWaitForResult() && + !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()), + CallStack("NetworkSource::DoPump() - wait receive result", 0))) break; unsigned int recvResult = receiver.GetReceiveResult(); @@ -100,7 +297,8 @@ size_t NetworkSource::GeneralPump2(lword &byteCount, bool blockingOutput, unsign if (receiver.MustWaitToReceive()) { - if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()))) + if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()), + CallStack("NetworkSource::DoPump() - wait receive", 0))) break; receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd); @@ -133,7 +331,8 @@ ReceiveNoWait: } else { - m_putSize = (size_t)STDMIN((lword)m_dataEnd-m_dataBegin, maxSize-byteCount); + m_putSize = UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount); + if (checkDelimiter) m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin); @@ -141,7 +340,8 @@ DoOutput: size_t result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput); if (result) { - if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()))) + if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()), + CallStack("NetworkSource::DoPump() - wait attachment", 0))) goto DoOutput; else { @@ -155,7 +355,7 @@ DoOutput: m_dataBegin += m_putSize; if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter) break; - if (byteCount == maxSize) + if (maxSize != ULONG_MAX && byteCount == maxSize) break; // once time limit is reached, return even if there is more data waiting // but make 0 a special case so caller can request a large amount of data to be @@ -172,7 +372,7 @@ DoOutput: NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound) : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound) - , m_needSendResult(false), m_wasBlocked(false) + , m_needSendResult(false), m_wasBlocked(false), m_eofState(EOF_NONE) , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0) , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0) , m_currentSpeed(0), m_maxObservedSpeed(0) @@ -192,44 +392,86 @@ float NetworkSink::ComputeCurrentSpeed() return m_currentSpeed; } +float NetworkSink::GetMaxObservedSpeed() const +{ + lword m = GetMaxBytesPerSecond(); + return m ? STDMIN(m_maxObservedSpeed, float(m)) : m_maxObservedSpeed; +} + +unsigned int NetworkSink::GetMaxWaitObjectCount() const +{ + return LimitedBandwidth::GetMaxWaitObjectCount() + GetSender().GetMaxWaitObjectCount(); +} + +void NetworkSink::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack) +{ + if (BlockedBySpeedLimit()) + LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - speed limit", &callStack)); + else if (m_wasBlocked) + AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - was blocked", &callStack)); + else if (!m_buffer.IsEmpty()) + AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - buffer not empty", &callStack)); + else if (EofPending()) + AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - EOF pending", &callStack)); +} + size_t NetworkSink::Put2(const byte *inString, size_t length, int messageEnd, bool blocking) { - if (m_skipBytes) + if (m_eofState == EOF_DONE) { - assert(length >= m_skipBytes); - inString += m_skipBytes; - length -= m_skipBytes; - } - m_buffer.LazyPut(inString, length); + if (length || messageEnd) + throw Exception(Exception::OTHER_ERROR, "NetworkSink::Put2() being called after EOF had been sent"); - if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound) - TimedFlush(0, 0); + return 0; + } - size_t targetSize = messageEnd ? 0 : m_maxBufferSize; - if (blocking) - TimedFlush(INFINITE_TIME, targetSize); + if (m_eofState > EOF_NONE) + goto EofSite; - if (m_buffer.CurrentSize() > targetSize) { - assert(!blocking); - size_t blockedBytes = (size_t)STDMIN(m_buffer.CurrentSize() - targetSize, (lword)length); - m_buffer.UndoLazyPut(blockedBytes); - m_buffer.FinalizeLazyPut(); - m_wasBlocked = true; - m_skipBytes += length - blockedBytes; - return UnsignedMin(1, blockedBytes); - } + if (m_skipBytes) + { + assert(length >= m_skipBytes); + inString += m_skipBytes; + length -= m_skipBytes; + } + + m_buffer.Put(inString, length); + + if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound) + TimedFlush(0, 0); + + size_t targetSize = messageEnd ? 0 : m_maxBufferSize; + if (blocking) + TimedFlush(INFINITE_TIME, targetSize); + + if (m_buffer.CurrentSize() > targetSize) + { + assert(!blocking); + m_wasBlocked = true; + m_skipBytes += length; + size_t blockedBytes = UnsignedMin(length, m_buffer.CurrentSize() - targetSize); + return STDMAX<size_t>(blockedBytes, 1); + } - m_buffer.FinalizeLazyPut(); - m_wasBlocked = false; - m_skipBytes = 0; + m_wasBlocked = false; + m_skipBytes = 0; + } if (messageEnd) - AccessSender().SendEof(); + { + m_eofState = EOF_PENDING_SEND; + + EofSite: + TimedFlush(blocking ? INFINITE_TIME : 0, 0); + if (m_eofState != EOF_DONE) + return 1; + } + return 0; } -lword NetworkSink::TimedFlush(unsigned long maxTime, size_t targetSize) +lword NetworkSink::DoFlush(unsigned long maxTime, size_t targetSize) { NetworkSender &sender = AccessSender(); @@ -244,7 +486,9 @@ lword NetworkSink::TimedFlush(unsigned long maxTime, size_t targetSize) if (m_needSendResult) { - if (sender.MustWaitForResult() && !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()))) + if (sender.MustWaitForResult() && + !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()), + CallStack("NetworkSink::DoFlush() - wait send result", 0))) break; unsigned int sendResult = sender.GetSendResult(); @@ -260,7 +504,7 @@ lword NetworkSink::TimedFlush(unsigned long maxTime, size_t targetSize) } unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0; - if (sender.MustWaitToSend() && !sender.Wait(timeOut)) + if (sender.MustWaitToSend() && !sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait send", 0))) break; size_t contiguousSize = 0; @@ -279,6 +523,25 @@ lword NetworkSink::TimedFlush(unsigned long maxTime, size_t targetSize) m_byteCountSinceLastTimerReset += totalFlushSize; ComputeCurrentSpeed(); + if (m_buffer.IsEmpty() && !m_needSendResult) + { + if (m_eofState == EOF_PENDING_SEND) + { + sender.SendEof(); + m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE; + } + + while (m_eofState == EOF_PENDING_DELIVERY) + { + unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0; + if (!sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait EOF", 0))) + break; + + if (sender.EofSent()) + m_eofState = EOF_DONE; + } + } + return totalFlushSize; } |