summaryrefslogtreecommitdiff
path: root/network.cpp
diff options
context:
space:
mode:
authorweidai <weidai@57ff6487-cd31-0410-9ec3-f628ee90f5f0>2003-06-06 02:34:03 +0000
committerweidai <weidai@57ff6487-cd31-0410-9ec3-f628ee90f5f0>2003-06-06 02:34:03 +0000
commitaec493328f4cd1a6d717743c984a153dc120013a (patch)
tree22ac172cd2a9920b7d33711a25947c5c98e8cef2 /network.cpp
parente3e0c6c7d2d37b902060ffefc27270675fbf246b (diff)
downloadcryptopp-aec493328f4cd1a6d717743c984a153dc120013a.tar.gz
sync with private branch
git-svn-id: svn://svn.code.sf.net/p/cryptopp/code/trunk/c5@76 57ff6487-cd31-0410-9ec3-f628ee90f5f0
Diffstat (limited to 'network.cpp')
-rw-r--r--network.cpp159
1 files changed, 92 insertions, 67 deletions
diff --git a/network.cpp b/network.cpp
index 72001d4..f7705a7 100644
--- a/network.cpp
+++ b/network.cpp
@@ -2,6 +2,7 @@
#include "pch.h"
#include "network.h"
+#include "wait.h"
NAMESPACE_BEGIN(CryptoPP)
@@ -33,10 +34,24 @@ bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
#ifdef HIGHRES_TIMER_AVAILABLE
NetworkSource::NetworkSource(BufferedTransformation *attachment)
- : NonblockingSource(attachment), m_buf(1024*4), m_bufSize(0), m_state(NORMAL)
+ : NonblockingSource(attachment), m_buf(1024*16)
+ , m_waitingForResult(false), m_outputBlocked(false)
+ , m_dataBegin(0), m_dataEnd(0)
{
}
+void NetworkSource::GetWaitObjects(WaitObjectContainer &container)
+{
+ if (!m_outputBlocked)
+ {
+ if (m_dataBegin == m_dataEnd)
+ AccessReceiver().GetWaitObjects(container);
+ else
+ container.SetNoWait();
+ }
+ AttachedTransformation()->GetWaitObjects(container);
+}
+
unsigned int NetworkSource::GeneralPump2(unsigned long &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
{
NetworkReceiver &receiver = AccessReceiver();
@@ -45,80 +60,93 @@ unsigned int NetworkSource::GeneralPump2(unsigned long &byteCount, bool blocking
byteCount = 0;
bool forever = maxTime == INFINITE_TIME;
Timer timer(Timer::MILLISECONDS, forever);
- unsigned long timeout;
BufferedTransformation *t = AttachedTransformation();
- if (m_state == OUTPUT_BLOCKED)
+ if (m_outputBlocked)
goto DoOutput;
while (true)
{
- if (m_state == WAITING_FOR_RESULT)
+ if (m_dataBegin == m_dataEnd)
{
- if (receiver.MustWaitForResult())
+ if (receiver.EofReceived())
+ break;
+
+ if (m_waitingForResult)
{
- timeout = SaturatingSubtract(maxTime, timer.ElapsedTime());
- if (!receiver.Wait(timeout))
+ if (receiver.MustWaitForResult() && !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
break;
+
+ unsigned int recvResult = receiver.GetReceiveResult();
+ m_dataEnd += recvResult;
+ m_waitingForResult = false;
+
+ if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
+ goto ReceiveNoWait;
}
+ else
+ {
+ m_dataEnd = m_dataBegin = 0;
- unsigned int recvResult = receiver.GetReceiveResult();
-// assert(recvResult > 0 || receiver.EofReceived());
- m_bufSize += recvResult;
- m_state = NORMAL;
- }
+ if (receiver.MustWaitToReceive())
+ {
+ if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
+ break;
- if (m_bufSize == 0)
- {
- if (receiver.EofReceived())
- break;
+ receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
+ m_waitingForResult = true;
+ }
+ else
+ {
+ReceiveNoWait:
+ m_waitingForResult = true;
+ // call Receive repeatedly as long as data is immediately available,
+ // because some receivers tend to return data in small pieces
+ while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
+ {
+ unsigned int recvResult = receiver.GetReceiveResult();
+ m_dataEnd += recvResult;
+ if (receiver.EofReceived() || m_dataEnd == m_buf.size())
+ {
+ m_waitingForResult = false;
+ break;
+ }
+ }
+ }
+ }
}
else
{
- m_putSize = STDMIN((unsigned long)m_bufSize, maxSize - byteCount);
+ m_putSize = STDMIN((unsigned long)m_dataEnd-m_dataBegin, maxSize-byteCount);
if (checkDelimiter)
- m_putSize = std::find(m_buf.begin(), m_buf+m_putSize, delimiter) - m_buf;
+ m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
DoOutput:
- unsigned int result = t->PutModifiable2(m_buf, m_putSize, 0, forever || blockingOutput);
+ unsigned int result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
if (result)
{
- timeout = SaturatingSubtract(maxTime, timer.ElapsedTime());
- if (t->Wait(timeout))
+ if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
goto DoOutput;
else
{
- m_state = OUTPUT_BLOCKED;
+ m_outputBlocked = true;
return result;
}
}
- m_state = NORMAL;
+ m_outputBlocked = false;
byteCount += m_putSize;
- m_bufSize -= m_putSize;
- if (m_bufSize > 0)
- {
- memmove(m_buf, m_buf+m_putSize, m_bufSize);
- if (checkDelimiter && m_buf[0] == delimiter)
- break;
- }
- }
-
- if (byteCount == maxSize)
- break;
-
- unsigned long elapsed = timer.ElapsedTime();
- if (elapsed > maxTime)
- break; // once time limit is reached, return even if there is more data waiting
-
- if (receiver.MustWaitToReceive())
- {
- if (!receiver.Wait(maxTime - elapsed))
+ m_dataBegin += m_putSize;
+ if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
+ break;
+ if (byteCount == maxSize)
+ break;
+ // once time limit is reached, return even if there is more data waiting
+ // but make 0 a special case so caller can request a large amount of data to be
+ // pumped as long as it is immediately available
+ if (maxTime > 0 && timer.ElapsedTime() > maxTime)
break;
}
-
- receiver.Receive(m_buf+m_bufSize, m_buf.size()-m_bufSize);
- m_state = WAITING_FOR_RESULT;
}
return 0;
@@ -126,6 +154,12 @@ DoOutput:
// *************************************************************
+NetworkSink::NetworkSink(unsigned int maxBufferSize, bool autoFlush)
+ : m_maxBufferSize(maxBufferSize), m_autoFlush(autoFlush)
+ , m_needSendResult(false), m_buffer(STDMIN(16U*1024U, maxBufferSize)), m_blockedBytes(0)
+{
+}
+
unsigned int NetworkSink::Put2(const byte *inString, unsigned int length, int messageEnd, bool blocking)
{
if (m_blockedBytes)
@@ -134,7 +168,7 @@ unsigned int NetworkSink::Put2(const byte *inString, unsigned int length, int me
inString += length - m_blockedBytes;
length = m_blockedBytes;
}
- m_buffer.LazyPut(inString, length);
+ LazyPutter lp(m_buffer, inString, length);
unsigned int targetSize = messageEnd ? 0 : m_maxBufferSize;
TimedFlush(blocking ? INFINITE_TIME : 0, m_autoFlush ? 0 : targetSize);
@@ -144,7 +178,6 @@ unsigned int NetworkSink::Put2(const byte *inString, unsigned int length, int me
assert(!blocking);
m_blockedBytes = STDMIN(m_buffer.CurrentSize() - targetSize, (unsigned long)length);
m_buffer.UndoLazyPut(m_blockedBytes);
- m_buffer.FinalizeLazyPut();
return STDMAX(m_blockedBytes, 1U);
}
m_blockedBytes = 0;
@@ -156,51 +189,43 @@ unsigned int NetworkSink::Put2(const byte *inString, unsigned int length, int me
unsigned int NetworkSink::TimedFlush(unsigned long maxTime, unsigned int targetSize)
{
- if (m_buffer.IsEmpty())
- return 0;
-
NetworkSender &sender = AccessSender();
bool forever = maxTime == INFINITE_TIME;
Timer timer(Timer::MILLISECONDS, forever);
- unsigned long timeout;
unsigned int totalFlushSize = 0;
while (true)
{
+ if (m_buffer.CurrentSize() <= targetSize)
+ break;
+
if (m_needSendResult)
{
- if (sender.MustWaitForResult())
- {
- timeout = SaturatingSubtract(maxTime, timer.ElapsedTime());
- if (!sender.Wait(timeout))
- break;
- }
+ if (sender.MustWaitForResult() && !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
+ break;
unsigned int sendResult = sender.GetSendResult();
m_buffer.Skip(sendResult);
totalFlushSize += sendResult;
m_needSendResult = false;
- if (m_buffer.CurrentSize() <= targetSize)
+ if (!m_buffer.AnyRetrievable())
break;
}
- unsigned long elapsed = timer.ElapsedTime();
- if (elapsed > maxTime)
- break; // once time limit is reached, return even if there is more data waiting
-
- if (sender.MustWaitToSend())
- {
- if (!sender.Wait(maxTime - elapsed))
- break;
- }
+ unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
+ if (sender.MustWaitToSend() && !sender.Wait(timeOut))
+ break;
unsigned int contiguousSize = 0;
const byte *block = m_buffer.Spy(contiguousSize);
sender.Send(block, contiguousSize);
m_needSendResult = true;
+
+ if (maxTime > 0 && timeOut == 0)
+ break; // once time limit is reached, return even if there is more data waiting
}
return totalFlushSize;