// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/system/data_pipe.h" #include #include #include #include "base/compiler_specific.h" #include "base/logging.h" #include "mojo/system/constants.h" #include "mojo/system/memory.h" #include "mojo/system/options_validation.h" #include "mojo/system/waiter_list.h" namespace mojo { namespace system { // static const MojoCreateDataPipeOptions DataPipe::kDefaultCreateOptions = { static_cast(sizeof(MojoCreateDataPipeOptions)), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1u, static_cast(kDefaultDataPipeCapacityBytes) }; // static MojoResult DataPipe::ValidateCreateOptions( const MojoCreateDataPipeOptions* in_options, MojoCreateDataPipeOptions* out_options) { const MojoCreateDataPipeOptionsFlags kKnownFlags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD; *out_options = kDefaultCreateOptions; if (!in_options) return MOJO_RESULT_OK; MojoResult result = ValidateOptionsStructPointerSizeAndFlags( in_options, kKnownFlags, out_options); if (result != MOJO_RESULT_OK) return result; // Checks for fields beyond |flags|: if (!HAS_OPTIONS_STRUCT_MEMBER(MojoCreateDataPipeOptions, element_num_bytes, in_options)) return MOJO_RESULT_OK; if (in_options->element_num_bytes == 0) return MOJO_RESULT_INVALID_ARGUMENT; out_options->element_num_bytes = in_options->element_num_bytes; if (!HAS_OPTIONS_STRUCT_MEMBER(MojoCreateDataPipeOptions, capacity_num_bytes, in_options) || in_options->capacity_num_bytes == 0) { // Round the default capacity down to a multiple of the element size (but at // least one element). out_options->capacity_num_bytes = std::max( static_cast(kDefaultDataPipeCapacityBytes - (kDefaultDataPipeCapacityBytes % out_options->element_num_bytes)), out_options->element_num_bytes); return MOJO_RESULT_OK; } if (in_options->capacity_num_bytes % out_options->element_num_bytes != 0) return MOJO_RESULT_INVALID_ARGUMENT; if (in_options->capacity_num_bytes > kMaxDataPipeCapacityBytes) return MOJO_RESULT_RESOURCE_EXHAUSTED; out_options->capacity_num_bytes = in_options->capacity_num_bytes; return MOJO_RESULT_OK; } void DataPipe::ProducerCancelAllWaiters() { base::AutoLock locker(lock_); DCHECK(has_local_producer_no_lock()); producer_waiter_list_->CancelAllWaiters(); } void DataPipe::ProducerClose() { base::AutoLock locker(lock_); DCHECK(producer_open_); producer_open_ = false; DCHECK(has_local_producer_no_lock()); producer_waiter_list_.reset(); // Not a bug, except possibly in "user" code. DVLOG_IF(2, producer_in_two_phase_write_no_lock()) << "Producer closed with active two-phase write"; producer_two_phase_max_num_bytes_written_ = 0; ProducerCloseImplNoLock(); AwakeConsumerWaitersForStateChangeNoLock( ConsumerGetHandleSignalsStateNoLock()); } MojoResult DataPipe::ProducerWriteData(const void* elements, uint32_t* num_bytes, bool all_or_none) { base::AutoLock locker(lock_); DCHECK(has_local_producer_no_lock()); if (producer_in_two_phase_write_no_lock()) return MOJO_RESULT_BUSY; if (!consumer_open_no_lock()) return MOJO_RESULT_FAILED_PRECONDITION; // Returning "busy" takes priority over "invalid argument". if (*num_bytes % element_num_bytes_ != 0) return MOJO_RESULT_INVALID_ARGUMENT; if (*num_bytes == 0) return MOJO_RESULT_OK; // Nothing to do. HandleSignalsState old_consumer_state = ConsumerGetHandleSignalsStateNoLock(); MojoResult rv = ProducerWriteDataImplNoLock(elements, num_bytes, all_or_none); HandleSignalsState new_consumer_state = ConsumerGetHandleSignalsStateNoLock(); if (!new_consumer_state.equals(old_consumer_state)) AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state); return rv; } MojoResult DataPipe::ProducerBeginWriteData(void** buffer, uint32_t* buffer_num_bytes, bool all_or_none) { base::AutoLock locker(lock_); DCHECK(has_local_producer_no_lock()); if (producer_in_two_phase_write_no_lock()) return MOJO_RESULT_BUSY; if (!consumer_open_no_lock()) return MOJO_RESULT_FAILED_PRECONDITION; if (all_or_none && *buffer_num_bytes % element_num_bytes_ != 0) return MOJO_RESULT_INVALID_ARGUMENT; MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, buffer_num_bytes, all_or_none); if (rv != MOJO_RESULT_OK) return rv; // Note: No need to awake producer waiters, even though we're going from // writable to non-writable (since you can't wait on non-writability). // Similarly, though this may have discarded data (in "may discard" mode), // making it non-readable, there's still no need to awake consumer waiters. DCHECK(producer_in_two_phase_write_no_lock()); return MOJO_RESULT_OK; } MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) { base::AutoLock locker(lock_); DCHECK(has_local_producer_no_lock()); if (!producer_in_two_phase_write_no_lock()) return MOJO_RESULT_FAILED_PRECONDITION; // Note: Allow successful completion of the two-phase write even if the // consumer has been closed. HandleSignalsState old_consumer_state = ConsumerGetHandleSignalsStateNoLock(); MojoResult rv; if (num_bytes_written > producer_two_phase_max_num_bytes_written_ || num_bytes_written % element_num_bytes_ != 0) { rv = MOJO_RESULT_INVALID_ARGUMENT; producer_two_phase_max_num_bytes_written_ = 0; } else { rv = ProducerEndWriteDataImplNoLock(num_bytes_written); } // Two-phase write ended even on failure. DCHECK(!producer_in_two_phase_write_no_lock()); // If we're now writable, we *became* writable (since we weren't writable // during the two-phase write), so awake producer waiters. HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock(); if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) AwakeProducerWaitersForStateChangeNoLock(new_producer_state); HandleSignalsState new_consumer_state = ConsumerGetHandleSignalsStateNoLock(); if (!new_consumer_state.equals(old_consumer_state)) AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state); return rv; } MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter, MojoHandleSignals signals, uint32_t context) { base::AutoLock locker(lock_); DCHECK(has_local_producer_no_lock()); HandleSignalsState producer_state = ProducerGetHandleSignalsStateNoLock(); if (producer_state.satisfies(signals)) return MOJO_RESULT_ALREADY_EXISTS; if (!producer_state.can_satisfy(signals)) return MOJO_RESULT_FAILED_PRECONDITION; producer_waiter_list_->AddWaiter(waiter, signals, context); return MOJO_RESULT_OK; } void DataPipe::ProducerRemoveWaiter(Waiter* waiter) { base::AutoLock locker(lock_); DCHECK(has_local_producer_no_lock()); producer_waiter_list_->RemoveWaiter(waiter); } bool DataPipe::ProducerIsBusy() const { base::AutoLock locker(lock_); return producer_in_two_phase_write_no_lock(); } void DataPipe::ConsumerCancelAllWaiters() { base::AutoLock locker(lock_); DCHECK(has_local_consumer_no_lock()); consumer_waiter_list_->CancelAllWaiters(); } void DataPipe::ConsumerClose() { base::AutoLock locker(lock_); DCHECK(consumer_open_); consumer_open_ = false; DCHECK(has_local_consumer_no_lock()); consumer_waiter_list_.reset(); // Not a bug, except possibly in "user" code. DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) << "Consumer closed with active two-phase read"; consumer_two_phase_max_num_bytes_read_ = 0; ConsumerCloseImplNoLock(); AwakeProducerWaitersForStateChangeNoLock( ProducerGetHandleSignalsStateNoLock()); } MojoResult DataPipe::ConsumerReadData(void* elements, uint32_t* num_bytes, bool all_or_none) { base::AutoLock locker(lock_); DCHECK(has_local_consumer_no_lock()); if (consumer_in_two_phase_read_no_lock()) return MOJO_RESULT_BUSY; if (*num_bytes % element_num_bytes_ != 0) return MOJO_RESULT_INVALID_ARGUMENT; if (*num_bytes == 0) return MOJO_RESULT_OK; // Nothing to do. HandleSignalsState old_producer_state = ProducerGetHandleSignalsStateNoLock(); MojoResult rv = ConsumerReadDataImplNoLock(elements, num_bytes, all_or_none); HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock(); if (!new_producer_state.equals(old_producer_state)) AwakeProducerWaitersForStateChangeNoLock(new_producer_state); return rv; } MojoResult DataPipe::ConsumerDiscardData(uint32_t* num_bytes, bool all_or_none) { base::AutoLock locker(lock_); DCHECK(has_local_consumer_no_lock()); if (consumer_in_two_phase_read_no_lock()) return MOJO_RESULT_BUSY; if (*num_bytes % element_num_bytes_ != 0) return MOJO_RESULT_INVALID_ARGUMENT; if (*num_bytes == 0) return MOJO_RESULT_OK; // Nothing to do. HandleSignalsState old_producer_state = ProducerGetHandleSignalsStateNoLock(); MojoResult rv = ConsumerDiscardDataImplNoLock(num_bytes, all_or_none); HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock(); if (!new_producer_state.equals(old_producer_state)) AwakeProducerWaitersForStateChangeNoLock(new_producer_state); return rv; } MojoResult DataPipe::ConsumerQueryData(uint32_t* num_bytes) { base::AutoLock locker(lock_); DCHECK(has_local_consumer_no_lock()); if (consumer_in_two_phase_read_no_lock()) return MOJO_RESULT_BUSY; // Note: Don't need to validate |*num_bytes| for query. return ConsumerQueryDataImplNoLock(num_bytes); } MojoResult DataPipe::ConsumerBeginReadData(const void** buffer, uint32_t* buffer_num_bytes, bool all_or_none) { base::AutoLock locker(lock_); DCHECK(has_local_consumer_no_lock()); if (consumer_in_two_phase_read_no_lock()) return MOJO_RESULT_BUSY; if (all_or_none && *buffer_num_bytes % element_num_bytes_ != 0) return MOJO_RESULT_INVALID_ARGUMENT; MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, buffer_num_bytes, all_or_none); if (rv != MOJO_RESULT_OK) return rv; DCHECK(consumer_in_two_phase_read_no_lock()); return MOJO_RESULT_OK; } MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) { base::AutoLock locker(lock_); DCHECK(has_local_consumer_no_lock()); if (!consumer_in_two_phase_read_no_lock()) return MOJO_RESULT_FAILED_PRECONDITION; HandleSignalsState old_producer_state = ProducerGetHandleSignalsStateNoLock(); MojoResult rv; if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ || num_bytes_read % element_num_bytes_ != 0) { rv = MOJO_RESULT_INVALID_ARGUMENT; consumer_two_phase_max_num_bytes_read_ = 0; } else { rv = ConsumerEndReadDataImplNoLock(num_bytes_read); } // Two-phase read ended even on failure. DCHECK(!consumer_in_two_phase_read_no_lock()); // If we're now readable, we *became* readable (since we weren't readable // during the two-phase read), so awake consumer waiters. HandleSignalsState new_consumer_state = ConsumerGetHandleSignalsStateNoLock(); if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE)) AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state); HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock(); if (!new_producer_state.equals(old_producer_state)) AwakeProducerWaitersForStateChangeNoLock(new_producer_state); return rv; } MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter, MojoHandleSignals signals, uint32_t context) { base::AutoLock locker(lock_); DCHECK(has_local_consumer_no_lock()); HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateNoLock(); if (consumer_state.satisfies(signals)) return MOJO_RESULT_ALREADY_EXISTS; if (!consumer_state.can_satisfy(signals)) return MOJO_RESULT_FAILED_PRECONDITION; consumer_waiter_list_->AddWaiter(waiter, signals, context); return MOJO_RESULT_OK; } void DataPipe::ConsumerRemoveWaiter(Waiter* waiter) { base::AutoLock locker(lock_); DCHECK(has_local_consumer_no_lock()); consumer_waiter_list_->RemoveWaiter(waiter); } bool DataPipe::ConsumerIsBusy() const { base::AutoLock locker(lock_); return consumer_in_two_phase_read_no_lock(); } DataPipe::DataPipe(bool has_local_producer, bool has_local_consumer, const MojoCreateDataPipeOptions& validated_options) : may_discard_((validated_options.flags & MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)), element_num_bytes_(validated_options.element_num_bytes), capacity_num_bytes_(validated_options.capacity_num_bytes), producer_open_(true), consumer_open_(true), producer_waiter_list_(has_local_producer ? new WaiterList() : NULL), consumer_waiter_list_(has_local_consumer ? new WaiterList() : NULL), producer_two_phase_max_num_bytes_written_(0), consumer_two_phase_max_num_bytes_read_(0) { // Check that the passed in options actually are validated. MojoCreateDataPipeOptions unused ALLOW_UNUSED = { 0 }; DCHECK_EQ(ValidateCreateOptions(&validated_options, &unused), MOJO_RESULT_OK); } DataPipe::~DataPipe() { DCHECK(!producer_open_); DCHECK(!consumer_open_); DCHECK(!producer_waiter_list_); DCHECK(!consumer_waiter_list_); } void DataPipe::AwakeProducerWaitersForStateChangeNoLock( const HandleSignalsState& new_producer_state) { lock_.AssertAcquired(); if (!has_local_producer_no_lock()) return; producer_waiter_list_->AwakeWaitersForStateChange(new_producer_state); } void DataPipe::AwakeConsumerWaitersForStateChangeNoLock( const HandleSignalsState& new_consumer_state) { lock_.AssertAcquired(); if (!has_local_consumer_no_lock()) return; consumer_waiter_list_->AwakeWaitersForStateChange(new_consumer_state); } } // namespace system } // namespace mojo