diff options
Diffstat (limited to 'ndb/src/rep/adapters/AppNDB.cpp')
-rw-r--r-- | ndb/src/rep/adapters/AppNDB.cpp | 581 |
1 files changed, 0 insertions, 581 deletions
diff --git a/ndb/src/rep/adapters/AppNDB.cpp b/ndb/src/rep/adapters/AppNDB.cpp deleted file mode 100644 index abb146d921f..00000000000 --- a/ndb/src/rep/adapters/AppNDB.cpp +++ /dev/null @@ -1,581 +0,0 @@ -/* Copyright (C) 2003 MySQL AB - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -#include "AppNDB.hpp" -#include <ConfigRetriever.hpp> -#include <AttributeHeader.hpp> -#include <NdbOperation.hpp> -#include <NdbDictionaryImpl.hpp> - -#include <signaldata/RepImpl.hpp> -#include <TransporterFacade.hpp> -#include <trigger_definitions.h> -#include <rep/storage/GCIPage.hpp> -#include <rep/storage/GCIBuffer.hpp> -#include <rep/rep_version.hpp> - -/***************************************************************************** - * Constructor / Destructor / Init - *****************************************************************************/ - -AppNDB::~AppNDB() -{ - delete m_tableInfoPs; - delete m_ndb; - m_tableInfoPs = 0; -} - -AppNDB::AppNDB(GCIContainer * gciContainer, RepState * repState) -{ - m_gciContainer = gciContainer; - m_repState = repState; - m_cond = NdbCondition_Create(); - m_started = true; -} - -void -AppNDB::init(const char* connectString) { - - // NdbThread_SetConcurrencyLevel(1+ 2); - Ndb::useFullyQualifiedNames(false); - - m_ndb = new Ndb(""); - m_ndb->setConnectString(connectString); - /** - * @todo Set proper max no of transactions?? needed?? Default 12?? - */ - m_ndb->init(2048); - m_dict = m_ndb->getDictionary(); - - m_ownNodeId = m_ndb->getNodeId(); - - ndbout << "-- NDB Cluster -- REP node " << m_ownNodeId << " -- Version " - << REP_VERSION_ID << " --" << endl; - ndbout_c("Connecting to NDB Cluster..."); - if (m_ndb->waitUntilReady() != 0){ - REPABORT("NDB Cluster not ready for connections"); - } - ndbout_c("Phase 1 (AppNDB): Connection 1 to NDB Cluster opened (Applier)"); - - m_tableInfoPs = new TableInfoPs(); - - m_applierThread = NdbThread_Create(runAppNDB_C, - (void**)this, - 32768, - "AppNDBThread", - NDB_THREAD_PRIO_LOW); -} - - -/***************************************************************************** - * Threads - *****************************************************************************/ - -extern "C" -void* -runAppNDB_C(void * me) -{ - ((AppNDB *) me)->threadMainAppNDB(); - NdbThread_Exit(0); - return me; -} - -void -AppNDB::threadMainAppNDB() { - MetaRecord * mr; - LogRecord * lr; - GCIBuffer::iterator * itBuffer; - GCIPage::iterator * itPage; - GCIBuffer * buffer; - GCIPage * page; - Uint32 gci=0; - - bool force; - while(true){ - - m_gciBufferList.lock(); - if(m_gciBufferList.size()==0) - NdbCondition_Wait(m_cond, m_gciBufferList.getMutex()); - m_gciBufferList.unlock(); - - /** - * Do nothing if we are not started! - */ - if(!m_started) - continue; - - if(m_gciBufferList.size()>0) { - m_gciBufferList.lock(); - buffer = m_gciBufferList[0]; - assert(buffer!=0); - if(buffer==0) { - m_gciBufferList.unlock(); -// stopApplier(GrepError::REP_APPLY_NULL_GCIBUFFER); - return; - } - m_gciBufferList.unlock(); - - RLOG(("Applying %d:[%d]", buffer->getId(), buffer->getGCI())); - gci = buffer->getGCI(); - /** - * Do stuff with buffer - */ - - force = buffer->m_force; - itBuffer = new GCIBuffer::iterator(buffer); - page = itBuffer->first(); - - Record * record; - while(page!=0 && m_started) { - - itPage = new GCIPage::iterator(page); - record = itPage->first(); - - while(record!=0 && m_started) { - switch(Record::RecordType(record->recordType)) { - case Record::META: - mr = (MetaRecord*)record; - if(applyMetaRecord(mr, gci) < 0){ - /** - * If we fail with a meta record then - * we should fail the replication! - */ - //stopApplier(GrepError::REP_APPLY_METARECORD_FAILED); - } - break; - case Record::LOG: - lr = (LogRecord*)record; - if(applyLogRecord(lr, force, gci) < 0) { - /** - * If we fail to apply a log record AND - * we have sent a ref to repstate event, - * then we should not try to apply another one! - */ -// stopApplier(GrepError::REP_APPLY_LOGRECORD_FAILED); - } - break; - default: - REPABORT("Illegal record type"); - }; - record = itPage->next(); - } - delete itPage; - itPage = 0; - page = itBuffer->next(); - } - - m_gciBufferList.erase(0, true); - /** - * "callback" to RepState to send REP_INSERT_GCIBUFFER_CONF - */ - m_repState->eventInsertConf(buffer->getGCI(), buffer->getId()); - delete itBuffer; - itBuffer = 0; - mr = 0; - lr = 0; - page = 0; - buffer = 0; - } - } - - -} - -void AppNDB::startApplier(){ - m_started = true; -} - - -void AppNDB::stopApplier(GrepError::Code err){ - m_started = false; - m_repState->eventInsertRef(0,0,0, err); -} - - -GrepError::Code -AppNDB::applyBuffer(Uint32 nodeGrp, Uint32 epoch, Uint32 force) -{ - m_gciBufferList.lock(); - - GCIBuffer * buffer = m_gciContainer->getGCIBuffer(epoch, nodeGrp); - if (buffer == NULL) { - RLOG(("WARNING! Request to apply NULL buffer %d[%d]. Force %d", - nodeGrp, epoch, force)); - return GrepError::NO_ERROR; - } - if (!buffer->isComplete()) { - RLOG(("WARNING! Request to apply non-complete buffer %d[%d]. Force %d", - nodeGrp, epoch, force)); - return GrepError::REP_APPLY_NONCOMPLETE_GCIBUFFER; - } - buffer->m_force = force; - - assert(buffer!=0); - m_gciBufferList.push_back(buffer, false); - NdbCondition_Broadcast(m_cond); - m_gciBufferList.unlock(); - return GrepError::NO_ERROR; -} - -int -AppNDB::applyLogRecord(LogRecord* lr, bool force, Uint32 gci) -{ -#if 0 - RLOG(("Applying log record (force %d, Op %d, GCI %d)", - force, lr->operation, gci)); -#endif - - int retries =0; - retry: - if(retries == 10) { - m_repState->eventInsertRef(gci, 0, lr->tableId, - GrepError::REP_APPLIER_EXECUTE_TRANSACTION); - return -1; - } - NdbConnection * trans = m_ndb->startTransaction(); - if (trans == NULL) { - /** - * Transaction could not be started - * @todo Handle the error by: - * 1. Return error code - * 2. Print log message - * 3. On higher level indicate that DB has been tainted - */ - ndbout_c("AppNDB: Send the following error msg to NDB Cluster support"); - reportNdbError("Cannot start transaction!", trans->getNdbError()); - m_repState->eventInsertRef(gci, 0, 0, - GrepError::REP_APPLIER_START_TRANSACTION); - REPABORT("Can not start transaction"); - } - - /** - * Resolve table name based on table id - */ - const Uint32 tableId = lr->tableId; - const char * tableName = m_tableInfoPs->getTableName(tableId); - - /** - * Close trans and return if it is systab_0. - */ - if (tableId == 0) { - RLOG(("WARNING! System table log record received")); - m_ndb->closeTransaction(trans); - return -1; - } - - if (tableName==0) { - /** - * Table probably does not exist - * (Under normal operation this should not happen - * since log records should not appear unless the - * table has been created.) - * - * @todo Perhaps the table is not cached due to a restart, - * so let's check in the dictionary if it exists. - */ - m_ndb->closeTransaction(trans); - m_repState->eventInsertRef(gci, 0, tableId, - GrepError::REP_APPLIER_NO_TABLE); - return -1; - } - - const NdbDictionary::Table * table = m_dict->getTable(tableName); - - NdbOperation * op = trans->getNdbOperation(tableName); - if (op == NULL) { - ndbout_c("AppNDB: Send the following error msg to NDB Cluster support"); - reportNdbError("Cannot get NdbOperation record", - trans->getNdbError()); - m_repState->eventInsertRef(gci,0,tableId, - GrepError::REP_APPLIER_NO_OPERATION); - REPABORT("Can not get NdbOperation record"); - } - - int check=0; - switch(lr->operation) { - case TriggerEvent::TE_INSERT: // INSERT - check = op->insertTuple(); - break; - case TriggerEvent::TE_DELETE: // DELETE - check = op->deleteTuple(); - break; - case TriggerEvent::TE_UPDATE: // UPDATE - if (force) { - check = op->writeTuple(); - } else { - check = op->updateTuple(); - } - break; - case TriggerEvent::TE_CUSTOM: //SCAN - check = op->writeTuple(); - break; - default: - m_ndb->closeTransaction(trans); - return -1; - }; - - if (check<0) { - ndbout_c("AppNDB: Something is weird"); - } - - /** - * @todo index inside LogRecord struct somewhat prettier - * Now it 4 (sizeof(Uint32)), and 9 the position inside the struct - * where the data starts. - */ - AttributeHeader * ah=(AttributeHeader *)((char *)lr + sizeof(Uint32) * 9); - AttributeHeader *end = (AttributeHeader *)(ah + lr->attributeHeaderWSize); - Uint32 * dataPtr = (Uint32 *)(end); - - /** - * @note attributeheader for operaration insert includes a duplicate - * p.k. The quick fix for this problem/bug is to skip the first set of - * of p.k, and start from the other set of P.Ks. Data is duplicated for - * the p.k. - */ - if (lr->operation == 0) { - for(int i = 0; i< table->getNoOfPrimaryKeys(); i++) { - ah+=ah->getHeaderSize(); - dataPtr = dataPtr + ah->getDataSize(); - } - } - - while (ah < end) { - const NdbDictionary::Column * column = - table->getColumn(ah->getAttributeId()); - /** - * @todo: Here is a limitation. I don't care if it is a tuplekey - * that is autogenerated or an ordinary pk. I just whack it in. - * However, this must be examined. - */ - if(column->getPrimaryKey()) { - if(op->equal(ah->getAttributeId(), (const char *)dataPtr) < 0) { - ndbout_c("AppNDB: Equal failed id %d op %d name %s, gci %d force %d", - ah->getAttributeId(), - lr->operation, - column->getName(), gci, force); - reportNdbError("Equal!", trans->getNdbError()); - } - - } else { - if(op->setValue(ah->getAttributeId(), (const char *)dataPtr) < 0) - ndbout_c("AppNDB: setvalue failed id %d op %d name %s, gci %d force %d", - ah->getAttributeId(), - lr->operation, - column->getName(), gci, force); - } - - dataPtr = dataPtr + ah->getDataSize(); - ah = ah + ah->getHeaderSize() ; - } - - if(trans->execute(Commit) != 0) { - /** - * Transaction commit failure - */ - const NdbError err = trans->getNdbError(); - m_ndb->closeTransaction(trans); - switch(err.status){ - case NdbError::Success: - { - m_repState->eventInsertRef(gci, 0, tableId, - GrepError::REP_APPLIER_EXECUTE_TRANSACTION); - return -1; - } - break; - case NdbError::TemporaryError: - { - NdbSleep_MilliSleep(50); - retries++; - goto retry; - } - break; - case NdbError::UnknownResult: - { - ndbout_c("AppNDB: Send the following error msg to NDB Cluster support"); - reportNdbError("Execute transaction failed!", - trans->getNdbError()); - m_repState->eventInsertRef(gci, 0, tableId, - GrepError::REP_APPLIER_EXECUTE_TRANSACTION); - return -1; - } - break; - case NdbError::PermanentError: - { - if(err.code == 626) { - if(force && lr->operation == TriggerEvent::TE_DELETE) /**delete*/ { - /**tuple was not found. Ignore this, since - * we are trying to apply a "delete a tuple"-log record before - * having applied the scan data. - */ - return -1; - } - } - - ndbout_c("AppNDB: Send the following error msg to NDB Cluster support"); reportNdbError("Execute transaction failed!", - trans->getNdbError()); - ndbout_c("\n\nAppNDB: RepNode will now crash."); - m_ndb->closeTransaction(trans); - m_repState->eventInsertRef(gci, 0, tableId, - GrepError::REP_APPLIER_EXECUTE_TRANSACTION); - return -1; - } - break; - } - } - - /** - * No errors. Close transaction and continue in applierThread. - */ - m_ndb->closeTransaction(trans); - return 1; -} - - -int -AppNDB::applyMetaRecord(MetaRecord* mr, Uint32 gci) -{ - /** - * Validate table id - */ - Uint32 tableId = mr->tableId; - if (tableId==0) { - RLOG(("WARNING! Meta record contained record with tableId 0")); - return 0; - } - - /** - * Prepare meta record - */ - NdbDictionary::Table * table = prepareMetaRecord(mr); - if(table == 0) { - RLOG(("WARNING! Prepare table meta record failed for table %d", tableId)); - m_dict->getNdbError(); - m_repState->eventInsertRef(gci,0,tableId, - GrepError::REP_APPLIER_PREPARE_TABLE); - return -1; - } - - /** - * Table does not exist in TableInfoPs -> add it - */ - if(m_tableInfoPs->getTableName(tableId)==0) { - RLOG(("Table %d:%s added to m_tableInfoPs", tableId, table->getName())); - m_tableInfoPs->insert(tableId,table->getName()); - } - - /** - * Validate that table does not exist in Dict - */ - - const NdbDictionary::Table * tmpTable = m_dict->getTable(table->getName()); - if(tmpTable !=0) { - /** - * Oops, a table with the same name exists - */ - if(tmpTable->getObjectVersion()!=table->getObjectVersion()) { - char buf[100]; - sprintf(buf,"WARNING! Another version of table %d:%s already exists." - "Currently, we dont support versions, so will abort now!", - tableId, table->getName()); - - REPABORT(buf); - - } - RLOG(("WARNING! An identical table %d:%s already exists.", - tableId, table->getName())); - return -1; - } - - - /** - * @todo WARNING! Should scan table MR for columns that are not supported - */ - /* - NdbDictionary::Column * column; - - for(int i=0; i<table->getNoOfColumns(); i++) { - column = table->getColumn(i); - if(column->getAutoIncrement()) { - reportWarning(table->getName(), column->getName(), - "Uses AUTOINCREMENT of PK"); - } - } - */ - - - /** - * Create table - */ - if(m_dict->createTable(*table)<0) { - ndbout_c("AppNDB: Send the following error msg to NDB Cluster support"); - reportNdbError("Create table failed!", m_dict->getNdbError()); - m_repState->eventCreateTableRef(gci, - tableId, - table->getName(), - GrepError::REP_APPLIER_CREATE_TABLE); - return -1; - } - - RLOG(("Table %d:%s created", tableId, table->getName())); - return 0; -} - -NdbDictionary::Table* -AppNDB::prepareMetaRecord(MetaRecord* mr) { - NdbTableImpl * tmp = 0; - NdbDictionary::Table * table =0; - Uint32 * data =(Uint32*)( ((char*)mr + sizeof(Uint32)*6)); - int res = NdbDictInterface::parseTableInfo(&tmp, data, mr->dataLen); - if(res == 0) { - table = tmp; - return table; - } else{ - return 0; - } -} - -void -AppNDB::reportNdbError(const char * msg, const NdbError & err) { - ndbout_c("%s : Error code %d , error message %s", - msg, err.code, - (err.message ? err.message : "")); -} - -void -AppNDB::reportWarning(const char * tableName, const char * message) { - ndbout_c("WARNING: Table %s, %s", tableName, message); -} - -void -AppNDB::reportWarning(const char * tableName, const char * columnName, - const char * message) { - ndbout_c("WARNING: Table %s, column %s, %s", tableName, columnName,message); -} - -int -AppNDB::dropTable(Uint32 tableId) -{ - char * tableName = m_tableInfoPs->getTableName(tableId); - if(tableName == 0) return -1; - ndbout_c("AppNDB: Dropping table "); - if(m_dict->dropTable(tableName) != 0) { - reportNdbError("Failed dropping table",m_dict->getNdbError()); - return -1; - } - m_tableInfoPs->del(tableId); - return 1; -} |