summaryrefslogtreecommitdiff
path: root/ndb/tools/restore/consumer_restore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/tools/restore/consumer_restore.cpp')
-rw-r--r--ndb/tools/restore/consumer_restore.cpp674
1 files changed, 0 insertions, 674 deletions
diff --git a/ndb/tools/restore/consumer_restore.cpp b/ndb/tools/restore/consumer_restore.cpp
deleted file mode 100644
index d72b82569e2..00000000000
--- a/ndb/tools/restore/consumer_restore.cpp
+++ /dev/null
@@ -1,674 +0,0 @@
-/* Copyright (C) 2003 MySQL AB
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-
-#include "consumer_restore.hpp"
-#include <NdbSleep.h>
-
-extern FilteredNdbOut err;
-extern FilteredNdbOut info;
-extern FilteredNdbOut debug;
-
-static void callback(int, NdbTransaction*, void*);
-
-extern const char * g_connect_string;
-bool
-BackupRestore::init()
-{
- release();
-
- if (!m_restore && !m_restore_meta)
- return true;
-
- m_cluster_connection = new Ndb_cluster_connection(g_connect_string);
- if(m_cluster_connection->connect(12, 5, 1) != 0)
- {
- return -1;
- }
-
- m_ndb = new Ndb(m_cluster_connection);
-
- if (m_ndb == NULL)
- return false;
-
- m_ndb->init(1024);
- if (m_ndb->waitUntilReady(30) != 0)
- {
- err << "Failed to connect to ndb!!" << endl;
- return false;
- }
- info << "Connected to ndb!!" << endl;
-
- m_callback = new restore_callback_t[m_parallelism];
-
- if (m_callback == 0)
- {
- err << "Failed to allocate callback structs" << endl;
- return false;
- }
-
- m_free_callback= m_callback;
- for (Uint32 i= 0; i < m_parallelism; i++) {
- m_callback[i].restore= this;
- m_callback[i].connection= 0;
- if (i > 0)
- m_callback[i-1].next= &(m_callback[i]);
- }
- m_callback[m_parallelism-1].next = 0;
-
- return true;
-}
-
-void BackupRestore::release()
-{
- if (m_ndb)
- {
- delete m_ndb;
- m_ndb= 0;
- }
-
- if (m_callback)
- {
- delete [] m_callback;
- m_callback= 0;
- }
-
- if (m_cluster_connection)
- {
- delete m_cluster_connection;
- m_cluster_connection= 0;
- }
-}
-
-BackupRestore::~BackupRestore()
-{
- release();
-}
-
-static
-int
-match_blob(const char * name){
- int cnt, id1, id2;
- char buf[256];
- if((cnt = sscanf(name, "%[^/]/%[^/]/NDB$BLOB_%d_%d", buf, buf, &id1, &id2)) == 4){
- return id1;
- }
-
- return -1;
-}
-
-const NdbDictionary::Table*
-BackupRestore::get_table(const NdbDictionary::Table* tab){
- if(m_cache.m_old_table == tab)
- return m_cache.m_new_table;
- m_cache.m_old_table = tab;
-
- int cnt, id1, id2;
- char db[256], schema[256];
- if((cnt = sscanf(tab->getName(), "%[^/]/%[^/]/NDB$BLOB_%d_%d",
- db, schema, &id1, &id2)) == 4){
- m_ndb->setDatabaseName(db);
- m_ndb->setSchemaName(schema);
-
- BaseString::snprintf(db, sizeof(db), "NDB$BLOB_%d_%d",
- m_new_tables[id1]->getTableId(), id2);
-
- m_cache.m_new_table = m_ndb->getDictionary()->getTable(db);
-
- } else {
- m_cache.m_new_table = m_new_tables[tab->getTableId()];
- }
- assert(m_cache.m_new_table);
- return m_cache.m_new_table;
-}
-
-bool
-BackupRestore::finalize_table(const TableS & table){
- bool ret= true;
- if (!m_restore && !m_restore_meta)
- return ret;
- if (table.have_auto_inc())
- {
- Uint64 max_val= table.get_max_auto_val();
- Uint64 auto_val= m_ndb->readAutoIncrementValue(get_table(table.m_dictTable));
- if (max_val+1 > auto_val || auto_val == ~(Uint64)0)
- ret= m_ndb->setAutoIncrementValue(get_table(table.m_dictTable), max_val+1, false);
- }
- return ret;
-}
-
-bool
-BackupRestore::table(const TableS & table){
- if (!m_restore && !m_restore_meta)
- return true;
-
- const char * name = table.getTableName();
-
- /**
- * Ignore blob tables
- */
- if(match_blob(name) >= 0)
- return true;
-
- const NdbTableImpl & tmptab = NdbTableImpl::getImpl(* table.m_dictTable);
- if(tmptab.m_indexType != NdbDictionary::Index::Undefined){
- m_indexes.push_back(table.m_dictTable);
- return true;
- }
-
- BaseString tmp(name);
- Vector<BaseString> split;
- if(tmp.split(split, "/") != 3){
- err << "Invalid table name format " << name << endl;
- return false;
- }
-
- m_ndb->setDatabaseName(split[0].c_str());
- m_ndb->setSchemaName(split[1].c_str());
-
- NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
- if(m_restore_meta){
- NdbDictionary::Table copy(*table.m_dictTable);
-
- copy.setName(split[2].c_str());
-
- if (dict->createTable(copy) == -1)
- {
- err << "Create table " << table.getTableName() << " failed: "
- << dict->getNdbError() << endl;
- return false;
- }
- info << "Successfully restored table " << table.getTableName()<< endl ;
- }
-
- const NdbDictionary::Table* tab = dict->getTable(split[2].c_str());
- if(tab == 0){
- err << "Unable to find table: " << split[2].c_str() << endl;
- return false;
- }
- if(m_restore_meta){
- m_ndb->setAutoIncrementValue(tab, ~(Uint64)0, false);
- }
- const NdbDictionary::Table* null = 0;
- m_new_tables.fill(table.m_dictTable->getTableId(), null);
- m_new_tables[table.m_dictTable->getTableId()] = tab;
- return true;
-}
-
-bool
-BackupRestore::endOfTables(){
- if(!m_restore_meta)
- return true;
-
- NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
- for(size_t i = 0; i<m_indexes.size(); i++){
- NdbTableImpl & indtab = NdbTableImpl::getImpl(* m_indexes[i]);
-
- BaseString tmp(indtab.m_primaryTable.c_str());
- Vector<BaseString> split;
- if(tmp.split(split, "/") != 3){
- err << "Invalid table name format " << indtab.m_primaryTable.c_str()
- << endl;
- return false;
- }
-
- m_ndb->setDatabaseName(split[0].c_str());
- m_ndb->setSchemaName(split[1].c_str());
-
- const NdbDictionary::Table * prim = dict->getTable(split[2].c_str());
- if(prim == 0){
- err << "Unable to find base table \"" << split[2].c_str()
- << "\" for index "
- << indtab.getName() << endl;
- return false;
- }
- NdbTableImpl& base = NdbTableImpl::getImpl(*prim);
- NdbIndexImpl* idx;
- int id;
- char idxName[255], buf[255];
- if(sscanf(indtab.getName(), "%[^/]/%[^/]/%d/%s",
- buf, buf, &id, idxName) != 4){
- err << "Invalid index name format " << indtab.getName() << endl;
- return false;
- }
- if(NdbDictInterface::create_index_obj_from_table(&idx, &indtab, &base))
- {
- err << "Failed to create index " << idxName
- << " on " << split[2].c_str() << endl;
- return false;
- }
- idx->setName(idxName);
- if(dict->createIndex(* idx) != 0)
- {
- delete idx;
- err << "Failed to create index " << idxName
- << " on " << split[2].c_str() << endl
- << dict->getNdbError() << endl;
-
- return false;
- }
- delete idx;
- info << "Successfully created index " << idxName
- << " on " << split[2].c_str() << endl;
- }
- return true;
-}
-
-void BackupRestore::tuple(const TupleS & tup)
-{
- if (!m_restore)
- return;
-
- while (m_free_callback == 0)
- {
- assert(m_transactions == m_parallelism);
- // send-poll all transactions
- // close transaction is done in callback
- m_ndb->sendPollNdb(3000, 1);
- }
-
- restore_callback_t * cb = m_free_callback;
-
- if (cb == 0)
- assert(false);
-
- m_free_callback = cb->next;
- cb->retries = 0;
- cb->tup = tup; // must do copy!
- tuple_a(cb);
-
-}
-
-void BackupRestore::tuple_a(restore_callback_t *cb)
-{
- while (cb->retries < 10)
- {
- /**
- * start transactions
- */
- cb->connection = m_ndb->startTransaction();
- if (cb->connection == NULL)
- {
- /*
- if (errorHandler(cb))
- {
- continue;
- }
- */
- exitHandler();
- } // if
-
- const TupleS &tup = cb->tup;
- const NdbDictionary::Table * table = get_table(tup.getTable()->m_dictTable);
-
- NdbOperation * op = cb->connection->getNdbOperation(table);
-
- if (op == NULL)
- {
- if (errorHandler(cb))
- continue;
- exitHandler();
- } // if
-
- if (op->writeTuple() == -1)
- {
- if (errorHandler(cb))
- continue;
- exitHandler();
- } // if
-
- int ret = 0;
- for (int j = 0; j < 2; j++)
- {
- for (int i = 0; i < tup.getNoOfAttributes(); i++)
- {
- const AttributeDesc * attr_desc = tup.getDesc(i);
- const AttributeData * attr_data = tup.getData(i);
- int size = attr_desc->size;
- int arraySize = attr_desc->arraySize;
- char * dataPtr = attr_data->string_value;
- Uint32 length = (size * arraySize) / 8;
-
- if (j == 0 && tup.getTable()->have_auto_inc(i))
- tup.getTable()->update_max_auto_val(dataPtr,size);
-
- if (attr_desc->m_column->getPrimaryKey())
- {
- if (j == 1) continue;
- ret = op->equal(i, dataPtr, length);
- }
- else
- {
- if (j == 0) continue;
- if (attr_data->null)
- ret = op->setValue(i, NULL, 0);
- else
- ret = op->setValue(i, dataPtr, length);
- }
- if (ret < 0) {
- ndbout_c("Column: %d type %d %d %d %d",i,
- attr_desc->m_column->getType(),
- size, arraySize, attr_data->size);
- break;
- }
- }
- if (ret < 0)
- break;
- }
- if (ret < 0)
- {
- if (errorHandler(cb))
- continue;
- exitHandler();
- }
-
- // Prepare transaction (the transaction is NOT yet sent to NDB)
- cb->connection->executeAsynchPrepare(NdbTransaction::Commit,
- &callback, cb);
- m_transactions++;
- return;
- }
- err << "Retried transaction " << cb->retries << " times.\nLast error"
- << m_ndb->getNdbError(cb->error_code) << endl
- << "...Unable to recover from errors. Exiting..." << endl;
- exitHandler();
-}
-
-void BackupRestore::cback(int result, restore_callback_t *cb)
-{
- m_transactions--;
-
- if (result < 0)
- {
- /**
- * Error. temporary or permanent?
- */
- if (errorHandler(cb))
- tuple_a(cb); // retry
- else
- {
- err << "Restore: Failed to restore data due to a unrecoverable error. Exiting..." << endl;
- exitHandler();
- }
- }
- else
- {
- /**
- * OK! close transaction
- */
- m_ndb->closeTransaction(cb->connection);
- cb->connection= 0;
- cb->next= m_free_callback;
- m_free_callback= cb;
- m_dataCount++;
- }
-}
-
-/**
- * returns true if is recoverable,
- * Error handling based on hugo
- * false if it is an error that generates an abort.
- */
-bool BackupRestore::errorHandler(restore_callback_t *cb)
-{
- NdbError error= cb->connection->getNdbError();
- m_ndb->closeTransaction(cb->connection);
- cb->connection= 0;
-
- Uint32 sleepTime = 100 + cb->retries * 300;
-
- cb->retries++;
- cb->error_code = error.code;
-
- switch(error.status)
- {
- case NdbError::Success:
- return false;
- // ERROR!
- break;
-
- case NdbError::TemporaryError:
- NdbSleep_MilliSleep(sleepTime);
- return true;
- // RETRY
- break;
-
- case NdbError::UnknownResult:
- err << error << endl;
- return false;
- // ERROR!
- break;
-
- default:
- case NdbError::PermanentError:
- //ERROR
- err << error << endl;
- return false;
- break;
- }
- return false;
-}
-
-void BackupRestore::exitHandler()
-{
- release();
- exit(-1);
-}
-
-
-void
-BackupRestore::tuple_free()
-{
- if (!m_restore)
- return;
-
- // Poll all transactions
- while (m_transactions)
- {
- m_ndb->sendPollNdb(3000);
- }
-}
-
-void
-BackupRestore::endOfTuples()
-{
- tuple_free();
-}
-
-void
-BackupRestore::logEntry(const LogEntry & tup)
-{
- if (!m_restore)
- return;
-
- NdbTransaction * trans = m_ndb->startTransaction();
- if (trans == NULL)
- {
- // Deep shit, TODO: handle the error
- err << "Cannot start transaction" << endl;
- exit(-1);
- } // if
-
- const NdbDictionary::Table * table = get_table(tup.m_table->m_dictTable);
- NdbOperation * op = trans->getNdbOperation(table);
- if (op == NULL)
- {
- err << "Cannot get operation: " << trans->getNdbError() << endl;
- exit(-1);
- } // if
-
- int check = 0;
- switch(tup.m_type)
- {
- case LogEntry::LE_INSERT:
- check = op->insertTuple();
- break;
- case LogEntry::LE_UPDATE:
- check = op->updateTuple();
- break;
- case LogEntry::LE_DELETE:
- check = op->deleteTuple();
- break;
- default:
- err << "Log entry has wrong operation type."
- << " Exiting...";
- exit(-1);
- }
-
- for (Uint32 i= 0; i < tup.size(); i++)
- {
- const AttributeS * attr = tup[i];
- int size = attr->Desc->size;
- int arraySize = attr->Desc->arraySize;
- const char * dataPtr = attr->Data.string_value;
-
- if (tup.m_table->have_auto_inc(attr->Desc->attrId))
- tup.m_table->update_max_auto_val(dataPtr,size);
-
- const Uint32 length = (size / 8) * arraySize;
- if (attr->Desc->m_column->getPrimaryKey())
- op->equal(attr->Desc->attrId, dataPtr, length);
- else
- op->setValue(attr->Desc->attrId, dataPtr, length);
- }
-
- const int ret = trans->execute(NdbTransaction::Commit);
- if (ret != 0)
- {
- // Both insert update and delete can fail during log running
- // and it's ok
- // TODO: check that the error is either tuple exists or tuple does not exist?
- switch(tup.m_type)
- {
- case LogEntry::LE_INSERT:
- break;
- case LogEntry::LE_UPDATE:
- break;
- case LogEntry::LE_DELETE:
- break;
- }
- if (false)
- {
- err << "execute failed: " << trans->getNdbError() << endl;
- exit(-1);
- }
- }
-
- m_ndb->closeTransaction(trans);
- m_logCount++;
-}
-
-void
-BackupRestore::endOfLogEntrys()
-{
- if (!m_restore)
- return;
-
- info << "Restored " << m_dataCount << " tuples and "
- << m_logCount << " log entries" << endl;
-}
-
-/*
- * callback : This is called when the transaction is polled
- *
- * (This function must have three arguments:
- * - The result of the transaction,
- * - The NdbTransaction object, and
- * - A pointer to an arbitrary object.)
- */
-
-static void
-callback(int result, NdbTransaction* trans, void* aObject)
-{
- restore_callback_t *cb = (restore_callback_t *)aObject;
- (cb->restore)->cback(result, cb);
-}
-
-#if 0 // old tuple impl
-void
-BackupRestore::tuple(const TupleS & tup)
-{
- if (!m_restore)
- return;
- while (1)
- {
- NdbTransaction * trans = m_ndb->startTransaction();
- if (trans == NULL)
- {
- // Deep shit, TODO: handle the error
- ndbout << "Cannot start transaction" << endl;
- exit(-1);
- } // if
-
- const TableS * table = tup.getTable();
- NdbOperation * op = trans->getNdbOperation(table->getTableName());
- if (op == NULL)
- {
- ndbout << "Cannot get operation: ";
- ndbout << trans->getNdbError() << endl;
- exit(-1);
- } // if
-
- // TODO: check return value and handle error
- if (op->writeTuple() == -1)
- {
- ndbout << "writeTuple call failed: ";
- ndbout << trans->getNdbError() << endl;
- exit(-1);
- } // if
-
- for (int i = 0; i < tup.getNoOfAttributes(); i++)
- {
- const AttributeS * attr = tup[i];
- int size = attr->Desc->size;
- int arraySize = attr->Desc->arraySize;
- const char * dataPtr = attr->Data.string_value;
-
- const Uint32 length = (size * arraySize) / 8;
- if (attr->Desc->m_column->getPrimaryKey())
- op->equal(i, dataPtr, length);
- }
-
- for (int i = 0; i < tup.getNoOfAttributes(); i++)
- {
- const AttributeS * attr = tup[i];
- int size = attr->Desc->size;
- int arraySize = attr->Desc->arraySize;
- const char * dataPtr = attr->Data.string_value;
-
- const Uint32 length = (size * arraySize) / 8;
- if (!attr->Desc->m_column->getPrimaryKey())
- if (attr->Data.null)
- op->setValue(i, NULL, 0);
- else
- op->setValue(i, dataPtr, length);
- }
- int ret = trans->execute(NdbTransaction::Commit);
- if (ret != 0)
- {
- ndbout << "execute failed: ";
- ndbout << trans->getNdbError() << endl;
- exit(-1);
- }
- m_ndb->closeTransaction(trans);
- if (ret == 0)
- break;
- }
- m_dataCount++;
-}
-#endif
-
-template class Vector<NdbDictionary::Table*>;
-template class Vector<const NdbDictionary::Table*>;