diff options
Diffstat (limited to 'storage/ndb/src/old_files/rep/adapters/AppNDB.cpp')
-rw-r--r-- | storage/ndb/src/old_files/rep/adapters/AppNDB.cpp | 583 |
1 files changed, 583 insertions, 0 deletions
diff --git a/storage/ndb/src/old_files/rep/adapters/AppNDB.cpp b/storage/ndb/src/old_files/rep/adapters/AppNDB.cpp new file mode 100644 index 00000000000..05f6d52807f --- /dev/null +++ b/storage/ndb/src/old_files/rep/adapters/AppNDB.cpp @@ -0,0 +1,583 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "AppNDB.hpp" +#include <ConfigRetriever.hpp> +#include <AttributeHeader.hpp> +#include <NdbOperation.hpp> +#include <NdbDictionaryImpl.hpp> + +#include <signaldata/RepImpl.hpp> +#include <TransporterFacade.hpp> +#include <trigger_definitions.h> +#include <rep/storage/GCIPage.hpp> +#include <rep/storage/GCIBuffer.hpp> +#include <rep/rep_version.hpp> + +/***************************************************************************** + * Constructor / Destructor / Init + *****************************************************************************/ + +AppNDB::~AppNDB() +{ + delete m_tableInfoPs; + delete m_ndb; + m_tableInfoPs = 0; +} + +AppNDB::AppNDB(GCIContainer * gciContainer, RepState * repState) +{ + m_gciContainer = gciContainer; + m_repState = repState; + m_cond = NdbCondition_Create(); + m_started = true; +} + +void +AppNDB::init(const char* connectString) { + + // NdbThread_SetConcurrencyLevel(1+ 2); + m_ndb = new Ndb(""); + + m_ndb->useFullyQualifiedNames(false); + + m_ndb->setConnectString(connectString); + /** + * @todo Set proper max no of transactions?? needed?? Default 12?? + */ + m_ndb->init(2048); + m_dict = m_ndb->getDictionary(); + + m_ownNodeId = m_ndb->getNodeId(); + + ndbout << "-- NDB Cluster -- REP node " << m_ownNodeId << " -- Version " + << REP_VERSION_ID << " --" << endl; + ndbout_c("Connecting to NDB Cluster..."); + if (m_ndb->waitUntilReady() != 0){ + REPABORT("NDB Cluster not ready for connections"); + } + ndbout_c("Phase 1 (AppNDB): Connection 1 to NDB Cluster opened (Applier)"); + + m_tableInfoPs = new TableInfoPs(); + + m_applierThread = NdbThread_Create(runAppNDB_C, + (void**)this, + 32768, + "AppNDBThread", + NDB_THREAD_PRIO_LOW); +} + + +/***************************************************************************** + * Threads + *****************************************************************************/ + +extern "C" +void* +runAppNDB_C(void * me) +{ + ((AppNDB *) me)->threadMainAppNDB(); + NdbThread_Exit(0); + return me; +} + +void +AppNDB::threadMainAppNDB() { + MetaRecord * mr; + LogRecord * lr; + GCIBuffer::iterator * itBuffer; + GCIPage::iterator * itPage; + GCIBuffer * buffer; + GCIPage * page; + Uint32 gci=0; + + bool force; + while(true){ + + m_gciBufferList.lock(); + if(m_gciBufferList.size()==0) + NdbCondition_Wait(m_cond, m_gciBufferList.getMutex()); + m_gciBufferList.unlock(); + + /** + * Do nothing if we are not started! + */ + if(!m_started) + continue; + + if(m_gciBufferList.size()>0) { + m_gciBufferList.lock(); + buffer = m_gciBufferList[0]; + assert(buffer!=0); + if(buffer==0) { + m_gciBufferList.unlock(); +// stopApplier(GrepError::REP_APPLY_NULL_GCIBUFFER); + return; + } + m_gciBufferList.unlock(); + + RLOG(("Applying %d:[%d]", buffer->getId(), buffer->getGCI())); + gci = buffer->getGCI(); + /** + * Do stuff with buffer + */ + + force = buffer->m_force; + itBuffer = new GCIBuffer::iterator(buffer); + page = itBuffer->first(); + + Record * record; + while(page!=0 && m_started) { + + itPage = new GCIPage::iterator(page); + record = itPage->first(); + + while(record!=0 && m_started) { + switch(Record::RecordType(record->recordType)) { + case Record::META: + mr = (MetaRecord*)record; + if(applyMetaRecord(mr, gci) < 0){ + /** + * If we fail with a meta record then + * we should fail the replication! + */ + //stopApplier(GrepError::REP_APPLY_METARECORD_FAILED); + } + break; + case Record::LOG: + lr = (LogRecord*)record; + if(applyLogRecord(lr, force, gci) < 0) { + /** + * If we fail to apply a log record AND + * we have sent a ref to repstate event, + * then we should not try to apply another one! + */ +// stopApplier(GrepError::REP_APPLY_LOGRECORD_FAILED); + } + break; + default: + REPABORT("Illegal record type"); + }; + record = itPage->next(); + } + delete itPage; + itPage = 0; + page = itBuffer->next(); + } + + m_gciBufferList.erase(0, true); + /** + * "callback" to RepState to send REP_INSERT_GCIBUFFER_CONF + */ + m_repState->eventInsertConf(buffer->getGCI(), buffer->getId()); + delete itBuffer; + itBuffer = 0; + mr = 0; + lr = 0; + page = 0; + buffer = 0; + } + } + + +} + +void AppNDB::startApplier(){ + m_started = true; +} + + +void AppNDB::stopApplier(GrepError::Code err){ + m_started = false; + m_repState->eventInsertRef(0,0,0, err); +} + + +GrepError::Code +AppNDB::applyBuffer(Uint32 nodeGrp, Uint32 epoch, Uint32 force) +{ + m_gciBufferList.lock(); + + GCIBuffer * buffer = m_gciContainer->getGCIBuffer(epoch, nodeGrp); + if (buffer == NULL) { + RLOG(("WARNING! Request to apply NULL buffer %d[%d]. Force %d", + nodeGrp, epoch, force)); + return GrepError::NO_ERROR; + } + if (!buffer->isComplete()) { + RLOG(("WARNING! Request to apply non-complete buffer %d[%d]. Force %d", + nodeGrp, epoch, force)); + return GrepError::REP_APPLY_NONCOMPLETE_GCIBUFFER; + } + buffer->m_force = force; + + assert(buffer!=0); + m_gciBufferList.push_back(buffer, false); + NdbCondition_Broadcast(m_cond); + m_gciBufferList.unlock(); + return GrepError::NO_ERROR; +} + +int +AppNDB::applyLogRecord(LogRecord* lr, bool force, Uint32 gci) +{ +#if 0 + RLOG(("Applying log record (force %d, Op %d, GCI %d)", + force, lr->operation, gci)); +#endif + + int retries =0; + retry: + if(retries == 10) { + m_repState->eventInsertRef(gci, 0, lr->tableId, + GrepError::REP_APPLIER_EXECUTE_TRANSACTION); + return -1; + } + NdbConnection * trans = m_ndb->startTransaction(); + if (trans == NULL) { + /** + * Transaction could not be started + * @todo Handle the error by: + * 1. Return error code + * 2. Print log message + * 3. On higher level indicate that DB has been tainted + */ + ndbout_c("AppNDB: Send the following error msg to NDB Cluster support"); + reportNdbError("Cannot start transaction!", trans->getNdbError()); + m_repState->eventInsertRef(gci, 0, 0, + GrepError::REP_APPLIER_START_TRANSACTION); + REPABORT("Can not start transaction"); + } + + /** + * Resolve table name based on table id + */ + const Uint32 tableId = lr->tableId; + const char * tableName = m_tableInfoPs->getTableName(tableId); + + /** + * Close trans and return if it is systab_0. + */ + if (tableId == 0) { + RLOG(("WARNING! System table log record received")); + m_ndb->closeTransaction(trans); + return -1; + } + + if (tableName==0) { + /** + * Table probably does not exist + * (Under normal operation this should not happen + * since log records should not appear unless the + * table has been created.) + * + * @todo Perhaps the table is not cached due to a restart, + * so let's check in the dictionary if it exists. + */ + m_ndb->closeTransaction(trans); + m_repState->eventInsertRef(gci, 0, tableId, + GrepError::REP_APPLIER_NO_TABLE); + return -1; + } + + const NdbDictionary::Table * table = m_dict->getTable(tableName); + + NdbOperation * op = trans->getNdbOperation(tableName); + if (op == NULL) { + ndbout_c("AppNDB: Send the following error msg to NDB Cluster support"); + reportNdbError("Cannot get NdbOperation record", + trans->getNdbError()); + m_repState->eventInsertRef(gci,0,tableId, + GrepError::REP_APPLIER_NO_OPERATION); + REPABORT("Can not get NdbOperation record"); + } + + int check=0; + switch(lr->operation) { + case TriggerEvent::TE_INSERT: // INSERT + check = op->insertTuple(); + break; + case TriggerEvent::TE_DELETE: // DELETE + check = op->deleteTuple(); + break; + case TriggerEvent::TE_UPDATE: // UPDATE + if (force) { + check = op->writeTuple(); + } else { + check = op->updateTuple(); + } + break; + case TriggerEvent::TE_CUSTOM: //SCAN + check = op->writeTuple(); + break; + default: + m_ndb->closeTransaction(trans); + return -1; + }; + + if (check<0) { + ndbout_c("AppNDB: Something is weird"); + } + + /** + * @todo index inside LogRecord struct somewhat prettier + * Now it 4 (sizeof(Uint32)), and 9 the position inside the struct + * where the data starts. + */ + AttributeHeader * ah=(AttributeHeader *)((char *)lr + sizeof(Uint32) * 9); + AttributeHeader *end = (AttributeHeader *)(ah + lr->attributeHeaderWSize); + Uint32 * dataPtr = (Uint32 *)(end); + + /** + * @note attributeheader for operaration insert includes a duplicate + * p.k. The quick fix for this problem/bug is to skip the first set of + * of p.k, and start from the other set of P.Ks. Data is duplicated for + * the p.k. + */ + if (lr->operation == 0) { + for(int i = 0; i< table->getNoOfPrimaryKeys(); i++) { + ah+=ah->getHeaderSize(); + dataPtr = dataPtr + ah->getDataSize(); + } + } + + while (ah < end) { + const NdbDictionary::Column * column = + table->getColumn(ah->getAttributeId()); + /** + * @todo: Here is a limitation. I don't care if it is a tuplekey + * that is autogenerated or an ordinary pk. I just whack it in. + * However, this must be examined. + */ + if(column->getPrimaryKey()) { + if(op->equal(ah->getAttributeId(), (const char *)dataPtr) < 0) { + ndbout_c("AppNDB: Equal failed id %d op %d name %s, gci %d force %d", + ah->getAttributeId(), + lr->operation, + column->getName(), gci, force); + reportNdbError("Equal!", trans->getNdbError()); + } + + } else { + if(op->setValue(ah->getAttributeId(), (const char *)dataPtr) < 0) + ndbout_c("AppNDB: setvalue failed id %d op %d name %s, gci %d force %d", + ah->getAttributeId(), + lr->operation, + column->getName(), gci, force); + } + + dataPtr = dataPtr + ah->getDataSize(); + ah = ah + ah->getHeaderSize() ; + } + + if(trans->execute(Commit) != 0) { + /** + * Transaction commit failure + */ + const NdbError err = trans->getNdbError(); + m_ndb->closeTransaction(trans); + switch(err.status){ + case NdbError::Success: + { + m_repState->eventInsertRef(gci, 0, tableId, + GrepError::REP_APPLIER_EXECUTE_TRANSACTION); + return -1; + } + break; + case NdbError::TemporaryError: + { + NdbSleep_MilliSleep(50); + retries++; + goto retry; + } + break; + case NdbError::UnknownResult: + { + ndbout_c("AppNDB: Send the following error msg to NDB Cluster support"); + reportNdbError("Execute transaction failed!", + trans->getNdbError()); + m_repState->eventInsertRef(gci, 0, tableId, + GrepError::REP_APPLIER_EXECUTE_TRANSACTION); + return -1; + } + break; + case NdbError::PermanentError: + { + if(err.code == 626) { + if(force && lr->operation == TriggerEvent::TE_DELETE) /**delete*/ { + /**tuple was not found. Ignore this, since + * we are trying to apply a "delete a tuple"-log record before + * having applied the scan data. + */ + return -1; + } + } + + ndbout_c("AppNDB: Send the following error msg to NDB Cluster support"); reportNdbError("Execute transaction failed!", + trans->getNdbError()); + ndbout_c("\n\nAppNDB: RepNode will now crash."); + m_ndb->closeTransaction(trans); + m_repState->eventInsertRef(gci, 0, tableId, + GrepError::REP_APPLIER_EXECUTE_TRANSACTION); + return -1; + } + break; + } + } + + /** + * No errors. Close transaction and continue in applierThread. + */ + m_ndb->closeTransaction(trans); + return 1; +} + + +int +AppNDB::applyMetaRecord(MetaRecord* mr, Uint32 gci) +{ + /** + * Validate table id + */ + Uint32 tableId = mr->tableId; + if (tableId==0) { + RLOG(("WARNING! Meta record contained record with tableId 0")); + return 0; + } + + /** + * Prepare meta record + */ + NdbDictionary::Table * table = prepareMetaRecord(mr); + if(table == 0) { + RLOG(("WARNING! Prepare table meta record failed for table %d", tableId)); + m_dict->getNdbError(); + m_repState->eventInsertRef(gci,0,tableId, + GrepError::REP_APPLIER_PREPARE_TABLE); + return -1; + } + + /** + * Table does not exist in TableInfoPs -> add it + */ + if(m_tableInfoPs->getTableName(tableId)==0) { + RLOG(("Table %d:%s added to m_tableInfoPs", tableId, table->getName())); + m_tableInfoPs->insert(tableId,table->getName()); + } + + /** + * Validate that table does not exist in Dict + */ + + const NdbDictionary::Table * tmpTable = m_dict->getTable(table->getName()); + if(tmpTable !=0) { + /** + * Oops, a table with the same name exists + */ + if(tmpTable->getObjectVersion()!=table->getObjectVersion()) { + char buf[100]; + sprintf(buf,"WARNING! Another version of table %d:%s already exists." + "Currently, we dont support versions, so will abort now!", + tableId, table->getName()); + + REPABORT(buf); + + } + RLOG(("WARNING! An identical table %d:%s already exists.", + tableId, table->getName())); + return -1; + } + + + /** + * @todo WARNING! Should scan table MR for columns that are not supported + */ + /* + NdbDictionary::Column * column; + + for(int i=0; i<table->getNoOfColumns(); i++) { + column = table->getColumn(i); + if(column->getAutoIncrement()) { + reportWarning(table->getName(), column->getName(), + "Uses AUTOINCREMENT of PK"); + } + } + */ + + + /** + * Create table + */ + if(m_dict->createTable(*table)<0) { + ndbout_c("AppNDB: Send the following error msg to NDB Cluster support"); + reportNdbError("Create table failed!", m_dict->getNdbError()); + m_repState->eventCreateTableRef(gci, + tableId, + table->getName(), + GrepError::REP_APPLIER_CREATE_TABLE); + return -1; + } + + RLOG(("Table %d:%s created", tableId, table->getName())); + return 0; +} + +NdbDictionary::Table* +AppNDB::prepareMetaRecord(MetaRecord* mr) { + NdbTableImpl * tmp = 0; + NdbDictionary::Table * table =0; + Uint32 * data =(Uint32*)( ((char*)mr + sizeof(Uint32)*6)); + int res = NdbDictInterface::parseTableInfo(&tmp, data, mr->dataLen, + m_ndb->usingFullyQualifiedNames()); + if(res == 0) { + table = tmp; + return table; + } else{ + return 0; + } +} + +void +AppNDB::reportNdbError(const char * msg, const NdbError & err) { + ndbout_c("%s : Error code %d , error message %s", + msg, err.code, + (err.message ? err.message : "")); +} + +void +AppNDB::reportWarning(const char * tableName, const char * message) { + ndbout_c("WARNING: Table %s, %s", tableName, message); +} + +void +AppNDB::reportWarning(const char * tableName, const char * columnName, + const char * message) { + ndbout_c("WARNING: Table %s, column %s, %s", tableName, columnName,message); +} + +int +AppNDB::dropTable(Uint32 tableId) +{ + char * tableName = m_tableInfoPs->getTableName(tableId); + if(tableName == 0) return -1; + ndbout_c("AppNDB: Dropping table "); + if(m_dict->dropTable(tableName) != 0) { + reportNdbError("Failed dropping table",m_dict->getNdbError()); + return -1; + } + m_tableInfoPs->del(tableId); + return 1; +} |