summaryrefslogtreecommitdiff
path: root/network.cpp
diff options
context:
space:
mode:
authorweidai <weidai@57ff6487-cd31-0410-9ec3-f628ee90f5f0>2006-04-06 21:20:25 +0000
committerweidai <weidai@57ff6487-cd31-0410-9ec3-f628ee90f5f0>2006-04-06 21:20:25 +0000
commit82c4eb38804e011cfd855bc9e292f7533bfe4c2f (patch)
tree34dba126863f587607debfecab883867f5f2d3dd /network.cpp
parent4c4b05b70d06e818417f6b4f879183a2f233c91b (diff)
downloadcryptopp-82c4eb38804e011cfd855bc9e292f7533bfe4c2f.tar.gz
merge in changes by denis bider and fix compile on gcc 3.4.4 and MSVC 6
git-svn-id: svn://svn.code.sf.net/p/cryptopp/code/trunk/c5@219 57ff6487-cd31-0410-9ec3-f628ee90f5f0
Diffstat (limited to 'network.cpp')
-rw-r--r--network.cpp343
1 files changed, 303 insertions, 40 deletions
diff --git a/network.cpp b/network.cpp
index 49800fd..24f5013 100644
--- a/network.cpp
+++ b/network.cpp
@@ -8,6 +8,133 @@
NAMESPACE_BEGIN(CryptoPP)
+lword LimitedBandwidth::ComputeCurrentTransceiveLimit()
+{
+ if (!m_maxBytesPerSecond)
+ return ULONG_MAX;
+
+ double curTime = GetCurTimeAndCleanUp();
+ lword total = 0;
+ for (OpQueue::size_type i=0; i!=m_ops.size(); ++i)
+ total += m_ops[i].second;
+ return SaturatingSubtract(m_maxBytesPerSecond, total);
+}
+
+double LimitedBandwidth::TimeToNextTransceive()
+{
+ if (!m_maxBytesPerSecond)
+ return 0;
+
+ if (!m_nextTransceiveTime)
+ ComputeNextTransceiveTime();
+
+ return SaturatingSubtract(m_nextTransceiveTime, m_timer.ElapsedTimeAsDouble());
+}
+
+void LimitedBandwidth::NoteTransceive(lword size)
+{
+ if (m_maxBytesPerSecond)
+ {
+ double curTime = GetCurTimeAndCleanUp();
+ m_ops.push_back(std::make_pair(curTime, size));
+ m_nextTransceiveTime = 0;
+ }
+}
+
+void LimitedBandwidth::ComputeNextTransceiveTime()
+{
+ double curTime = GetCurTimeAndCleanUp();
+ lword total = 0;
+ for (unsigned int i=0; i!=m_ops.size(); ++i)
+ total += m_ops[i].second;
+ m_nextTransceiveTime =
+ (total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000;
+}
+
+double LimitedBandwidth::GetCurTimeAndCleanUp()
+{
+ if (!m_maxBytesPerSecond)
+ return 0;
+
+ double curTime = m_timer.ElapsedTimeAsDouble();
+ while (m_ops.size() && (m_ops.front().first + 1000 < curTime))
+ m_ops.pop_front();
+ return curTime;
+}
+
+void LimitedBandwidth::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
+{
+ double nextTransceiveTime = TimeToNextTransceive();
+ if (nextTransceiveTime)
+ container.ScheduleEvent(nextTransceiveTime, CallStack("LimitedBandwidth::GetWaitObjects()", &callStack));
+}
+
+// *************************************************************
+
+size_t NonblockingSource::GeneralPump2(
+ lword& byteCount, bool blockingOutput,
+ unsigned long maxTime, bool checkDelimiter, byte delimiter)
+{
+ m_blockedBySpeedLimit = false;
+
+ if (!GetMaxBytesPerSecond())
+ {
+ size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter);
+ m_doPumpBlocked = (ret != 0);
+ return ret;
+ }
+
+ bool forever = (maxTime == INFINITE_TIME);
+ unsigned long timeToGo = maxTime;
+ Timer timer(Timer::MILLISECONDS, forever);
+ lword maxSize = byteCount;
+ byteCount = 0;
+
+ timer.StartTimer();
+
+ while (true)
+ {
+ lword curMaxSize = UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount);
+
+ if (curMaxSize || m_doPumpBlocked)
+ {
+ if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
+ size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter);
+ m_doPumpBlocked = (ret != 0);
+ if (curMaxSize)
+ {
+ NoteTransceive(curMaxSize);
+ byteCount += curMaxSize;
+ }
+ if (ret)
+ return ret;
+ }
+
+ if (maxSize != ULONG_MAX && byteCount >= maxSize)
+ break;
+
+ if (!forever)
+ {
+ timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
+ if (!timeToGo)
+ break;
+ }
+
+ double waitTime = TimeToNextTransceive();
+ if (!forever && waitTime > timeToGo)
+ {
+ m_blockedBySpeedLimit = true;
+ break;
+ }
+
+ WaitObjectContainer container;
+ LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSource::GeneralPump2() - speed limit", 0));
+ container.Wait((unsigned long)waitTime);
+ }
+
+ return 0;
+}
+
size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking)
{
if (messageCount == 0)
@@ -30,10 +157,68 @@ size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blockin
return 0;
}
+lword NonblockingSink::TimedFlush(unsigned long maxTime, size_t targetSize)
+{
+ m_blockedBySpeedLimit = false;
+
+ size_t curBufSize = GetCurrentBufferSize();
+ if (curBufSize <= targetSize && (targetSize || !EofPending()))
+ return 0;
+
+ if (!GetMaxBytesPerSecond())
+ return DoFlush(maxTime, targetSize);
+
+ bool forever = (maxTime == INFINITE_TIME);
+ unsigned long timeToGo = maxTime;
+ Timer timer(Timer::MILLISECONDS, forever);
+ lword totalFlushed = 0;
+
+ timer.StartTimer();
+
+ while (true)
+ {
+ size_t flushSize = UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit());
+ if (flushSize || EofPending())
+ {
+ if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
+ size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize);
+ if (ret)
+ {
+ NoteTransceive(ret);
+ curBufSize -= ret;
+ totalFlushed += ret;
+ }
+ }
+
+ if (curBufSize <= targetSize && (targetSize || !EofPending()))
+ break;
+
+ if (!forever)
+ {
+ timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
+ if (!timeToGo)
+ break;
+ }
+
+ double waitTime = TimeToNextTransceive();
+ if (!forever && waitTime > timeToGo)
+ {
+ m_blockedBySpeedLimit = true;
+ break;
+ }
+
+ WaitObjectContainer container;
+ LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSink::TimedFlush() - speed limit", 0));
+ container.Wait((unsigned long)waitTime);
+ }
+
+ return totalFlushed;
+}
+
bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
{
TimedFlush(blocking ? INFINITE_TIME : 0);
- return hardFlush && !!GetCurrentBufferSize();
+ return hardFlush && (!!GetCurrentBufferSize() || EofPending());
}
// *************************************************************
@@ -47,19 +232,29 @@ NetworkSource::NetworkSource(BufferedTransformation *attachment)
{
}
-void NetworkSource::GetWaitObjects(WaitObjectContainer &container)
+unsigned int NetworkSource::GetMaxWaitObjectCount() const
+{
+ return LimitedBandwidth::GetMaxWaitObjectCount()
+ + GetReceiver().GetMaxWaitObjectCount()
+ + AttachedTransformation()->GetMaxWaitObjectCount();
+}
+
+void NetworkSource::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
{
- if (!m_outputBlocked)
+ if (BlockedBySpeedLimit())
+ LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - speed limit", &callStack));
+ else if (!m_outputBlocked)
{
if (m_dataBegin == m_dataEnd)
- AccessReceiver().GetWaitObjects(container);
+ AccessReceiver().GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - no data", &callStack));
else
- container.SetNoWait();
+ container.SetNoWait(CallStack("NetworkSource::GetWaitObjects() - have data", &callStack));
}
- AttachedTransformation()->GetWaitObjects(container);
+
+ AttachedTransformation()->GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - attachment", &callStack));
}
-size_t NetworkSource::GeneralPump2(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
+size_t NetworkSource::DoPump(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
{
NetworkReceiver &receiver = AccessReceiver();
@@ -81,7 +276,9 @@ size_t NetworkSource::GeneralPump2(lword &byteCount, bool blockingOutput, unsign
if (m_waitingForResult)
{
- if (receiver.MustWaitForResult() && !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
+ if (receiver.MustWaitForResult() &&
+ !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
+ CallStack("NetworkSource::DoPump() - wait receive result", 0)))
break;
unsigned int recvResult = receiver.GetReceiveResult();
@@ -100,7 +297,8 @@ size_t NetworkSource::GeneralPump2(lword &byteCount, bool blockingOutput, unsign
if (receiver.MustWaitToReceive())
{
- if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
+ if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
+ CallStack("NetworkSource::DoPump() - wait receive", 0)))
break;
receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
@@ -133,7 +331,8 @@ ReceiveNoWait:
}
else
{
- m_putSize = (size_t)STDMIN((lword)m_dataEnd-m_dataBegin, maxSize-byteCount);
+ m_putSize = UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount);
+
if (checkDelimiter)
m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
@@ -141,7 +340,8 @@ DoOutput:
size_t result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
if (result)
{
- if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
+ if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
+ CallStack("NetworkSource::DoPump() - wait attachment", 0)))
goto DoOutput;
else
{
@@ -155,7 +355,7 @@ DoOutput:
m_dataBegin += m_putSize;
if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
break;
- if (byteCount == maxSize)
+ if (maxSize != ULONG_MAX && byteCount == maxSize)
break;
// once time limit is reached, return even if there is more data waiting
// but make 0 a special case so caller can request a large amount of data to be
@@ -172,7 +372,7 @@ DoOutput:
NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound)
: m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
- , m_needSendResult(false), m_wasBlocked(false)
+ , m_needSendResult(false), m_wasBlocked(false), m_eofState(EOF_NONE)
, m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0)
, m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
, m_currentSpeed(0), m_maxObservedSpeed(0)
@@ -192,44 +392,86 @@ float NetworkSink::ComputeCurrentSpeed()
return m_currentSpeed;
}
+float NetworkSink::GetMaxObservedSpeed() const
+{
+ lword m = GetMaxBytesPerSecond();
+ return m ? STDMIN(m_maxObservedSpeed, float(m)) : m_maxObservedSpeed;
+}
+
+unsigned int NetworkSink::GetMaxWaitObjectCount() const
+{
+ return LimitedBandwidth::GetMaxWaitObjectCount() + GetSender().GetMaxWaitObjectCount();
+}
+
+void NetworkSink::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
+{
+ if (BlockedBySpeedLimit())
+ LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - speed limit", &callStack));
+ else if (m_wasBlocked)
+ AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - was blocked", &callStack));
+ else if (!m_buffer.IsEmpty())
+ AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - buffer not empty", &callStack));
+ else if (EofPending())
+ AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - EOF pending", &callStack));
+}
+
size_t NetworkSink::Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
{
- if (m_skipBytes)
+ if (m_eofState == EOF_DONE)
{
- assert(length >= m_skipBytes);
- inString += m_skipBytes;
- length -= m_skipBytes;
- }
- m_buffer.LazyPut(inString, length);
+ if (length || messageEnd)
+ throw Exception(Exception::OTHER_ERROR, "NetworkSink::Put2() being called after EOF had been sent");
- if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
- TimedFlush(0, 0);
+ return 0;
+ }
- size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
- if (blocking)
- TimedFlush(INFINITE_TIME, targetSize);
+ if (m_eofState > EOF_NONE)
+ goto EofSite;
- if (m_buffer.CurrentSize() > targetSize)
{
- assert(!blocking);
- size_t blockedBytes = (size_t)STDMIN(m_buffer.CurrentSize() - targetSize, (lword)length);
- m_buffer.UndoLazyPut(blockedBytes);
- m_buffer.FinalizeLazyPut();
- m_wasBlocked = true;
- m_skipBytes += length - blockedBytes;
- return UnsignedMin(1, blockedBytes);
- }
+ if (m_skipBytes)
+ {
+ assert(length >= m_skipBytes);
+ inString += m_skipBytes;
+ length -= m_skipBytes;
+ }
+
+ m_buffer.Put(inString, length);
+
+ if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
+ TimedFlush(0, 0);
+
+ size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
+ if (blocking)
+ TimedFlush(INFINITE_TIME, targetSize);
+
+ if (m_buffer.CurrentSize() > targetSize)
+ {
+ assert(!blocking);
+ m_wasBlocked = true;
+ m_skipBytes += length;
+ size_t blockedBytes = UnsignedMin(length, m_buffer.CurrentSize() - targetSize);
+ return STDMAX<size_t>(blockedBytes, 1);
+ }
- m_buffer.FinalizeLazyPut();
- m_wasBlocked = false;
- m_skipBytes = 0;
+ m_wasBlocked = false;
+ m_skipBytes = 0;
+ }
if (messageEnd)
- AccessSender().SendEof();
+ {
+ m_eofState = EOF_PENDING_SEND;
+
+ EofSite:
+ TimedFlush(blocking ? INFINITE_TIME : 0, 0);
+ if (m_eofState != EOF_DONE)
+ return 1;
+ }
+
return 0;
}
-lword NetworkSink::TimedFlush(unsigned long maxTime, size_t targetSize)
+lword NetworkSink::DoFlush(unsigned long maxTime, size_t targetSize)
{
NetworkSender &sender = AccessSender();
@@ -244,7 +486,9 @@ lword NetworkSink::TimedFlush(unsigned long maxTime, size_t targetSize)
if (m_needSendResult)
{
- if (sender.MustWaitForResult() && !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
+ if (sender.MustWaitForResult() &&
+ !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
+ CallStack("NetworkSink::DoFlush() - wait send result", 0)))
break;
unsigned int sendResult = sender.GetSendResult();
@@ -260,7 +504,7 @@ lword NetworkSink::TimedFlush(unsigned long maxTime, size_t targetSize)
}
unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
- if (sender.MustWaitToSend() && !sender.Wait(timeOut))
+ if (sender.MustWaitToSend() && !sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait send", 0)))
break;
size_t contiguousSize = 0;
@@ -279,6 +523,25 @@ lword NetworkSink::TimedFlush(unsigned long maxTime, size_t targetSize)
m_byteCountSinceLastTimerReset += totalFlushSize;
ComputeCurrentSpeed();
+ if (m_buffer.IsEmpty() && !m_needSendResult)
+ {
+ if (m_eofState == EOF_PENDING_SEND)
+ {
+ sender.SendEof();
+ m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE;
+ }
+
+ while (m_eofState == EOF_PENDING_DELIVERY)
+ {
+ unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
+ if (!sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait EOF", 0)))
+ break;
+
+ if (sender.EofSent())
+ m_eofState = EOF_DONE;
+ }
+ }
+
return totalFlushSize;
}