/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "Channel.hpp" Channel::Channel() { reset(); } Channel::~Channel() { /** * Destroy list of selected tables */ for(Uint32 i=0; i < m_selectedTables.size(); i++) { delete m_selectedTables[i]; m_selectedTables[i] = 0; } m_selectedTables=0; } void Channel::reset() { for (Uint32 i=0; ionlyLeft(GREP_SYSTEM_TABLE_MAX_RANGE); i->onlyUpToValue(m_stopEpochId); if (i->isEmpty()) return false; add(SSReq, nodeGrp, *i); invariant(); return true; } bool Channel::requestApply(Uint32 nodeGrp, Uint32 * epoch) { invariant(); Interval tmp1, tmp2; // tmp2 = SS - AppReq - App intervalLeftMinus(state[nodeGrp][SS], state[nodeGrp][AppReq], &tmp1); intervalLeftMinus(tmp1, state[nodeGrp][App], &tmp2); tmp2.onlyUpToValue(m_stopEpochId); if (tmp2.isEmpty()) return false; tmp2.onlyLeft(1); // Check that all GCI Buffers for epoch exists in SS for (Uint32 i=0; iisEmpty()) return false; i->onlyLeft(GREP_SYSTEM_TABLE_MAX_RANGE); invariant(); add(DelReq, nodeGrp, *i); invariant(); return true; } void Channel::add(Position pos, Uint32 nodeGrp, const Interval i) { Interval r; intervalAdd(state[nodeGrp][pos], i, &r); state[nodeGrp][pos].set(r); } void Channel::clear(Position p, Uint32 nodeGrp, const Interval i) { Interval r; intervalLeftMinus(state[nodeGrp][p], i, &r); state[nodeGrp][p].set(r); } bool Channel::isSynchable(Uint32 nodeGrp) { return true; /* @todo This should be implemented... Interval tmp1, tmp2; intervalAdd(state[nodeGrp][PS], state[nodeGrp][SSReq], &tmp1); intervalAdd(tmp1, state[nodeGrp][SSReq], &tmp2); intervalAdd(tmp2, state[nodeGrp][SS], &tmp1); intervalAdd(tmp1, state[nodeGrp][AppReq], &tmp2); intervalAdd(tmp2, state[nodeGrp][App], &tmp1); if (intervalInclude(state[nodeGrp][PS], tmp1.right())) return true; else return false; */ } /** * Return the cut of all App:s. */ void Channel::getFullyAppliedEpochs(Interval * interval) { if (m_noOfNodeGroups < 1) { *interval = emptyInterval; return; } *interval = universeInterval; for (Uint32 i=0; ifirst() < state[i][App].first()) { interval->setFirst(state[i][App].first()); } if (state[i][App].last() < interval->last()) { interval->setLast(state[i][App].last()); } } interval->normalize(); return; } /** * Return true if it is ok to remove the subscription and then stop channel */ bool Channel::isStoppable() { /** * Check that AppReq are empty for all nodegrps */ for (Uint32 i=0; i interval.last()) { RLOG(("Stop disallowed. AppReq empty. Stop %d, LastApplied %d", m_stopEpochId, interval.last())); return false; } return true; } GrepError::Code Channel::setStopEpochId(Uint32 n) { /** * If n equal to zero, use next possible epoch (max(App, AppReq)) */ if (n == 0) { for (Uint32 i=0; i n) ? state[i][App].last() : n; n = (state[i][AppReq].last() > n) ? state[i][AppReq].last() : n; } } /** * If n >= max(App, AppReq) then set value, else return error code */ for (Uint32 i=0; iMAX_TAB_NAME_SIZE) return GrepError::REP_NOT_PROPER_TABLE; /** * No of separators are the number of table_name_separator found in tableName * since a table is defined as //tablename. * if noOfSeparators is not equal to 2, then it is not a valid * table name. */ Uint32 noOfSeps = 0; if(strlen(tableName) < 5) return GrepError::REP_NOT_PROPER_TABLE; for(Uint32 i =0; i < strlen(tableName); i++) if(tableName[i]==table_name_separator) noOfSeps++; if(noOfSeps!=2) return GrepError::REP_NOT_PROPER_TABLE; table * t= new table(tableName); for(Uint32 i=0; itableName)==0) return GrepError::REP_TABLE_ALREADY_SELECTED; } m_selectedTables.push_back(t); return GrepError::NO_ERROR; } GrepError::Code Channel::removeTable(const char * tableName) { if(strlen(tableName)>MAX_TAB_NAME_SIZE) return GrepError::REP_NOT_PROPER_TABLE; /** * No of separators are the number of table_name_separator found in tableName * since a table is defined as //tablename. * If noOfSeparators is not equal to 2, * then it is not a valid table name. */ Uint32 noOfSeps = 0; if(strlen(tableName) < 5) return GrepError::REP_NOT_PROPER_TABLE; for(Uint32 i =0; i < strlen(tableName); i++) if(tableName[i]==table_name_separator) noOfSeps++; if(noOfSeps!=2) return GrepError::REP_NOT_PROPER_TABLE; for(Uint32 i=0; itableName)==0) { delete m_selectedTables[i]; m_selectedTables.erase(i); return GrepError::NO_ERROR; } } return GrepError::REP_TABLE_NOT_FOUND; } void Channel::printTables() { if(m_selectedTables.size() == 0) ndbout_c("| ALL TABLES " " |"); else { for(Uint32 i=0; itableName); } } Vector * Channel::getSelectedTables() { if(m_selectedTables.size() == 0) return 0; return &m_selectedTables; } /***************************************************************************** * PRINT *****************************************************************************/ void Channel::print(Position pos) { switch(pos){ case PS: ndbout << "PS Rep"; break; case SSReq: ndbout << "Tra-Req"; break; case SS: ndbout << "SS Rep"; break; case AppReq: ndbout << "App-Req"; break; case App: ndbout << "Applied"; break; case DelReq: ndbout << "Del-Req"; break; default: REPABORT("Unknown replication position"); } } void Channel::print() { for (Uint32 i=0; i