diff options
Diffstat (limited to 'network.cpp')
-rw-r--r-- | network.cpp | 69 |
1 files changed, 56 insertions, 13 deletions
diff --git a/network.cpp b/network.cpp index f7705a7..ab495da 100644 --- a/network.cpp +++ b/network.cpp @@ -4,6 +4,8 @@ #include "network.h" #include "wait.h" +#define CRYPTOPP_TRACE_NETWORK 0 + NAMESPACE_BEGIN(CryptoPP) unsigned int NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking) @@ -78,6 +80,9 @@ unsigned int NetworkSource::GeneralPump2(unsigned long &byteCount, bool blocking break; unsigned int recvResult = receiver.GetReceiveResult(); +#if CRYPTOPP_TRACE_NETWORK + OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str()); +#endif m_dataEnd += recvResult; m_waitingForResult = false; @@ -102,11 +107,17 @@ ReceiveNoWait: m_waitingForResult = true; // call Receive repeatedly as long as data is immediately available, // because some receivers tend to return data in small pieces +#if CRYPTOPP_TRACE_NETWORK + OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str()); +#endif while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd)) { unsigned int recvResult = receiver.GetReceiveResult(); +#if CRYPTOPP_TRACE_NETWORK + OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str()); +#endif m_dataEnd += recvResult; - if (receiver.EofReceived() || m_dataEnd == m_buf.size()) + if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2) { m_waitingForResult = false; break; @@ -154,33 +165,56 @@ DoOutput: // ************************************************************* -NetworkSink::NetworkSink(unsigned int maxBufferSize, bool autoFlush) - : m_maxBufferSize(maxBufferSize), m_autoFlush(autoFlush) - , m_needSendResult(false), m_buffer(STDMIN(16U*1024U, maxBufferSize)), m_blockedBytes(0) +NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound) + : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound) + , m_needSendResult(false), m_wasBlocked(false) + , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0) + , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0) + , m_currentSpeed(0), m_maxObservedSpeed(0) { } +float NetworkSink::ComputeCurrentSpeed() +{ + if (m_speedTimer.ElapsedTime() > 1000) + { + m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime(); + m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f); + m_byteCountSinceLastTimerReset = 0; + m_speedTimer.StartTimer(); +// OutputDebugString(("max speed: " + IntToString((int)m_maxObservedSpeed) + " current speed: " + IntToString((int)m_currentSpeed) + "\n").c_str()); + } + return m_currentSpeed; +} + unsigned int NetworkSink::Put2(const byte *inString, unsigned int length, int messageEnd, bool blocking) { - if (m_blockedBytes) + if (m_skipBytes) { - assert(length >= m_blockedBytes); - inString += length - m_blockedBytes; - length = m_blockedBytes; + assert(length >= m_skipBytes); + inString += m_skipBytes; + length -= m_skipBytes; } LazyPutter lp(m_buffer, inString, length); + if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound) + TimedFlush(0, 0); + unsigned int targetSize = messageEnd ? 0 : m_maxBufferSize; - TimedFlush(blocking ? INFINITE_TIME : 0, m_autoFlush ? 0 : targetSize); + if (blocking) + TimedFlush(INFINITE_TIME, targetSize); if (m_buffer.CurrentSize() > targetSize) { assert(!blocking); - m_blockedBytes = STDMIN(m_buffer.CurrentSize() - targetSize, (unsigned long)length); - m_buffer.UndoLazyPut(m_blockedBytes); - return STDMAX(m_blockedBytes, 1U); + unsigned int blockedBytes = STDMIN(m_buffer.CurrentSize() - targetSize, (unsigned long)length); + m_buffer.UndoLazyPut(blockedBytes); + m_wasBlocked = true; + m_skipBytes += length - blockedBytes; + return STDMAX(blockedBytes, 1U); } - m_blockedBytes = 0; + m_wasBlocked = false; + m_skipBytes = 0; if (messageEnd) AccessSender().SendEof(); @@ -206,6 +240,9 @@ unsigned int NetworkSink::TimedFlush(unsigned long maxTime, unsigned int targetS break; unsigned int sendResult = sender.GetSendResult(); +#if CRYPTOPP_TRACE_NETWORK + OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str()); +#endif m_buffer.Skip(sendResult); totalFlushSize += sendResult; m_needSendResult = false; @@ -221,6 +258,9 @@ unsigned int NetworkSink::TimedFlush(unsigned long maxTime, unsigned int targetS unsigned int contiguousSize = 0; const byte *block = m_buffer.Spy(contiguousSize); +#if CRYPTOPP_TRACE_NETWORK + OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str()); +#endif sender.Send(block, contiguousSize); m_needSendResult = true; @@ -228,6 +268,9 @@ unsigned int NetworkSink::TimedFlush(unsigned long maxTime, unsigned int targetS break; // once time limit is reached, return even if there is more data waiting } + m_byteCountSinceLastTimerReset += totalFlushSize; + ComputeCurrentSpeed(); + return totalFlushSize; } |