Skip to content

Commit

Permalink
[tcp] Handle out-of-order received packets
Browse files Browse the repository at this point in the history
Maintain a queue of received packets, so that lost packets need not
result in retransmission of the entire TCP window.

Increase the TCP window to 8kB, in order that we can potentially
transmit enough duplicate ACKs to trigger Fast Retransmission at the
sender.

Using a 10MB HTTP download in qemu-kvm with an artificial drop rate of
1 in 64 packets, this reduces the download time from around 26s to
around 4s.

Signed-off-by: Michael Brown <mcb30@ipxe.org>
  • Loading branch information
mcb30 committed Jul 20, 2010
1 parent 9f2e76e commit 6861304
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 35 deletions.
31 changes: 30 additions & 1 deletion src/include/ipxe/tcp.h
Expand Up @@ -287,7 +287,7 @@ struct tcp_options {
* that payloads remain dword-aligned.
*/
//#define TCP_MAX_WINDOW_SIZE ( 65536 - 4 )
#define TCP_MAX_WINDOW_SIZE 4096
#define TCP_MAX_WINDOW_SIZE 8192

/**
* Path MTU
Expand All @@ -313,6 +313,35 @@ struct tcp_options {
*/
#define TCP_MSL ( 2 * 60 * TICKS_PER_SEC )

/**
* Compare TCP sequence numbers
*
* @v seq1 Sequence number 1
* @v seq2 Sequence number 2
* @ret diff Sequence difference
*
* Analogous to memcmp(), returns an integer less than, equal to, or
* greater than zero if @c seq1 is found, respectively, to be before,
* equal to, or after @c seq2.
*/
static inline __attribute__ (( always_inline )) int32_t
tcp_cmp ( uint32_t seq1, uint32_t seq2 ) {
return ( ( int32_t ) ( seq1 - seq2 ) );
}

/**
* Check if TCP sequence number lies within window
*
* @v seq Sequence number
* @v start Start of window
* @v len Length of window
* @ret in_window Sequence number is within window
*/
static inline int tcp_in_window ( uint32_t seq, uint32_t start,
uint32_t len ) {
return ( ( seq - start ) < len );
}

extern struct tcpip_protocol tcp_protocol;

#endif /* _IPXE_TCP_H */
184 changes: 150 additions & 34 deletions src/net/tcp.c
Expand Up @@ -80,7 +80,9 @@ struct tcp_connection {
uint32_t ts_recent;

/** Transmit queue */
struct list_head queue;
struct list_head tx_queue;
/** Receive queue */
struct list_head rx_queue;
/** Retransmission timer */
struct retry_timer timer;
/** Shutdown (TIME_WAIT) timer */
Expand All @@ -97,6 +99,29 @@ enum tcp_flags {
TCP_ACK_PENDING = 0x0004,
};

/** TCP internal header
*
* This is the header that replaces the TCP header for packets
* enqueued on the receive queue.
*/
struct tcp_rx_queued_header {
/** SEQ value, in host-endian order
*
* This represents the SEQ value at the time the packet is
* enqueued, and so excludes the SYN, if present.
*/
uint32_t seq;
/** Flags
*
* Only FIN is valid within this flags byte; all other flags
* have already been processed by the time the packet is
* enqueued.
*/
uint8_t flags;
/** Reserved */
uint8_t reserved[3];
};

/**
* List of registered TCP connections
*/
Expand Down Expand Up @@ -246,7 +271,8 @@ static int tcp_open ( struct interface *xfer, struct sockaddr *peer,
tcp->tcp_state = TCP_STATE_SENT ( TCP_SYN );
tcp_dump_state ( tcp );
tcp->snd_seq = random();
INIT_LIST_HEAD ( &tcp->queue );
INIT_LIST_HEAD ( &tcp->tx_queue );
INIT_LIST_HEAD ( &tcp->rx_queue );
memcpy ( &tcp->peer, st_peer, sizeof ( tcp->peer ) );

/* Bind to local port */
Expand Down Expand Up @@ -296,8 +322,14 @@ static void tcp_close ( struct tcp_connection *tcp, int rc ) {
tcp->tcp_state = TCP_CLOSED;
tcp_dump_state ( tcp );

/* Free any unprocessed I/O buffers */
list_for_each_entry_safe ( iobuf, tmp, &tcp->rx_queue, list ) {
list_del ( &iobuf->list );
free_iob ( iobuf );
}

/* Free any unsent I/O buffers */
list_for_each_entry_safe ( iobuf, tmp, &tcp->queue, list ) {
list_for_each_entry_safe ( iobuf, tmp, &tcp->tx_queue, list ) {
list_del ( &iobuf->list );
free_iob ( iobuf );
}
Expand All @@ -318,7 +350,7 @@ static void tcp_close ( struct tcp_connection *tcp, int rc ) {
tcp_rx_ack ( tcp, ( tcp->snd_seq + 1 ), 0 );

/* If we have no data remaining to send, start sending FIN */
if ( list_empty ( &tcp->queue ) ) {
if ( list_empty ( &tcp->tx_queue ) ) {
tcp->tcp_state |= TCP_STATE_SENT ( TCP_FIN );
tcp_dump_state ( tcp );
}
Expand Down Expand Up @@ -366,14 +398,14 @@ static size_t tcp_xmit_win ( struct tcp_connection *tcp ) {
* (if provided) and, if @c remove is true, removed from the transmit
* queue.
*/
static size_t tcp_process_queue ( struct tcp_connection *tcp, size_t max_len,
struct io_buffer *dest, int remove ) {
static size_t tcp_process_tx_queue ( struct tcp_connection *tcp, size_t max_len,
struct io_buffer *dest, int remove ) {
struct io_buffer *iobuf;
struct io_buffer *tmp;
size_t frag_len;
size_t len = 0;

list_for_each_entry_safe ( iobuf, tmp, &tcp->queue, list ) {
list_for_each_entry_safe ( iobuf, tmp, &tcp->tx_queue, list ) {
frag_len = iob_len ( iobuf );
if ( frag_len > max_len )
frag_len = max_len;
Expand Down Expand Up @@ -426,8 +458,8 @@ static int tcp_xmit ( struct tcp_connection *tcp ) {
* lengths that we wish to transmit.
*/
if ( TCP_CAN_SEND_DATA ( tcp->tcp_state ) ) {
len = tcp_process_queue ( tcp, tcp_xmit_win ( tcp ),
NULL, 0 );
len = tcp_process_tx_queue ( tcp, tcp_xmit_win ( tcp ),
NULL, 0 );
}
seq_len = len;
flags = TCP_FLAGS_SENDING ( tcp->tcp_state );
Expand Down Expand Up @@ -461,7 +493,7 @@ static int tcp_xmit ( struct tcp_connection *tcp ) {
iob_reserve ( iobuf, MAX_HDR_LEN );

/* Fill data payload from transmit queue */
tcp_process_queue ( tcp, len, iobuf, 0 );
tcp_process_tx_queue ( tcp, len, iobuf, 0 );

/* Expand receive window if possible */
max_rcv_win = ( ( freemem * 3 ) / 4 );
Expand Down Expand Up @@ -735,7 +767,7 @@ static int tcp_rx_syn ( struct tcp_connection *tcp, uint32_t seq,
}

/* Ignore duplicate SYN */
if ( ( tcp->rcv_ack - seq ) > 0 )
if ( seq != tcp->rcv_ack )
return 0;

/* Acknowledge SYN */
Expand Down Expand Up @@ -806,14 +838,14 @@ static int tcp_rx_ack ( struct tcp_connection *tcp, uint32_t ack,
tcp->snd_win = win;

/* Remove any acknowledged data from transmit queue */
tcp_process_queue ( tcp, len, NULL, 1 );
tcp_process_tx_queue ( tcp, len, NULL, 1 );

/* Mark SYN/FIN as acknowledged if applicable. */
if ( acked_flags )
tcp->tcp_state |= TCP_STATE_ACKED ( acked_flags );

/* Start sending FIN if we've had all possible data ACKed */
if ( list_empty ( &tcp->queue ) && ( tcp->flags & TCP_XFER_CLOSED ) )
if ( list_empty ( &tcp->tx_queue ) && ( tcp->flags & TCP_XFER_CLOSED ) )
tcp->tcp_state |= TCP_STATE_SENT ( TCP_FIN );

return 0;
Expand Down Expand Up @@ -868,7 +900,7 @@ static int tcp_rx_data ( struct tcp_connection *tcp, uint32_t seq,
static int tcp_rx_fin ( struct tcp_connection *tcp, uint32_t seq ) {

/* Ignore duplicate or out-of-order FIN */
if ( ( tcp->rcv_ack - seq ) > 0 )
if ( seq != tcp->rcv_ack )
return 0;

/* Acknowledge FIN */
Expand Down Expand Up @@ -898,7 +930,7 @@ static int tcp_rx_rst ( struct tcp_connection *tcp, uint32_t seq ) {
* ACKed.
*/
if ( tcp->tcp_state & TCP_STATE_RCVD ( TCP_SYN ) ) {
if ( ( seq - tcp->rcv_ack ) >= tcp->rcv_win )
if ( ! tcp_in_window ( seq, tcp->rcv_ack, tcp->rcv_win ) )
return 0;
} else {
if ( ! ( tcp->tcp_state & TCP_STATE_ACKED ( TCP_SYN ) ) )
Expand All @@ -914,6 +946,95 @@ static int tcp_rx_rst ( struct tcp_connection *tcp, uint32_t seq ) {
return -ECONNRESET;
}

/**
* Enqueue received TCP packet
*
* @v tcp TCP connection
* @v seq SEQ value (in host-endian order)
* @v flags TCP flags
* @v iobuf I/O buffer
*/
static void tcp_rx_enqueue ( struct tcp_connection *tcp, uint32_t seq,
uint8_t flags, struct io_buffer *iobuf ) {
struct tcp_rx_queued_header *tcpqhdr;
struct io_buffer *queued;
size_t len;
uint32_t seq_len;

/* Calculate remaining flags and sequence length. Note that
* SYN, if present, has already been processed by this point.
*/
flags &= TCP_FIN;
len = iob_len ( iobuf );
seq_len = ( len + ( flags ? 1 : 0 ) );

/* Discard immediately (to save memory) if:
*
* a) we have not yet received a SYN (and so have no defined
* receive window), or
* b) the packet lies entirely outside the receive window, or
* c) there is no further content to process.
*/
if ( ( ! ( tcp->tcp_state & TCP_STATE_RCVD ( TCP_SYN ) ) ) ||
( tcp_cmp ( seq, tcp->rcv_ack + tcp->rcv_win ) >= 0 ) ||
( tcp_cmp ( seq + seq_len, tcp->rcv_ack ) < 0 ) ||
( seq_len == 0 ) ) {
free_iob ( iobuf );
return;
}

/* Add internal header */
tcpqhdr = iob_push ( iobuf, sizeof ( *tcpqhdr ) );
tcpqhdr->seq = seq;
tcpqhdr->flags = flags;

/* Add to RX queue */
list_for_each_entry ( queued, &tcp->rx_queue, list ) {
tcpqhdr = queued->data;
if ( tcp_cmp ( seq, tcpqhdr->seq ) < 0 )
break;
}
list_add_tail ( &iobuf->list, &queued->list );
}

/**
* Process receive queue
*
* @v tcp TCP connection
*/
static void tcp_process_rx_queue ( struct tcp_connection *tcp ) {
struct io_buffer *iobuf;
struct io_buffer *tmp;
struct tcp_rx_queued_header *tcpqhdr;
uint32_t seq;
unsigned int flags;
size_t len;

/* Process all applicable received buffers */
list_for_each_entry_safe ( iobuf, tmp, &tcp->rx_queue, list ) {
tcpqhdr = iobuf->data;
if ( tcp_cmp ( tcpqhdr->seq, tcp->rcv_ack ) > 0 )
break;

/* Strip internal header and remove from RX queue */
list_del ( &iobuf->list );
seq = tcpqhdr->seq;
flags = tcpqhdr->flags;
iob_pull ( iobuf, sizeof ( *tcpqhdr ) );
len = iob_len ( iobuf );

/* Handle new data, if any */
tcp_rx_data ( tcp, seq, iob_disown ( iobuf ) );
seq += len;

/* Handle FIN, if present */
if ( flags & TCP_FIN ) {
tcp_rx_fin ( tcp, seq );
seq++;
}
}
}

/**
* Process received packet
*
Expand All @@ -935,9 +1056,9 @@ static int tcp_rx ( struct io_buffer *iobuf,
uint32_t seq;
uint32_t ack;
uint32_t win;
uint32_t ts_recent;
unsigned int flags;
size_t len;
uint32_t seq_len;
int rc;

/* Sanity check packet */
Expand Down Expand Up @@ -977,17 +1098,16 @@ static int tcp_rx ( struct io_buffer *iobuf,
flags = tcphdr->flags;
tcp_rx_opts ( tcp, ( ( ( void * ) tcphdr ) + sizeof ( *tcphdr ) ),
( hlen - sizeof ( *tcphdr ) ), &options );
ts_recent = ( options.tsopt ?
ntohl ( options.tsopt->tsval ) : tcp->ts_recent );
iob_pull ( iobuf, hlen );
len = iob_len ( iobuf );
seq_len = ( len + ( ( flags & TCP_SYN ) ? 1 : 0 ) +
( ( flags & TCP_FIN ) ? 1 : 0 ) );

/* Dump header */
DBGC2 ( tcp, "TCP %p RX %d<-%d %08x %08x..%08zx %4zd",
tcp, ntohs ( tcphdr->dest ), ntohs ( tcphdr->src ),
ntohl ( tcphdr->ack ), ntohl ( tcphdr->seq ),
( ntohl ( tcphdr->seq ) + len +
( ( tcphdr->flags & ( TCP_SYN | TCP_FIN ) ) ? 1 : 0 )), len);
( ntohl ( tcphdr->seq ) + seq_len ), len );
tcp_dump_flags ( tcp, tcphdr->flags );
DBGC2 ( tcp, "\n" );

Expand All @@ -998,6 +1118,10 @@ static int tcp_rx ( struct io_buffer *iobuf,
goto discard;
}

/* Update timestamp, if applicable */
if ( options.tsopt && tcp_in_window ( tcp->rcv_ack, seq, seq_len ) )
tcp->ts_recent = ntohl ( options.tsopt->tsval );

/* Handle ACK, if present */
if ( flags & TCP_ACK ) {
if ( ( rc = tcp_rx_ack ( tcp, ack, win ) ) != 0 ) {
Expand All @@ -1024,19 +1148,11 @@ static int tcp_rx ( struct io_buffer *iobuf,
goto discard;
}

/* Handle new data, if any */
tcp_rx_data ( tcp, seq, iob_disown ( iobuf ) );
seq += len;
/* Enqueue received data */
tcp_rx_enqueue ( tcp, seq, flags, iob_disown ( iobuf ) );

/* Handle FIN, if present */
if ( flags & TCP_FIN ) {
tcp_rx_fin ( tcp, seq );
seq++;
}

/* Update timestamp, if applicable */
if ( seq == tcp->rcv_ack )
tcp->ts_recent = ts_recent;
/* Process receive queue */
tcp_process_rx_queue ( tcp );

/* Dump out any state change as a result of the received packet */
tcp_dump_state ( tcp );
Expand Down Expand Up @@ -1101,7 +1217,7 @@ static size_t tcp_xfer_window ( struct tcp_connection *tcp ) {
* of only one unACKed packet in the TX queue at any time; we
* do this to conserve memory usage.
*/
if ( ! list_empty ( &tcp->queue ) )
if ( ! list_empty ( &tcp->tx_queue ) )
return 0;

/* Return TCP window length */
Expand All @@ -1121,7 +1237,7 @@ static int tcp_xfer_deliver ( struct tcp_connection *tcp,
struct xfer_metadata *meta __unused ) {

/* Enqueue packet */
list_add_tail ( &iobuf->list, &tcp->queue );
list_add_tail ( &iobuf->list, &tcp->tx_queue );

/* Transmit data, if possible */
tcp_xmit ( tcp );
Expand Down

0 comments on commit 6861304

Please sign in to comment.