summaryrefslogtreecommitdiff
path: root/src/dataqueue
diff options
context:
space:
mode:
authorJames M Snell <jasnell@gmail.com>2022-12-17 13:58:26 -0800
committerJames M Snell <jasnell@gmail.com>2023-02-19 16:26:59 -0800
commit71fb06fd64c873e2c716e23ca1d31219cf78cbd6 (patch)
treebb762422acd056a3c42b68d7b81db5c239cb0d87 /src/dataqueue
parent950cec4c2642c15e2913f35babadda56c1d8a723 (diff)
downloadnode-new-71fb06fd64c873e2c716e23ca1d31219cf78cbd6.tar.gz
src, lib: fixup lint and format issues for DataQueue/Blob
Co-authored-by: flakey5 <73616808+flakey5@users.noreply.github.com> PR-URL: https://github.com/nodejs/node/pull/45258 Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Diffstat (limited to 'src/dataqueue')
-rw-r--r--src/dataqueue/queue.cc1119
-rw-r--r--src/dataqueue/queue.h75
2 files changed, 552 insertions, 642 deletions
diff --git a/src/dataqueue/queue.cc b/src/dataqueue/queue.cc
index 2ca87c0b59..a39314e23e 100644
--- a/src/dataqueue/queue.cc
+++ b/src/dataqueue/queue.cc
@@ -6,73 +6,53 @@
#include <node.h>
#include <node_bob-inl.h>
#include <node_errors.h>
+#include <node_external_reference.h>
#include <node_file-inl.h>
#include <stream_base-inl.h>
-#include <v8.h>
#include <util-inl.h>
#include <uv.h>
+#include <v8.h>
#include <algorithm>
#include <deque>
#include <initializer_list>
#include <memory>
#include <vector>
-#include "base_object.h"
-#include "memory_tracker.h"
-#include "node_bob.h"
-#include "node_external_reference.h"
-#include "util.h"
-#include "v8-function-callback.h"
namespace node {
using v8::ArrayBufferView;
using v8::BackingStore;
-using v8::Context;
-using v8::HandleScope;
-using v8::Just;
using v8::Local;
using v8::Object;
-using v8::Maybe;
-using v8::Nothing;
-using v8::FunctionTemplate;
-using v8::Isolate;
-using v8::FunctionCallbackInfo;
using v8::Value;
-using v8::String;
-using v8::Global;
-using v8::Function;
-using v8::Int32;
-using v8::Uint32;
namespace {
// ============================================================================
class IdempotentDataQueueReader;
class NonIdempotentDataQueueReader;
-class EntryBase : public DataQueue::Entry {
+class EntryImpl : public DataQueue::Entry {
public:
- virtual std::unique_ptr<DataQueue::Reader> getReader() = 0;
+ virtual std::shared_ptr<DataQueue::Reader> get_reader() = 0;
};
class DataQueueImpl final : public DataQueue,
public std::enable_shared_from_this<DataQueueImpl> {
public:
- // Constructor for an imdempotent, fixed sized DataQueue.
- DataQueueImpl(std::vector<std::unique_ptr<Entry>> list, size_t size)
+ // Constructor for an idempotent, fixed sized DataQueue.
+ DataQueueImpl(std::vector<std::unique_ptr<Entry>>&& list, uint64_t size)
: entries_(std::move(list)),
idempotent_(true),
- size_(Just(size)),
- capped_size_(Just<size_t>(0UL)) {}
+ size_(size),
+ capped_size_(0) {}
// Constructor for a non-idempotent DataQueue. This kind of queue can have
// entries added to it over time. The size is set to 0 initially. The queue
// can be capped immediately on creation. Depending on the entries that are
// added, the size can be cleared if any of the entries are not capable of
// providing a size.
- DataQueueImpl(Maybe<size_t> cap = Nothing<size_t>())
- : idempotent_(false),
- size_(Just<size_t>(0UL)),
- capped_size_(cap) {}
+ DataQueueImpl(std::optional<uint64_t> cap = std::nullopt)
+ : idempotent_(false), size_(0), capped_size_(cap) {}
// Disallow moving and copying.
DataQueueImpl(const DataQueueImpl&) = delete;
@@ -81,42 +61,38 @@ class DataQueueImpl final : public DataQueue,
DataQueueImpl& operator=(DataQueueImpl&&) = delete;
std::shared_ptr<DataQueue> slice(
- size_t start,
- Maybe<size_t> maybeEnd = Nothing<size_t>()) override {
+ uint64_t start,
+ std::optional<uint64_t> maybeEnd = std::nullopt) override {
// If the data queue is not idempotent, or the size cannot be determined,
// we cannot reasonably create a slice. Therefore, return nothing.
- if (!idempotent_ || size_.IsNothing()) return nullptr;
+ if (!idempotent_ || !size_.has_value()) return nullptr;
- size_t size = size_.FromJust();
+ uint64_t size = size_.value();
// start cannot be greater than the size.
start = std::min(start, size);
- size_t end;
- if (maybeEnd.To(&end)) {
- // end cannot be less than start, or greater than the size.
- end = std::max(start, std::min(end, size));
- } else {
- end = size;
- }
+ uint64_t end = std::max(start, std::min(maybeEnd.value_or(size), size));
DCHECK_LE(start, end);
- size_t len = end - start;
- size_t remaining = end - start;
+ uint64_t len = end - start;
+ uint64_t remaining = end - start;
std::vector<std::unique_ptr<Entry>> slices;
if (remaining > 0) {
for (const auto& entry : entries_) {
- size_t entrySize = entry->size().FromJust();
+ // The size of every entry should be known since this is an
+ // idempotent queue.
+ uint64_t entrySize = entry->size().value();
if (start > entrySize) {
start -= entrySize;
continue;
}
- size_t chunkStart = start;
- size_t len = std::min(remaining, entrySize - chunkStart);
- slices.emplace_back(entry->slice(chunkStart, Just(chunkStart + len)));
+ uint64_t chunkStart = start;
+ uint64_t len = std::min(remaining, entrySize - chunkStart);
+ slices.emplace_back(entry->slice(chunkStart, chunkStart + len));
remaining -= len;
start = 0;
@@ -127,76 +103,75 @@ class DataQueueImpl final : public DataQueue,
return std::make_shared<DataQueueImpl>(std::move(slices), len);
}
- Maybe<size_t> size() const override { return size_; }
+ std::optional<uint64_t> size() const override { return size_; }
- bool isIdempotent() const override { return idempotent_; }
+ bool is_idempotent() const override { return idempotent_; }
- bool isCapped() const override { return capped_size_.IsJust(); }
+ bool is_capped() const override { return capped_size_.has_value(); }
- Maybe<bool> append(std::unique_ptr<Entry> entry) override {
- if (idempotent_) return Nothing<bool>();
- if (!entry) return Just(false);
+ std::optional<bool> append(std::unique_ptr<Entry> entry) override {
+ if (idempotent_) return std::nullopt;
+ if (!entry) return false;
// If this entry successfully provides a size, we can add it to our size_
- // if that has a value, otherwise, we keep size_t empty.
- size_t entrySize;
- size_t queueSize;
- if (entry->size().To(&entrySize) && size_.To(&queueSize)) {
+ // if that has a value, otherwise, we keep uint64_t empty.
+ if (entry->size().has_value() && size_.has_value()) {
+ uint64_t entrySize = entry->size().value();
+ uint64_t size = size_.value();
// If capped_size_ is set, size + entrySize cannot exceed capped_size_
// or the entry cannot be added.
- size_t capped_size;
- if (capped_size_.To(&capped_size) && queueSize + entrySize > capped_size) {
- return Just(false);
+ if (capped_size_.has_value() &&
+ (capped_size_.value() < entrySize + size)) {
+ return false;
}
-
- size_ = Just(queueSize + entrySize);
+ size_ = size + entrySize;
} else {
// This entry cannot provide a size. We can still add it but we have to
// clear the known size.
- size_ = Nothing<size_t>();
+ size_ = std::nullopt;
}
entries_.push_back(std::move(entry));
- return Just(true);
+ return true;
}
- void cap(size_t limit = 0) override {
- if (isIdempotent()) return;
- size_t current_cap;
+ void cap(uint64_t limit = 0) override {
+ if (is_idempotent()) return;
// If the data queue is already capped, it is possible to call
// cap again with a smaller size.
- if (capped_size_.To(&current_cap)) {
- capped_size_ = Just(std::min(limit, current_cap));
+ if (capped_size_.has_value()) {
+ capped_size_ = std::min(limit, capped_size_.value());
return;
}
// Otherwise just set the limit.
- capped_size_ = Just(limit);
+ capped_size_ = limit;
}
- Maybe<size_t> maybeCapRemaining() const override {
- size_t capped_size;
- size_t size;
- if (capped_size_.To(&capped_size) && size_.To(&size)) {
- return capped_size > size ? Just(capped_size - size) : Just<size_t>(0UL);
+ std::optional<uint64_t> maybeCapRemaining() const override {
+ if (capped_size_.has_value() && size_.has_value()) {
+ uint64_t capped_size = capped_size_.value();
+ uint64_t size = size_.value();
+ return capped_size > size ? capped_size - size : 0UL;
}
- return Nothing<size_t>();
+ return std::nullopt;
}
void MemoryInfo(node::MemoryTracker* tracker) const override {
- tracker->TrackField("entries", entries_);
+ tracker->TrackField(
+ "entries", entries_, "std::vector<std::unique_ptr<Entry>>");
}
- std::unique_ptr<Reader> getReader() override;
- SET_MEMORY_INFO_NAME(DataQueue);
- SET_SELF_SIZE(DataQueueImpl);
+ std::shared_ptr<Reader> get_reader() override;
+ SET_MEMORY_INFO_NAME(DataQueue)
+ SET_SELF_SIZE(DataQueueImpl)
private:
std::vector<std::unique_ptr<Entry>> entries_;
bool idempotent_;
- Maybe<size_t> size_;
- Maybe<size_t> capped_size_;
- bool lockedToReader_ = false;
+ std::optional<uint64_t> size_ = std::nullopt;
+ std::optional<uint64_t> capped_size_ = std::nullopt;
+ bool locked_to_reader_ = false;
friend class DataQueue;
friend class IdempotentDataQueueReader;
@@ -207,123 +182,108 @@ class DataQueueImpl final : public DataQueue,
// DataQueue with which it is associated, and always from the beginning.
// Reads are non-destructive, meaning that the state of the DataQueue
// will not and cannot be changed.
-class IdempotentDataQueueReader final : public DataQueue::Reader {
+class IdempotentDataQueueReader final
+ : public DataQueue::Reader,
+ public std::enable_shared_from_this<IdempotentDataQueueReader> {
public:
IdempotentDataQueueReader(std::shared_ptr<DataQueueImpl> data_queue)
: data_queue_(std::move(data_queue)) {
- CHECK(data_queue_->isIdempotent());
+ CHECK(data_queue_->is_idempotent());
}
// Disallow moving and copying.
IdempotentDataQueueReader(const IdempotentDataQueueReader&) = delete;
IdempotentDataQueueReader(IdempotentDataQueueReader&&) = delete;
- IdempotentDataQueueReader& operator=(const IdempotentDataQueueReader&) = delete;
+ IdempotentDataQueueReader& operator=(const IdempotentDataQueueReader&) =
+ delete;
IdempotentDataQueueReader& operator=(IdempotentDataQueueReader&&) = delete;
- int Pull(
- Next next,
- int options,
- DataQueue::Vec* data,
- size_t count,
- size_t max_count_hint = bob::kMaxCountHint) override {
+ int Pull(Next next,
+ int options,
+ DataQueue::Vec* data,
+ size_t count,
+ size_t max_count_hint = bob::kMaxCountHint) override {
+ std::shared_ptr<DataQueue::Reader> self = shared_from_this();
+
// If ended is true, this reader has already reached the end and cannot
// provide any more data.
if (ended_) {
- std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](size_t) {});
+ std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](uint64_t) {});
return bob::Status::STATUS_EOS;
}
// If this is the first pull from this reader, we are first going to
// check to see if there is anything at all to actually do.
- if (current_index_.IsNothing()) {
+ if (!current_index_.has_value()) {
// First, let's check the number of entries. If there are no entries,
// we've reached the end and have nothing to do.
- bool empty = data_queue_->entries_.empty();
-
- // Second, if there are entries, let's check the known size to see if
- // it is zero or not.
- if (!empty) {
- size_t size;
- if (data_queue_->size().To(&size)) {
- // If the size is known to be zero, there's absolutely nothing else for
- // us to do but end.
- empty = (size == 0);
- }
- // If the size cannot be determined, we will have to try reading from
- // the entry to see if it has any data or not, so fall through here.
- }
-
- if (empty) {
+ // Because this is an idempotent dataqueue, we should always know the
+ // size...
+ if (data_queue_->entries_.empty()) {
ended_ = true;
- std::move(next)(bob::Status::STATUS_END, nullptr, 0, [](size_t) {});
- return bob::Status::STATUS_END;
+ std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](uint64_t) {});
+ return bob::Status::STATUS_EOS;
}
- current_index_ = Just(0U);
+ current_index_ = 0;
}
// We have current_index_, awesome, we are going to keep reading from
// it until we receive and end.
- CHECK(!pull_pending_);
- pull_pending_ = true;
- int status = getCurrentReader().Pull(
- [this, next = std::move(next)]
- (int status, const DataQueue::Vec* vecs, size_t count, Done done) {
- pull_pending_ = false;
- last_status_ = status;
-
- // In each of these cases, we do not expect that the source will
- // actually have provided any actual data.
- CHECK_IMPLIES(status == bob::Status::STATUS_BLOCK ||
- status == bob::Status::STATUS_WAIT ||
- status == bob::Status::STATUS_EOS,
- vecs == nullptr && count == 0);
-
- // Technically, receiving a STATUS_EOS is really an error because
- // we've read past the end of the data, but we are going to treat
- // it the same as end.
- if (status == bob::Status::STATUS_END ||
- status == bob::Status::STATUS_EOS) {
- uint32_t current = current_index_.FromJust() + 1;
- // We have reached the end of this entry. If this is the last entry,
- // then we are done. Otherwise, we advance the current_index_, clear
- // the current_reader_ and wait for the next read.
- if (current == data_queue_->entries_.size()) {
- // Yes, this was the final entry. We're all done.
- ended_ = true;
- status = bob::Status::STATUS_END;
- } else {
- // This was not the final entry, so we update the index and
- // continue on.
- current_index_ = Just(current);
- status = bob::Status::STATUS_CONTINUE;
- }
- current_reader_ = nullptr;
- }
- // Now that we have updated this readers state, we can forward
- // everything on to the outer next.
- std::move(next)(status, vecs, count, std::move(done));
- }, options, data, count, max_count_hint);
+ auto current_reader = getCurrentReader();
+ if (current_reader == nullptr) {
+ // Getting the current reader for an entry could fail for several
+ // reasons. For an FdEntry, for instance, getting the reader may
+ // fail if the file has been modified since the FdEntry was created.
+ // We handle the case simply by erroring.
+ std::move(next)(UV_EINVAL, nullptr, 0, [](uint64_t) {});
+ return UV_EINVAL;
+ }
+ CHECK(!pull_pending_);
+ pull_pending_ = true;
+ int status = current_reader->Pull(
+ [this, next = std::move(next)](
+ int status, const DataQueue::Vec* vecs, uint64_t count, Done done) {
+ pull_pending_ = false;
+ // In each of these cases, we do not expect that the source will
+ // actually have provided any actual data.
+ CHECK_IMPLIES(status == bob::Status::STATUS_BLOCK ||
+ status == bob::Status::STATUS_WAIT ||
+ status == bob::Status::STATUS_EOS,
+ vecs == nullptr && count == 0);
+ if (status == bob::Status::STATUS_EOS) {
+ uint32_t current = current_index_.value() + 1;
+ current_reader_ = nullptr;
+ // We have reached the end of this entry. If this is the last entry,
+ // then we are done. Otherwise, we advance the current_index_, clear
+ // the current_reader_ and wait for the next read.
+
+ if (current == data_queue_->entries_.size()) {
+ // Yes, this was the final entry. We're all done.
+ ended_ = true;
+ } else {
+ // This was not the final entry, so we update the index and
+ // continue on by performing another read.
+ current_index_ = current;
+ status = bob::STATUS_CONTINUE;
+ }
+ std::move(next)(status, nullptr, 0, [](uint64_t) {});
+ return;
+ }
+
+ std::move(next)(status, vecs, count, std::move(done));
+ },
+ options,
+ data,
+ count,
+ max_count_hint);
+
+ // The pull was handled synchronously. If we're not ended, we want to
+ // make sure status returned is CONTINUE.
if (!pull_pending_) {
- // The callback was resolved synchronously. Let's check our status.
-
- // Just as a double check, when next is called synchronous, the status
- // provided there should match the status returned.
- CHECK(status == last_status_);
-
- if (ended_) {
- // Awesome, we read everything. Return status end here and we're done.
- return bob::Status::STATUS_END;
- }
-
- if (status == bob::Status::STATUS_END ||
- status == bob::Status::STATUS_EOS) {
- // If we got here and ended_ is not true, there's more to read.
- return bob::Status::STATUS_CONTINUE;
- }
-
+ if (!ended_) return bob::Status::STATUS_CONTINUE;
// For all other status, we just fall through and return it straightaway.
}
@@ -345,60 +305,63 @@ class IdempotentDataQueueReader final : public DataQueue::Reader {
return status;
}
- DataQueue::Reader& getCurrentReader() {
+ DataQueue::Reader* getCurrentReader() {
CHECK(!ended_);
- CHECK(current_index_.IsJust());
+ CHECK(current_index_.has_value());
if (current_reader_ == nullptr) {
- auto& entry = data_queue_->entries_[current_index_.FromJust()];
+ auto& entry = data_queue_->entries_[current_index_.value()];
// Because this is an idempotent reader, let's just be sure to
// doublecheck that the entry itself is actually idempotent
- DCHECK(entry->isIdempotent());
- current_reader_ = static_cast<EntryBase&>(*entry).getReader();
+ DCHECK(entry->is_idempotent());
+ current_reader_ = static_cast<EntryImpl&>(*entry).get_reader();
}
- CHECK_NOT_NULL(current_reader_);
- return *current_reader_;
+ return current_reader_.get();
}
SET_NO_MEMORY_INFO()
- SET_MEMORY_INFO_NAME(IdempotentDataQueueReader);
- SET_SELF_SIZE(IdempotentDataQueueReader);
+ SET_MEMORY_INFO_NAME(IdempotentDataQueueReader)
+ SET_SELF_SIZE(IdempotentDataQueueReader)
private:
std::shared_ptr<DataQueueImpl> data_queue_;
- Maybe<uint32_t> current_index_ = Nothing<uint32_t>();
- std::unique_ptr<DataQueue::Reader> current_reader_ = nullptr;
+ std::optional<uint32_t> current_index_ = std::nullopt;
+ std::shared_ptr<DataQueue::Reader> current_reader_ = nullptr;
bool ended_ = false;
bool pull_pending_ = false;
- int last_status_ = 0;
};
// A NonIdempotentDataQueueReader reads entries from the DataEnqueue
// and removes those entries from the queue as they are fully consumed.
// This means that reads are destructive and the state of the DataQueue
// is mutated as the read proceeds.
-class NonIdempotentDataQueueReader final : public DataQueue::Reader {
+class NonIdempotentDataQueueReader final
+ : public DataQueue::Reader,
+ public std::enable_shared_from_this<NonIdempotentDataQueueReader> {
public:
NonIdempotentDataQueueReader(std::shared_ptr<DataQueueImpl> data_queue)
: data_queue_(std::move(data_queue)) {
- CHECK(!data_queue_->isIdempotent());
+ CHECK(!data_queue_->is_idempotent());
}
// Disallow moving and copying.
NonIdempotentDataQueueReader(const NonIdempotentDataQueueReader&) = delete;
NonIdempotentDataQueueReader(NonIdempotentDataQueueReader&&) = delete;
- NonIdempotentDataQueueReader& operator=(const NonIdempotentDataQueueReader&) = delete;
- NonIdempotentDataQueueReader& operator=(NonIdempotentDataQueueReader&&) = delete;
-
- int Pull(
- Next next,
- int options,
- DataQueue::Vec* data,
- size_t count,
- size_t max_count_hint = bob::kMaxCountHint) override {
+ NonIdempotentDataQueueReader& operator=(const NonIdempotentDataQueueReader&) =
+ delete;
+ NonIdempotentDataQueueReader& operator=(NonIdempotentDataQueueReader&&) =
+ delete;
+
+ int Pull(Next next,
+ int options,
+ DataQueue::Vec* data,
+ size_t count,
+ size_t max_count_hint = bob::kMaxCountHint) override {
+ std::shared_ptr<DataQueue::Reader> self = shared_from_this();
+
// If ended is true, this reader has already reached the end and cannot
// provide any more data.
if (ended_) {
- std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](size_t) {});
+ std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](uint64_t) {});
return bob::Status::STATUS_EOS;
}
@@ -410,21 +373,21 @@ class NonIdempotentDataQueueReader final : public DataQueue::Reader {
// expect more data to be provided later, but we don't know exactly when
// that'll happe, so the proper response here is to return a blocked
// status.
- if (!data_queue_->isCapped()) {
- std::move(next)(bob::Status::STATUS_BLOCK, nullptr, 0, [](size_t) {});
+ if (!data_queue_->is_capped()) {
+ std::move(next)(bob::Status::STATUS_BLOCK, nullptr, 0, [](uint64_t) {});
return bob::STATUS_BLOCK;
}
// However, if we are capped, the status will depend on whether the size
// of the data_queue_ is known or not.
- size_t size;
- if (data_queue_->size().To(&size)) {
- // If the size is known, and it is still less than the cap, then we still
- // might get more data. We just don't know exactly when that'll come, so
- // let's return a blocked status.
- if (size < data_queue_->capped_size_.FromJust()) {
- std::move(next)(bob::Status::STATUS_BLOCK, nullptr, 0, [](size_t) {});
+ if (data_queue_->size().has_value()) {
+ // If the size is known, and it is still less than the cap, then we
+ // still might get more data. We just don't know exactly when that'll
+ // come, so let's return a blocked status.
+ if (data_queue_->size().value() < data_queue_->capped_size_.value()) {
+ std::move(next)(
+ bob::Status::STATUS_BLOCK, nullptr, 0, [](uint64_t) {});
return bob::STATUS_BLOCK;
}
@@ -437,75 +400,51 @@ class NonIdempotentDataQueueReader final : public DataQueue::Reader {
// entries, we're done. There's nothing left to read.
current_reader_ = nullptr;
ended_ = true;
- std::move(next)(bob::Status::STATUS_END, nullptr, 0, [](size_t) {});
- return bob::STATUS_END;
+ std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](uint64_t) {});
+ return bob::STATUS_EOS;
+ }
+
+ auto current_reader = getCurrentReader();
+ if (current_reader == nullptr) {
+ std::move(next)(UV_EINVAL, nullptr, 0, [](uint64_t) {});
+ return UV_EINVAL;
}
// If we got here, we have an entry to read from.
CHECK(!pull_pending_);
pull_pending_ = true;
- int status = getCurrentReader().Pull(
- [this, next = std::move(next)]
- (int status, const DataQueue::Vec* vecs, size_t count, Done done) {
- pull_pending_ = false;
- last_status_ = status;
-
- // In each of these cases, we do not expect that the source will
- // actually have provided any actual data.
- CHECK_IMPLIES(status == bob::Status::STATUS_BLOCK ||
- status == bob::Status::STATUS_WAIT ||
- status == bob::Status::STATUS_EOS,
- vecs == nullptr && count == 0);
-
- // Technically, receiving a STATUS_EOS is really an error because
- // we've read past the end of the data, but we are going to treat
- // it the same as end.
- if (status == bob::Status::STATUS_END ||
- status == bob::Status::STATUS_EOS) {
- data_queue_->entries_.erase(data_queue_->entries_.begin());
-
- // We have reached the end of this entry. If this is the last entry,
- // then we are done. Otherwise, we advance the current_index_, clear
- // the current_reader_ and wait for the next read.
- if (data_queue_->entries_.empty()) {
- // Yes, this was the final entry. We're all done.
- ended_ = true;
- status = bob::Status::STATUS_END;
- } else {
- // This was not the final entry, so we update the index and
- // continue on.
- status = bob::Status::STATUS_CONTINUE;
- }
- current_reader_ = nullptr;
- }
-
- // Now that we have updated this readers state, we can forward
- // everything on to the outer next.
- std::move(next)(status, vecs, count, std::move(done));
- }, options, data, count, max_count_hint);
+ int status = current_reader->Pull(
+ [this, next = std::move(next)](
+ int status, const DataQueue::Vec* vecs, uint64_t count, Done done) {
+ pull_pending_ = false;
+
+ // In each of these cases, we do not expect that the source will
+ // actually have provided any actual data.
+ CHECK_IMPLIES(status == bob::Status::STATUS_BLOCK ||
+ status == bob::Status::STATUS_WAIT ||
+ status == bob::Status::STATUS_EOS,
+ vecs == nullptr && count == 0);
+ if (status == bob::Status::STATUS_EOS) {
+ data_queue_->entries_.erase(data_queue_->entries_.begin());
+ ended_ = data_queue_->entries_.empty();
+ current_reader_ = nullptr;
+ if (!ended_) status = bob::Status::STATUS_CONTINUE;
+ std::move(next)(status, nullptr, 0, [](uint64_t) {});
+ return;
+ }
+
+ // Now that we have updated this readers state, we can forward
+ // everything on to the outer next.
+ std::move(next)(status, vecs, count, std::move(done));
+ },
+ options,
+ data,
+ count,
+ max_count_hint);
if (!pull_pending_) {
// The callback was resolved synchronously. Let's check our status.
-
- // Just as a double check, when next is called synchronous, the status
- // provided there should match the status returned.
- CHECK(status == last_status_);
-
- if (ended_) {
- // Awesome, we read everything. Return status end here and we're done.
-
- // Let's just make sure we've removed all of the entries.
- CHECK(data_queue_->entries_.empty());
-
- return bob::Status::STATUS_END;
- }
-
- if (status == bob::Status::STATUS_END ||
- status == bob::Status::STATUS_EOS) {
- // If we got here and ended_ is not true, there's more to read.
- return bob::Status::STATUS_CONTINUE;
- }
-
+ if (!ended_) return bob::Status::STATUS_CONTINUE;
// For all other status, we just fall through and return it straightaway.
}
@@ -527,68 +466,68 @@ class NonIdempotentDataQueueReader final : public DataQueue::Reader {
return status;
}
- DataQueue::Reader& getCurrentReader() {
+ DataQueue::Reader* getCurrentReader() {
CHECK(!ended_);
CHECK(!data_queue_->entries_.empty());
if (current_reader_ == nullptr) {
auto& entry = data_queue_->entries_.front();
- current_reader_ = static_cast<EntryBase&>(*entry).getReader();
+ current_reader_ = static_cast<EntryImpl&>(*entry).get_reader();
}
- return *current_reader_;
+ return current_reader_.get();
}
SET_NO_MEMORY_INFO()
- SET_MEMORY_INFO_NAME(NonIdempotentDataQueueReader);
- SET_SELF_SIZE(NonIdempotentDataQueueReader);
+ SET_MEMORY_INFO_NAME(NonIdempotentDataQueueReader)
+ SET_SELF_SIZE(NonIdempotentDataQueueReader)
private:
std::shared_ptr<DataQueueImpl> data_queue_;
- std::unique_ptr<DataQueue::Reader> current_reader_ = nullptr;
+ std::shared_ptr<DataQueue::Reader> current_reader_ = nullptr;
bool ended_ = false;
bool pull_pending_ = false;
- int last_status_ = 0;
};
-std::unique_ptr<DataQueue::Reader> DataQueueImpl::getReader() {
- if (isIdempotent()) {
- return std::make_unique<IdempotentDataQueueReader>(shared_from_this());
+std::shared_ptr<DataQueue::Reader> DataQueueImpl::get_reader() {
+ if (is_idempotent()) {
+ return std::make_shared<IdempotentDataQueueReader>(shared_from_this());
}
- if (lockedToReader_) return nullptr;
- lockedToReader_ = true;
+ if (locked_to_reader_) return nullptr;
+ locked_to_reader_ = true;
- return std::make_unique<NonIdempotentDataQueueReader>(shared_from_this());
+ return std::make_shared<NonIdempotentDataQueueReader>(shared_from_this());
}
// ============================================================================
// An empty, always idempotent entry.
-class EmptyEntry final : public EntryBase {
+class EmptyEntry final : public EntryImpl {
public:
- class EmptyReader final : public DataQueue::Reader {
- public:
-
- int Pull(
- Next next,
- int options,
- DataQueue::Vec* data,
- size_t count,
- size_t max_count_hint = bob::kMaxCountHint) override {
+ class EmptyReader final : public DataQueue::Reader,
+ public std::enable_shared_from_this<EmptyReader> {
+ public:
+ int Pull(Next next,
+ int options,
+ DataQueue::Vec* data,
+ size_t count,
+ size_t max_count_hint = bob::kMaxCountHint) override {
+ auto self = shared_from_this();
if (ended_) {
- std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](size_t) {});
+ std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](uint64_t) {});
return bob::Status::STATUS_EOS;
}
ended_ = true;
- std::move(next)(bob::Status::STATUS_END, nullptr, 0, [](size_t) {});
- return bob::Status::STATUS_END;
+ std::move(next)(
+ bob::Status::STATUS_CONTINUE, nullptr, 0, [](uint64_t) {});
+ return bob::Status::STATUS_CONTINUE;
}
SET_NO_MEMORY_INFO()
- SET_MEMORY_INFO_NAME(EmptyReader);
- SET_SELF_SIZE(EmptyReader);
+ SET_MEMORY_INFO_NAME(EmptyReader)
+ SET_SELF_SIZE(EmptyReader)
- private:
+ private:
bool ended_ = false;
};
@@ -600,73 +539,71 @@ class EmptyEntry final : public EntryBase {
EmptyEntry& operator=(const EmptyEntry&) = delete;
EmptyEntry& operator=(EmptyEntry&&) = delete;
- std::unique_ptr<DataQueue::Reader> getReader() override {
- return std::make_unique<EmptyReader>();
+ std::shared_ptr<DataQueue::Reader> get_reader() override {
+ return std::make_shared<EmptyReader>();
}
std::unique_ptr<Entry> slice(
- size_t start,
- Maybe<size_t> maybeEnd = Nothing<size_t>()) override {
+ uint64_t start,
+ std::optional<uint64_t> maybeEnd = std::nullopt) override {
if (start != 0) return nullptr;
- size_t end;
- if (maybeEnd.To(&end)) {
- if (end != 0) return nullptr;
- }
return std::make_unique<EmptyEntry>();
}
- Maybe<size_t> size() const override { return Just<size_t>(0UL); }
+ std::optional<uint64_t> size() const override { return 0; }
- bool isIdempotent() const override { return true; }
+ bool is_idempotent() const override { return true; }
SET_NO_MEMORY_INFO()
- SET_MEMORY_INFO_NAME(EmptyEntry);
- SET_SELF_SIZE(EmptyEntry);
+ SET_MEMORY_INFO_NAME(EmptyEntry)
+ SET_SELF_SIZE(EmptyEntry)
};
// ============================================================================
// An entry that consists of a single memory resident v8::BackingStore.
// These are always idempotent and always a fixed, known size.
-class InMemoryEntry final : public EntryBase {
+class InMemoryEntry final : public EntryImpl {
public:
struct InMemoryFunctor final {
std::shared_ptr<BackingStore> backing_store;
- void operator()(size_t) {
- backing_store = nullptr;
- };
+ void operator()(uint64_t) { backing_store = nullptr; }
};
- class InMemoryReader final : public DataQueue::Reader {
+ class InMemoryReader final
+ : public DataQueue::Reader,
+ public std::enable_shared_from_this<InMemoryReader> {
public:
- InMemoryReader(InMemoryEntry& entry)
- : entry_(entry) {}
-
- int Pull(
- Next next,
- int options,
- DataQueue::Vec* data,
- size_t count,
- size_t max_count_hint = bob::kMaxCountHint) override {
+ InMemoryReader(InMemoryEntry& entry) : entry_(entry) {}
+
+ int Pull(Next next,
+ int options,
+ DataQueue::Vec* data,
+ size_t count,
+ size_t max_count_hint = bob::kMaxCountHint) override {
+ auto self = shared_from_this();
if (ended_) {
- std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](size_t) {});
+ std::move(next)(bob::Status::STATUS_EOS, nullptr, 0, [](uint64_t) {});
return bob::Status::STATUS_EOS;
}
ended_ = true;
- DataQueue::Vec vec {
- reinterpret_cast<uint8_t*>(entry_.backing_store_->Data()) + entry_.offset_,
- entry_.byte_length_,
+ DataQueue::Vec vec{
+ reinterpret_cast<uint8_t*>(entry_.backing_store_->Data()) +
+ entry_.offset_,
+ entry_.byte_length_,
};
- std::move(next)(bob::Status::STATUS_END, &vec, 1, InMemoryFunctor({
- entry_.backing_store_
- }));
- return bob::Status::STATUS_END;
+
+ std::move(next)(bob::Status::STATUS_CONTINUE,
+ &vec,
+ 1,
+ InMemoryFunctor({entry_.backing_store_}));
+ return bob::Status::STATUS_CONTINUE;
}
SET_NO_MEMORY_INFO()
- SET_MEMORY_INFO_NAME(InMemoryReader);
- SET_SELF_SIZE(InMemoryReader);
+ SET_MEMORY_INFO_NAME(InMemoryReader)
+ SET_SELF_SIZE(InMemoryReader)
private:
InMemoryEntry& entry_;
@@ -674,8 +611,8 @@ class InMemoryEntry final : public EntryBase {
};
InMemoryEntry(std::shared_ptr<BackingStore> backing_store,
- size_t offset,
- size_t byte_length)
+ uint64_t offset,
+ uint64_t byte_length)
: backing_store_(std::move(backing_store)),
offset_(offset),
byte_length_(byte_length) {
@@ -690,14 +627,15 @@ class InMemoryEntry final : public EntryBase {
InMemoryEntry& operator=(const InMemoryEntry&) = delete;
InMemoryEntry& operator=(InMemoryEntry&&) = delete;
- std::unique_ptr<DataQueue::Reader> getReader() override {
- return std::make_unique<InMemoryReader>(*this);
+ std::shared_ptr<DataQueue::Reader> get_reader() override {
+ return std::make_shared<InMemoryReader>(*this);
}
std::unique_ptr<Entry> slice(
- size_t start,
- Maybe<size_t> maybeEnd = Nothing<size_t>()) override {
- const auto makeEntry = [&](size_t start, size_t len) -> std::unique_ptr<Entry> {
+ uint64_t start,
+ std::optional<uint64_t> maybeEnd = std::nullopt) override {
+ const auto makeEntry = [&](uint64_t start,
+ uint64_t len) -> std::unique_ptr<Entry> {
if (len == 0) {
return std::make_unique<EmptyEntry>();
}
@@ -710,8 +648,8 @@ class InMemoryEntry final : public EntryBase {
// The start cannot extend beyond the maximum end point of this entry.
start = std::min(start, offset_ + byte_length_);
- size_t end;
- if (maybeEnd.To(&end)) {
+ if (maybeEnd.has_value()) {
+ uint64_t end = maybeEnd.value();
// The end cannot extend beyond the maximum end point of this entry,
// and the end must be equal to or greater than the start.
end = std::max(start, std::min(offset_ + end, offset_ + byte_length_));
@@ -724,20 +662,21 @@ class InMemoryEntry final : public EntryBase {
return makeEntry(start, byte_length_ - start);
}
- Maybe<size_t> size() const override { return Just(byte_length_); }
+ std::optional<uint64_t> size() const override { return byte_length_; }
- bool isIdempotent() const override { return true; }
+ bool is_idempotent() const override { return true; }
void MemoryInfo(node::MemoryTracker* tracker) const override {
- tracker->TrackField("store", backing_store_);
+ tracker->TrackField(
+ "store", backing_store_, "std::shared_ptr<v8::BackingStore>");
}
- SET_MEMORY_INFO_NAME(InMemoryEntry);
- SET_SELF_SIZE(InMemoryEntry);
+ SET_MEMORY_INFO_NAME(InMemoryEntry)
+ SET_SELF_SIZE(InMemoryEntry)
private:
std::shared_ptr<BackingStore> backing_store_;
- size_t offset_;
- size_t byte_length_;
+ uint64_t offset_;
+ uint64_t byte_length_;
friend class InMemoryReader;
};
@@ -746,9 +685,9 @@ class InMemoryEntry final : public EntryBase {
// An entry that wraps a DataQueue. The entry takes on the characteristics
// of the wrapped dataqueue.
-class DataQueueEntry : public EntryBase {
+class DataQueueEntry : public EntryImpl {
public:
- DataQueueEntry(std::shared_ptr<DataQueue> data_queue)
+ explicit DataQueueEntry(std::shared_ptr<DataQueue> data_queue)
: data_queue_(std::move(data_queue)) {
CHECK(data_queue_);
}
@@ -759,13 +698,12 @@ class DataQueueEntry : public EntryBase {
DataQueueEntry& operator=(const DataQueueEntry&) = delete;
DataQueueEntry& operator=(DataQueueEntry&&) = delete;
- std::unique_ptr<DataQueue::Reader> getReader() override {
- return data_queue_->getReader();
+ std::shared_ptr<DataQueue::Reader> get_reader() override {
+ return std::make_shared<ReaderImpl>(data_queue_->get_reader());
}
std::unique_ptr<Entry> slice(
- size_t start,
- Maybe<size_t> end = Nothing<size_t>()) override {
+ uint64_t start, std::optional<uint64_t> end = std::nullopt) override {
std::shared_ptr<DataQueue> sliced = data_queue_->slice(start, end);
if (!sliced) return nullptr;
@@ -775,10 +713,10 @@ class DataQueueEntry : public EntryBase {
// Returns the number of bytes represented by this Entry if it is
// known. Certain types of entries, such as those backed by streams
// might not know the size in advance and therefore cannot provide
- // a value. In such cases, size() must return v8::Nothing<size_t>.
+ // a value. In such cases, size() must return std::nullopt.
//
// If the entry is idempotent, a size should always be available.
- Maybe<size_t> size() const override { return data_queue_->size(); }
+ std::optional<uint64_t> size() const override { return data_queue_->size(); }
// When true, multiple reads on the object must produce the exact
// same data or the reads will fail. Some sources of entry data,
@@ -786,19 +724,44 @@ class DataQueueEntry : public EntryBase {
// and therefore must not claim to be. If an entry claims to be
// idempotent and cannot preserve that quality, subsequent reads
// must fail with an error when a variance is detected.
- bool isIdempotent() const override { return data_queue_->isIdempotent(); }
+ bool is_idempotent() const override { return data_queue_->is_idempotent(); }
void MemoryInfo(node::MemoryTracker* tracker) const override {
- tracker->TrackField("data_queue", data_queue_);
+ tracker->TrackField(
+ "data_queue", data_queue_, "std::shared_ptr<DataQueue>");
}
DataQueue& getDataQueue() { return *data_queue_; }
- SET_MEMORY_INFO_NAME(DataQueueEntry);
- SET_SELF_SIZE(DataQueueEntry);
+ SET_MEMORY_INFO_NAME(DataQueueEntry)
+ SET_SELF_SIZE(DataQueueEntry)
private:
std::shared_ptr<DataQueue> data_queue_;
+
+ class ReaderImpl : public DataQueue::Reader,
+ public std::enable_shared_from_this<ReaderImpl> {
+ public:
+ explicit ReaderImpl(std::shared_ptr<DataQueue::Reader> inner)
+ : inner_(std::move(inner)) {}
+
+ int Pull(DataQueue::Reader::Next next,
+ int options,
+ DataQueue::Vec* data,
+ size_t count,
+ size_t max_count_hint) override {
+ auto self = shared_from_this();
+ return inner_->Pull(
+ std::move(next), options, data, count, max_count_hint);
+ }
+
+ SET_NO_MEMORY_INFO()
+ SET_MEMORY_INFO_NAME(ReaderImpl)
+ SET_SELF_SIZE(ReaderImpl)
+
+ private:
+ std::shared_ptr<DataQueue::Reader> inner_;
+ };
};
// ============================================================================
@@ -811,77 +774,61 @@ class DataQueueEntry : public EntryBase {
// a tolerable risk here. While FdEntry is considered idempotent, this race
// means that it is indeed possible for multiple reads to return different
// results if the file just happens to get modified.
-class FdEntry final : public EntryBase {
+class FdEntry final : public EntryImpl {
// TODO(@jasnell, @flakey5):
- // * This should only allow reading from regular files. No directories, no pipes, etc.
- // * The reader should support accepting the buffer(s) from the pull, if any. It should
+ // * This should only allow reading from regular files. No directories, no
+ // pipes, etc.
+ // * The reader should support accepting the buffer(s) from the pull, if any.
+ // It should
// only allocate a managed buffer if the pull doesn't provide any.
- // * We might want to consider making the stat on each read sync to eliminate the race
+ // * We might want to consider making the stat on each read sync to eliminate
+ // the race
// condition described in the comment above.
public:
- FdEntry(Environment* env,
- int fd,
- size_t start,
- v8::Maybe<size_t> end,
- BaseObjectPtr<fs::FileHandle> maybe_file_handle =
- BaseObjectPtr<fs::FileHandle>())
- : env_(env),
- fd_(fd),
- start_(0),
- maybe_file_handle_(maybe_file_handle) {
- CHECK(fd);
- if (GetStat(stat_) == 0) {
- if (end.IsNothing()) {
- end_ = stat_.st_size;
- } else {
- end_ = std::min(stat_.st_size, end.FromJust());
- }
- }
- }
+ static std::unique_ptr<FdEntry> Create(Environment* env, Local<Value> path) {
+ // We're only going to create the FdEntry if the file exists.
+ uv_fs_t req;
+ auto cleanup = OnScopeLeave([&] { uv_fs_req_cleanup(&req); });
- FdEntry(Environment* env, BaseObjectPtr<fs::FileHandle> handle)
- : FdEntry(env, handle->GetFD(), 0, Nothing<size_t>(), handle) {}
+ auto buf = std::make_shared<BufferValue>(env->isolate(), path);
+ if (uv_fs_stat(nullptr, &req, buf->out(), nullptr) < 0) return nullptr;
+
+ return std::make_unique<FdEntry>(
+ env, std::move(buf), req.statbuf, 0, req.statbuf.st_size);
+ }
FdEntry(Environment* env,
- int fd,
+ std::shared_ptr<BufferValue> path_,
uv_stat_t stat,
- size_t start,
- size_t end,
- BaseObjectPtr<fs::FileHandle> maybe_file_handle =
- BaseObjectPtr<fs::FileHandle>())
+ uint64_t start,
+ uint64_t end)
: env_(env),
- fd_(end),
- start_(start),
- end_(end),
+ path_(std::move(path_)),
stat_(stat),
- maybe_file_handle_(maybe_file_handle){}
+ start_(start),
+ end_(end) {}
- std::unique_ptr<DataQueue::Reader> getReader() override {
- return std::make_unique<Reader>(this);
+ std::shared_ptr<DataQueue::Reader> get_reader() override {
+ return ReaderImpl::Create(this);
}
std::unique_ptr<Entry> slice(
- size_t start,
- Maybe<size_t> end = Nothing<size_t>()) override {
- size_t new_start = start_ + start;
- size_t new_end = end_;
- if (end.IsJust()) {
- new_end = std::min(end.FromJust() + start, new_end);
+ uint64_t start, std::optional<uint64_t> end = std::nullopt) override {
+ uint64_t new_start = start_ + start;
+ uint64_t new_end = end_;
+ if (end.has_value()) {
+ new_end = std::min(end.value() + start, new_end);
}
CHECK(new_start >= start_);
CHECK(new_end <= end_);
- return std::make_unique<FdEntry>(env_, fd_, stat_, new_start, new_end, maybe_file_handle_);
+ return std::make_unique<FdEntry>(env_, path_, stat_, new_start, new_end);
}
- Maybe<size_t> size() const override {
- return Just(end_ - start_);
- }
+ std::optional<uint64_t> size() const override { return end_ - start_; }
- bool isIdempotent() const override {
- return true;
- }
+ bool is_idempotent() const override { return true; }
Environment* env() const { return env_; }
@@ -889,229 +836,193 @@ class FdEntry final : public EntryBase {
SET_MEMORY_INFO_NAME(FdEntry)
SET_SELF_SIZE(FdEntry)
- class Wrap : public BaseObject {
- public:
- static void New(const FunctionCallbackInfo<Value>& args) {
- CHECK(args.IsConstructCall());
- Environment* env = Environment::GetCurrent(args);
-
- CHECK(args[0]->IsInt32());
- CHECK(args[1]->IsUint32());
- CHECK(args[2]->IsUint32());
-
- int fd = args[0].As<Int32>()->Value();
- size_t start = args[1].As<Uint32>()->Value();
- size_t end = args[1].As<Uint32>()->Value();
-
- new Wrap(env, args.This(), fd, start, Just(end));
- }
+ private:
+ Environment* env_;
+ std::shared_ptr<BufferValue> path_;
+ uv_stat_t stat_;
+ uint64_t start_ = 0;
+ uint64_t end_ = 0;
- static Local<FunctionTemplate> GetConstructorTemplate(Environment* env) {
- Local<FunctionTemplate> tmpl = env->fdentry_constructor_template();
- if (tmpl.IsEmpty()) {
- Isolate* isolate = env->isolate();
- tmpl = NewFunctionTemplate(isolate, New);
+ bool is_modified(const uv_stat_t& other) {
+ return other.st_size != stat_.st_size ||
+ other.st_mtim.tv_nsec != stat_.st_mtim.tv_nsec;
+ }
- tmpl->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "FdEntry"));
- tmpl->Inherit(BaseObject::GetConstructorTemplate(env));
+ static bool CheckModified(FdEntry* entry, int fd) {
+ uv_fs_t req;
+ auto cleanup = OnScopeLeave([&] { uv_fs_req_cleanup(&req); });
+ // TODO(jasnell): Note the use of a sync fs call here is a bit unfortunate.
+ // Doing this asynchronously creates a bit of a race condition tho, a file
+ // could be unmodified when we call the operation but then by the time the
+ // async callback is triggered to give us that answer the file is modified.
+ // While such silliness is still possible here, the sync call at least makes
+ // it less likely to hit the race.
+ if (uv_fs_fstat(nullptr, &req, fd, nullptr) < 0) return true;
+ return entry->is_modified(req.statbuf);
+ }
- env->set_fdentry_constructor_template(tmpl);
+ class ReaderImpl final : public DataQueue::Reader,
+ public StreamListener,
+ public std::enable_shared_from_this<ReaderImpl> {
+ public:
+ static std::shared_ptr<ReaderImpl> Create(FdEntry* entry) {
+ uv_fs_t req;
+ auto cleanup = OnScopeLeave([&] { uv_fs_req_cleanup(&req); });
+ int file =
+ uv_fs_open(nullptr, &req, entry->path_->out(), O_RDONLY, 0, nullptr);
+ if (file < 0 || FdEntry::CheckModified(entry, file)) {
+ uv_fs_close(nullptr, &req, file, nullptr);
+ return nullptr;
}
+ return std::make_shared<ReaderImpl>(
+ BaseObjectPtr<fs::FileHandle>(
+ fs::FileHandle::New(entry->env()->GetBindingData<fs::BindingData>(
+ entry->env()->context()),
+ file,
+ Local<Object>(),
+ entry->start_,
+ entry->end_)),
+ entry);
+ }
- return tmpl;
+ explicit ReaderImpl(BaseObjectPtr<fs::FileHandle> handle, FdEntry* entry)
+ : env_(handle->env()), handle_(std::move(handle)), entry_(entry) {
+ handle_->PushStreamListener(this);
+ handle_->env()->AddCleanupHook(cleanup, this);
}
- static void Initialize(Environment* env, Local<Object> target) {
- SetConstructorFunction(
- env->context(), target, "FdEntry", GetConstructorTemplate(env));
+ ~ReaderImpl() override {
+ handle_->env()->RemoveCleanupHook(cleanup, this);
+ DrainAndClose();
+ handle_->RemoveStreamListener(this);
}
- static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
- registry->Register(New);
+ uv_buf_t OnStreamAlloc(size_t suggested_size) override {
+ return env_->allocate_managed_buffer(suggested_size);
}
- static BaseObjectPtr<Wrap> Create(
- Environment* env,
- int fd,
- size_t start = 0,
- Maybe<size_t> end = Nothing<size_t>()) {
- Local<Object> obj;
- if (!GetConstructorTemplate(env)
- ->InstanceTemplate()
- ->NewInstance(env->context())
- .ToLocal(&obj)) {
- return BaseObjectPtr<Wrap>();
+ void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override {
+ std::shared_ptr<v8::BackingStore> store =
+ env_->release_managed_buffer(buf);
+
+ if (ended_) {
+ // If we got here and ended_ is true, it means we ended and drained
+ // while the read was pending. We're just going to do nothing.
+ CHECK(pending_pulls_.empty());
+ return;
}
- return MakeBaseObject<Wrap>(env, obj, fd, start, end);
- }
+ CHECK(reading_);
+ auto pending = DequeuePendingPull();
- Wrap(Environment* env, Local<Object> obj, int fd, size_t start, v8::Maybe<size_t> end)
- : BaseObject(env, obj),
- inner_(std::make_unique<FdEntry>(env, fd, start, end)) {
- MakeWeak();
- }
+ if (CheckModified(entry_, handle_->GetFD())) {
+ DrainAndClose();
+ // The file was modified while the read was pending. We need to error.
+ std::move(pending.next)(UV_EINVAL, nullptr, 0, [](uint64_t) {});
+ return;
+ }
- std::unique_ptr<DataQueue::Entry> detach() {
- return std::move(inner_);
- }
+ if (nread < 0) {
+ if (nread == UV_EOF) {
+ std::move(pending.next)(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {});
+ } else {
+ std::move(pending.next)(nread, nullptr, 0, [](uint64_t) {});
+ }
- bool isDetached() const { return inner_ == nullptr; }
+ return DrainAndClose();
+ }
- void MemoryInfo(MemoryTracker* tracker) const override {
- tracker->TrackField("entry", inner_);
- }
- SET_MEMORY_INFO_NAME(FdEntry::Wrap)
- SET_SELF_SIZE(Wrap)
+ DataQueue::Vec vec;
+ vec.base = static_cast<uint8_t*>(store->Data());
+ vec.len = static_cast<uint64_t>(nread);
+ std::move(pending.next)(
+ bob::STATUS_CONTINUE, &vec, 1, [store](uint64_t) {});
- private:
- std::unique_ptr<FdEntry> inner_;
- };
+ if (pending_pulls_.empty()) {
+ reading_ = false;
+ if (handle_->IsAlive()) handle_->ReadStop();
+ }
+ }
- private:
- Environment* env_;
- int fd_;
- size_t start_ = 0;
- size_t end_ = 0;
- uv_stat_t stat_;
- uv_fs_t req;
- BaseObjectPtr<fs::FileHandle> maybe_file_handle_;
+ int Pull(Next next,
+ int options,
+ DataQueue::Vec* data,
+ size_t count,
+ size_t max_count_hint = bob::kMaxCountHint) override {
+ if (ended_ || !handle_->IsAlive()) {
+ std::move(next)(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {});
+ return bob::STATUS_EOS;
+ }
- int GetStat(uv_stat_t& stat) {
- int err = uv_fs_fstat(env_->event_loop(), &req, fd_, nullptr);
- stat = req.statbuf;
- return err;
- }
+ if (FdEntry::CheckModified(entry_, handle_->GetFD())) {
+ DrainAndClose();
+ std::move(next)(UV_EINVAL, nullptr, 0, [](uint64_t) {});
+ return UV_EINVAL;
+ }
- class Reader : public DataQueue::Reader {
- public:
- Reader(FdEntry* entry)
- : entry_(entry),
- offset_(entry->start_),
- end_(entry_->end_) {}
-
- int Pull(
- Next next,
- int options,
- DataQueue::Vec* data,
- size_t count,
- size_t max_count_hint = bob::kMaxCountHint) override {
- // TODO(@jasnell): For now, we're going to ignore data and count.
- // Later, we can support these to allow the caller to allocate the
- // buffers we read into. To keep things easier for now, we're going
- // to read into a pre-allocated buffer.
- if (ended_ || offset_ == end_) {
- std::move(next)(bob::STATUS_EOS, nullptr, 0, [](size_t) {});
- return bob::STATUS_EOS;
+ pending_pulls_.emplace_back(std::move(next), shared_from_this());
+ if (!reading_) {
+ reading_ = true;
+ handle_->ReadStart();
}
- // offset_ should always be less than end_ here
- CHECK_LT(offset_, end_);
- new PendingRead(this, std::move(next));
return bob::STATUS_WAIT;
}
SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(FdEntry::Reader)
- SET_SELF_SIZE(Reader)
+ SET_SELF_SIZE(ReaderImpl)
private:
- FdEntry* entry_;
- bool ended_ = false;
- size_t offset_;
- size_t end_;
-
- struct PendingRead {
- static constexpr size_t DEFAULT_BUFFER_SIZE = 4096;
- Reader* reader;
+ struct PendingPull {
Next next;
- uv_fs_t req_;
- uv_buf_t uvbuf;
-
- PendingRead(Reader* reader, Next next)
- : reader(reader),
- next(std::move(next)),
- uvbuf(reader->entry_->env()->allocate_managed_buffer(
- std::min(DEFAULT_BUFFER_SIZE, reader->end_ - reader->offset_)
- )) {
- req_.data = this;
- uv_fs_fstat(reader->entry_->env()->event_loop(), &req_,
- reader->entry_->fd_, &PendingRead::OnStat);
- }
-
- void Done() {
- delete this;
- }
-
- bool checkEnded() {
- if (reader->ended_) {
- // A previous read ended this readable. Let's stop here.
- std::move(next)(bob::STATUS_EOS, nullptr, 0, [](size_t) {});
- return true;
- }
- if (req_.result < 0) {
- std::move(next)(req_.result, nullptr, 0, [](size_t) {});
- return true;
- }
- return false;
- }
-
- void OnStat() {
- if (checkEnded()) return Done();
- uv_stat_t current_stat = req_.statbuf;
- uv_stat_t& orig = reader->entry_->stat_;
- if (current_stat.st_size != orig.st_size ||
- current_stat.st_ctim.tv_nsec != orig.st_ctim.tv_nsec ||
- current_stat.st_mtim.tv_nsec != orig.st_mtim.tv_nsec) {
- // The fd was modified. Fail the read.
- std::move(next)(UV_EINVAL, nullptr, 0, [](size_t) {});
- return;
- }
+ std::shared_ptr<ReaderImpl> self;
+ PendingPull(Next next, std::shared_ptr<ReaderImpl> self)
+ : next(std::move(next)), self(std::move(self)) {}
+ };
- // Now we read from the file.
- uv_fs_read(reader->entry_->env()->event_loop(), &req_,
- reader->entry_->fd_,
- &uvbuf, 1,
- reader->offset_,
- OnRead);
- }
+ Environment* env_;
+ BaseObjectPtr<fs::FileHandle> handle_;
+ FdEntry* entry_;
+ std::deque<PendingPull> pending_pulls_;
+ bool reading_ = false;
+ bool ended_ = false;
- void OnRead() {
- auto on_exit = OnScopeLeave([this] { Done(); });
- if (checkEnded()) return;
- std::shared_ptr<BackingStore> store =
- reader->entry_->env()->release_managed_buffer(uvbuf);
- size_t amountRead = req_.result;
- // We should never read past end_
- CHECK_LE(amountRead + reader->offset_, reader->end_);
- reader->offset_ += amountRead;
- if (reader->offset_ == reader->end_)
- reader->ended_ = true;
- DataQueue::Vec vec = {
- reinterpret_cast<uint8_t*>(store->Data()),
- amountRead
- };
- std::move(next)(
- reader->ended_ ? bob::STATUS_END : bob::STATUS_CONTINUE,
- &vec, 1, [store](size_t) mutable {});
- }
+ static void cleanup(void* self) {
+ auto ptr = static_cast<ReaderImpl*>(self);
+ ptr->DrainAndClose();
+ }
- static void OnStat(uv_fs_t* req) {
- PendingRead* read = ContainerOf(&PendingRead::req_, req);
- read->OnStat();
+ void DrainAndClose() {
+ if (ended_) return;
+ ended_ = true;
+ while (!pending_pulls_.empty()) {
+ auto pending = DequeuePendingPull();
+ std::move(pending.next)(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {});
}
+ handle_->ReadStop();
+
+ // We fallback to a sync close on the raw fd here because it is the
+ // easiest, simplest thing to do. All of FileHandle's close mechanisms
+ // assume async close and cleanup, while DrainAndClose might be running
+ // in the destructor during GC, for instance. As a todo, FileHandle could
+ // provide a sync mechanism for closing the FD but, for now, this
+ // approach works.
+ int fd = handle_->Release();
+ uv_fs_t req;
+ uv_fs_close(nullptr, &req, fd, nullptr);
+ uv_fs_req_cleanup(&req);
+ }
- static void OnRead(uv_fs_t* req) {
- PendingRead* read = ContainerOf(&PendingRead::req_, req);
- read->OnRead();
- }
- };
+ PendingPull DequeuePendingPull() {
+ CHECK(!pending_pulls_.empty());
+ auto pop = OnScopeLeave([this] { pending_pulls_.pop_front(); });
+ return std::move(pending_pulls_.front());
+ }
- friend struct PendingRead;
friend class FdEntry;
};
- friend class Reader;
- friend struct Reader::PendingRead;
+ friend class ReaderImpl;
};
// ============================================================================
@@ -1122,9 +1033,9 @@ std::shared_ptr<DataQueue> DataQueue::CreateIdempotent(
std::vector<std::unique_ptr<Entry>> list) {
// Any entry is invalid for an idempotent DataQueue if any of the entries
// are nullptr or is not idempotent.
- size_t size = 0;
+ uint64_t size = 0;
const auto isInvalid = [&size](auto& item) {
- if (item == nullptr || !item->isIdempotent()) {
+ if (item == nullptr || !item->is_idempotent()) {
return true; // true means the entry is not valid here.
}
@@ -1133,9 +1044,11 @@ std::shared_ptr<DataQueue> DataQueue::CreateIdempotent(
// of the entries are unable to provide a size, then
// we assume we cannot safely treat this entry as
// idempotent even if it claims to be.
- size_t itemSize;
- if (item->size().To(&itemSize)) { size += itemSize; }
- else return true; // true means the entry is not valid here.
+ if (item->size().has_value()) {
+ size += item->size().value();
+ } else {
+ return true; // true means the entry is not valid here.
+ }
return false;
};
@@ -1147,7 +1060,7 @@ std::shared_ptr<DataQueue> DataQueue::CreateIdempotent(
return std::make_shared<DataQueueImpl>(std::move(list), size);
}
-std::shared_ptr<DataQueue> DataQueue::Create(Maybe<size_t> capped) {
+std::shared_ptr<DataQueue> DataQueue::Create(std::optional<uint64_t> capped) {
return std::make_shared<DataQueueImpl>(capped);
}
@@ -1170,9 +1083,7 @@ std::unique_ptr<DataQueue::Entry> DataQueue::CreateInMemoryEntryFromView(
std::unique_ptr<DataQueue::Entry>
DataQueue::CreateInMemoryEntryFromBackingStore(
- std::shared_ptr<BackingStore> store,
- size_t offset,
- size_t length) {
+ std::shared_ptr<BackingStore> store, uint64_t offset, uint64_t length) {
CHECK(store);
if (offset + length > store->ByteLength()) {
return nullptr;
@@ -1185,18 +1096,18 @@ std::unique_ptr<DataQueue::Entry> DataQueue::CreateDataQueueEntry(
return std::make_unique<DataQueueEntry>(std::move(data_queue));
}
-std::unique_ptr<DataQueue::Entry> DataQueue::CreateFdEntry(
- BaseObjectPtr<fs::FileHandle> handle) {
- return std::make_unique<FdEntry>(handle->env(), handle);
+std::unique_ptr<DataQueue::Entry> DataQueue::CreateFdEntry(Environment* env,
+ Local<Value> path) {
+ return FdEntry::Create(env, path);
}
void DataQueue::Initialize(Environment* env, v8::Local<v8::Object> target) {
- FdEntry::Wrap::Initialize(env, target);
+ // Nothing to do here currently.
}
void DataQueue::RegisterExternalReferences(
ExternalReferenceRegistry* registry) {
- FdEntry::Wrap::RegisterExternalReferences(registry);
+ // Nothing to do here currently.
}
} // namespace node
diff --git a/src/dataqueue/queue.h b/src/dataqueue/queue.h
index 955f9ec589..a1a297a8fc 100644
--- a/src/dataqueue/queue.h
+++ b/src/dataqueue/queue.h
@@ -3,15 +3,16 @@
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#include <base_object.h>
+#include <memory_tracker.h>
#include <node.h>
#include <node_bob.h>
#include <node_file.h>
-#include <memory_tracker.h>
#include <stream_base.h>
-#include <v8.h>
#include <uv.h>
+#include <v8.h>
#include <memory>
+#include <optional>
#include <vector>
namespace node {
@@ -101,17 +102,18 @@ namespace node {
// To read from a DataQueue, we use the node::bob::Source API
// (see src/node_bob.h).
//
-// std::unique_ptr<DataQueue::Reader> reader = data_queue->getReader();
+// std::shared_ptr<DataQueue::Reader> reader = data_queue->get_reader();
//
// reader->Pull(
-// [](int status, const DataQueue::Vec* vecs, size_t count, Done done) {
+// [](int status, const DataQueue::Vec* vecs,
+// uint64_t count, Done done) {
// // status is one of node::bob::Status
// // vecs is zero or more data buffers containing the read data
// // count is the number of vecs
// // done is a callback to be invoked when done processing the data
// }, options, nullptr, 0, 16);
//
-// Keep calling Pull() until status is equal to node::bob::Status::STATUS_END.
+// Keep calling Pull() until status is equal to node::bob::Status::STATUS_EOS.
//
// For idempotent DataQueues, any number of readers can be created and
// pull concurrently from the same DataQueue. The DataQueue can be read
@@ -126,15 +128,14 @@ class DataQueue : public MemoryRetainer {
public:
struct Vec {
uint8_t* base;
- size_t len;
+ uint64_t len;
};
// A DataQueue::Reader consumes the DataQueue. If the data queue is
// idempotent, multiple Readers can be attached to the DataQueue at
// any given time, all guaranteed to yield the same result when the
// data is read. Otherwise, only a single Reader can be attached.
- class Reader : public MemoryRetainer,
- public bob::Source<Vec> {
+ class Reader : public MemoryRetainer, public bob::Source<Vec> {
public:
using Next = bob::Next<Vec>;
using Done = bob::Done;
@@ -150,26 +151,25 @@ class DataQueue : public MemoryRetainer {
// offset is omitted, the slice extends to the end of the
// data.
//
- // Creating a slice is only possible if isIdempotent() returns
+ // Creating a slice is only possible if is_idempotent() returns
// true. This is because consuming either the original entry or
// the new entry would change the state of the other in non-
- // deterministic ways. When isIdempotent() returns false, slice()
+ // deterministic ways. When is_idempotent() returns false, slice()
// must return a nulled unique_ptr.
//
// Creating a slice is also only possible if the size of the
- // entry is known. If size() returns v8::Nothing<size_t>, slice()
+ // entry is known. If size() returns std::nullopt, slice()
// must return a nulled unique_ptr.
virtual std::unique_ptr<Entry> slice(
- size_t start,
- v8::Maybe<size_t> end = v8::Nothing<size_t>()) = 0;
+ uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0;
// Returns the number of bytes represented by this Entry if it is
// known. Certain types of entries, such as those backed by streams
// might not know the size in advance and therefore cannot provide
- // a value. In such cases, size() must return v8::Nothing<size_t>.
+ // a value. In such cases, size() must return v8::Nothing<uint64_t>.
//
// If the entry is idempotent, a size should always be available.
- virtual v8::Maybe<size_t> size() const = 0;
+ virtual std::optional<uint64_t> size() const = 0;
// When true, multiple reads on the object must produce the exact
// same data or the reads will fail. Some sources of entry data,
@@ -177,7 +177,7 @@ class DataQueue : public MemoryRetainer {
// and therefore must not claim to be. If an entry claims to be
// idempotent and cannot preserve that quality, subsequent reads
// must fail with an error when a variance is detected.
- virtual bool isIdempotent() const = 0;
+ virtual bool is_idempotent() const = 0;
};
// Creates an idempotent DataQueue with a pre-established collection
@@ -190,7 +190,7 @@ class DataQueue : public MemoryRetainer {
// mutated and updated such that multiple reads are not guaranteed
// to produce the same result. The entries added can be of any type.
static std::shared_ptr<DataQueue> Create(
- v8::Maybe<size_t> capped = v8::Nothing<size_t>());
+ std::optional<uint64_t> capped = std::nullopt);
// Creates an idempotent Entry from a v8::ArrayBufferView. To help
// ensure idempotency, the underlying ArrayBuffer is detached from
@@ -207,26 +207,26 @@ class DataQueue : public MemoryRetainer {
// is not detachable, nullptr will be returned.
static std::unique_ptr<Entry> CreateInMemoryEntryFromBackingStore(
std::shared_ptr<v8::BackingStore> store,
- size_t offset,
- size_t length);
+ uint64_t offset,
+ uint64_t length);
static std::unique_ptr<Entry> CreateDataQueueEntry(
std::shared_ptr<DataQueue> data_queue);
- static std::unique_ptr<Entry> CreateFdEntry(
- BaseObjectPtr<fs::FileHandle> handle);
+ static std::unique_ptr<Entry> CreateFdEntry(Environment* env,
+ v8::Local<v8::Value> path);
// Creates a Reader for the given queue. If the queue is idempotent,
// any number of readers can be created, all of which are guaranteed
// to provide the same data. Otherwise, only a single reader is
// permitted.
- virtual std::unique_ptr<Reader> getReader() = 0;
+ virtual std::shared_ptr<Reader> get_reader() = 0;
// Append a single new entry to the queue. Appending is only allowed
- // when isIdempotent() is false. v8::Nothing<bool>() will be returned
- // if isIdempotent() is true. v8::Just(false) will be returned if the
+ // when is_idempotent() is false. std::nullopt will be returned
+ // if is_idempotent() is true. std::optional(false) will be returned if the
// data queue is not idempotent but the entry otherwise cannot be added.
- virtual v8::Maybe<bool> append(std::unique_ptr<Entry> entry) = 0;
+ virtual std::optional<bool> append(std::unique_ptr<Entry> entry) = 0;
// Caps the size of this DataQueue preventing additional entries to
// be added if those cause the size to extend beyond the specified
@@ -239,12 +239,12 @@ class DataQueue : public MemoryRetainer {
// If the size of the data queue is not known, the limit will be
// ignored and no additional entries will be allowed at all.
//
- // If isIdempotent is true capping is unnecessary because the data
+ // If is_idempotent is true capping is unnecessary because the data
// queue cannot be appended to. In that case, cap() is a non-op.
//
// If the data queue has already been capped, cap can be called
// again with a smaller size.
- virtual void cap(size_t limit = 0) = 0;
+ virtual void cap(uint64_t limit = 0) = 0;
// Returns a new DataQueue that is a view over this queues data
// from the start offset to the ending offset. If the end offset
@@ -252,39 +252,38 @@ class DataQueue : public MemoryRetainer {
//
// The slice will coverage a range from start up to, but excluding, end.
//
- // Creating a slice is only possible if isIdempotent() returns
+ // Creating a slice is only possible if is_idempotent() returns
// true. This is because consuming either the original DataQueue or
// the new queue would change the state of the other in non-
- // deterministic ways. When isIdempotent() returns false, slice()
+ // deterministic ways. When is_idempotent() returns false, slice()
// must return a nulled unique_ptr.
//
// Creating a slice is also only possible if the size of the
- // DataQueue is known. If size() returns v8::Nothing<size_t>, slice()
+ // DataQueue is known. If size() returns std::nullopt, slice()
// must return a null unique_ptr.
virtual std::shared_ptr<DataQueue> slice(
- size_t start,
- v8::Maybe<size_t> end = v8::Nothing<size_t>()) = 0;
+ uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0;
// The size of DataQueue is the total size of all of its member entries.
// If any of the entries is not able to specify a size, the DataQueue
// will also be incapable of doing so, in which case size() must return
- // v8::Nothing<size_t>.
- virtual v8::Maybe<size_t> size() const = 0;
+ // std::nullopt.
+ virtual std::optional<uint64_t> size() const = 0;
// A DataQueue is idempotent only if all of its member entries are
// idempotent.
- virtual bool isIdempotent() const = 0;
+ virtual bool is_idempotent() const = 0;
// True only if cap is called or the data queue is a limited to a
// fixed size.
- virtual bool isCapped() const = 0;
+ virtual bool is_capped() const = 0;
// If the data queue has been capped, and the size of the data queue
// is known, maybeCapRemaining will return the number of additional
// bytes the data queue can receive before reaching the cap limit.
// If the size of the queue cannot be known, or the cap has not
- // been set, maybeCapRemaining() will return v8::Nothing<size_t>.
- virtual v8::Maybe<size_t> maybeCapRemaining() const = 0;
+ // been set, maybeCapRemaining() will return std::nullopt.
+ virtual std::optional<uint64_t> maybeCapRemaining() const = 0;
static void Initialize(Environment* env, v8::Local<v8::Object> target);
static void RegisterExternalReferences(ExternalReferenceRegistry* registry);