summaryrefslogtreecommitdiff
path: root/agent/pseudotcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'agent/pseudotcp.c')
-rw-r--r--agent/pseudotcp.c575
1 files changed, 458 insertions, 117 deletions
diff --git a/agent/pseudotcp.c b/agent/pseudotcp.c
index 54a714c..2e8ecc4 100644
--- a/agent/pseudotcp.c
+++ b/agent/pseudotcp.c
@@ -156,6 +156,15 @@ const guint16 PACKET_MAXIMUMS[] = {
#define DEFAULT_ACK_DELAY 100 /* 100 milliseconds */
#define DEFAULT_NO_DELAY FALSE
+#define DEFAULT_RCV_BUF_SIZE (60 * 1024)
+#define DEFAULT_SND_BUF_SIZE (90 * 1024)
+
+#define TCP_OPT_EOL 0 // End of list.
+#define TCP_OPT_NOOP 1 // No-op.
+#define TCP_OPT_MSS 2 // Maximum segment size.
+#define TCP_OPT_WND_SCALE 3 // Window scale factor.
+
+
/*
#define FLAG_FIN 0x01
#define FLAG_SYN 0x02
@@ -229,6 +238,150 @@ time_diff(guint32 later, guint32 earlier)
}
}
+////////////////////////////////////////////////////////
+// PseudoTcpFifo works exactly like FifoBuffer in libjingle
+////////////////////////////////////////////////////////
+
+
+typedef struct {
+ guint8 *buffer;
+ gsize buffer_length;
+ gsize data_length;
+ gsize read_position;
+} PseudoTcpFifo;
+
+
+static void
+pseudo_tcp_fifo_init (PseudoTcpFifo *b, gsize size)
+{
+ b->buffer = g_slice_alloc (size);
+ b->buffer_length = size;
+}
+
+static void
+pseudo_tcp_fifo_clear (PseudoTcpFifo *b)
+{
+ if (b->buffer)
+ g_slice_free1 (b->buffer_length, b->buffer);
+ b->buffer = NULL;
+ b->buffer_length = 0;
+}
+
+static gsize
+pseudo_tcp_fifo_get_buffered (PseudoTcpFifo *b)
+{
+ return b->data_length;
+}
+
+static gboolean
+pseudo_tcp_fifo_set_capacity (PseudoTcpFifo *b, gsize size)
+{
+ if (b->data_length > size)
+ return FALSE;
+
+ if (size != b->data_length) {
+ guint8 *buffer = g_slice_alloc (size);
+ gsize copy = b->data_length;
+ gsize tail_copy = min (copy, b->buffer_length - b->read_position);
+
+ memcpy (buffer, &b->buffer[b->read_position], tail_copy);
+ memcpy (buffer + tail_copy, &b->buffer[0], copy - tail_copy);
+ g_slice_free1 (b->buffer_length, b->buffer);
+ b->buffer = buffer;
+ b->buffer_length = size;
+ b->read_position = 0;
+ }
+
+ return TRUE;
+}
+
+static void
+pseudo_tcp_fifo_consume_read_data (PseudoTcpFifo *b, gsize size)
+{
+ g_assert (size <= b->data_length);
+
+ b->read_position = (b->read_position + size) % b->buffer_length;
+ b->data_length -= size;
+}
+
+static void
+pseudo_tcp_fifo_consume_write_buffer (PseudoTcpFifo *b, gsize size)
+{
+ g_assert (size <= b->buffer_length - b->data_length);
+
+ b->data_length += size;
+}
+
+static gsize
+pseudo_tcp_fifo_get_write_remaining (PseudoTcpFifo *b)
+{
+ return b->buffer_length - b->data_length;
+}
+
+static gsize
+pseudo_tcp_fifo_read_offset (PseudoTcpFifo *b, guint8 *buffer, gsize bytes,
+ gsize offset)
+{
+ gsize available = b->data_length - offset;
+ gsize read_position = (b->read_position + offset) % b->buffer_length;
+ gsize copy = min (bytes, available);
+ gsize tail_copy = min(copy, b->buffer_length - read_position);
+
+ /* EOS */
+ if (offset >= b->data_length)
+ return 0;
+
+ memcpy(buffer, &b->buffer[read_position], tail_copy);
+ memcpy(buffer + tail_copy, &b->buffer[0], copy - tail_copy);
+
+ return copy;
+}
+
+static gsize
+pseudo_tcp_fifo_write_offset (PseudoTcpFifo *b, const guint8 *buffer,
+ gsize bytes, gsize offset)
+{
+ gsize available = b->buffer_length - b->data_length - offset;
+ gsize write_position = (b->read_position + b->data_length + offset)
+ % b->buffer_length;
+ gsize copy = min (bytes, available);
+ gsize tail_copy = min(copy, b->buffer_length - write_position);
+
+ if (b->data_length + offset >= b->buffer_length) {
+ return 0;
+ }
+
+ memcpy(&b->buffer[write_position], buffer, tail_copy);
+ memcpy(&b->buffer[0], buffer + tail_copy, copy - tail_copy);
+
+ return copy;
+}
+
+static gsize
+pseudo_tcp_fifo_read (PseudoTcpFifo *b, guint8 *buffer, gsize bytes)
+{
+ gsize copy;
+
+ copy = pseudo_tcp_fifo_read_offset (b, buffer, bytes, 0);
+
+ b->read_position = (b->read_position + copy) % b->buffer_length;
+ b->data_length -= copy;
+
+ return copy;
+}
+
+static gsize
+pseudo_tcp_fifo_write (PseudoTcpFifo *b, const guint8 *buffer, gsize bytes)
+{
+ gsize copy;
+
+ copy = pseudo_tcp_fifo_write_offset (b, buffer, bytes, 0);
+ b->data_length += copy;
+
+ return copy;
+}
+
+
//////////////////////////////////////////////////////////////////////
// PseudoTcp
//////////////////////////////////////////////////////////////////////
@@ -245,14 +398,6 @@ typedef enum {
sfImmediateAck
} SendFlags;
-enum {
- // Note: can't go as high as 1024 * 64, because of uint16 precision
- kRcvBufSize = 1024 * 60,
- // Note: send buffer should be larger to make sure we can always fill the
- // receiver window
- kSndBufSize = 1024 * 90
-};
-
typedef struct {
guint32 conv, seq, ack;
guint8 flags;
@@ -287,13 +432,16 @@ struct _PseudoTcpSocketPrivate {
// Incoming data
GList *rlist;
- gchar rbuf[kRcvBufSize];
- guint32 rcv_nxt, rcv_wnd, rpos, rlen, lastrecv;
+ guint32 rbuf_len, rcv_nxt, rcv_wnd, lastrecv;
+ guint8 rwnd_scale; // Window scale factor
+ PseudoTcpFifo rbuf;
// Outgoing data
GList *slist;
- gchar sbuf[kSndBufSize];
- guint32 snd_nxt, snd_wnd, slen, lastsend, snd_una;
+ guint32 sbuf_len, snd_nxt, snd_wnd, lastsend, snd_una;
+ guint8 swnd_scale; // Window scale factor
+ PseudoTcpFifo sbuf;
+
// Maximum segment size, estimated protocol level, largest segment sent
guint32 mss, msslevel, largest, mtu_advise;
// Retransmit timer
@@ -313,6 +461,10 @@ struct _PseudoTcpSocketPrivate {
gboolean use_nagling;
guint32 ack_delay;
+
+ // This is used by unit tests to test backward compatibility of
+ // PseudoTcp implementations that don't support window scaling.
+ gboolean support_wnd_scale;
};
@@ -324,6 +476,8 @@ enum
PROP_STATE,
PROP_ACK_DELAY,
PROP_NO_DELAY,
+ PROP_RCV_BUF,
+ PROP_SND_BUF,
LAST_PROPERTY
};
@@ -335,10 +489,11 @@ static void pseudo_tcp_socket_set_property (GObject *object, guint property_id,
static void pseudo_tcp_socket_finalize (GObject *object);
+static void queue_connect_message (PseudoTcpSocket *self);
static guint32 queue(PseudoTcpSocket *self, const gchar * data,
guint32 len, gboolean bCtrl);
static PseudoTcpWriteResult packet(PseudoTcpSocket *self, guint32 seq,
- guint8 flags, const gchar * data, guint32 len);
+ guint8 flags, guint32 offset, guint32 len);
static gboolean parse(PseudoTcpSocket *self,
const guint8 * buffer, guint32 size);
static gboolean process(PseudoTcpSocket *self, Segment *seg);
@@ -346,6 +501,10 @@ static gboolean transmit(PseudoTcpSocket *self, const GList *seg, guint32 now);
static void attempt_send(PseudoTcpSocket *self, SendFlags sflags);
static void closedown(PseudoTcpSocket *self, guint32 err);
static void adjustMTU(PseudoTcpSocket *self);
+static void parse_options (PseudoTcpSocket *self, const guint8 *data,
+ guint32 len);
+static void resize_send_buffer (PseudoTcpSocket *self, guint32 new_size);
+static void resize_receive_buffer (PseudoTcpSocket *self, guint32 new_size);
// The following logging is for detailed (packet-level) pseudotcp analysis only.
@@ -409,6 +568,18 @@ pseudo_tcp_socket_class_init (PseudoTcpSocketClass *cls)
"Disable the Nagle algorithm (like the TCP_NODELAY option)",
DEFAULT_NO_DELAY,
G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (object_class, PROP_RCV_BUF,
+ g_param_spec_uint ("rcv-buf", "Receive Buffer",
+ "Receive Buffer size",
+ 1, G_MAXUINT, DEFAULT_RCV_BUF_SIZE,
+ G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (object_class, PROP_SND_BUF,
+ g_param_spec_uint ("snd-buf", "Send Buffer",
+ "Send Buffer size",
+ 1, G_MAXUINT, DEFAULT_SND_BUF_SIZE,
+ G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));
}
@@ -436,6 +607,12 @@ pseudo_tcp_socket_get_property (GObject *object,
case PROP_NO_DELAY:
g_value_set_boolean (value, !self->priv->use_nagling);
break;
+ case PROP_RCV_BUF:
+ g_value_set_uint (value, self->priv->rbuf_len);
+ break;
+ case PROP_SND_BUF:
+ g_value_set_uint (value, self->priv->sbuf_len);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
@@ -466,6 +643,14 @@ pseudo_tcp_socket_set_property (GObject *object,
case PROP_NO_DELAY:
self->priv->use_nagling = !g_value_get_boolean (value);
break;
+ case PROP_RCV_BUF:
+ g_return_if_fail (self->priv->state == TCP_LISTEN);
+ resize_receive_buffer (self, g_value_get_uint (value));
+ break;
+ case PROP_SND_BUF:
+ g_return_if_fail (self->priv->state == TCP_LISTEN);
+ resize_send_buffer (self, g_value_get_uint (value));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
@@ -495,6 +680,9 @@ pseudo_tcp_socket_finalize (GObject *object)
g_list_free (priv->rlist);
priv->rlist = NULL;
+ pseudo_tcp_fifo_clear (&priv->rbuf);
+ pseudo_tcp_fifo_clear (&priv->sbuf);
+
g_free (priv);
self->priv = NULL;
@@ -517,12 +705,18 @@ pseudo_tcp_socket_init (PseudoTcpSocket *obj)
priv->shutdown = SD_NONE;
priv->error = 0;
+ priv->rbuf_len = DEFAULT_RCV_BUF_SIZE;
+ pseudo_tcp_fifo_init (&priv->rbuf, priv->rbuf_len);
+ priv->sbuf_len = DEFAULT_SND_BUF_SIZE;
+ pseudo_tcp_fifo_init (&priv->sbuf, priv->sbuf_len);
+
priv->state = TCP_LISTEN;
priv->conv = 0;
- priv->rcv_wnd = sizeof(priv->rbuf);
- priv->snd_nxt = priv->slen = 0;
+ priv->rcv_wnd = priv->rbuf_len;
+ priv->rwnd_scale = priv->swnd_scale = 0;
+ priv->snd_nxt = 0;
priv->snd_wnd = 1;
- priv->snd_una = priv->rcv_nxt = priv->rlen = priv->rpos = 0;
+ priv->snd_una = priv->rcv_nxt = 0;
priv->bReadEnable = TRUE;
priv->bWriteEnable = FALSE;
priv->t_ack = 0;
@@ -535,7 +729,7 @@ pseudo_tcp_socket_init (PseudoTcpSocket *obj)
priv->rto_base = 0;
priv->cwnd = 2 * priv->mss;
- priv->ssthresh = sizeof(priv->rbuf);
+ priv->ssthresh = priv->rbuf_len;
priv->lastrecv = priv->lastsend = priv->last_traffic = now;
priv->bOutgoing = FALSE;
@@ -549,6 +743,8 @@ pseudo_tcp_socket_init (PseudoTcpSocket *obj)
priv->ack_delay = DEFAULT_ACK_DELAY;
priv->use_nagling = !DEFAULT_NO_DELAY;
+
+ priv->support_wnd_scale = TRUE;
}
PseudoTcpSocket *pseudo_tcp_socket_new (guint32 conversation,
@@ -561,11 +757,30 @@ PseudoTcpSocket *pseudo_tcp_socket_new (guint32 conversation,
NULL);
}
+static void
+queue_connect_message (PseudoTcpSocket *self)
+{
+ PseudoTcpSocketPrivate *priv = self->priv;
+ guint8 buf[4];
+ gsize size = 1;
+
+ buf[0] = CTL_CONNECT;
+ if (priv->support_wnd_scale) {
+ buf[1] = TCP_OPT_WND_SCALE;
+ buf[2] = 1;
+ buf[3] = priv->rwnd_scale;
+ size = 4;
+ }
+
+ priv->snd_wnd = size;
+
+ queue(self, (char*) buf, size, TRUE);
+}
+
gboolean
pseudo_tcp_socket_connect(PseudoTcpSocket *self)
{
PseudoTcpSocketPrivate *priv = self->priv;
- gchar buffer[1];
if (priv->state != TCP_LISTEN) {
priv->error = EINVAL;
@@ -575,8 +790,7 @@ pseudo_tcp_socket_connect(PseudoTcpSocket *self)
priv->state = TCP_SYN_SENT;
DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_SYN_SENT");
- buffer[0] = CTL_CONNECT;
- queue(self, buffer, 1, TRUE);
+ queue_connect_message (self);
attempt_send(self, sfNone);
return TRUE;
@@ -672,13 +886,15 @@ pseudo_tcp_socket_get_next_clock(PseudoTcpSocket *self, long *timeout)
{
PseudoTcpSocketPrivate *priv = self->priv;
guint32 now = get_current_time ();
+ gsize snd_buffered;
if (priv->shutdown == SD_FORCEFUL)
return FALSE;
+ snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf);
if ((priv->shutdown == SD_GRACEFUL)
&& ((priv->state != TCP_ESTABLISHED)
- || ((priv->slen == 0) && (priv->t_ack == 0)))) {
+ || ((snd_buffered == 0) && (priv->t_ack == 0)))) {
return FALSE;
}
@@ -702,62 +918,39 @@ pseudo_tcp_socket_get_next_clock(PseudoTcpSocket *self, long *timeout)
return TRUE;
}
-static guint32
-get_receive_buffer_space (PseudoTcpSocket *self)
-{
- PseudoTcpSocketPrivate *priv = self->priv;
-
- return sizeof(priv->rbuf) - priv->rlen + priv->rpos;
-}
-
-static guint32
-get_receive_buffer_consecutive_space (PseudoTcpSocket *self)
-{
- PseudoTcpSocketPrivate *priv = self->priv;
-
- return sizeof(priv->rbuf) - priv->rlen;
-}
-
-static void
-consolidate_receiver_buffer_space (PseudoTcpSocket *self)
-{
- PseudoTcpSocketPrivate *priv = self->priv;
-
- memmove(priv->rbuf, priv->rbuf + priv->rpos, sizeof(priv->rbuf) - priv->rpos);
- priv->rlen -= priv->rpos;
- priv->rpos = 0;
-}
gint
pseudo_tcp_socket_recv(PseudoTcpSocket *self, char * buffer, size_t len)
{
PseudoTcpSocketPrivate *priv = self->priv;
- guint32 read;
+ gsize read;
+ gsize available_space;
if (priv->state != TCP_ESTABLISHED) {
priv->error = ENOTCONN;
return -1;
}
- // Make sure read position is correct.
- g_assert (priv->rpos <= priv->rlen);
- if (priv->rlen == priv->rpos) {
+ if (len == 0)
+ return 0;
+
+ read = pseudo_tcp_fifo_read (&priv->rbuf, (guint8 *) buffer, len);
+
+ // If there's no data in |m_rbuf|.
+ if (read == 0) {
priv->bReadEnable = TRUE;
priv->error = EWOULDBLOCK;
return -1;
}
- read = min((guint32) len, priv->rlen - priv->rpos);
- memcpy(buffer, priv->rbuf + priv->rpos, read);
- priv->rpos += read;
-
+ available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf);
- if (get_receive_buffer_space (self) - priv->rcv_wnd >=
- min(sizeof(priv->rbuf) / 2, priv->mss)) {
+ if (available_space - priv->rcv_wnd >=
+ min (priv->rbuf_len / 2, priv->mss)) {
// !?! Not sure about this was closed business
gboolean bWasClosed = (priv->rcv_wnd == 0);
- priv->rcv_wnd = get_receive_buffer_space (self);
+ priv->rcv_wnd = available_space;
if (bWasClosed) {
attempt_send(self, sfImmediateAck);
@@ -772,13 +965,16 @@ pseudo_tcp_socket_send(PseudoTcpSocket *self, const char * buffer, guint32 len)
{
PseudoTcpSocketPrivate *priv = self->priv;
gint written;
+ gsize available_space;
if (priv->state != TCP_ESTABLISHED) {
priv->error = ENOTCONN;
return -1;
}
- if (priv->slen == sizeof(priv->sbuf)) {
+ available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
+
+ if (!available_space) {
priv->bWriteEnable = TRUE;
priv->error = EWOULDBLOCK;
return -1;
@@ -798,7 +994,8 @@ void
pseudo_tcp_socket_close(PseudoTcpSocket *self, gboolean force)
{
PseudoTcpSocketPrivate *priv = self->priv;
- //nice_agent ("Closing socket %p : %d", sock, force?"true":"false");
+ DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "Closing socket %p : %s", self,
+ force ? "forcefully" : "gracefully");
priv->shutdown = force ? SD_FORCEFUL : SD_GRACEFUL;
}
@@ -817,10 +1014,12 @@ static guint32
queue(PseudoTcpSocket *self, const gchar * data, guint32 len, gboolean bCtrl)
{
PseudoTcpSocketPrivate *priv = self->priv;
+ gsize available_space;
- if (len > sizeof(priv->sbuf) - priv->slen) {
+ available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
+ if (len > available_space) {
g_assert(!bCtrl);
- len = sizeof(priv->sbuf) - priv->slen;
+ len = available_space;
}
// We can concatenate data if the last segment is the same type
@@ -831,21 +1030,30 @@ queue(PseudoTcpSocket *self, const gchar * data, guint32 len, gboolean bCtrl)
((SSegment *)g_list_last (priv->slist)->data)->len += len;
} else {
SSegment *sseg = g_slice_new0 (SSegment);
- sseg->seq = priv->snd_una + priv->slen;
+ gsize snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf);
+
+ sseg->seq = priv->snd_una + snd_buffered;
sseg->len = len;
sseg->bCtrl = bCtrl;
priv->slist = g_list_append (priv->slist, sseg);
}
- memcpy(priv->sbuf + priv->slen, data, len);
- priv->slen += len;
//LOG(LS_INFO) << "PseudoTcp::queue - priv->slen = " << priv->slen;
- return len;
+ return pseudo_tcp_fifo_write (&priv->sbuf, (guint8*) data, len);;
}
+// Creates a packet and submits it to the network. This method can either
+// send payload or just an ACK packet.
+//
+// |seq| is the sequence number of this packet.
+// |flags| is the flags for sending this packet.
+// |offset| is the offset to read from |m_sbuf|.
+// |len| is the number of bytes to read from |m_sbuf| as payload. If this
+// value is 0 then this is an ACK packet, otherwise this packet has payload.
+
static PseudoTcpWriteResult
packet(PseudoTcpSocket *self, guint32 seq, guint8 flags,
- const gchar * data, guint32 len)
+ guint32 offset, guint32 len)
{
PseudoTcpSocketPrivate *priv = self->priv;
guint32 now = get_current_time();
@@ -863,15 +1071,20 @@ packet(PseudoTcpSocket *self, guint32 seq, guint8 flags,
*(buffer.u32 + 2) = htonl(priv->rcv_nxt);
buffer.u8[12] = 0;
buffer.u8[13] = flags;
- *(buffer.u16 + 7) = htons((guint16)priv->rcv_wnd);
+ *(buffer.u16 + 7) = htons((guint16)(priv->rcv_wnd >> priv->rwnd_scale));
// Timestamp computations
*(buffer.u32 + 4) = htonl(now);
*(buffer.u32 + 5) = htonl(priv->ts_recent);
priv->ts_lastack = priv->rcv_nxt;
- if (data != NULL)
- memcpy(buffer.u8 + HEADER_SIZE, data, len);
+ if (len) {
+ gsize bytes_read;
+
+ bytes_read = pseudo_tcp_fifo_read_offset (&priv->sbuf, buffer.u8 + HEADER_SIZE,
+ len, offset);
+ g_assert (bytes_read == len);
+ }
DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "<-- <CONV=%d><FLG=%d><SEQ=%d:%d><ACK=%d>"
"<WND=%d><TS=%d><TSR=%d><LEN=%d>",
@@ -880,11 +1093,11 @@ packet(PseudoTcpSocket *self, guint32 seq, guint8 flags,
wres = priv->callbacks.WritePacket(self, (gchar *) buffer.u8, len + HEADER_SIZE,
priv->callbacks.user_data);
- /* Note: When data is NULL, this is an ACK packet. We don't read the
+ /* Note: When len is 0, this is an ACK packet. We don't read the
return value for those, and thus we won't retry. So go ahead and treat
the packet as a success (basically simulate as if it were dropped),
which will prevent our timers from being messed up. */
- if ((wres != WR_SUCCESS) && (NULL != data))
+ if ((wres != WR_SUCCESS) && (0 != len))
return wres;
priv->t_ack = 0;
@@ -943,6 +1156,9 @@ process(PseudoTcpSocket *self, Segment *seg)
gboolean bIgnoreData;
gboolean bNewData;
gboolean bConnect = FALSE;
+ gsize snd_buffered;
+ gsize available_space;
+ guint32 kIdealRefillSize;
/* If this is the wrong conversation, send a reset!?!
(with the correct conversation?) */
@@ -978,11 +1194,12 @@ process(PseudoTcpSocket *self, Segment *seg)
return FALSE;
} else if (seg->data[0] == CTL_CONNECT) {
bConnect = TRUE;
+
+ parse_options (self, (guint8 *) &seg->data[1], seg->len - 1);
+
if (priv->state == TCP_LISTEN) {
- char buffer[1];
priv->state = TCP_SYN_RECEIVED;
- buffer[0] = CTL_CONNECT;
- queue(self, buffer, 1, TRUE);
+ queue_connect_message (self);
} else if (priv->state == TCP_SYN_SENT) {
priv->state = TCP_ESTABLISHED;
DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_ESTABLISHED");
@@ -1007,7 +1224,6 @@ process(PseudoTcpSocket *self, Segment *seg)
if ((seg->ack > priv->snd_una) && (seg->ack <= priv->snd_nxt)) {
guint32 nAcked;
guint32 nFree;
- guint32 kIdealRefillSize;
// Calculate round-trip time
if (seg->tsecr) {
@@ -1031,16 +1247,14 @@ process(PseudoTcpSocket *self, Segment *seg)
}
}
- priv->snd_wnd = seg->wnd;
+ priv->snd_wnd = seg->wnd << priv->swnd_scale;
nAcked = seg->ack - priv->snd_una;
priv->snd_una = seg->ack;
priv->rto_base = (priv->snd_una == priv->snd_nxt) ? 0 : now;
- priv->slen -= nAcked;
- memmove(priv->sbuf, priv->sbuf + nAcked, priv->slen);
- //LOG(LS_INFO) << "PseudoTcp::process - priv->slen = " << priv->slen;
+ pseudo_tcp_fifo_consume_read_data (&priv->sbuf, nAcked);
for (nFree = nAcked; nFree > 0; ) {
SSegment *data;
@@ -1085,29 +1299,10 @@ process(PseudoTcpSocket *self, Segment *seg)
priv->cwnd += max(1LU, priv->mss * priv->mss / priv->cwnd);
}
}
-
- // !?! A bit hacky
- if ((priv->state == TCP_SYN_RECEIVED) && !bConnect) {
- priv->state = TCP_ESTABLISHED;
- DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_ESTABLISHED");
- adjustMTU(self);
- if (priv->callbacks.PseudoTcpOpened)
- priv->callbacks.PseudoTcpOpened(self, priv->callbacks.user_data);
- }
-
- // If we make room in the send queue, notify the user
- // The goal it to make sure we always have at least enough data to fill the
- // window. We'd like to notify the app when we are halfway to that point.
- kIdealRefillSize = (sizeof(priv->sbuf) + sizeof(priv->rbuf)) / 2;
- if (priv->bWriteEnable && (priv->slen < kIdealRefillSize)) {
- priv->bWriteEnable = FALSE;
- if (priv->callbacks.PseudoTcpWritable)
- priv->callbacks.PseudoTcpWritable(self, priv->callbacks.user_data);
- }
} else if (seg->ack == priv->snd_una) {
/* !?! Note, tcp says don't do this... but otherwise how does a
closed window become open? */
- priv->snd_wnd = seg->wnd;
+ priv->snd_wnd = seg->wnd << priv->swnd_scale;
// Check duplicate acks
if (seg->len > 0) {
@@ -1136,6 +1331,27 @@ process(PseudoTcpSocket *self, Segment *seg)
}
}
+ // !?! A bit hacky
+ if ((priv->state == TCP_SYN_RECEIVED) && !bConnect) {
+ priv->state = TCP_ESTABLISHED;
+ DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_ESTABLISHED");
+ adjustMTU(self);
+ if (priv->callbacks.PseudoTcpOpened)
+ priv->callbacks.PseudoTcpOpened(self, priv->callbacks.user_data);
+ }
+
+ // If we make room in the send queue, notify the user
+ // The goal it to make sure we always have at least enough data to fill the
+ // window. We'd like to notify the app when we are halfway to that point.
+ kIdealRefillSize = (priv->sbuf_len + priv->rbuf_len) / 2;
+
+ snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf);
+ if (priv->bWriteEnable && snd_buffered < kIdealRefillSize) {
+ priv->bWriteEnable = FALSE;
+ if (priv->callbacks.PseudoTcpWritable)
+ priv->callbacks.PseudoTcpWritable(self, priv->callbacks.user_data);
+ }
+
/* Conditions where acks must be sent:
* 1) Segment is too old (they missed an ACK) (immediately)
* 2) Segment is too new (we missed a segment) (immediately)
@@ -1172,9 +1388,11 @@ process(PseudoTcpSocket *self, Segment *seg)
seg->len = 0;
}
}
- if ((seg->seq + seg->len - priv->rcv_nxt) > get_receive_buffer_space (self)) {
- guint32 nAdjust = seg->seq + seg->len - priv->rcv_nxt -
- get_receive_buffer_space (self);
+
+ available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf);
+
+ if ((seg->seq + seg->len - priv->rcv_nxt) > available_space) {
+ guint32 nAdjust = seg->seq + seg->len - priv->rcv_nxt - available_space;
if (nAdjust < seg->len) {
seg->len -= nAdjust;
} else {
@@ -1192,18 +1410,16 @@ process(PseudoTcpSocket *self, Segment *seg)
}
} else {
guint32 nOffset = seg->seq - priv->rcv_nxt;
+ gsize res;
- if (get_receive_buffer_consecutive_space (self) < seg->len + nOffset) {
- consolidate_receiver_buffer_space (self);
- g_assert (get_receive_buffer_consecutive_space (self) >=
- seg->len + nOffset);
- }
+ res = pseudo_tcp_fifo_write_offset (&priv->rbuf, (guint8 *) seg->data,
+ seg->len, nOffset);
+ g_assert (res == seg->len);
- memcpy(priv->rbuf + priv->rlen + nOffset, seg->data, seg->len);
if (seg->seq == priv->rcv_nxt) {
GList *iter = NULL;
- priv->rlen += seg->len;
+ pseudo_tcp_fifo_consume_write_buffer (&priv->rbuf, seg->len);
priv->rcv_nxt += seg->len;
priv->rcv_wnd -= seg->len;
bNewData = TRUE;
@@ -1216,7 +1432,7 @@ process(PseudoTcpSocket *self, Segment *seg)
sflags = sfImmediateAck; // (Fast Recovery)
DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Recovered %d bytes (%d -> %d)",
nAdjust, priv->rcv_nxt, priv->rcv_nxt + nAdjust);
- priv->rlen += nAdjust;
+ pseudo_tcp_fifo_consume_write_buffer (&priv->rbuf, nAdjust);
priv->rcv_nxt += nAdjust;
priv->rcv_wnd -= nAdjust;
}
@@ -1268,8 +1484,8 @@ transmit(PseudoTcpSocket *self, const GList *seg, guint32 now)
while (TRUE) {
guint32 seq = segment->seq;
guint8 flags = (segment->bCtrl ? FLAG_CTL : 0);
- const gchar * buffer = priv->sbuf + (segment->seq - priv->snd_una);
- PseudoTcpWriteResult wres = packet(self, seq, flags, buffer, nTransmit);
+ PseudoTcpWriteResult wres = packet(self, seq, flags,
+ segment->seq - priv->snd_una, nTransmit);
if (wres == WR_SUCCESS)
break;
@@ -1344,6 +1560,7 @@ attempt_send(PseudoTcpSocket *self, SendFlags sflags)
guint32 nInFlight;
guint32 nUseable;
guint32 nAvailable;
+ gsize snd_buffered;
GList *iter;
cwnd = priv->cwnd;
@@ -1353,7 +1570,8 @@ attempt_send(PseudoTcpSocket *self, SendFlags sflags)
nWindow = min(priv->snd_wnd, cwnd);
nInFlight = priv->snd_nxt - priv->snd_una;
nUseable = (nInFlight < nWindow) ? (nWindow - nInFlight) : 0;
- nAvailable = min(priv->slen - nInFlight, priv->mss);
+ snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf);
+ nAvailable = min(snd_buffered - nInFlight, priv->mss);
if (nAvailable > nUseable) {
if (nUseable * 4 < nWindow) {
@@ -1365,12 +1583,13 @@ attempt_send(PseudoTcpSocket *self, SendFlags sflags)
}
if (bFirst) {
+ gsize available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
bFirst = FALSE;
DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "[cwnd: %d nWindow: %d nInFlight: %d "
- "nAvailable: %d nQueued: %d nEmpty: %" G_GSIZE_FORMAT
+ "nAvailable: %d nQueued: %" G_GSIZE_FORMAT " nEmpty: %" G_GSIZE_FORMAT
" ssthresh: %d]",
- priv->cwnd, nWindow, nInFlight, nAvailable, priv->slen - nInFlight,
- sizeof(priv->sbuf) - priv->slen, priv->ssthresh);
+ priv->cwnd, nWindow, nInFlight, nAvailable, snd_buffered,
+ available_space, priv->ssthresh);
}
if (nAvailable == 0) {
@@ -1427,7 +1646,6 @@ static void
closedown(PseudoTcpSocket *self, guint32 err)
{
PseudoTcpSocketPrivate *priv = self->priv;
- priv->slen = 0;
DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_CLOSED");
priv->state = TCP_CLOSED;
@@ -1455,3 +1673,126 @@ adjustMTU(PseudoTcpSocket *self)
priv->ssthresh = max(priv->ssthresh, 2 * priv->mss);
priv->cwnd = max(priv->cwnd, priv->mss);
}
+
+static void
+apply_window_scale_option (PseudoTcpSocket *self, guint8 scale_factor)
+{
+ PseudoTcpSocketPrivate *priv = self->priv;
+
+ priv->swnd_scale = scale_factor;
+}
+
+static void
+apply_option(PseudoTcpSocket *self, char kind, const guint8* data, guint32 len)
+{
+ if (kind == TCP_OPT_MSS) {
+ DEBUG (PSEUDO_TCP_DEBUG_NORMAL,
+ "Peer specified MSS option which is not supported.");
+ // TODO: Implement.
+ } else if (kind == TCP_OPT_WND_SCALE) {
+ // Window scale factor.
+ // http://www.ietf.org/rfc/rfc1323.txt
+ if (len != 1) {
+ DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid window scale option received.");
+ return;
+ }
+ apply_window_scale_option(self, data[0]);
+ }
+}
+
+
+static void
+parse_options (PseudoTcpSocket *self, const guint8 *data, guint32 len)
+{
+ PseudoTcpSocketPrivate *priv = self->priv;
+ gboolean has_window_scaling_option = FALSE;
+ guint32 pos = 0;
+
+ // See http://www.freesoft.org/CIE/Course/Section4/8.htm for
+ // parsing the options list.
+ while (pos < len) {
+ guint8 kind = TCP_OPT_EOL;
+ guint8 opt_len;
+
+ kind = data[pos];
+ pos++;
+
+ if (kind == TCP_OPT_EOL) {
+ // End of option list.
+ break;
+ } else if (kind == TCP_OPT_NOOP) {
+ // No op.
+ continue;
+ }
+
+ // Length of this option.
+ g_assert(len);
+ opt_len = data[pos];
+ pos++;
+
+ // Content of this option.
+ if (opt_len <= len - pos) {
+ apply_option (self, kind, data + pos, opt_len);
+ pos += opt_len;
+ } else {
+ DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid option length received.");
+ return;
+ }
+
+ if (kind == TCP_OPT_WND_SCALE)
+ has_window_scaling_option = TRUE;
+ }
+
+ if (!has_window_scaling_option) {
+ DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Peer doesn't support window scaling");
+ if (priv->rwnd_scale > 0) {
+ // Peer doesn't support TCP options and window scaling.
+ // Revert receive buffer size to default value.
+ resize_receive_buffer (self, DEFAULT_RCV_BUF_SIZE);
+ priv->swnd_scale = 0;
+ }
+ }
+}
+
+static void
+resize_send_buffer (PseudoTcpSocket *self, guint32 new_size)
+{
+ PseudoTcpSocketPrivate *priv = self->priv;
+
+ priv->sbuf_len = new_size;
+ pseudo_tcp_fifo_set_capacity (&priv->sbuf, new_size);
+}
+
+
+static void
+resize_receive_buffer (PseudoTcpSocket *self, guint32 new_size)
+{
+ PseudoTcpSocketPrivate *priv = self->priv;
+ guint8 scale_factor = 0;
+ gboolean result;
+ gsize available_space;
+
+ // Determine the scale factor such that the scaled window size can fit
+ // in a 16-bit unsigned integer.
+ while (new_size > 0xFFFF) {
+ ++scale_factor;
+ new_size >>= 1;
+ }
+
+ // Determine the proper size of the buffer.
+ new_size <<= scale_factor;
+ result = pseudo_tcp_fifo_set_capacity (&priv->rbuf, new_size);
+
+ // Make sure the new buffer is large enough to contain data in the old
+ // buffer. This should always be true because this method is called either
+ // before connection is established or when peers are exchanging connect
+ // messages.
+ g_assert(result);
+ priv->rbuf_len = new_size;
+ priv->rwnd_scale = scale_factor;
+ priv->ssthresh = new_size;
+
+ available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf);
+ priv->rcv_wnd = available_space;
+}
+