diff options
author | James M Snell <jasnell@gmail.com> | 2022-12-17 13:58:26 -0800 |
---|---|---|
committer | James M Snell <jasnell@gmail.com> | 2023-02-19 16:26:59 -0800 |
commit | 71fb06fd64c873e2c716e23ca1d31219cf78cbd6 (patch) | |
tree | bb762422acd056a3c42b68d7b81db5c239cb0d87 /src/dataqueue | |
parent | 950cec4c2642c15e2913f35babadda56c1d8a723 (diff) | |
download | node-new-71fb06fd64c873e2c716e23ca1d31219cf78cbd6.tar.gz |
src, lib: fixup lint and format issues for DataQueue/Blob
Co-authored-by: flakey5 <73616808+flakey5@users.noreply.github.com>
PR-URL: https://github.com/nodejs/node/pull/45258
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Diffstat (limited to 'src/dataqueue')
-rw-r--r-- | src/dataqueue/queue.cc | 1119 | ||||
-rw-r--r-- | src/dataqueue/queue.h | 75 |
2 files changed, 552 insertions, 642 deletions
diff --git a/src/dataqueue/queue.cc b/src/dataqueue/queue.cc index 2ca87c0b59..a39314e23e 100644 --- a/src/dataqueue/queue.cc +++ b/src/dataqueue/queue.cc @@ -6,73 +6,53 @@ #include <node.h> #include <node_bob-inl.h> #include <node_errors.h> +#include <node_external_reference.h> #include <node_file-inl.h> #include <stream_base-inl.h> -#include <v8.h> #include <util-inl.h> #include <uv.h> +#include <v8.h> #include <algorithm> #include <deque> #include <initializer_list> #include <memory> #include <vector> -#include "base_object.h" -#include "memory_tracker.h" -#include "node_bob.h" -#include "node_external_reference.h" -#include "util.h" -#include "v8-function-callback.h" namespace node { using v8::ArrayBufferView; using v8::BackingStore; -using v8::Context; -using v8::HandleScope; -using v8::Just; using v8::Local; using v8::Object; -using v8::Maybe; -using v8::Nothing; -using v8::FunctionTemplate; -using v8::Isolate; -using v8::FunctionCallbackInfo; using v8::Value; -using v8::String; -using v8::Global; -using v8::Function; -using v8::Int32; -using v8::Uint32; namespace { // ============================================================================ class IdempotentDataQueueReader; class NonIdempotentDataQueueReader; -class EntryBase : public DataQueue::Entry { +class EntryImpl : public DataQueue::Entry { public: - virtual std::unique_ptr<DataQueue::Reader> getReader() = 0; + virtual std::shared_ptr<DataQueue::Reader> get_reader() = 0; }; class DataQueueImpl final : public DataQueue, public std::enable_shared_from_this<DataQueueImpl> { public: - // Constructor for an imdempotent, fixed sized DataQueue. - DataQueueImpl(std::vector<std::unique_ptr<Entry>> list, size_t size) + // Constructor for an idempotent, fixed sized DataQueue. + DataQueueImpl(std::vector<std::unique_ptr<Entry>>&& list, uint64_t size) : entries_(std::move(list)), idempotent_(true), - size_(Just(size)), - capped_size_(Just<size_t>(0UL)) {} + size_(size), + capped_size_(0) {} // Constructor for a non-idempotent DataQueue. This kind of queue can have // entries added to it over time. The size is set to 0 initially. The queue // can be capped immediately on creation. Depending on the entries that are // added, the size can be cleared if any of the entries are not capable of // providing a size. - DataQueueImpl(Maybe<size_t> cap = Nothing<size_t>()) - : idempotent_(false), - size_(Just<size_t>(0UL)), - capped_size_(cap) {} + DataQueueImpl(std::optional<uint64_t> cap = std::nullopt) + : idempotent_(false), size_(0), capped_size_(cap) {} // Disallow moving and copying. DataQueueImpl(const DataQueueImpl&) = delete; @@ -81,42 +61,38 @@ class DataQueueImpl final : public DataQueue, DataQueueImpl& operator=(DataQueueImpl&&) = delete; std::shared_ptr<DataQueue> slice( - size_t start, - Maybe<size_t> maybeEnd = Nothing<size_t>()) override { + uint64_t start, + std::optional<uint64_t> maybeEnd = std::nullopt) override { // If the data queue is not idempotent, or the size cannot be determined, // we cannot reasonably create a slice. Therefore, return nothing. - if (!idempotent_ || size_.IsNothing()) return nullptr; + if (!idempotent_ || !size_.has_value()) return nullptr; - size_t size = size_.FromJust(); + uint64_t size = size_.value(); // start cannot be greater than the size. start = std::min(start, size); - size_t end; - if (maybeEnd.To(&end)) { - // end cannot be less than start, or greater than the size. - end = std::max(start, std::min(end, size)); - } else { - end = size; - } + uint64_t end = std::max(start, std::min(maybeEnd.value_or(size), size)); DCHECK_LE(start, end); - size_t len = end - start; - size_t remaining = end - start; + uint64_t len = end - start; + uint64_t remaining = end - start; std::vector<std::unique_ptr<Entry>> slices; if (remaining > 0) { for (const auto& entry : entries_) { - size_t entrySize = entry->size().FromJust(); + // The size of every entry should be known since this is an + // idempotent queue. + uint64_t entrySize = entry->size().value(); if (start > entrySize) { start -= entrySize; continue; } - size_t chunkStart = start; - size_t len = std::min(remaining, entrySize - chunkStart); - slices.emplace_back(entry->slice(chunkStart, Just(chunkStart + len))); + uint64_t chunkStart = start; + uint64_t len = std::min(remaining, entrySize - chunkStart); + slices.emplace_back(entry->slice(chunkStart, chunkStart + len)); remaining -= len; start = 0; @@ -127,76 +103,75 @@ class DataQueueImpl final : public DataQueue, return std::make_shared<DataQueueImpl>(std::move(slices), len); } - Maybe<size_t> size() const override { return size_; } + std::optional<uint64_t> size() const override { return size_; } - bool isIdempotent() const override { return idempotent_; } + bool is_idempotent() const override { return idempotent_; } - bool isCapped() const override { return capped_size_.IsJust(); } + bool is_capped() const override { return capped_size_.has_value(); } - Maybe<bool> append(std::unique_ptr<Entry> entry) override { - if (idempotent_) return Nothing<bool>(); - if (!entry) return Just(false); + std::optional<bool> append(std::unique_ptr<Entry> entry) override { + if (idempotent_) return std::nullopt; + if (!entry) return false; // If this entry successfully provides a size, we can add it to our size_ - // if that has a value, otherwise, we keep size_t empty. - size_t entrySize; - size_t queueSize; - if (entry->size().To(&entrySize) && size_.To(&queueSize)) { + // if that has a value, otherwise, we keep uint64_t empty. + if (entry->size().has_value() && size_.has_value()) { + uint64_t entrySize = entry->size().value(); + uint64_t size = size_.value(); // If capped_size_ is set, size + entrySize cannot exceed capped_size_ // or the entry cannot be added. - size_t capped_size; - if (capped_size_.To(&capped_size) && queueSize + entrySize > capped_size) { - return Just(false); + if (capped_size_.has_value() && + (capped_size_.value() < entrySize + size)) { + return false; } - - size_ = Just(queueSize + entrySize); + size_ = size + entrySize; } else { // This entry cannot provide a size. We can still add it but we have to // clear the known size. - size_ = Nothing<size_t>(); + size_ = std::nullopt; } entries_.push_back(std::move(entry)); - return Just(true); + return true; } - void cap(size_t limit = 0) override { - if (isIdempotent()) return; - size_t current_cap; + void cap(uint64_t limit = 0) override { + if (is_idempotent()) return; // If the data queue is already capped, it is possible to call // cap again with a smaller size. - if (capped_size_.To(¤t_cap)) { - capped_size_ = Just(std::min(limit, current_cap)); + if (capped_size_.has_value()) { + capped_size_ = std::min(limit, capped_size_.value()); return; } // Otherwise just set the limit. - capped_size_ = Just(limit); + capped_size_ = limit; } - Maybe<size_t> maybeCapRemaining() const override { - size_t capped_size; - size_t size; - if (capped_size_.To(&capped_size) && size_.To(&size)) { - return capped_size > size ? Just(capped_size - size) : Just<size_t>(0UL); + std::optional<uint64_t> maybeCapRemaining() const override { + if (capped_size_.has_value() && size_.has_value()) { + uint64_t capped_size = capped_size_.value(); + uint64_t size = size_.value(); + return capped_size > size ? capped_size - size : 0UL; } - return Nothing<size_t>(); + return std::nullopt; } void MemoryInfo(node::MemoryTracker* tracker) const override { - tracker->TrackField("entries", entries_); + tracker->TrackField( + "entries", entries_, "std::vector<std::unique_ptr<Entry>>"); } - std::unique_ptr<Reader> getReader() override; - SET_MEMORY_INFO_NAME(DataQueue); - SET_SELF_SIZE(DataQueueImpl); + std::shared_ptr<Reader> get_reader() override; + SET_MEMORY_INFO_NAME(DataQueue) + SET_SELF_SIZE(DataQueueImpl) private: std::vector<std::unique_ptr<Entry>> entries_; bool idempotent_; - Maybe<size_t> size_; - Maybe<size_t> capped_size_; - bool lockedToReader_ = false; + std::optional<uint64_t> size_ = std::nullopt; + std::optional<uint64_t> capped_size_ = std::nullopt; + bool locked_to_reader_ = false; friend class DataQueue; friend class IdempotentDataQueueReader; @@ -207,123 +182,108 @@ class DataQueueImpl final : public DataQueue, // DataQueue with which it is associated, and always from the beginning. // Reads are non-destructive, meaning that the state of the DataQueue // will not and cannot be changed. -class IdempotentDataQueueReader final : public DataQueue::Reader { +class IdempotentDataQueueReader final + : public DataQueue::Reader, + public std::enable_shared_from_this<IdempotentDataQueueReader> { public: IdempotentDataQueueReader(std::shared_ptr<DataQueueImpl> data_queue) : data_queue_(std::move(data_queue)) { - CHECK(data_queue_->isIdempotent()); + CHECK(data_queue_->is_idempotent()); } // Disallow moving and copying. IdempotentDataQueueReader(const IdempotentDataQueueReader&) = delete; IdempotentDataQueueReader(IdempotentDataQueueReader&&) = delete; - IdempotentDataQueueReader& operator=(const IdempotentDataQueueReader&) = delete; + IdempotentDataQueueReader& operator=(const IdempotentDataQueueReader&) = + delete; IdempotentDataQueueReader& operator=(IdempotentDataQueueReader&&) = delete; - int Pull( - Next next, - int options, - DataQueue::Vec* data, - size_t count, - size_t max_count_hint = bob::kMaxCountHint) override { + int Pull(Next next, + int options, + DataQueue::Vec* data, + size_t count, + size_t max_count_hint = bob::kMaxCountHint) override { + std::shared_ptr<DataQueue::Reader> self = shared_from_this(); + // If ended is true, this reader has already reached the end and cannot // provide any more data. if (ended_) { - std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](size_t) {}); + std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](uint64_t) {}); return bob::Status::STATUS_EOS; } // If this is the first pull from this reader, we are first going to // check to see if there is anything at all to actually do. - if (current_index_.IsNothing()) { + if (!current_index_.has_value()) { // First, let's check the number of entries. If there are no entries, // we've reached the end and have nothing to do. - bool empty = data_queue_->entries_.empty(); - - // Second, if there are entries, let's check the known size to see if - // it is zero or not. - if (!empty) { - size_t size; - if (data_queue_->size().To(&size)) { - // If the size is known to be zero, there's absolutely nothing else for - // us to do but end. - empty = (size == 0); - } - // If the size cannot be determined, we will have to try reading from - // the entry to see if it has any data or not, so fall through here. - } - - if (empty) { + // Because this is an idempotent dataqueue, we should always know the + // size... + if (data_queue_->entries_.empty()) { ended_ = true; - std::move(next)(bob::Status::STATUS_END, nullptr, 0, [](size_t) {}); - return bob::Status::STATUS_END; + std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](uint64_t) {}); + return bob::Status::STATUS_EOS; } - current_index_ = Just(0U); + current_index_ = 0; } // We have current_index_, awesome, we are going to keep reading from // it until we receive and end. - CHECK(!pull_pending_); - pull_pending_ = true; - int status = getCurrentReader().Pull( - [this, next = std::move(next)] - (int status, const DataQueue::Vec* vecs, size_t count, Done done) { - pull_pending_ = false; - last_status_ = status; - - // In each of these cases, we do not expect that the source will - // actually have provided any actual data. - CHECK_IMPLIES(status == bob::Status::STATUS_BLOCK || - status == bob::Status::STATUS_WAIT || - status == bob::Status::STATUS_EOS, - vecs == nullptr && count == 0); - - // Technically, receiving a STATUS_EOS is really an error because - // we've read past the end of the data, but we are going to treat - // it the same as end. - if (status == bob::Status::STATUS_END || - status == bob::Status::STATUS_EOS) { - uint32_t current = current_index_.FromJust() + 1; - // We have reached the end of this entry. If this is the last entry, - // then we are done. Otherwise, we advance the current_index_, clear - // the current_reader_ and wait for the next read. - if (current == data_queue_->entries_.size()) { - // Yes, this was the final entry. We're all done. - ended_ = true; - status = bob::Status::STATUS_END; - } else { - // This was not the final entry, so we update the index and - // continue on. - current_index_ = Just(current); - status = bob::Status::STATUS_CONTINUE; - } - current_reader_ = nullptr; - } - // Now that we have updated this readers state, we can forward - // everything on to the outer next. - std::move(next)(status, vecs, count, std::move(done)); - }, options, data, count, max_count_hint); + auto current_reader = getCurrentReader(); + if (current_reader == nullptr) { + // Getting the current reader for an entry could fail for several + // reasons. For an FdEntry, for instance, getting the reader may + // fail if the file has been modified since the FdEntry was created. + // We handle the case simply by erroring. + std::move(next)(UV_EINVAL, nullptr, 0, [](uint64_t) {}); + return UV_EINVAL; + } + CHECK(!pull_pending_); + pull_pending_ = true; + int status = current_reader->Pull( + [this, next = std::move(next)]( + int status, const DataQueue::Vec* vecs, uint64_t count, Done done) { + pull_pending_ = false; + // In each of these cases, we do not expect that the source will + // actually have provided any actual data. + CHECK_IMPLIES(status == bob::Status::STATUS_BLOCK || + status == bob::Status::STATUS_WAIT || + status == bob::Status::STATUS_EOS, + vecs == nullptr && count == 0); + if (status == bob::Status::STATUS_EOS) { + uint32_t current = current_index_.value() + 1; + current_reader_ = nullptr; + // We have reached the end of this entry. If this is the last entry, + // then we are done. Otherwise, we advance the current_index_, clear + // the current_reader_ and wait for the next read. + + if (current == data_queue_->entries_.size()) { + // Yes, this was the final entry. We're all done. + ended_ = true; + } else { + // This was not the final entry, so we update the index and + // continue on by performing another read. + current_index_ = current; + status = bob::STATUS_CONTINUE; + } + std::move(next)(status, nullptr, 0, [](uint64_t) {}); + return; + } + + std::move(next)(status, vecs, count, std::move(done)); + }, + options, + data, + count, + max_count_hint); + + // The pull was handled synchronously. If we're not ended, we want to + // make sure status returned is CONTINUE. if (!pull_pending_) { - // The callback was resolved synchronously. Let's check our status. - - // Just as a double check, when next is called synchronous, the status - // provided there should match the status returned. - CHECK(status == last_status_); - - if (ended_) { - // Awesome, we read everything. Return status end here and we're done. - return bob::Status::STATUS_END; - } - - if (status == bob::Status::STATUS_END || - status == bob::Status::STATUS_EOS) { - // If we got here and ended_ is not true, there's more to read. - return bob::Status::STATUS_CONTINUE; - } - + if (!ended_) return bob::Status::STATUS_CONTINUE; // For all other status, we just fall through and return it straightaway. } @@ -345,60 +305,63 @@ class IdempotentDataQueueReader final : public DataQueue::Reader { return status; } - DataQueue::Reader& getCurrentReader() { + DataQueue::Reader* getCurrentReader() { CHECK(!ended_); - CHECK(current_index_.IsJust()); + CHECK(current_index_.has_value()); if (current_reader_ == nullptr) { - auto& entry = data_queue_->entries_[current_index_.FromJust()]; + auto& entry = data_queue_->entries_[current_index_.value()]; // Because this is an idempotent reader, let's just be sure to // doublecheck that the entry itself is actually idempotent - DCHECK(entry->isIdempotent()); - current_reader_ = static_cast<EntryBase&>(*entry).getReader(); + DCHECK(entry->is_idempotent()); + current_reader_ = static_cast<EntryImpl&>(*entry).get_reader(); } - CHECK_NOT_NULL(current_reader_); - return *current_reader_; + return current_reader_.get(); } SET_NO_MEMORY_INFO() - SET_MEMORY_INFO_NAME(IdempotentDataQueueReader); - SET_SELF_SIZE(IdempotentDataQueueReader); + SET_MEMORY_INFO_NAME(IdempotentDataQueueReader) + SET_SELF_SIZE(IdempotentDataQueueReader) private: std::shared_ptr<DataQueueImpl> data_queue_; - Maybe<uint32_t> current_index_ = Nothing<uint32_t>(); - std::unique_ptr<DataQueue::Reader> current_reader_ = nullptr; + std::optional<uint32_t> current_index_ = std::nullopt; + std::shared_ptr<DataQueue::Reader> current_reader_ = nullptr; bool ended_ = false; bool pull_pending_ = false; - int last_status_ = 0; }; // A NonIdempotentDataQueueReader reads entries from the DataEnqueue // and removes those entries from the queue as they are fully consumed. // This means that reads are destructive and the state of the DataQueue // is mutated as the read proceeds. -class NonIdempotentDataQueueReader final : public DataQueue::Reader { +class NonIdempotentDataQueueReader final + : public DataQueue::Reader, + public std::enable_shared_from_this<NonIdempotentDataQueueReader> { public: NonIdempotentDataQueueReader(std::shared_ptr<DataQueueImpl> data_queue) : data_queue_(std::move(data_queue)) { - CHECK(!data_queue_->isIdempotent()); + CHECK(!data_queue_->is_idempotent()); } // Disallow moving and copying. NonIdempotentDataQueueReader(const NonIdempotentDataQueueReader&) = delete; NonIdempotentDataQueueReader(NonIdempotentDataQueueReader&&) = delete; - NonIdempotentDataQueueReader& operator=(const NonIdempotentDataQueueReader&) = delete; - NonIdempotentDataQueueReader& operator=(NonIdempotentDataQueueReader&&) = delete; - - int Pull( - Next next, - int options, - DataQueue::Vec* data, - size_t count, - size_t max_count_hint = bob::kMaxCountHint) override { + NonIdempotentDataQueueReader& operator=(const NonIdempotentDataQueueReader&) = + delete; + NonIdempotentDataQueueReader& operator=(NonIdempotentDataQueueReader&&) = + delete; + + int Pull(Next next, + int options, + DataQueue::Vec* data, + size_t count, + size_t max_count_hint = bob::kMaxCountHint) override { + std::shared_ptr<DataQueue::Reader> self = shared_from_this(); + // If ended is true, this reader has already reached the end and cannot // provide any more data. if (ended_) { - std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](size_t) {}); + std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](uint64_t) {}); return bob::Status::STATUS_EOS; } @@ -410,21 +373,21 @@ class NonIdempotentDataQueueReader final : public DataQueue::Reader { // expect more data to be provided later, but we don't know exactly when // that'll happe, so the proper response here is to return a blocked // status. - if (!data_queue_->isCapped()) { - std::move(next)(bob::Status::STATUS_BLOCK, nullptr, 0, [](size_t) {}); + if (!data_queue_->is_capped()) { + std::move(next)(bob::Status::STATUS_BLOCK, nullptr, 0, [](uint64_t) {}); return bob::STATUS_BLOCK; } // However, if we are capped, the status will depend on whether the size // of the data_queue_ is known or not. - size_t size; - if (data_queue_->size().To(&size)) { - // If the size is known, and it is still less than the cap, then we still - // might get more data. We just don't know exactly when that'll come, so - // let's return a blocked status. - if (size < data_queue_->capped_size_.FromJust()) { - std::move(next)(bob::Status::STATUS_BLOCK, nullptr, 0, [](size_t) {}); + if (data_queue_->size().has_value()) { + // If the size is known, and it is still less than the cap, then we + // still might get more data. We just don't know exactly when that'll + // come, so let's return a blocked status. + if (data_queue_->size().value() < data_queue_->capped_size_.value()) { + std::move(next)( + bob::Status::STATUS_BLOCK, nullptr, 0, [](uint64_t) {}); return bob::STATUS_BLOCK; } @@ -437,75 +400,51 @@ class NonIdempotentDataQueueReader final : public DataQueue::Reader { // entries, we're done. There's nothing left to read. current_reader_ = nullptr; ended_ = true; - std::move(next)(bob::Status::STATUS_END, nullptr, 0, [](size_t) {}); - return bob::STATUS_END; + std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](uint64_t) {}); + return bob::STATUS_EOS; + } + + auto current_reader = getCurrentReader(); + if (current_reader == nullptr) { + std::move(next)(UV_EINVAL, nullptr, 0, [](uint64_t) {}); + return UV_EINVAL; } // If we got here, we have an entry to read from. CHECK(!pull_pending_); pull_pending_ = true; - int status = getCurrentReader().Pull( - [this, next = std::move(next)] - (int status, const DataQueue::Vec* vecs, size_t count, Done done) { - pull_pending_ = false; - last_status_ = status; - - // In each of these cases, we do not expect that the source will - // actually have provided any actual data. - CHECK_IMPLIES(status == bob::Status::STATUS_BLOCK || - status == bob::Status::STATUS_WAIT || - status == bob::Status::STATUS_EOS, - vecs == nullptr && count == 0); - - // Technically, receiving a STATUS_EOS is really an error because - // we've read past the end of the data, but we are going to treat - // it the same as end. - if (status == bob::Status::STATUS_END || - status == bob::Status::STATUS_EOS) { - data_queue_->entries_.erase(data_queue_->entries_.begin()); - - // We have reached the end of this entry. If this is the last entry, - // then we are done. Otherwise, we advance the current_index_, clear - // the current_reader_ and wait for the next read. - if (data_queue_->entries_.empty()) { - // Yes, this was the final entry. We're all done. - ended_ = true; - status = bob::Status::STATUS_END; - } else { - // This was not the final entry, so we update the index and - // continue on. - status = bob::Status::STATUS_CONTINUE; - } - current_reader_ = nullptr; - } - - // Now that we have updated this readers state, we can forward - // everything on to the outer next. - std::move(next)(status, vecs, count, std::move(done)); - }, options, data, count, max_count_hint); + int status = current_reader->Pull( + [this, next = std::move(next)]( + int status, const DataQueue::Vec* vecs, uint64_t count, Done done) { + pull_pending_ = false; + + // In each of these cases, we do not expect that the source will + // actually have provided any actual data. + CHECK_IMPLIES(status == bob::Status::STATUS_BLOCK || + status == bob::Status::STATUS_WAIT || + status == bob::Status::STATUS_EOS, + vecs == nullptr && count == 0); + if (status == bob::Status::STATUS_EOS) { + data_queue_->entries_.erase(data_queue_->entries_.begin()); + ended_ = data_queue_->entries_.empty(); + current_reader_ = nullptr; + if (!ended_) status = bob::Status::STATUS_CONTINUE; + std::move(next)(status, nullptr, 0, [](uint64_t) {}); + return; + } + + // Now that we have updated this readers state, we can forward + // everything on to the outer next. + std::move(next)(status, vecs, count, std::move(done)); + }, + options, + data, + count, + max_count_hint); if (!pull_pending_) { // The callback was resolved synchronously. Let's check our status. - - // Just as a double check, when next is called synchronous, the status - // provided there should match the status returned. - CHECK(status == last_status_); - - if (ended_) { - // Awesome, we read everything. Return status end here and we're done. - - // Let's just make sure we've removed all of the entries. - CHECK(data_queue_->entries_.empty()); - - return bob::Status::STATUS_END; - } - - if (status == bob::Status::STATUS_END || - status == bob::Status::STATUS_EOS) { - // If we got here and ended_ is not true, there's more to read. - return bob::Status::STATUS_CONTINUE; - } - + if (!ended_) return bob::Status::STATUS_CONTINUE; // For all other status, we just fall through and return it straightaway. } @@ -527,68 +466,68 @@ class NonIdempotentDataQueueReader final : public DataQueue::Reader { return status; } - DataQueue::Reader& getCurrentReader() { + DataQueue::Reader* getCurrentReader() { CHECK(!ended_); CHECK(!data_queue_->entries_.empty()); if (current_reader_ == nullptr) { auto& entry = data_queue_->entries_.front(); - current_reader_ = static_cast<EntryBase&>(*entry).getReader(); + current_reader_ = static_cast<EntryImpl&>(*entry).get_reader(); } - return *current_reader_; + return current_reader_.get(); } SET_NO_MEMORY_INFO() - SET_MEMORY_INFO_NAME(NonIdempotentDataQueueReader); - SET_SELF_SIZE(NonIdempotentDataQueueReader); + SET_MEMORY_INFO_NAME(NonIdempotentDataQueueReader) + SET_SELF_SIZE(NonIdempotentDataQueueReader) private: std::shared_ptr<DataQueueImpl> data_queue_; - std::unique_ptr<DataQueue::Reader> current_reader_ = nullptr; + std::shared_ptr<DataQueue::Reader> current_reader_ = nullptr; bool ended_ = false; bool pull_pending_ = false; - int last_status_ = 0; }; -std::unique_ptr<DataQueue::Reader> DataQueueImpl::getReader() { - if (isIdempotent()) { - return std::make_unique<IdempotentDataQueueReader>(shared_from_this()); +std::shared_ptr<DataQueue::Reader> DataQueueImpl::get_reader() { + if (is_idempotent()) { + return std::make_shared<IdempotentDataQueueReader>(shared_from_this()); } - if (lockedToReader_) return nullptr; - lockedToReader_ = true; + if (locked_to_reader_) return nullptr; + locked_to_reader_ = true; - return std::make_unique<NonIdempotentDataQueueReader>(shared_from_this()); + return std::make_shared<NonIdempotentDataQueueReader>(shared_from_this()); } // ============================================================================ // An empty, always idempotent entry. -class EmptyEntry final : public EntryBase { +class EmptyEntry final : public EntryImpl { public: - class EmptyReader final : public DataQueue::Reader { - public: - - int Pull( - Next next, - int options, - DataQueue::Vec* data, - size_t count, - size_t max_count_hint = bob::kMaxCountHint) override { + class EmptyReader final : public DataQueue::Reader, + public std::enable_shared_from_this<EmptyReader> { + public: + int Pull(Next next, + int options, + DataQueue::Vec* data, + size_t count, + size_t max_count_hint = bob::kMaxCountHint) override { + auto self = shared_from_this(); if (ended_) { - std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](size_t) {}); + std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](uint64_t) {}); return bob::Status::STATUS_EOS; } ended_ = true; - std::move(next)(bob::Status::STATUS_END, nullptr, 0, [](size_t) {}); - return bob::Status::STATUS_END; + std::move(next)( + bob::Status::STATUS_CONTINUE, nullptr, 0, [](uint64_t) {}); + return bob::Status::STATUS_CONTINUE; } SET_NO_MEMORY_INFO() - SET_MEMORY_INFO_NAME(EmptyReader); - SET_SELF_SIZE(EmptyReader); + SET_MEMORY_INFO_NAME(EmptyReader) + SET_SELF_SIZE(EmptyReader) - private: + private: bool ended_ = false; }; @@ -600,73 +539,71 @@ class EmptyEntry final : public EntryBase { EmptyEntry& operator=(const EmptyEntry&) = delete; EmptyEntry& operator=(EmptyEntry&&) = delete; - std::unique_ptr<DataQueue::Reader> getReader() override { - return std::make_unique<EmptyReader>(); + std::shared_ptr<DataQueue::Reader> get_reader() override { + return std::make_shared<EmptyReader>(); } std::unique_ptr<Entry> slice( - size_t start, - Maybe<size_t> maybeEnd = Nothing<size_t>()) override { + uint64_t start, + std::optional<uint64_t> maybeEnd = std::nullopt) override { if (start != 0) return nullptr; - size_t end; - if (maybeEnd.To(&end)) { - if (end != 0) return nullptr; - } return std::make_unique<EmptyEntry>(); } - Maybe<size_t> size() const override { return Just<size_t>(0UL); } + std::optional<uint64_t> size() const override { return 0; } - bool isIdempotent() const override { return true; } + bool is_idempotent() const override { return true; } SET_NO_MEMORY_INFO() - SET_MEMORY_INFO_NAME(EmptyEntry); - SET_SELF_SIZE(EmptyEntry); + SET_MEMORY_INFO_NAME(EmptyEntry) + SET_SELF_SIZE(EmptyEntry) }; // ============================================================================ // An entry that consists of a single memory resident v8::BackingStore. // These are always idempotent and always a fixed, known size. -class InMemoryEntry final : public EntryBase { +class InMemoryEntry final : public EntryImpl { public: struct InMemoryFunctor final { std::shared_ptr<BackingStore> backing_store; - void operator()(size_t) { - backing_store = nullptr; - }; + void operator()(uint64_t) { backing_store = nullptr; } }; - class InMemoryReader final : public DataQueue::Reader { + class InMemoryReader final + : public DataQueue::Reader, + public std::enable_shared_from_this<InMemoryReader> { public: - InMemoryReader(InMemoryEntry& entry) - : entry_(entry) {} - - int Pull( - Next next, - int options, - DataQueue::Vec* data, - size_t count, - size_t max_count_hint = bob::kMaxCountHint) override { + InMemoryReader(InMemoryEntry& entry) : entry_(entry) {} + + int Pull(Next next, + int options, + DataQueue::Vec* data, + size_t count, + size_t max_count_hint = bob::kMaxCountHint) override { + auto self = shared_from_this(); if (ended_) { - std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](size_t) {}); + std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](uint64_t) {}); return bob::Status::STATUS_EOS; } ended_ = true; - DataQueue::Vec vec { - reinterpret_cast<uint8_t*>(entry_.backing_store_->Data()) + entry_.offset_, - entry_.byte_length_, + DataQueue::Vec vec{ + reinterpret_cast<uint8_t*>(entry_.backing_store_->Data()) + + entry_.offset_, + entry_.byte_length_, }; - std::move(next)(bob::Status::STATUS_END, &vec, 1, InMemoryFunctor({ - entry_.backing_store_ - })); - return bob::Status::STATUS_END; + + std::move(next)(bob::Status::STATUS_CONTINUE, + &vec, + 1, + InMemoryFunctor({entry_.backing_store_})); + return bob::Status::STATUS_CONTINUE; } SET_NO_MEMORY_INFO() - SET_MEMORY_INFO_NAME(InMemoryReader); - SET_SELF_SIZE(InMemoryReader); + SET_MEMORY_INFO_NAME(InMemoryReader) + SET_SELF_SIZE(InMemoryReader) private: InMemoryEntry& entry_; @@ -674,8 +611,8 @@ class InMemoryEntry final : public EntryBase { }; InMemoryEntry(std::shared_ptr<BackingStore> backing_store, - size_t offset, - size_t byte_length) + uint64_t offset, + uint64_t byte_length) : backing_store_(std::move(backing_store)), offset_(offset), byte_length_(byte_length) { @@ -690,14 +627,15 @@ class InMemoryEntry final : public EntryBase { InMemoryEntry& operator=(const InMemoryEntry&) = delete; InMemoryEntry& operator=(InMemoryEntry&&) = delete; - std::unique_ptr<DataQueue::Reader> getReader() override { - return std::make_unique<InMemoryReader>(*this); + std::shared_ptr<DataQueue::Reader> get_reader() override { + return std::make_shared<InMemoryReader>(*this); } std::unique_ptr<Entry> slice( - size_t start, - Maybe<size_t> maybeEnd = Nothing<size_t>()) override { - const auto makeEntry = [&](size_t start, size_t len) -> std::unique_ptr<Entry> { + uint64_t start, + std::optional<uint64_t> maybeEnd = std::nullopt) override { + const auto makeEntry = [&](uint64_t start, + uint64_t len) -> std::unique_ptr<Entry> { if (len == 0) { return std::make_unique<EmptyEntry>(); } @@ -710,8 +648,8 @@ class InMemoryEntry final : public EntryBase { // The start cannot extend beyond the maximum end point of this entry. start = std::min(start, offset_ + byte_length_); - size_t end; - if (maybeEnd.To(&end)) { + if (maybeEnd.has_value()) { + uint64_t end = maybeEnd.value(); // The end cannot extend beyond the maximum end point of this entry, // and the end must be equal to or greater than the start. end = std::max(start, std::min(offset_ + end, offset_ + byte_length_)); @@ -724,20 +662,21 @@ class InMemoryEntry final : public EntryBase { return makeEntry(start, byte_length_ - start); } - Maybe<size_t> size() const override { return Just(byte_length_); } + std::optional<uint64_t> size() const override { return byte_length_; } - bool isIdempotent() const override { return true; } + bool is_idempotent() const override { return true; } void MemoryInfo(node::MemoryTracker* tracker) const override { - tracker->TrackField("store", backing_store_); + tracker->TrackField( + "store", backing_store_, "std::shared_ptr<v8::BackingStore>"); } - SET_MEMORY_INFO_NAME(InMemoryEntry); - SET_SELF_SIZE(InMemoryEntry); + SET_MEMORY_INFO_NAME(InMemoryEntry) + SET_SELF_SIZE(InMemoryEntry) private: std::shared_ptr<BackingStore> backing_store_; - size_t offset_; - size_t byte_length_; + uint64_t offset_; + uint64_t byte_length_; friend class InMemoryReader; }; @@ -746,9 +685,9 @@ class InMemoryEntry final : public EntryBase { // An entry that wraps a DataQueue. The entry takes on the characteristics // of the wrapped dataqueue. -class DataQueueEntry : public EntryBase { +class DataQueueEntry : public EntryImpl { public: - DataQueueEntry(std::shared_ptr<DataQueue> data_queue) + explicit DataQueueEntry(std::shared_ptr<DataQueue> data_queue) : data_queue_(std::move(data_queue)) { CHECK(data_queue_); } @@ -759,13 +698,12 @@ class DataQueueEntry : public EntryBase { DataQueueEntry& operator=(const DataQueueEntry&) = delete; DataQueueEntry& operator=(DataQueueEntry&&) = delete; - std::unique_ptr<DataQueue::Reader> getReader() override { - return data_queue_->getReader(); + std::shared_ptr<DataQueue::Reader> get_reader() override { + return std::make_shared<ReaderImpl>(data_queue_->get_reader()); } std::unique_ptr<Entry> slice( - size_t start, - Maybe<size_t> end = Nothing<size_t>()) override { + uint64_t start, std::optional<uint64_t> end = std::nullopt) override { std::shared_ptr<DataQueue> sliced = data_queue_->slice(start, end); if (!sliced) return nullptr; @@ -775,10 +713,10 @@ class DataQueueEntry : public EntryBase { // Returns the number of bytes represented by this Entry if it is // known. Certain types of entries, such as those backed by streams // might not know the size in advance and therefore cannot provide - // a value. In such cases, size() must return v8::Nothing<size_t>. + // a value. In such cases, size() must return std::nullopt. // // If the entry is idempotent, a size should always be available. - Maybe<size_t> size() const override { return data_queue_->size(); } + std::optional<uint64_t> size() const override { return data_queue_->size(); } // When true, multiple reads on the object must produce the exact // same data or the reads will fail. Some sources of entry data, @@ -786,19 +724,44 @@ class DataQueueEntry : public EntryBase { // and therefore must not claim to be. If an entry claims to be // idempotent and cannot preserve that quality, subsequent reads // must fail with an error when a variance is detected. - bool isIdempotent() const override { return data_queue_->isIdempotent(); } + bool is_idempotent() const override { return data_queue_->is_idempotent(); } void MemoryInfo(node::MemoryTracker* tracker) const override { - tracker->TrackField("data_queue", data_queue_); + tracker->TrackField( + "data_queue", data_queue_, "std::shared_ptr<DataQueue>"); } DataQueue& getDataQueue() { return *data_queue_; } - SET_MEMORY_INFO_NAME(DataQueueEntry); - SET_SELF_SIZE(DataQueueEntry); + SET_MEMORY_INFO_NAME(DataQueueEntry) + SET_SELF_SIZE(DataQueueEntry) private: std::shared_ptr<DataQueue> data_queue_; + + class ReaderImpl : public DataQueue::Reader, + public std::enable_shared_from_this<ReaderImpl> { + public: + explicit ReaderImpl(std::shared_ptr<DataQueue::Reader> inner) + : inner_(std::move(inner)) {} + + int Pull(DataQueue::Reader::Next next, + int options, + DataQueue::Vec* data, + size_t count, + size_t max_count_hint) override { + auto self = shared_from_this(); + return inner_->Pull( + std::move(next), options, data, count, max_count_hint); + } + + SET_NO_MEMORY_INFO() + SET_MEMORY_INFO_NAME(ReaderImpl) + SET_SELF_SIZE(ReaderImpl) + + private: + std::shared_ptr<DataQueue::Reader> inner_; + }; }; // ============================================================================ @@ -811,77 +774,61 @@ class DataQueueEntry : public EntryBase { // a tolerable risk here. While FdEntry is considered idempotent, this race // means that it is indeed possible for multiple reads to return different // results if the file just happens to get modified. -class FdEntry final : public EntryBase { +class FdEntry final : public EntryImpl { // TODO(@jasnell, @flakey5): - // * This should only allow reading from regular files. No directories, no pipes, etc. - // * The reader should support accepting the buffer(s) from the pull, if any. It should + // * This should only allow reading from regular files. No directories, no + // pipes, etc. + // * The reader should support accepting the buffer(s) from the pull, if any. + // It should // only allocate a managed buffer if the pull doesn't provide any. - // * We might want to consider making the stat on each read sync to eliminate the race + // * We might want to consider making the stat on each read sync to eliminate + // the race // condition described in the comment above. public: - FdEntry(Environment* env, - int fd, - size_t start, - v8::Maybe<size_t> end, - BaseObjectPtr<fs::FileHandle> maybe_file_handle = - BaseObjectPtr<fs::FileHandle>()) - : env_(env), - fd_(fd), - start_(0), - maybe_file_handle_(maybe_file_handle) { - CHECK(fd); - if (GetStat(stat_) == 0) { - if (end.IsNothing()) { - end_ = stat_.st_size; - } else { - end_ = std::min(stat_.st_size, end.FromJust()); - } - } - } + static std::unique_ptr<FdEntry> Create(Environment* env, Local<Value> path) { + // We're only going to create the FdEntry if the file exists. + uv_fs_t req; + auto cleanup = OnScopeLeave([&] { uv_fs_req_cleanup(&req); }); - FdEntry(Environment* env, BaseObjectPtr<fs::FileHandle> handle) - : FdEntry(env, handle->GetFD(), 0, Nothing<size_t>(), handle) {} + auto buf = std::make_shared<BufferValue>(env->isolate(), path); + if (uv_fs_stat(nullptr, &req, buf->out(), nullptr) < 0) return nullptr; + + return std::make_unique<FdEntry>( + env, std::move(buf), req.statbuf, 0, req.statbuf.st_size); + } FdEntry(Environment* env, - int fd, + std::shared_ptr<BufferValue> path_, uv_stat_t stat, - size_t start, - size_t end, - BaseObjectPtr<fs::FileHandle> maybe_file_handle = - BaseObjectPtr<fs::FileHandle>()) + uint64_t start, + uint64_t end) : env_(env), - fd_(end), - start_(start), - end_(end), + path_(std::move(path_)), stat_(stat), - maybe_file_handle_(maybe_file_handle){} + start_(start), + end_(end) {} - std::unique_ptr<DataQueue::Reader> getReader() override { - return std::make_unique<Reader>(this); + std::shared_ptr<DataQueue::Reader> get_reader() override { + return ReaderImpl::Create(this); } std::unique_ptr<Entry> slice( - size_t start, - Maybe<size_t> end = Nothing<size_t>()) override { - size_t new_start = start_ + start; - size_t new_end = end_; - if (end.IsJust()) { - new_end = std::min(end.FromJust() + start, new_end); + uint64_t start, std::optional<uint64_t> end = std::nullopt) override { + uint64_t new_start = start_ + start; + uint64_t new_end = end_; + if (end.has_value()) { + new_end = std::min(end.value() + start, new_end); } CHECK(new_start >= start_); CHECK(new_end <= end_); - return std::make_unique<FdEntry>(env_, fd_, stat_, new_start, new_end, maybe_file_handle_); + return std::make_unique<FdEntry>(env_, path_, stat_, new_start, new_end); } - Maybe<size_t> size() const override { - return Just(end_ - start_); - } + std::optional<uint64_t> size() const override { return end_ - start_; } - bool isIdempotent() const override { - return true; - } + bool is_idempotent() const override { return true; } Environment* env() const { return env_; } @@ -889,229 +836,193 @@ class FdEntry final : public EntryBase { SET_MEMORY_INFO_NAME(FdEntry) SET_SELF_SIZE(FdEntry) - class Wrap : public BaseObject { - public: - static void New(const FunctionCallbackInfo<Value>& args) { - CHECK(args.IsConstructCall()); - Environment* env = Environment::GetCurrent(args); - - CHECK(args[0]->IsInt32()); - CHECK(args[1]->IsUint32()); - CHECK(args[2]->IsUint32()); - - int fd = args[0].As<Int32>()->Value(); - size_t start = args[1].As<Uint32>()->Value(); - size_t end = args[1].As<Uint32>()->Value(); - - new Wrap(env, args.This(), fd, start, Just(end)); - } + private: + Environment* env_; + std::shared_ptr<BufferValue> path_; + uv_stat_t stat_; + uint64_t start_ = 0; + uint64_t end_ = 0; - static Local<FunctionTemplate> GetConstructorTemplate(Environment* env) { - Local<FunctionTemplate> tmpl = env->fdentry_constructor_template(); - if (tmpl.IsEmpty()) { - Isolate* isolate = env->isolate(); - tmpl = NewFunctionTemplate(isolate, New); + bool is_modified(const uv_stat_t& other) { + return other.st_size != stat_.st_size || + other.st_mtim.tv_nsec != stat_.st_mtim.tv_nsec; + } - tmpl->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "FdEntry")); - tmpl->Inherit(BaseObject::GetConstructorTemplate(env)); + static bool CheckModified(FdEntry* entry, int fd) { + uv_fs_t req; + auto cleanup = OnScopeLeave([&] { uv_fs_req_cleanup(&req); }); + // TODO(jasnell): Note the use of a sync fs call here is a bit unfortunate. + // Doing this asynchronously creates a bit of a race condition tho, a file + // could be unmodified when we call the operation but then by the time the + // async callback is triggered to give us that answer the file is modified. + // While such silliness is still possible here, the sync call at least makes + // it less likely to hit the race. + if (uv_fs_fstat(nullptr, &req, fd, nullptr) < 0) return true; + return entry->is_modified(req.statbuf); + } - env->set_fdentry_constructor_template(tmpl); + class ReaderImpl final : public DataQueue::Reader, + public StreamListener, + public std::enable_shared_from_this<ReaderImpl> { + public: + static std::shared_ptr<ReaderImpl> Create(FdEntry* entry) { + uv_fs_t req; + auto cleanup = OnScopeLeave([&] { uv_fs_req_cleanup(&req); }); + int file = + uv_fs_open(nullptr, &req, entry->path_->out(), O_RDONLY, 0, nullptr); + if (file < 0 || FdEntry::CheckModified(entry, file)) { + uv_fs_close(nullptr, &req, file, nullptr); + return nullptr; } + return std::make_shared<ReaderImpl>( + BaseObjectPtr<fs::FileHandle>( + fs::FileHandle::New(entry->env()->GetBindingData<fs::BindingData>( + entry->env()->context()), + file, + Local<Object>(), + entry->start_, + entry->end_)), + entry); + } - return tmpl; + explicit ReaderImpl(BaseObjectPtr<fs::FileHandle> handle, FdEntry* entry) + : env_(handle->env()), handle_(std::move(handle)), entry_(entry) { + handle_->PushStreamListener(this); + handle_->env()->AddCleanupHook(cleanup, this); } - static void Initialize(Environment* env, Local<Object> target) { - SetConstructorFunction( - env->context(), target, "FdEntry", GetConstructorTemplate(env)); + ~ReaderImpl() override { + handle_->env()->RemoveCleanupHook(cleanup, this); + DrainAndClose(); + handle_->RemoveStreamListener(this); } - static void RegisterExternalReferences(ExternalReferenceRegistry* registry) { - registry->Register(New); + uv_buf_t OnStreamAlloc(size_t suggested_size) override { + return env_->allocate_managed_buffer(suggested_size); } - static BaseObjectPtr<Wrap> Create( - Environment* env, - int fd, - size_t start = 0, - Maybe<size_t> end = Nothing<size_t>()) { - Local<Object> obj; - if (!GetConstructorTemplate(env) - ->InstanceTemplate() - ->NewInstance(env->context()) - .ToLocal(&obj)) { - return BaseObjectPtr<Wrap>(); + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override { + std::shared_ptr<v8::BackingStore> store = + env_->release_managed_buffer(buf); + + if (ended_) { + // If we got here and ended_ is true, it means we ended and drained + // while the read was pending. We're just going to do nothing. + CHECK(pending_pulls_.empty()); + return; } - return MakeBaseObject<Wrap>(env, obj, fd, start, end); - } + CHECK(reading_); + auto pending = DequeuePendingPull(); - Wrap(Environment* env, Local<Object> obj, int fd, size_t start, v8::Maybe<size_t> end) - : BaseObject(env, obj), - inner_(std::make_unique<FdEntry>(env, fd, start, end)) { - MakeWeak(); - } + if (CheckModified(entry_, handle_->GetFD())) { + DrainAndClose(); + // The file was modified while the read was pending. We need to error. + std::move(pending.next)(UV_EINVAL, nullptr, 0, [](uint64_t) {}); + return; + } - std::unique_ptr<DataQueue::Entry> detach() { - return std::move(inner_); - } + if (nread < 0) { + if (nread == UV_EOF) { + std::move(pending.next)(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {}); + } else { + std::move(pending.next)(nread, nullptr, 0, [](uint64_t) {}); + } - bool isDetached() const { return inner_ == nullptr; } + return DrainAndClose(); + } - void MemoryInfo(MemoryTracker* tracker) const override { - tracker->TrackField("entry", inner_); - } - SET_MEMORY_INFO_NAME(FdEntry::Wrap) - SET_SELF_SIZE(Wrap) + DataQueue::Vec vec; + vec.base = static_cast<uint8_t*>(store->Data()); + vec.len = static_cast<uint64_t>(nread); + std::move(pending.next)( + bob::STATUS_CONTINUE, &vec, 1, [store](uint64_t) {}); - private: - std::unique_ptr<FdEntry> inner_; - }; + if (pending_pulls_.empty()) { + reading_ = false; + if (handle_->IsAlive()) handle_->ReadStop(); + } + } - private: - Environment* env_; - int fd_; - size_t start_ = 0; - size_t end_ = 0; - uv_stat_t stat_; - uv_fs_t req; - BaseObjectPtr<fs::FileHandle> maybe_file_handle_; + int Pull(Next next, + int options, + DataQueue::Vec* data, + size_t count, + size_t max_count_hint = bob::kMaxCountHint) override { + if (ended_ || !handle_->IsAlive()) { + std::move(next)(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {}); + return bob::STATUS_EOS; + } - int GetStat(uv_stat_t& stat) { - int err = uv_fs_fstat(env_->event_loop(), &req, fd_, nullptr); - stat = req.statbuf; - return err; - } + if (FdEntry::CheckModified(entry_, handle_->GetFD())) { + DrainAndClose(); + std::move(next)(UV_EINVAL, nullptr, 0, [](uint64_t) {}); + return UV_EINVAL; + } - class Reader : public DataQueue::Reader { - public: - Reader(FdEntry* entry) - : entry_(entry), - offset_(entry->start_), - end_(entry_->end_) {} - - int Pull( - Next next, - int options, - DataQueue::Vec* data, - size_t count, - size_t max_count_hint = bob::kMaxCountHint) override { - // TODO(@jasnell): For now, we're going to ignore data and count. - // Later, we can support these to allow the caller to allocate the - // buffers we read into. To keep things easier for now, we're going - // to read into a pre-allocated buffer. - if (ended_ || offset_ == end_) { - std::move(next)(bob::STATUS_EOS, nullptr, 0, [](size_t) {}); - return bob::STATUS_EOS; + pending_pulls_.emplace_back(std::move(next), shared_from_this()); + if (!reading_) { + reading_ = true; + handle_->ReadStart(); } - // offset_ should always be less than end_ here - CHECK_LT(offset_, end_); - new PendingRead(this, std::move(next)); return bob::STATUS_WAIT; } SET_NO_MEMORY_INFO() SET_MEMORY_INFO_NAME(FdEntry::Reader) - SET_SELF_SIZE(Reader) + SET_SELF_SIZE(ReaderImpl) private: - FdEntry* entry_; - bool ended_ = false; - size_t offset_; - size_t end_; - - struct PendingRead { - static constexpr size_t DEFAULT_BUFFER_SIZE = 4096; - Reader* reader; + struct PendingPull { Next next; - uv_fs_t req_; - uv_buf_t uvbuf; - - PendingRead(Reader* reader, Next next) - : reader(reader), - next(std::move(next)), - uvbuf(reader->entry_->env()->allocate_managed_buffer( - std::min(DEFAULT_BUFFER_SIZE, reader->end_ - reader->offset_) - )) { - req_.data = this; - uv_fs_fstat(reader->entry_->env()->event_loop(), &req_, - reader->entry_->fd_, &PendingRead::OnStat); - } - - void Done() { - delete this; - } - - bool checkEnded() { - if (reader->ended_) { - // A previous read ended this readable. Let's stop here. - std::move(next)(bob::STATUS_EOS, nullptr, 0, [](size_t) {}); - return true; - } - if (req_.result < 0) { - std::move(next)(req_.result, nullptr, 0, [](size_t) {}); - return true; - } - return false; - } - - void OnStat() { - if (checkEnded()) return Done(); - uv_stat_t current_stat = req_.statbuf; - uv_stat_t& orig = reader->entry_->stat_; - if (current_stat.st_size != orig.st_size || - current_stat.st_ctim.tv_nsec != orig.st_ctim.tv_nsec || - current_stat.st_mtim.tv_nsec != orig.st_mtim.tv_nsec) { - // The fd was modified. Fail the read. - std::move(next)(UV_EINVAL, nullptr, 0, [](size_t) {}); - return; - } + std::shared_ptr<ReaderImpl> self; + PendingPull(Next next, std::shared_ptr<ReaderImpl> self) + : next(std::move(next)), self(std::move(self)) {} + }; - // Now we read from the file. - uv_fs_read(reader->entry_->env()->event_loop(), &req_, - reader->entry_->fd_, - &uvbuf, 1, - reader->offset_, - OnRead); - } + Environment* env_; + BaseObjectPtr<fs::FileHandle> handle_; + FdEntry* entry_; + std::deque<PendingPull> pending_pulls_; + bool reading_ = false; + bool ended_ = false; - void OnRead() { - auto on_exit = OnScopeLeave([this] { Done(); }); - if (checkEnded()) return; - std::shared_ptr<BackingStore> store = - reader->entry_->env()->release_managed_buffer(uvbuf); - size_t amountRead = req_.result; - // We should never read past end_ - CHECK_LE(amountRead + reader->offset_, reader->end_); - reader->offset_ += amountRead; - if (reader->offset_ == reader->end_) - reader->ended_ = true; - DataQueue::Vec vec = { - reinterpret_cast<uint8_t*>(store->Data()), - amountRead - }; - std::move(next)( - reader->ended_ ? bob::STATUS_END : bob::STATUS_CONTINUE, - &vec, 1, [store](size_t) mutable {}); - } + static void cleanup(void* self) { + auto ptr = static_cast<ReaderImpl*>(self); + ptr->DrainAndClose(); + } - static void OnStat(uv_fs_t* req) { - PendingRead* read = ContainerOf(&PendingRead::req_, req); - read->OnStat(); + void DrainAndClose() { + if (ended_) return; + ended_ = true; + while (!pending_pulls_.empty()) { + auto pending = DequeuePendingPull(); + std::move(pending.next)(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {}); } + handle_->ReadStop(); + + // We fallback to a sync close on the raw fd here because it is the + // easiest, simplest thing to do. All of FileHandle's close mechanisms + // assume async close and cleanup, while DrainAndClose might be running + // in the destructor during GC, for instance. As a todo, FileHandle could + // provide a sync mechanism for closing the FD but, for now, this + // approach works. + int fd = handle_->Release(); + uv_fs_t req; + uv_fs_close(nullptr, &req, fd, nullptr); + uv_fs_req_cleanup(&req); + } - static void OnRead(uv_fs_t* req) { - PendingRead* read = ContainerOf(&PendingRead::req_, req); - read->OnRead(); - } - }; + PendingPull DequeuePendingPull() { + CHECK(!pending_pulls_.empty()); + auto pop = OnScopeLeave([this] { pending_pulls_.pop_front(); }); + return std::move(pending_pulls_.front()); + } - friend struct PendingRead; friend class FdEntry; }; - friend class Reader; - friend struct Reader::PendingRead; + friend class ReaderImpl; }; // ============================================================================ @@ -1122,9 +1033,9 @@ std::shared_ptr<DataQueue> DataQueue::CreateIdempotent( std::vector<std::unique_ptr<Entry>> list) { // Any entry is invalid for an idempotent DataQueue if any of the entries // are nullptr or is not idempotent. - size_t size = 0; + uint64_t size = 0; const auto isInvalid = [&size](auto& item) { - if (item == nullptr || !item->isIdempotent()) { + if (item == nullptr || !item->is_idempotent()) { return true; // true means the entry is not valid here. } @@ -1133,9 +1044,11 @@ std::shared_ptr<DataQueue> DataQueue::CreateIdempotent( // of the entries are unable to provide a size, then // we assume we cannot safely treat this entry as // idempotent even if it claims to be. - size_t itemSize; - if (item->size().To(&itemSize)) { size += itemSize; } - else return true; // true means the entry is not valid here. + if (item->size().has_value()) { + size += item->size().value(); + } else { + return true; // true means the entry is not valid here. + } return false; }; @@ -1147,7 +1060,7 @@ std::shared_ptr<DataQueue> DataQueue::CreateIdempotent( return std::make_shared<DataQueueImpl>(std::move(list), size); } -std::shared_ptr<DataQueue> DataQueue::Create(Maybe<size_t> capped) { +std::shared_ptr<DataQueue> DataQueue::Create(std::optional<uint64_t> capped) { return std::make_shared<DataQueueImpl>(capped); } @@ -1170,9 +1083,7 @@ std::unique_ptr<DataQueue::Entry> DataQueue::CreateInMemoryEntryFromView( std::unique_ptr<DataQueue::Entry> DataQueue::CreateInMemoryEntryFromBackingStore( - std::shared_ptr<BackingStore> store, - size_t offset, - size_t length) { + std::shared_ptr<BackingStore> store, uint64_t offset, uint64_t length) { CHECK(store); if (offset + length > store->ByteLength()) { return nullptr; @@ -1185,18 +1096,18 @@ std::unique_ptr<DataQueue::Entry> DataQueue::CreateDataQueueEntry( return std::make_unique<DataQueueEntry>(std::move(data_queue)); } -std::unique_ptr<DataQueue::Entry> DataQueue::CreateFdEntry( - BaseObjectPtr<fs::FileHandle> handle) { - return std::make_unique<FdEntry>(handle->env(), handle); +std::unique_ptr<DataQueue::Entry> DataQueue::CreateFdEntry(Environment* env, + Local<Value> path) { + return FdEntry::Create(env, path); } void DataQueue::Initialize(Environment* env, v8::Local<v8::Object> target) { - FdEntry::Wrap::Initialize(env, target); + // Nothing to do here currently. } void DataQueue::RegisterExternalReferences( ExternalReferenceRegistry* registry) { - FdEntry::Wrap::RegisterExternalReferences(registry); + // Nothing to do here currently. } } // namespace node diff --git a/src/dataqueue/queue.h b/src/dataqueue/queue.h index 955f9ec589..a1a297a8fc 100644 --- a/src/dataqueue/queue.h +++ b/src/dataqueue/queue.h @@ -3,15 +3,16 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #include <base_object.h> +#include <memory_tracker.h> #include <node.h> #include <node_bob.h> #include <node_file.h> -#include <memory_tracker.h> #include <stream_base.h> -#include <v8.h> #include <uv.h> +#include <v8.h> #include <memory> +#include <optional> #include <vector> namespace node { @@ -101,17 +102,18 @@ namespace node { // To read from a DataQueue, we use the node::bob::Source API // (see src/node_bob.h). // -// std::unique_ptr<DataQueue::Reader> reader = data_queue->getReader(); +// std::shared_ptr<DataQueue::Reader> reader = data_queue->get_reader(); // // reader->Pull( -// [](int status, const DataQueue::Vec* vecs, size_t count, Done done) { +// [](int status, const DataQueue::Vec* vecs, +// uint64_t count, Done done) { // // status is one of node::bob::Status // // vecs is zero or more data buffers containing the read data // // count is the number of vecs // // done is a callback to be invoked when done processing the data // }, options, nullptr, 0, 16); // -// Keep calling Pull() until status is equal to node::bob::Status::STATUS_END. +// Keep calling Pull() until status is equal to node::bob::Status::STATUS_EOS. // // For idempotent DataQueues, any number of readers can be created and // pull concurrently from the same DataQueue. The DataQueue can be read @@ -126,15 +128,14 @@ class DataQueue : public MemoryRetainer { public: struct Vec { uint8_t* base; - size_t len; + uint64_t len; }; // A DataQueue::Reader consumes the DataQueue. If the data queue is // idempotent, multiple Readers can be attached to the DataQueue at // any given time, all guaranteed to yield the same result when the // data is read. Otherwise, only a single Reader can be attached. - class Reader : public MemoryRetainer, - public bob::Source<Vec> { + class Reader : public MemoryRetainer, public bob::Source<Vec> { public: using Next = bob::Next<Vec>; using Done = bob::Done; @@ -150,26 +151,25 @@ class DataQueue : public MemoryRetainer { // offset is omitted, the slice extends to the end of the // data. // - // Creating a slice is only possible if isIdempotent() returns + // Creating a slice is only possible if is_idempotent() returns // true. This is because consuming either the original entry or // the new entry would change the state of the other in non- - // deterministic ways. When isIdempotent() returns false, slice() + // deterministic ways. When is_idempotent() returns false, slice() // must return a nulled unique_ptr. // // Creating a slice is also only possible if the size of the - // entry is known. If size() returns v8::Nothing<size_t>, slice() + // entry is known. If size() returns std::nullopt, slice() // must return a nulled unique_ptr. virtual std::unique_ptr<Entry> slice( - size_t start, - v8::Maybe<size_t> end = v8::Nothing<size_t>()) = 0; + uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0; // Returns the number of bytes represented by this Entry if it is // known. Certain types of entries, such as those backed by streams // might not know the size in advance and therefore cannot provide - // a value. In such cases, size() must return v8::Nothing<size_t>. + // a value. In such cases, size() must return v8::Nothing<uint64_t>. // // If the entry is idempotent, a size should always be available. - virtual v8::Maybe<size_t> size() const = 0; + virtual std::optional<uint64_t> size() const = 0; // When true, multiple reads on the object must produce the exact // same data or the reads will fail. Some sources of entry data, @@ -177,7 +177,7 @@ class DataQueue : public MemoryRetainer { // and therefore must not claim to be. If an entry claims to be // idempotent and cannot preserve that quality, subsequent reads // must fail with an error when a variance is detected. - virtual bool isIdempotent() const = 0; + virtual bool is_idempotent() const = 0; }; // Creates an idempotent DataQueue with a pre-established collection @@ -190,7 +190,7 @@ class DataQueue : public MemoryRetainer { // mutated and updated such that multiple reads are not guaranteed // to produce the same result. The entries added can be of any type. static std::shared_ptr<DataQueue> Create( - v8::Maybe<size_t> capped = v8::Nothing<size_t>()); + std::optional<uint64_t> capped = std::nullopt); // Creates an idempotent Entry from a v8::ArrayBufferView. To help // ensure idempotency, the underlying ArrayBuffer is detached from @@ -207,26 +207,26 @@ class DataQueue : public MemoryRetainer { // is not detachable, nullptr will be returned. static std::unique_ptr<Entry> CreateInMemoryEntryFromBackingStore( std::shared_ptr<v8::BackingStore> store, - size_t offset, - size_t length); + uint64_t offset, + uint64_t length); static std::unique_ptr<Entry> CreateDataQueueEntry( std::shared_ptr<DataQueue> data_queue); - static std::unique_ptr<Entry> CreateFdEntry( - BaseObjectPtr<fs::FileHandle> handle); + static std::unique_ptr<Entry> CreateFdEntry(Environment* env, + v8::Local<v8::Value> path); // Creates a Reader for the given queue. If the queue is idempotent, // any number of readers can be created, all of which are guaranteed // to provide the same data. Otherwise, only a single reader is // permitted. - virtual std::unique_ptr<Reader> getReader() = 0; + virtual std::shared_ptr<Reader> get_reader() = 0; // Append a single new entry to the queue. Appending is only allowed - // when isIdempotent() is false. v8::Nothing<bool>() will be returned - // if isIdempotent() is true. v8::Just(false) will be returned if the + // when is_idempotent() is false. std::nullopt will be returned + // if is_idempotent() is true. std::optional(false) will be returned if the // data queue is not idempotent but the entry otherwise cannot be added. - virtual v8::Maybe<bool> append(std::unique_ptr<Entry> entry) = 0; + virtual std::optional<bool> append(std::unique_ptr<Entry> entry) = 0; // Caps the size of this DataQueue preventing additional entries to // be added if those cause the size to extend beyond the specified @@ -239,12 +239,12 @@ class DataQueue : public MemoryRetainer { // If the size of the data queue is not known, the limit will be // ignored and no additional entries will be allowed at all. // - // If isIdempotent is true capping is unnecessary because the data + // If is_idempotent is true capping is unnecessary because the data // queue cannot be appended to. In that case, cap() is a non-op. // // If the data queue has already been capped, cap can be called // again with a smaller size. - virtual void cap(size_t limit = 0) = 0; + virtual void cap(uint64_t limit = 0) = 0; // Returns a new DataQueue that is a view over this queues data // from the start offset to the ending offset. If the end offset @@ -252,39 +252,38 @@ class DataQueue : public MemoryRetainer { // // The slice will coverage a range from start up to, but excluding, end. // - // Creating a slice is only possible if isIdempotent() returns + // Creating a slice is only possible if is_idempotent() returns // true. This is because consuming either the original DataQueue or // the new queue would change the state of the other in non- - // deterministic ways. When isIdempotent() returns false, slice() + // deterministic ways. When is_idempotent() returns false, slice() // must return a nulled unique_ptr. // // Creating a slice is also only possible if the size of the - // DataQueue is known. If size() returns v8::Nothing<size_t>, slice() + // DataQueue is known. If size() returns std::nullopt, slice() // must return a null unique_ptr. virtual std::shared_ptr<DataQueue> slice( - size_t start, - v8::Maybe<size_t> end = v8::Nothing<size_t>()) = 0; + uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0; // The size of DataQueue is the total size of all of its member entries. // If any of the entries is not able to specify a size, the DataQueue // will also be incapable of doing so, in which case size() must return - // v8::Nothing<size_t>. - virtual v8::Maybe<size_t> size() const = 0; + // std::nullopt. + virtual std::optional<uint64_t> size() const = 0; // A DataQueue is idempotent only if all of its member entries are // idempotent. - virtual bool isIdempotent() const = 0; + virtual bool is_idempotent() const = 0; // True only if cap is called or the data queue is a limited to a // fixed size. - virtual bool isCapped() const = 0; + virtual bool is_capped() const = 0; // If the data queue has been capped, and the size of the data queue // is known, maybeCapRemaining will return the number of additional // bytes the data queue can receive before reaching the cap limit. // If the size of the queue cannot be known, or the cap has not - // been set, maybeCapRemaining() will return v8::Nothing<size_t>. - virtual v8::Maybe<size_t> maybeCapRemaining() const = 0; + // been set, maybeCapRemaining() will return std::nullopt. + virtual std::optional<uint64_t> maybeCapRemaining() const = 0; static void Initialize(Environment* env, v8::Local<v8::Object> target); static void RegisterExternalReferences(ExternalReferenceRegistry* registry); |