// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "net/quic/quic_data_stream.h" #include "base/logging.h" #include "net/quic/quic_session.h" #include "net/quic/quic_spdy_decompressor.h" #include "net/spdy/write_blocked_list.h" using base::StringPiece; using std::min; namespace net { #define ENDPOINT (session()->is_server() ? "Server: " : " Client: ") namespace { // This is somewhat arbitrary. It's possible, but unlikely, we will either fail // to set a priority client-side, or cancel a stream before stripping the // priority from the wire server-side. In either case, start out with a // priority in the middle. QuicPriority kDefaultPriority = 3; // Appends bytes from data into partial_data_buffer. Once partial_data_buffer // reaches 4 bytes, copies the data into 'result' and clears // partial_data_buffer. // Returns the number of bytes consumed. uint32 StripUint32(const char* data, uint32 data_len, string* partial_data_buffer, uint32* result) { DCHECK_GT(4u, partial_data_buffer->length()); size_t missing_size = 4 - partial_data_buffer->length(); if (data_len < missing_size) { StringPiece(data, data_len).AppendToString(partial_data_buffer); return data_len; } StringPiece(data, missing_size).AppendToString(partial_data_buffer); DCHECK_EQ(4u, partial_data_buffer->length()); memcpy(result, partial_data_buffer->data(), 4); partial_data_buffer->clear(); return missing_size; } } // namespace QuicDataStream::QuicDataStream(QuicStreamId id, QuicSession* session) : ReliableQuicStream(id, session), visitor_(NULL), headers_decompressed_(false), priority_(kDefaultPriority), headers_id_(0), decompression_failed_(false), priority_parsed_(false) { DCHECK_NE(kCryptoStreamId, id); } QuicDataStream::~QuicDataStream() { } size_t QuicDataStream::Readv(const struct iovec* iov, size_t iov_len) { if (FinishedReadingHeaders()) { // If the headers have been read, simply delegate to the sequencer's // Readv method. return sequencer()->Readv(iov, iov_len); } // Otherwise, copy decompressed header data into |iov|. size_t bytes_consumed = 0; size_t iov_index = 0; while (iov_index < iov_len && decompressed_headers_.length() > bytes_consumed) { size_t bytes_to_read = min(iov[iov_index].iov_len, decompressed_headers_.length() - bytes_consumed); char* iov_ptr = static_cast(iov[iov_index].iov_base); memcpy(iov_ptr, decompressed_headers_.data() + bytes_consumed, bytes_to_read); bytes_consumed += bytes_to_read; ++iov_index; } decompressed_headers_.erase(0, bytes_consumed); return bytes_consumed; } int QuicDataStream::GetReadableRegions(iovec* iov, size_t iov_len) { if (FinishedReadingHeaders()) { return sequencer()->GetReadableRegions(iov, iov_len); } if (iov_len == 0) { return 0; } iov[0].iov_base = static_cast( const_cast(decompressed_headers_.data())); iov[0].iov_len = decompressed_headers_.length(); return 1; } bool QuicDataStream::IsDoneReading() const { if (!headers_decompressed_ || !decompressed_headers_.empty()) { return false; } return sequencer()->IsClosed(); } bool QuicDataStream::HasBytesToRead() const { return !decompressed_headers_.empty() || sequencer()->HasBytesToRead(); } void QuicDataStream::set_priority(QuicPriority priority) { DCHECK_EQ(0u, stream_bytes_written()); priority_ = priority; } QuicPriority QuicDataStream::EffectivePriority() const { return priority(); } uint32 QuicDataStream::ProcessRawData(const char* data, uint32 data_len) { DCHECK_NE(0u, data_len); uint32 total_bytes_consumed = 0; if (headers_id_ == 0u) { total_bytes_consumed += StripPriorityAndHeaderId(data, data_len); data += total_bytes_consumed; data_len -= total_bytes_consumed; if (data_len == 0 || total_bytes_consumed == 0) { return total_bytes_consumed; } } DCHECK_NE(0u, headers_id_); // Once the headers are finished, we simply pass the data through. if (headers_decompressed_) { // Some buffered header data remains. if (!decompressed_headers_.empty()) { ProcessHeaderData(); } if (decompressed_headers_.empty()) { DVLOG(1) << "Delegating procesing to ProcessData"; total_bytes_consumed += ProcessData(data, data_len); } return total_bytes_consumed; } QuicHeaderId current_header_id = session()->decompressor()->current_header_id(); // Ensure that this header id looks sane. if (headers_id_ < current_header_id || headers_id_ > kMaxHeaderIdDelta + current_header_id) { DVLOG(1) << ENDPOINT << "Invalid headers for stream: " << id() << " header_id: " << headers_id_ << " current_header_id: " << current_header_id; session()->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); return total_bytes_consumed; } // If we are head-of-line blocked on decompression, then back up. if (current_header_id != headers_id_) { session()->MarkDecompressionBlocked(headers_id_, id()); DVLOG(1) << ENDPOINT << "Unable to decompress header data for stream: " << id() << " header_id: " << headers_id_; return total_bytes_consumed; } // Decompressed data will be delivered to decompressed_headers_. size_t bytes_consumed = session()->decompressor()->DecompressData( StringPiece(data, data_len), this); DCHECK_NE(0u, bytes_consumed); if (bytes_consumed > data_len) { DCHECK(false) << "DecompressData returned illegal value"; OnDecompressionError(); return total_bytes_consumed; } total_bytes_consumed += bytes_consumed; data += bytes_consumed; data_len -= bytes_consumed; if (decompression_failed_) { // The session will have been closed in OnDecompressionError. return total_bytes_consumed; } // Headers are complete if the decompressor has moved on to the // next stream. headers_decompressed_ = session()->decompressor()->current_header_id() != headers_id_; if (!headers_decompressed_) { DCHECK_EQ(0u, data_len); } ProcessHeaderData(); if (!headers_decompressed_ || !decompressed_headers_.empty()) { return total_bytes_consumed; } // We have processed all of the decompressed data but we might // have some more raw data to process. if (data_len > 0) { total_bytes_consumed += ProcessData(data, data_len); } // The sequencer will push any additional buffered frames if this data // has been completely consumed. return total_bytes_consumed; } const IPEndPoint& QuicDataStream::GetPeerAddress() { return session()->peer_address(); } QuicSpdyCompressor* QuicDataStream::compressor() { return session()->compressor(); } bool QuicDataStream::GetSSLInfo(SSLInfo* ssl_info) { return session()->GetSSLInfo(ssl_info); } uint32 QuicDataStream::ProcessHeaderData() { if (decompressed_headers_.empty()) { return 0; } size_t bytes_processed = ProcessData(decompressed_headers_.data(), decompressed_headers_.length()); if (bytes_processed == decompressed_headers_.length()) { decompressed_headers_.clear(); } else { decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); } return bytes_processed; } void QuicDataStream::OnDecompressorAvailable() { DCHECK_EQ(headers_id_, session()->decompressor()->current_header_id()); DCHECK(!headers_decompressed_); DCHECK(!decompression_failed_); DCHECK_EQ(0u, decompressed_headers_.length()); while (!headers_decompressed_) { struct iovec iovec; if (sequencer()->GetReadableRegions(&iovec, 1) == 0) { return; } size_t bytes_consumed = session()->decompressor()->DecompressData( StringPiece(static_cast(iovec.iov_base), iovec.iov_len), this); DCHECK_LE(bytes_consumed, iovec.iov_len); if (decompression_failed_) { return; } sequencer()->MarkConsumed(bytes_consumed); headers_decompressed_ = session()->decompressor()->current_header_id() != headers_id_; } // Either the headers are complete, or the all data as been consumed. ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. if (IsDoneReading()) { OnFinRead(); } else if (FinishedReadingHeaders()) { sequencer()->FlushBufferedFrames(); } } bool QuicDataStream::OnDecompressedData(StringPiece data) { data.AppendToString(&decompressed_headers_); return true; } void QuicDataStream::OnDecompressionError() { DCHECK(!decompression_failed_); decompression_failed_ = true; session()->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); } void QuicDataStream::OnClose() { ReliableQuicStream::OnClose(); if (visitor_) { Visitor* visitor = visitor_; // Calling Visitor::OnClose() may result the destruction of the visitor, // so we need to ensure we don't call it again. visitor_ = NULL; visitor->OnClose(this); } } uint32 QuicDataStream::StripPriorityAndHeaderId( const char* data, uint32 data_len) { uint32 total_bytes_parsed = 0; if (!priority_parsed_ && session()->connection()->is_server()) { QuicPriority temporary_priority = priority_; total_bytes_parsed = StripUint32( data, data_len, &headers_id_and_priority_buffer_, &temporary_priority); if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) { priority_parsed_ = true; // Spdy priorities are inverted, so the highest numerical value is the // lowest legal priority. if (temporary_priority > QuicUtils::LowestPriority()) { session()->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY); return 0; } priority_ = temporary_priority; } data += total_bytes_parsed; data_len -= total_bytes_parsed; } if (data_len > 0 && headers_id_ == 0u) { // The headers ID has not yet been read. Strip it from the beginning of // the data stream. total_bytes_parsed += StripUint32( data, data_len, &headers_id_and_priority_buffer_, &headers_id_); } return total_bytes_parsed; } bool QuicDataStream::FinishedReadingHeaders() { return headers_decompressed_ && decompressed_headers_.empty(); } } // namespace net