summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp')
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp256
1 files changed, 167 insertions, 89 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
index f61da49a1b..a4ed6cbf19 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
@@ -26,20 +26,24 @@
#include <cstdlib>
#include <cstring>
#include "qpid/linearstore/jrnl/utils/file_hdr.h"
+#include "qpid/linearstore/jrnl/jcfg.h"
#include "qpid/linearstore/jrnl/jcntl.h"
#include "qpid/linearstore/jrnl/jerrno.h"
#include "qpid/linearstore/jrnl/JournalFile.h"
#include <sstream>
#include <stdint.h>
-//#include <iostream> // DEBUG
+#include <iostream> // DEBUG
namespace qpid
{
namespace qls_jrnl
{
-wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc):
+wmgr::wmgr(jcntl* jc,
+ enq_map& emap,
+ txn_map& tmap,
+ LinearFileController& lfc):
pmgr(jc, emap, tmap),
_lfc(lfc),
_max_dtokpp(0),
@@ -52,8 +56,13 @@ wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc):
_txn_pending_set()
{}
-wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc, const uint32_t max_dtokpp, const uint32_t max_iowait_us):
- pmgr(jc, emap, tmap /* , dtoklp */),
+wmgr::wmgr(jcntl* jc,
+ enq_map& emap,
+ txn_map& tmap,
+ LinearFileController& lfc,
+ const uint32_t max_dtokpp,
+ const uint32_t max_iowait_us):
+ pmgr(jc, emap, tmap),
_lfc(lfc),
_max_dtokpp(max_dtokpp),
_max_io_wait_us(max_iowait_us),
@@ -71,9 +80,12 @@ wmgr::~wmgr()
}
void
-wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks,
- const uint16_t wcache_num_pages, const uint32_t max_dtokpp, const uint32_t max_iowait_us,
- std::size_t eo)
+wmgr::initialize(aio_callback* const cbp,
+ const uint32_t wcache_pgsize_sblks,
+ const uint16_t wcache_num_pages,
+ const uint32_t max_dtokpp,
+ const uint32_t max_iowait_us,
+ std::size_t eo)
{
_enq_busy = false;
_deq_busy = false;
@@ -86,26 +98,38 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks,
if (eo)
{
- const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS;
- uint32_t data_dblks = (eo / JRNL_DBLK_SIZE_BYTES) - 4; // 4 dblks for file hdr
+ const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS;
+ uint32_t data_dblks = (eo / QLS_DBLK_SIZE_BYTES) - 4; // 4 dblks for file hdr
_pg_cntr = data_dblks / wr_pg_size_dblks;
_pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks);
}
}
iores
-wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len,
- const std::size_t this_data_len, data_tok* dtokp, const void* const xid_ptr,
- const std::size_t xid_len, const bool transient, const bool external)
+wmgr::enqueue(const void* const data_buff,
+ const std::size_t tot_data_len,
+ const std::size_t this_data_len,
+ data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len,
+ const bool transient,
+ const bool external)
{
if (xid_len)
assert(xid_ptr != 0);
- if (_deq_busy || _abort_busy || _commit_busy)
- return RHM_IORES_BUSY;
+ if (_deq_busy || _abort_busy || _commit_busy) {
+ std::ostringstream oss;
+ oss << "RHM_IORES_BUSY: enqueue while part way through another op:";
+ oss << " _deq_busy=" << (_deq_busy?"T":"F");
+ oss << " _abort_busy=" << (_abort_busy?"T":"F");
+ oss << " _commit_busy=" << (_commit_busy?"T":"F");
+ throw jexception(oss.str()); // TODO: complete exception
+ }
- if (this_data_len != tot_data_len && !external)
- return RHM_IORES_NOTIMPL;
+ if (this_data_len != tot_data_len && !external) {
+ throw jexception("RHM_IORES_NOTIMPL: partial enqueues not implemented"); // TODO: complete exception;
+ }
iores res = pre_write_check(WMGR_ENQUEUE, dtokp, xid_len, tot_data_len, external);
if (res != RHM_IORES_SUCCESS)
@@ -124,9 +148,8 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len,
}
}
- uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();/*_wrfc.get_incr_rid()*/
- _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len/*, _wrfc.owi()*/, transient,
- external);
+ uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();
+ _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, transient, external);
if (!cont)
{
dtokp->set_rid(rid);
@@ -137,14 +160,16 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len,
dtokp->clear_xid();
_enq_busy = true;
}
+//std::cout << "---+++ wmgr::enqueue() ENQ rid=0x" << std::hex << rid << " " << std::dec << std::flush; // DEBUG
bool done = false;
while (!done)
{
- assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
- void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
+//std::cout << "*" << std::flush; // DEBUG
+ assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS);
+ void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
uint32_t data_offs_dblks = dtokp->dblocks_written();
uint32_t ret = _enq_rec.encode(wptr, data_offs_dblks,
- (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
+ (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0) {
@@ -159,6 +184,7 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len,
// Is the encoding of this record complete?
if (dtokp->dblocks_written() >= _enq_rec.rec_size_dblks())
{
+//std::cout << "!" << std::flush; // DEBUG
// TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns.
dtokp->set_wstate(data_tok::ENQ_SUBM);
dtokp->set_dsize(tot_data_len);
@@ -166,7 +192,8 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len,
// long multi-page messages have their token on the page containing the END of the
// message. AIO callbacks will then only process this token when entire message is
// enqueued.
- _lfc.incrEnqueuedRecordCount();
+ _lfc.incrEnqueuedRecordCount(dtokp->fid());
+//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount() << std::dec << std::flush; // DEBUG
if (xid_len) // If part of transaction, add to transaction map
{
@@ -185,26 +212,37 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len,
}
done = true;
- }
- else
+ } else {
+//std::cout << "$" << std::endl << std::flush; // DEBUG
dtokp->set_wstate(data_tok::ENQ_PART);
+ }
file_header_check(rid, cont, _enq_rec.rec_size_dblks() - data_offs_dblks);
- flush_check(res, cont, done);
+ flush_check(res, cont, done, rid);
}
if (dtokp->wstate() >= data_tok::ENQ_SUBM)
_enq_busy = false;
+//std::cout << " res=" << iores_str(res) << " _enq_busy=" << (_enq_busy?"T":"F") << std::endl << std::flush; // DEBUG
return res;
}
iores
-wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len, const bool txn_coml_commit)
+wmgr::dequeue(data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len,
+ const bool txn_coml_commit)
{
if (xid_len)
assert(xid_ptr != 0);
- if (_enq_busy || _abort_busy || _commit_busy)
- return RHM_IORES_BUSY;
+ if (_enq_busy || _abort_busy || _commit_busy) {
+ std::ostringstream oss;
+ oss << "RHM_IORES_BUSY: dequeue while part way through another op:";
+ oss << " _enq_busy=" << (_enq_busy?"T":"F");
+ oss << " _abort_busy=" << (_abort_busy?"T":"F");
+ oss << " _commit_busy=" << (_commit_busy?"T":"F");
+ throw jexception(oss.str()); // TODO: complete exception
+ }
iores res = pre_write_check(WMGR_DEQUEUE, dtokp);
if (res != RHM_IORES_SUCCESS)
@@ -224,7 +262,7 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_
}
const bool ext_rid = dtokp->external_rid();
- uint64_t rid = (ext_rid | cont) ? dtokp->rid() : /*_wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc
+ uint64_t rid = (ext_rid | cont) ? dtokp->rid() : _lfc.getNextRecordId();
uint64_t dequeue_rid = (ext_rid | cont) ? dtokp->dequeue_rid() : dtokp->rid();
_deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len/*, _wrfc.owi()*/, txn_coml_commit);
if (!cont)
@@ -242,14 +280,16 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_
dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
_deq_busy = true;
}
+//std::cout << "---+++ wmgr::dequeue() DEQ rid=0x" << std::hex << rid << " drid=0x" << dequeue_rid << " " << std::dec << std::flush; // DEBUG
bool done = false;
while (!done)
{
- assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
- void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
+//std::cout << "*" << std::flush; // DEBUG
+ assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS);
+ void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
uint32_t data_offs_dblks = dtokp->dblocks_written();
uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
- (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
+ (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0) {
@@ -264,6 +304,7 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_
// Is the encoding of this record complete?
if (dtokp->dblocks_written() >= _deq_rec.rec_size_dblks())
{
+//std::cout << "!" << std::flush; // DEBUG
// TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns.
dtokp->set_wstate(data_tok::DEQ_SUBM);
@@ -276,7 +317,7 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_
}
else
{
- int16_t fid;
+ uint64_t fid;
short eres = _emap.get_remove_pfid(dtokp->dequeue_rid(), fid);
if (eres < enq_map::EMAP_OK) // fail
{
@@ -293,30 +334,43 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_
throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
}
}
- _lfc.decrEnqueuedRecordCount();
+//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount(fid) << std::dec << std::flush; // DEBUG
+//try {
+ _lfc.decrEnqueuedRecordCount(fid);
+//} catch (std::exception& e) { std::cout << "***OOPS*** " << e.what() << " cfid=" << _lfc.getCurrentFileSeqNum() << " fid=" << fid << std::flush; throw; }
}
done = true;
- }
- else
+ } else {
+//std::cout << "$" << std::flush; // DEBUG
dtokp->set_wstate(data_tok::DEQ_PART);
+ }
file_header_check(rid, cont, _deq_rec.rec_size_dblks() - data_offs_dblks);
- flush_check(res, cont, done);
+ flush_check(res, cont, done, rid);
}
if (dtokp->wstate() >= data_tok::DEQ_SUBM)
_deq_busy = false;
+//std::cout << " res=" << iores_str(res) << " _deq_busy=" << (_deq_busy?"T":"F") << std::endl << std::flush; // DEBUG
return res;
}
iores
-wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len)
+wmgr::abort(data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len)
{
// commit and abort MUST have a valid xid
assert(xid_ptr != 0 && xid_len > 0);
- if (_enq_busy || _deq_busy || _commit_busy)
- return RHM_IORES_BUSY;
+ if (_enq_busy || _deq_busy || _commit_busy) {
+ std::ostringstream oss;
+ oss << "RHM_IORES_BUSY: abort while part way through another op:";
+ oss << " _enq_busy=" << (_enq_busy?"T":"F");
+ oss << " _deq_busy=" << (_deq_busy?"T":"F");
+ oss << " _commit_busy=" << (_commit_busy?"T":"F");
+ throw jexception(oss.str()); // TODO: complete exception
+ }
iores res = pre_write_check(WMGR_ABORT, dtokp);
if (res != RHM_IORES_SUCCESS)
@@ -348,11 +402,11 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le
bool done = false;
while (!done)
{
- assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
- void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
+ assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS);
+ void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
uint32_t data_offs_dblks = dtokp->dblocks_written();
uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
- (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
+ (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0)
@@ -392,7 +446,7 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le
dtokp->set_wstate(data_tok::ABORT_PART);
file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks);
- flush_check(res, cont, done);
+ flush_check(res, cont, done, rid);
}
if (dtokp->wstate() >= data_tok::ABORT_SUBM)
_abort_busy = false;
@@ -400,13 +454,21 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le
}
iores
-wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len)
+wmgr::commit(data_tok* dtokp,
+ const void* const xid_ptr,
+ const std::size_t xid_len)
{
// commit and abort MUST have a valid xid
assert(xid_ptr != 0 && xid_len > 0);
- if (_enq_busy || _deq_busy || _abort_busy)
- return RHM_IORES_BUSY;
+ if (_enq_busy || _deq_busy || _abort_busy) {
+ std::ostringstream oss;
+ oss << "RHM_IORES_BUSY: commit while part way through another op:";
+ oss << " _enq_busy=" << (_enq_busy?"T":"F");
+ oss << " _deq_busy=" << (_deq_busy?"T":"F");
+ oss << " _abort_busy=" << (_abort_busy?"T":"F");
+ throw jexception(oss.str()); // TODO: complete exception
+ }
iores res = pre_write_check(WMGR_COMMIT, dtokp);
if (res != RHM_IORES_SUCCESS)
@@ -438,11 +500,11 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l
bool done = false;
while (!done)
{
- assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
- void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
+ assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS);
+ void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
uint32_t data_offs_dblks = dtokp->dblocks_written();
uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
- (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
+ (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0)
@@ -475,7 +537,7 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l
}
else // txn dequeue
{
- int16_t fid;
+ uint64_t fid;
short eres = _emap.get_remove_pfid(itr->_drid, fid, true);
if (eres < enq_map::EMAP_OK) // fail
{
@@ -509,7 +571,7 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l
dtokp->set_wstate(data_tok::COMMIT_PART);
file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks);
- flush_check(res, cont, done);
+ flush_check(res, cont, done, rid);
}
if (dtokp->wstate() >= data_tok::COMMIT_SUBM)
_commit_busy = false;
@@ -517,29 +579,34 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l
}
void
-wmgr::file_header_check(const uint64_t rid, const bool cont, const uint32_t rec_dblks_rem)
+wmgr::file_header_check(const uint64_t rid,
+ const bool cont,
+ const uint32_t rec_dblks_rem)
{
if (_lfc.isEmpty()) // File never written (i.e. no header or data)
{
std::size_t fro = 0;
if (cont) {
- bool file_fit = rec_dblks_rem <= _lfc.dataSize_sblks() * JRNL_SBLK_SIZE_DBLKS; // Will fit within this journal file
- bool file_full = rec_dblks_rem == _lfc.dataSize_sblks() * JRNL_SBLK_SIZE_DBLKS; // Will exactly fill this journal file
+ bool file_fit = rec_dblks_rem <= _lfc.dataSize_sblks() * QLS_SBLK_SIZE_DBLKS; // Will fit within this journal file
+ bool file_full = rec_dblks_rem == _lfc.dataSize_sblks() * QLS_SBLK_SIZE_DBLKS; // Will exactly fill this journal file
if (file_fit && !file_full) {
- fro = (rec_dblks_rem + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS)) * JRNL_DBLK_SIZE_BYTES;
+ fro = (rec_dblks_rem + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS)) * QLS_DBLK_SIZE_BYTES;
}
} else {
- fro = QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_BYTES;
+ fro = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
}
_lfc.asyncFileHeaderWrite(_ioctx, 0, rid, fro);
+ _aio_evt_rem++;
}
}
void
-wmgr::flush_check(iores& res, bool& cont, bool& done)
+wmgr::flush_check(iores& res,
+ bool& cont,
+ bool& done, const uint64_t /*rid*/) // DEBUG
{
// Is page is full, flush
- if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS)
+ if (_pg_offset_dblks >= _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS)
{
res = write_flush();
assert(res == RHM_IORES_SUCCESS);
@@ -558,6 +625,7 @@ wmgr::flush_check(iores& res, bool& cont, bool& done)
if (!done) {
cont = true;
}
+//std::cout << "***** wmgr::flush_check(): GET NEXT FILE: rid=0x" << std::hex << rid << std::dec << " res=" << iores_str(res) << " cont=" << (cont?"T":"F") << " done=" << (done?"T":"F") << std::endl; // DEBUG
}
}
}
@@ -580,28 +648,28 @@ wmgr::write_flush()
// Don't bother flushing an empty page or one that is still in state AIO_PENDING
if (_cached_offset_dblks)
{
- if (_page_cb_arr[_pg_index]._state == AIO_PENDING)
+ if (_page_cb_arr[_pg_index]._state == AIO_PENDING) {
+//std::cout << "#"; // DEBUG
res = RHM_IORES_PAGE_AIOWAIT;
- else
- {
+ } else {
if (_page_cb_arr[_pg_index]._state != IN_USE)
{
std::ostringstream oss;
oss << "pg_index=" << _pg_index << " state=" << _page_cb_arr[_pg_index].state_str();
- throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr",
- "write_flush");
+ throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr", "write_flush");
}
// Send current page using AIO
- // In manual flushes, dblks may not coincide with sblks, add filler records ("RHMx")
- // if necessary.
+ // In manual flushes, dblks may not coincide with sblks, add filler records ("RHMx") if necessary.
dblk_roundup();
- std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * JRNL_DBLK_SIZE_BYTES;
+ std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * QLS_DBLK_SIZE_BYTES;
aio_cb* aiocbp = &_aio_cb_arr[_pg_index];
_lfc.asyncPageWrite(_ioctx, aiocbp, (char*)_page_ptr_arr[_pg_index] + pg_offs, _cached_offset_dblks);
+ _page_cb_arr[_pg_index]._state = AIO_PENDING;
_aio_evt_rem++;
+//std::cout << "." << _aio_evt_rem << std::flush; // DEBUG
_cached_offset_dblks = 0;
_jc->instr_incr_outstanding_aio_cnt();
@@ -610,7 +678,7 @@ wmgr::write_flush()
_page_cb_arr[_pg_index]._state = IN_USE;
}
}
- get_events(UNUSED, 0);
+ get_events(0, false);
if (_page_cb_arr[_pg_index]._state == UNUSED)
_page_cb_arr[_pg_index]._state = IN_USE;
return res;
@@ -620,17 +688,19 @@ void
wmgr::get_next_file()
{
_pg_cntr = 0;
+//std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::endl; // DEBUG
_lfc.pullEmptyFileFromEfp();
}
int32_t
-wmgr::get_events(page_state state, timespec* const timeout, bool flush)
+wmgr::get_events(timespec* const timeout,
+ bool flush)
{
if (_aio_evt_rem == 0) // no events to get
return 0;
int ret = 0;
- if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem/*_cache_num_pages + _jc->num_jfiles()*/, _aio_event_arr, timeout)) < 0)
+ if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem, _aio_event_arr, timeout)) < 0)
{
if (ret == -EINTR) // Interrupted by signal
return 0;
@@ -652,6 +722,7 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush)
throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "wmgr", "get_events");
}
_aio_evt_rem--;
+//std::cout << "'" << _aio_evt_rem; // DEBUG
aio_cb* aiocbp = _aio_event_arr[i].obj; // This I/O control block (iocb)
page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
long aioret = (long)_aio_event_arr[i].res;
@@ -671,6 +742,7 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush)
}
if (pcbp) // Page writes have pcb
{
+//std::cout << "p"; // DEBUG
uint32_t s = pcbp->_pdtokl->size();
std::vector<data_tok*> dtokl;
dtokl.reserve(s);
@@ -754,7 +826,8 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush)
// Clean up this pcb's data_tok list
pcbp->_pdtokl->clear();
- pcbp->_state = state;
+ pcbp->_state = UNUSED;
+//std::cout << "c" << pcbp->_index << pcbp->state_str(); // DEBUG
// Perform AIO return callback
if (_cbp && tot_data_toks)
@@ -762,10 +835,10 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush)
}
else // File header writes have no pcb
{
+//std::cout << "f"; // DEBUG
file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf;
- _lfc.addWriteCompletedDblkCount(fhp->_file_number, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS);
+ _lfc.addWriteCompletedDblkCount(fhp->_file_number, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS);
_lfc.decrOutstandingAioOperationCount(fhp->_file_number);
- //fcntlp->set_wr_fhdr_aio_outstanding(false); // TODO: Do we need this?
}
}
@@ -784,7 +857,9 @@ wmgr::is_txn_synced(const std::string& xid)
}
void
-wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, const uint16_t wcache_num_pages)
+wmgr::initialize(aio_callback* const cbp,
+ const uint32_t wcache_pgsize_sblks,
+ const uint16_t wcache_num_pages)
{
pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
@@ -796,9 +871,11 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, co
}
iores
-wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp,
- const std::size_t /*xidsize*/, const std::size_t /*dsize*/, const bool /*external*/
- ) const
+wmgr::pre_write_check(const _op_type op,
+ const data_tok* const dtokp,
+ const std::size_t /*xidsize*/,
+ const std::size_t /*dsize*/,
+ const bool /*external*/) const
{
// Check status of current file
// TODO: Replace for LFC
@@ -861,11 +938,12 @@ wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp,
}
void
-wmgr::dequeue_check(const std::string& xid, const uint64_t drid)
+wmgr::dequeue_check(const std::string& xid,
+ const uint64_t drid)
{
// First check emap
bool found = false;
- int16_t fid;
+ uint64_t fid;
short eres = _emap.get_pfid(drid, fid);
if (eres < enq_map::EMAP_OK) { // fail
if (eres == enq_map::EMAP_RID_NOT_FOUND) {
@@ -891,13 +969,13 @@ void
wmgr::dblk_roundup()
{
const uint32_t xmagic = QLS_EMPTY_MAGIC;
- uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE_DBLKS) * JRNL_SBLK_SIZE_DBLKS;
+ uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, QLS_SBLK_SIZE_DBLKS) * QLS_SBLK_SIZE_DBLKS;
while (_cached_offset_dblks < wdblks)
{
- void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES);
+ void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
std::memcpy(wptr, (const void*)&xmagic, sizeof(xmagic));
-#ifdef RHM_CLEAN
- std::memset((char*)wptr + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE_BYTES - sizeof(xmagic));
+#ifdef QLS_CLEAN
+ std::memset((char*)wptr + sizeof(xmagic), QLS_CLEAN_CHAR, QLS_DBLK_SIZE_BYTES - sizeof(xmagic));
#endif
_pg_offset_dblks++;
_cached_offset_dblks++;
@@ -907,14 +985,15 @@ wmgr::dblk_roundup()
void
wmgr::rotate_page()
{
- _page_cb_arr[_pg_index]._state = AIO_PENDING;
- if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS)
+//std::cout << "^^^^^ wmgr::rotate_page() " << status_str() << " pi=" << _pg_index; // DEBUG
+ if (_pg_offset_dblks >= _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS)
{
_pg_offset_dblks = 0;
_pg_cntr++;
}
if (++_pg_index >= _cache_num_pages)
_pg_index = 0;
+//std::cout << "->" << _pg_index << std::endl; // DEBUG
}
void
@@ -928,7 +1007,7 @@ wmgr::status_str() const
std::ostringstream oss;
oss << "wmgr: pi=" << _pg_index << " pc=" << _pg_cntr;
oss << " po=" << _pg_offset_dblks << " aer=" << _aio_evt_rem;
- oss << " edac:" << (_enq_busy?"T":"F") << (_deq_busy?"T":"F");
+ oss << " edac=" << (_enq_busy?"T":"F") << (_deq_busy?"T":"F");
oss << (_abort_busy?"T":"F") << (_commit_busy?"T":"F");
oss << " ps=[";
for (int i=0; i<_cache_num_pages; i++)
@@ -938,11 +1017,10 @@ wmgr::status_str() const
case UNUSED: oss << "-"; break;
case IN_USE: oss << "U"; break;
case AIO_PENDING: oss << "A"; break;
- case AIO_COMPLETE: oss << "*"; break;
default: oss << _page_cb_arr[i]._state;
}
}
- oss << "] " << _lfc.status(0);
+ oss << "] ";
return oss.str();
}