diff options
Diffstat (limited to 'agent/pseudotcp.c')
-rw-r--r-- | agent/pseudotcp.c | 575 |
1 files changed, 458 insertions, 117 deletions
diff --git a/agent/pseudotcp.c b/agent/pseudotcp.c index 54a714c..2e8ecc4 100644 --- a/agent/pseudotcp.c +++ b/agent/pseudotcp.c @@ -156,6 +156,15 @@ const guint16 PACKET_MAXIMUMS[] = { #define DEFAULT_ACK_DELAY 100 /* 100 milliseconds */ #define DEFAULT_NO_DELAY FALSE +#define DEFAULT_RCV_BUF_SIZE (60 * 1024) +#define DEFAULT_SND_BUF_SIZE (90 * 1024) + +#define TCP_OPT_EOL 0 // End of list. +#define TCP_OPT_NOOP 1 // No-op. +#define TCP_OPT_MSS 2 // Maximum segment size. +#define TCP_OPT_WND_SCALE 3 // Window scale factor. + + /* #define FLAG_FIN 0x01 #define FLAG_SYN 0x02 @@ -229,6 +238,150 @@ time_diff(guint32 later, guint32 earlier) } } +//////////////////////////////////////////////////////// +// PseudoTcpFifo works exactly like FifoBuffer in libjingle +//////////////////////////////////////////////////////// + + +typedef struct { + guint8 *buffer; + gsize buffer_length; + gsize data_length; + gsize read_position; +} PseudoTcpFifo; + + +static void +pseudo_tcp_fifo_init (PseudoTcpFifo *b, gsize size) +{ + b->buffer = g_slice_alloc (size); + b->buffer_length = size; +} + +static void +pseudo_tcp_fifo_clear (PseudoTcpFifo *b) +{ + if (b->buffer) + g_slice_free1 (b->buffer_length, b->buffer); + b->buffer = NULL; + b->buffer_length = 0; +} + +static gsize +pseudo_tcp_fifo_get_buffered (PseudoTcpFifo *b) +{ + return b->data_length; +} + +static gboolean +pseudo_tcp_fifo_set_capacity (PseudoTcpFifo *b, gsize size) +{ + if (b->data_length > size) + return FALSE; + + if (size != b->data_length) { + guint8 *buffer = g_slice_alloc (size); + gsize copy = b->data_length; + gsize tail_copy = min (copy, b->buffer_length - b->read_position); + + memcpy (buffer, &b->buffer[b->read_position], tail_copy); + memcpy (buffer + tail_copy, &b->buffer[0], copy - tail_copy); + g_slice_free1 (b->buffer_length, b->buffer); + b->buffer = buffer; + b->buffer_length = size; + b->read_position = 0; + } + + return TRUE; +} + +static void +pseudo_tcp_fifo_consume_read_data (PseudoTcpFifo *b, gsize size) +{ + g_assert (size <= b->data_length); + + b->read_position = (b->read_position + size) % b->buffer_length; + b->data_length -= size; +} + +static void +pseudo_tcp_fifo_consume_write_buffer (PseudoTcpFifo *b, gsize size) +{ + g_assert (size <= b->buffer_length - b->data_length); + + b->data_length += size; +} + +static gsize +pseudo_tcp_fifo_get_write_remaining (PseudoTcpFifo *b) +{ + return b->buffer_length - b->data_length; +} + +static gsize +pseudo_tcp_fifo_read_offset (PseudoTcpFifo *b, guint8 *buffer, gsize bytes, + gsize offset) +{ + gsize available = b->data_length - offset; + gsize read_position = (b->read_position + offset) % b->buffer_length; + gsize copy = min (bytes, available); + gsize tail_copy = min(copy, b->buffer_length - read_position); + + /* EOS */ + if (offset >= b->data_length) + return 0; + + memcpy(buffer, &b->buffer[read_position], tail_copy); + memcpy(buffer + tail_copy, &b->buffer[0], copy - tail_copy); + + return copy; +} + +static gsize +pseudo_tcp_fifo_write_offset (PseudoTcpFifo *b, const guint8 *buffer, + gsize bytes, gsize offset) +{ + gsize available = b->buffer_length - b->data_length - offset; + gsize write_position = (b->read_position + b->data_length + offset) + % b->buffer_length; + gsize copy = min (bytes, available); + gsize tail_copy = min(copy, b->buffer_length - write_position); + + if (b->data_length + offset >= b->buffer_length) { + return 0; + } + + memcpy(&b->buffer[write_position], buffer, tail_copy); + memcpy(&b->buffer[0], buffer + tail_copy, copy - tail_copy); + + return copy; +} + +static gsize +pseudo_tcp_fifo_read (PseudoTcpFifo *b, guint8 *buffer, gsize bytes) +{ + gsize copy; + + copy = pseudo_tcp_fifo_read_offset (b, buffer, bytes, 0); + + b->read_position = (b->read_position + copy) % b->buffer_length; + b->data_length -= copy; + + return copy; +} + +static gsize +pseudo_tcp_fifo_write (PseudoTcpFifo *b, const guint8 *buffer, gsize bytes) +{ + gsize copy; + + copy = pseudo_tcp_fifo_write_offset (b, buffer, bytes, 0); + b->data_length += copy; + + return copy; +} + + ////////////////////////////////////////////////////////////////////// // PseudoTcp ////////////////////////////////////////////////////////////////////// @@ -245,14 +398,6 @@ typedef enum { sfImmediateAck } SendFlags; -enum { - // Note: can't go as high as 1024 * 64, because of uint16 precision - kRcvBufSize = 1024 * 60, - // Note: send buffer should be larger to make sure we can always fill the - // receiver window - kSndBufSize = 1024 * 90 -}; - typedef struct { guint32 conv, seq, ack; guint8 flags; @@ -287,13 +432,16 @@ struct _PseudoTcpSocketPrivate { // Incoming data GList *rlist; - gchar rbuf[kRcvBufSize]; - guint32 rcv_nxt, rcv_wnd, rpos, rlen, lastrecv; + guint32 rbuf_len, rcv_nxt, rcv_wnd, lastrecv; + guint8 rwnd_scale; // Window scale factor + PseudoTcpFifo rbuf; // Outgoing data GList *slist; - gchar sbuf[kSndBufSize]; - guint32 snd_nxt, snd_wnd, slen, lastsend, snd_una; + guint32 sbuf_len, snd_nxt, snd_wnd, lastsend, snd_una; + guint8 swnd_scale; // Window scale factor + PseudoTcpFifo sbuf; + // Maximum segment size, estimated protocol level, largest segment sent guint32 mss, msslevel, largest, mtu_advise; // Retransmit timer @@ -313,6 +461,10 @@ struct _PseudoTcpSocketPrivate { gboolean use_nagling; guint32 ack_delay; + + // This is used by unit tests to test backward compatibility of + // PseudoTcp implementations that don't support window scaling. + gboolean support_wnd_scale; }; @@ -324,6 +476,8 @@ enum PROP_STATE, PROP_ACK_DELAY, PROP_NO_DELAY, + PROP_RCV_BUF, + PROP_SND_BUF, LAST_PROPERTY }; @@ -335,10 +489,11 @@ static void pseudo_tcp_socket_set_property (GObject *object, guint property_id, static void pseudo_tcp_socket_finalize (GObject *object); +static void queue_connect_message (PseudoTcpSocket *self); static guint32 queue(PseudoTcpSocket *self, const gchar * data, guint32 len, gboolean bCtrl); static PseudoTcpWriteResult packet(PseudoTcpSocket *self, guint32 seq, - guint8 flags, const gchar * data, guint32 len); + guint8 flags, guint32 offset, guint32 len); static gboolean parse(PseudoTcpSocket *self, const guint8 * buffer, guint32 size); static gboolean process(PseudoTcpSocket *self, Segment *seg); @@ -346,6 +501,10 @@ static gboolean transmit(PseudoTcpSocket *self, const GList *seg, guint32 now); static void attempt_send(PseudoTcpSocket *self, SendFlags sflags); static void closedown(PseudoTcpSocket *self, guint32 err); static void adjustMTU(PseudoTcpSocket *self); +static void parse_options (PseudoTcpSocket *self, const guint8 *data, + guint32 len); +static void resize_send_buffer (PseudoTcpSocket *self, guint32 new_size); +static void resize_receive_buffer (PseudoTcpSocket *self, guint32 new_size); // The following logging is for detailed (packet-level) pseudotcp analysis only. @@ -409,6 +568,18 @@ pseudo_tcp_socket_class_init (PseudoTcpSocketClass *cls) "Disable the Nagle algorithm (like the TCP_NODELAY option)", DEFAULT_NO_DELAY, G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_RCV_BUF, + g_param_spec_uint ("rcv-buf", "Receive Buffer", + "Receive Buffer size", + 1, G_MAXUINT, DEFAULT_RCV_BUF_SIZE, + G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_SND_BUF, + g_param_spec_uint ("snd-buf", "Send Buffer", + "Send Buffer size", + 1, G_MAXUINT, DEFAULT_SND_BUF_SIZE, + G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS)); } @@ -436,6 +607,12 @@ pseudo_tcp_socket_get_property (GObject *object, case PROP_NO_DELAY: g_value_set_boolean (value, !self->priv->use_nagling); break; + case PROP_RCV_BUF: + g_value_set_uint (value, self->priv->rbuf_len); + break; + case PROP_SND_BUF: + g_value_set_uint (value, self->priv->sbuf_len); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; @@ -466,6 +643,14 @@ pseudo_tcp_socket_set_property (GObject *object, case PROP_NO_DELAY: self->priv->use_nagling = !g_value_get_boolean (value); break; + case PROP_RCV_BUF: + g_return_if_fail (self->priv->state == TCP_LISTEN); + resize_receive_buffer (self, g_value_get_uint (value)); + break; + case PROP_SND_BUF: + g_return_if_fail (self->priv->state == TCP_LISTEN); + resize_send_buffer (self, g_value_get_uint (value)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; @@ -495,6 +680,9 @@ pseudo_tcp_socket_finalize (GObject *object) g_list_free (priv->rlist); priv->rlist = NULL; + pseudo_tcp_fifo_clear (&priv->rbuf); + pseudo_tcp_fifo_clear (&priv->sbuf); + g_free (priv); self->priv = NULL; @@ -517,12 +705,18 @@ pseudo_tcp_socket_init (PseudoTcpSocket *obj) priv->shutdown = SD_NONE; priv->error = 0; + priv->rbuf_len = DEFAULT_RCV_BUF_SIZE; + pseudo_tcp_fifo_init (&priv->rbuf, priv->rbuf_len); + priv->sbuf_len = DEFAULT_SND_BUF_SIZE; + pseudo_tcp_fifo_init (&priv->sbuf, priv->sbuf_len); + priv->state = TCP_LISTEN; priv->conv = 0; - priv->rcv_wnd = sizeof(priv->rbuf); - priv->snd_nxt = priv->slen = 0; + priv->rcv_wnd = priv->rbuf_len; + priv->rwnd_scale = priv->swnd_scale = 0; + priv->snd_nxt = 0; priv->snd_wnd = 1; - priv->snd_una = priv->rcv_nxt = priv->rlen = priv->rpos = 0; + priv->snd_una = priv->rcv_nxt = 0; priv->bReadEnable = TRUE; priv->bWriteEnable = FALSE; priv->t_ack = 0; @@ -535,7 +729,7 @@ pseudo_tcp_socket_init (PseudoTcpSocket *obj) priv->rto_base = 0; priv->cwnd = 2 * priv->mss; - priv->ssthresh = sizeof(priv->rbuf); + priv->ssthresh = priv->rbuf_len; priv->lastrecv = priv->lastsend = priv->last_traffic = now; priv->bOutgoing = FALSE; @@ -549,6 +743,8 @@ pseudo_tcp_socket_init (PseudoTcpSocket *obj) priv->ack_delay = DEFAULT_ACK_DELAY; priv->use_nagling = !DEFAULT_NO_DELAY; + + priv->support_wnd_scale = TRUE; } PseudoTcpSocket *pseudo_tcp_socket_new (guint32 conversation, @@ -561,11 +757,30 @@ PseudoTcpSocket *pseudo_tcp_socket_new (guint32 conversation, NULL); } +static void +queue_connect_message (PseudoTcpSocket *self) +{ + PseudoTcpSocketPrivate *priv = self->priv; + guint8 buf[4]; + gsize size = 1; + + buf[0] = CTL_CONNECT; + if (priv->support_wnd_scale) { + buf[1] = TCP_OPT_WND_SCALE; + buf[2] = 1; + buf[3] = priv->rwnd_scale; + size = 4; + } + + priv->snd_wnd = size; + + queue(self, (char*) buf, size, TRUE); +} + gboolean pseudo_tcp_socket_connect(PseudoTcpSocket *self) { PseudoTcpSocketPrivate *priv = self->priv; - gchar buffer[1]; if (priv->state != TCP_LISTEN) { priv->error = EINVAL; @@ -575,8 +790,7 @@ pseudo_tcp_socket_connect(PseudoTcpSocket *self) priv->state = TCP_SYN_SENT; DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_SYN_SENT"); - buffer[0] = CTL_CONNECT; - queue(self, buffer, 1, TRUE); + queue_connect_message (self); attempt_send(self, sfNone); return TRUE; @@ -672,13 +886,15 @@ pseudo_tcp_socket_get_next_clock(PseudoTcpSocket *self, long *timeout) { PseudoTcpSocketPrivate *priv = self->priv; guint32 now = get_current_time (); + gsize snd_buffered; if (priv->shutdown == SD_FORCEFUL) return FALSE; + snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf); if ((priv->shutdown == SD_GRACEFUL) && ((priv->state != TCP_ESTABLISHED) - || ((priv->slen == 0) && (priv->t_ack == 0)))) { + || ((snd_buffered == 0) && (priv->t_ack == 0)))) { return FALSE; } @@ -702,62 +918,39 @@ pseudo_tcp_socket_get_next_clock(PseudoTcpSocket *self, long *timeout) return TRUE; } -static guint32 -get_receive_buffer_space (PseudoTcpSocket *self) -{ - PseudoTcpSocketPrivate *priv = self->priv; - - return sizeof(priv->rbuf) - priv->rlen + priv->rpos; -} - -static guint32 -get_receive_buffer_consecutive_space (PseudoTcpSocket *self) -{ - PseudoTcpSocketPrivate *priv = self->priv; - - return sizeof(priv->rbuf) - priv->rlen; -} - -static void -consolidate_receiver_buffer_space (PseudoTcpSocket *self) -{ - PseudoTcpSocketPrivate *priv = self->priv; - - memmove(priv->rbuf, priv->rbuf + priv->rpos, sizeof(priv->rbuf) - priv->rpos); - priv->rlen -= priv->rpos; - priv->rpos = 0; -} gint pseudo_tcp_socket_recv(PseudoTcpSocket *self, char * buffer, size_t len) { PseudoTcpSocketPrivate *priv = self->priv; - guint32 read; + gsize read; + gsize available_space; if (priv->state != TCP_ESTABLISHED) { priv->error = ENOTCONN; return -1; } - // Make sure read position is correct. - g_assert (priv->rpos <= priv->rlen); - if (priv->rlen == priv->rpos) { + if (len == 0) + return 0; + + read = pseudo_tcp_fifo_read (&priv->rbuf, (guint8 *) buffer, len); + + // If there's no data in |m_rbuf|. + if (read == 0) { priv->bReadEnable = TRUE; priv->error = EWOULDBLOCK; return -1; } - read = min((guint32) len, priv->rlen - priv->rpos); - memcpy(buffer, priv->rbuf + priv->rpos, read); - priv->rpos += read; - + available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf); - if (get_receive_buffer_space (self) - priv->rcv_wnd >= - min(sizeof(priv->rbuf) / 2, priv->mss)) { + if (available_space - priv->rcv_wnd >= + min (priv->rbuf_len / 2, priv->mss)) { // !?! Not sure about this was closed business gboolean bWasClosed = (priv->rcv_wnd == 0); - priv->rcv_wnd = get_receive_buffer_space (self); + priv->rcv_wnd = available_space; if (bWasClosed) { attempt_send(self, sfImmediateAck); @@ -772,13 +965,16 @@ pseudo_tcp_socket_send(PseudoTcpSocket *self, const char * buffer, guint32 len) { PseudoTcpSocketPrivate *priv = self->priv; gint written; + gsize available_space; if (priv->state != TCP_ESTABLISHED) { priv->error = ENOTCONN; return -1; } - if (priv->slen == sizeof(priv->sbuf)) { + available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf); + + if (!available_space) { priv->bWriteEnable = TRUE; priv->error = EWOULDBLOCK; return -1; @@ -798,7 +994,8 @@ void pseudo_tcp_socket_close(PseudoTcpSocket *self, gboolean force) { PseudoTcpSocketPrivate *priv = self->priv; - //nice_agent ("Closing socket %p : %d", sock, force?"true":"false"); + DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "Closing socket %p : %s", self, + force ? "forcefully" : "gracefully"); priv->shutdown = force ? SD_FORCEFUL : SD_GRACEFUL; } @@ -817,10 +1014,12 @@ static guint32 queue(PseudoTcpSocket *self, const gchar * data, guint32 len, gboolean bCtrl) { PseudoTcpSocketPrivate *priv = self->priv; + gsize available_space; - if (len > sizeof(priv->sbuf) - priv->slen) { + available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf); + if (len > available_space) { g_assert(!bCtrl); - len = sizeof(priv->sbuf) - priv->slen; + len = available_space; } // We can concatenate data if the last segment is the same type @@ -831,21 +1030,30 @@ queue(PseudoTcpSocket *self, const gchar * data, guint32 len, gboolean bCtrl) ((SSegment *)g_list_last (priv->slist)->data)->len += len; } else { SSegment *sseg = g_slice_new0 (SSegment); - sseg->seq = priv->snd_una + priv->slen; + gsize snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf); + + sseg->seq = priv->snd_una + snd_buffered; sseg->len = len; sseg->bCtrl = bCtrl; priv->slist = g_list_append (priv->slist, sseg); } - memcpy(priv->sbuf + priv->slen, data, len); - priv->slen += len; //LOG(LS_INFO) << "PseudoTcp::queue - priv->slen = " << priv->slen; - return len; + return pseudo_tcp_fifo_write (&priv->sbuf, (guint8*) data, len);; } +// Creates a packet and submits it to the network. This method can either +// send payload or just an ACK packet. +// +// |seq| is the sequence number of this packet. +// |flags| is the flags for sending this packet. +// |offset| is the offset to read from |m_sbuf|. +// |len| is the number of bytes to read from |m_sbuf| as payload. If this +// value is 0 then this is an ACK packet, otherwise this packet has payload. + static PseudoTcpWriteResult packet(PseudoTcpSocket *self, guint32 seq, guint8 flags, - const gchar * data, guint32 len) + guint32 offset, guint32 len) { PseudoTcpSocketPrivate *priv = self->priv; guint32 now = get_current_time(); @@ -863,15 +1071,20 @@ packet(PseudoTcpSocket *self, guint32 seq, guint8 flags, *(buffer.u32 + 2) = htonl(priv->rcv_nxt); buffer.u8[12] = 0; buffer.u8[13] = flags; - *(buffer.u16 + 7) = htons((guint16)priv->rcv_wnd); + *(buffer.u16 + 7) = htons((guint16)(priv->rcv_wnd >> priv->rwnd_scale)); // Timestamp computations *(buffer.u32 + 4) = htonl(now); *(buffer.u32 + 5) = htonl(priv->ts_recent); priv->ts_lastack = priv->rcv_nxt; - if (data != NULL) - memcpy(buffer.u8 + HEADER_SIZE, data, len); + if (len) { + gsize bytes_read; + + bytes_read = pseudo_tcp_fifo_read_offset (&priv->sbuf, buffer.u8 + HEADER_SIZE, + len, offset); + g_assert (bytes_read == len); + } DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "<-- <CONV=%d><FLG=%d><SEQ=%d:%d><ACK=%d>" "<WND=%d><TS=%d><TSR=%d><LEN=%d>", @@ -880,11 +1093,11 @@ packet(PseudoTcpSocket *self, guint32 seq, guint8 flags, wres = priv->callbacks.WritePacket(self, (gchar *) buffer.u8, len + HEADER_SIZE, priv->callbacks.user_data); - /* Note: When data is NULL, this is an ACK packet. We don't read the + /* Note: When len is 0, this is an ACK packet. We don't read the return value for those, and thus we won't retry. So go ahead and treat the packet as a success (basically simulate as if it were dropped), which will prevent our timers from being messed up. */ - if ((wres != WR_SUCCESS) && (NULL != data)) + if ((wres != WR_SUCCESS) && (0 != len)) return wres; priv->t_ack = 0; @@ -943,6 +1156,9 @@ process(PseudoTcpSocket *self, Segment *seg) gboolean bIgnoreData; gboolean bNewData; gboolean bConnect = FALSE; + gsize snd_buffered; + gsize available_space; + guint32 kIdealRefillSize; /* If this is the wrong conversation, send a reset!?! (with the correct conversation?) */ @@ -978,11 +1194,12 @@ process(PseudoTcpSocket *self, Segment *seg) return FALSE; } else if (seg->data[0] == CTL_CONNECT) { bConnect = TRUE; + + parse_options (self, (guint8 *) &seg->data[1], seg->len - 1); + if (priv->state == TCP_LISTEN) { - char buffer[1]; priv->state = TCP_SYN_RECEIVED; - buffer[0] = CTL_CONNECT; - queue(self, buffer, 1, TRUE); + queue_connect_message (self); } else if (priv->state == TCP_SYN_SENT) { priv->state = TCP_ESTABLISHED; DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_ESTABLISHED"); @@ -1007,7 +1224,6 @@ process(PseudoTcpSocket *self, Segment *seg) if ((seg->ack > priv->snd_una) && (seg->ack <= priv->snd_nxt)) { guint32 nAcked; guint32 nFree; - guint32 kIdealRefillSize; // Calculate round-trip time if (seg->tsecr) { @@ -1031,16 +1247,14 @@ process(PseudoTcpSocket *self, Segment *seg) } } - priv->snd_wnd = seg->wnd; + priv->snd_wnd = seg->wnd << priv->swnd_scale; nAcked = seg->ack - priv->snd_una; priv->snd_una = seg->ack; priv->rto_base = (priv->snd_una == priv->snd_nxt) ? 0 : now; - priv->slen -= nAcked; - memmove(priv->sbuf, priv->sbuf + nAcked, priv->slen); - //LOG(LS_INFO) << "PseudoTcp::process - priv->slen = " << priv->slen; + pseudo_tcp_fifo_consume_read_data (&priv->sbuf, nAcked); for (nFree = nAcked; nFree > 0; ) { SSegment *data; @@ -1085,29 +1299,10 @@ process(PseudoTcpSocket *self, Segment *seg) priv->cwnd += max(1LU, priv->mss * priv->mss / priv->cwnd); } } - - // !?! A bit hacky - if ((priv->state == TCP_SYN_RECEIVED) && !bConnect) { - priv->state = TCP_ESTABLISHED; - DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_ESTABLISHED"); - adjustMTU(self); - if (priv->callbacks.PseudoTcpOpened) - priv->callbacks.PseudoTcpOpened(self, priv->callbacks.user_data); - } - - // If we make room in the send queue, notify the user - // The goal it to make sure we always have at least enough data to fill the - // window. We'd like to notify the app when we are halfway to that point. - kIdealRefillSize = (sizeof(priv->sbuf) + sizeof(priv->rbuf)) / 2; - if (priv->bWriteEnable && (priv->slen < kIdealRefillSize)) { - priv->bWriteEnable = FALSE; - if (priv->callbacks.PseudoTcpWritable) - priv->callbacks.PseudoTcpWritable(self, priv->callbacks.user_data); - } } else if (seg->ack == priv->snd_una) { /* !?! Note, tcp says don't do this... but otherwise how does a closed window become open? */ - priv->snd_wnd = seg->wnd; + priv->snd_wnd = seg->wnd << priv->swnd_scale; // Check duplicate acks if (seg->len > 0) { @@ -1136,6 +1331,27 @@ process(PseudoTcpSocket *self, Segment *seg) } } + // !?! A bit hacky + if ((priv->state == TCP_SYN_RECEIVED) && !bConnect) { + priv->state = TCP_ESTABLISHED; + DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_ESTABLISHED"); + adjustMTU(self); + if (priv->callbacks.PseudoTcpOpened) + priv->callbacks.PseudoTcpOpened(self, priv->callbacks.user_data); + } + + // If we make room in the send queue, notify the user + // The goal it to make sure we always have at least enough data to fill the + // window. We'd like to notify the app when we are halfway to that point. + kIdealRefillSize = (priv->sbuf_len + priv->rbuf_len) / 2; + + snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf); + if (priv->bWriteEnable && snd_buffered < kIdealRefillSize) { + priv->bWriteEnable = FALSE; + if (priv->callbacks.PseudoTcpWritable) + priv->callbacks.PseudoTcpWritable(self, priv->callbacks.user_data); + } + /* Conditions where acks must be sent: * 1) Segment is too old (they missed an ACK) (immediately) * 2) Segment is too new (we missed a segment) (immediately) @@ -1172,9 +1388,11 @@ process(PseudoTcpSocket *self, Segment *seg) seg->len = 0; } } - if ((seg->seq + seg->len - priv->rcv_nxt) > get_receive_buffer_space (self)) { - guint32 nAdjust = seg->seq + seg->len - priv->rcv_nxt - - get_receive_buffer_space (self); + + available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf); + + if ((seg->seq + seg->len - priv->rcv_nxt) > available_space) { + guint32 nAdjust = seg->seq + seg->len - priv->rcv_nxt - available_space; if (nAdjust < seg->len) { seg->len -= nAdjust; } else { @@ -1192,18 +1410,16 @@ process(PseudoTcpSocket *self, Segment *seg) } } else { guint32 nOffset = seg->seq - priv->rcv_nxt; + gsize res; - if (get_receive_buffer_consecutive_space (self) < seg->len + nOffset) { - consolidate_receiver_buffer_space (self); - g_assert (get_receive_buffer_consecutive_space (self) >= - seg->len + nOffset); - } + res = pseudo_tcp_fifo_write_offset (&priv->rbuf, (guint8 *) seg->data, + seg->len, nOffset); + g_assert (res == seg->len); - memcpy(priv->rbuf + priv->rlen + nOffset, seg->data, seg->len); if (seg->seq == priv->rcv_nxt) { GList *iter = NULL; - priv->rlen += seg->len; + pseudo_tcp_fifo_consume_write_buffer (&priv->rbuf, seg->len); priv->rcv_nxt += seg->len; priv->rcv_wnd -= seg->len; bNewData = TRUE; @@ -1216,7 +1432,7 @@ process(PseudoTcpSocket *self, Segment *seg) sflags = sfImmediateAck; // (Fast Recovery) DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Recovered %d bytes (%d -> %d)", nAdjust, priv->rcv_nxt, priv->rcv_nxt + nAdjust); - priv->rlen += nAdjust; + pseudo_tcp_fifo_consume_write_buffer (&priv->rbuf, nAdjust); priv->rcv_nxt += nAdjust; priv->rcv_wnd -= nAdjust; } @@ -1268,8 +1484,8 @@ transmit(PseudoTcpSocket *self, const GList *seg, guint32 now) while (TRUE) { guint32 seq = segment->seq; guint8 flags = (segment->bCtrl ? FLAG_CTL : 0); - const gchar * buffer = priv->sbuf + (segment->seq - priv->snd_una); - PseudoTcpWriteResult wres = packet(self, seq, flags, buffer, nTransmit); + PseudoTcpWriteResult wres = packet(self, seq, flags, + segment->seq - priv->snd_una, nTransmit); if (wres == WR_SUCCESS) break; @@ -1344,6 +1560,7 @@ attempt_send(PseudoTcpSocket *self, SendFlags sflags) guint32 nInFlight; guint32 nUseable; guint32 nAvailable; + gsize snd_buffered; GList *iter; cwnd = priv->cwnd; @@ -1353,7 +1570,8 @@ attempt_send(PseudoTcpSocket *self, SendFlags sflags) nWindow = min(priv->snd_wnd, cwnd); nInFlight = priv->snd_nxt - priv->snd_una; nUseable = (nInFlight < nWindow) ? (nWindow - nInFlight) : 0; - nAvailable = min(priv->slen - nInFlight, priv->mss); + snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf); + nAvailable = min(snd_buffered - nInFlight, priv->mss); if (nAvailable > nUseable) { if (nUseable * 4 < nWindow) { @@ -1365,12 +1583,13 @@ attempt_send(PseudoTcpSocket *self, SendFlags sflags) } if (bFirst) { + gsize available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf); bFirst = FALSE; DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "[cwnd: %d nWindow: %d nInFlight: %d " - "nAvailable: %d nQueued: %d nEmpty: %" G_GSIZE_FORMAT + "nAvailable: %d nQueued: %" G_GSIZE_FORMAT " nEmpty: %" G_GSIZE_FORMAT " ssthresh: %d]", - priv->cwnd, nWindow, nInFlight, nAvailable, priv->slen - nInFlight, - sizeof(priv->sbuf) - priv->slen, priv->ssthresh); + priv->cwnd, nWindow, nInFlight, nAvailable, snd_buffered, + available_space, priv->ssthresh); } if (nAvailable == 0) { @@ -1427,7 +1646,6 @@ static void closedown(PseudoTcpSocket *self, guint32 err) { PseudoTcpSocketPrivate *priv = self->priv; - priv->slen = 0; DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_CLOSED"); priv->state = TCP_CLOSED; @@ -1455,3 +1673,126 @@ adjustMTU(PseudoTcpSocket *self) priv->ssthresh = max(priv->ssthresh, 2 * priv->mss); priv->cwnd = max(priv->cwnd, priv->mss); } + +static void +apply_window_scale_option (PseudoTcpSocket *self, guint8 scale_factor) +{ + PseudoTcpSocketPrivate *priv = self->priv; + + priv->swnd_scale = scale_factor; +} + +static void +apply_option(PseudoTcpSocket *self, char kind, const guint8* data, guint32 len) +{ + if (kind == TCP_OPT_MSS) { + DEBUG (PSEUDO_TCP_DEBUG_NORMAL, + "Peer specified MSS option which is not supported."); + // TODO: Implement. + } else if (kind == TCP_OPT_WND_SCALE) { + // Window scale factor. + // http://www.ietf.org/rfc/rfc1323.txt + if (len != 1) { + DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid window scale option received."); + return; + } + apply_window_scale_option(self, data[0]); + } +} + + +static void +parse_options (PseudoTcpSocket *self, const guint8 *data, guint32 len) +{ + PseudoTcpSocketPrivate *priv = self->priv; + gboolean has_window_scaling_option = FALSE; + guint32 pos = 0; + + // See http://www.freesoft.org/CIE/Course/Section4/8.htm for + // parsing the options list. + while (pos < len) { + guint8 kind = TCP_OPT_EOL; + guint8 opt_len; + + kind = data[pos]; + pos++; + + if (kind == TCP_OPT_EOL) { + // End of option list. + break; + } else if (kind == TCP_OPT_NOOP) { + // No op. + continue; + } + + // Length of this option. + g_assert(len); + opt_len = data[pos]; + pos++; + + // Content of this option. + if (opt_len <= len - pos) { + apply_option (self, kind, data + pos, opt_len); + pos += opt_len; + } else { + DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid option length received."); + return; + } + + if (kind == TCP_OPT_WND_SCALE) + has_window_scaling_option = TRUE; + } + + if (!has_window_scaling_option) { + DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Peer doesn't support window scaling"); + if (priv->rwnd_scale > 0) { + // Peer doesn't support TCP options and window scaling. + // Revert receive buffer size to default value. + resize_receive_buffer (self, DEFAULT_RCV_BUF_SIZE); + priv->swnd_scale = 0; + } + } +} + +static void +resize_send_buffer (PseudoTcpSocket *self, guint32 new_size) +{ + PseudoTcpSocketPrivate *priv = self->priv; + + priv->sbuf_len = new_size; + pseudo_tcp_fifo_set_capacity (&priv->sbuf, new_size); +} + + +static void +resize_receive_buffer (PseudoTcpSocket *self, guint32 new_size) +{ + PseudoTcpSocketPrivate *priv = self->priv; + guint8 scale_factor = 0; + gboolean result; + gsize available_space; + + // Determine the scale factor such that the scaled window size can fit + // in a 16-bit unsigned integer. + while (new_size > 0xFFFF) { + ++scale_factor; + new_size >>= 1; + } + + // Determine the proper size of the buffer. + new_size <<= scale_factor; + result = pseudo_tcp_fifo_set_capacity (&priv->rbuf, new_size); + + // Make sure the new buffer is large enough to contain data in the old + // buffer. This should always be true because this method is called either + // before connection is established or when peers are exchanging connect + // messages. + g_assert(result); + priv->rbuf_len = new_size; + priv->rwnd_scale = scale_factor; + priv->ssthresh = new_size; + + available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf); + priv->rcv_wnd = available_space; +} + |