summaryrefslogtreecommitdiff
path: root/storage/ndb/src/old_files/rep/state/RepState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/old_files/rep/state/RepState.cpp')
-rw-r--r--storage/ndb/src/old_files/rep/state/RepState.cpp869
1 files changed, 869 insertions, 0 deletions
diff --git a/storage/ndb/src/old_files/rep/state/RepState.cpp b/storage/ndb/src/old_files/rep/state/RepState.cpp
new file mode 100644
index 00000000000..d8a50961a3c
--- /dev/null
+++ b/storage/ndb/src/old_files/rep/state/RepState.cpp
@@ -0,0 +1,869 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "RepState.hpp"
+
+#include <signaldata/SumaImpl.hpp>
+#include <NdbApiSignal.hpp>
+#include <Properties.hpp>
+//#define DBUG_REQUESTOR
+
+#ifdef DBUG_REQUESTOR
+#define DBUG_REQUESTOR_PRINT(X) ndbout_c(X);
+#else
+#define DBUG_REQUESTOR_PRINT(X)
+#endif
+
+/****************************************************************************
+ * Constructor / Destructor / Init
+ ****************************************************************************/
+RepState::RepState()
+{
+ m_connected = UNKNOWN;
+ m_repConnected = UNKNOWN;
+ m_mutex = NdbMutex_Create();
+ m_stopEpoch = 0;
+ m_subIdToRemove = 0;
+ m_subKeyToRemove = 0;
+}
+
+RepState::~RepState()
+{
+ NdbMutex_Destroy(m_mutex);
+}
+
+void
+RepState::setSubscriptionRequests(FuncRequestCreateSubscriptionId f1,
+ FuncRequestCreateSubscription f2,
+ FuncRequestRemoveSubscription f3)
+{
+ m_funcRequestCreateSubscriptionId = f1;
+ m_funcRequestCreateSubscription = f2;
+ m_funcRequestRemoveSubscription = f3;
+}
+
+void
+RepState::setIntervalRequests(FuncRequestTransfer f1,
+ FuncRequestApply f2,
+ FuncRequestDeleteSS f3,
+ FuncRequestDeletePS f4)
+{
+ m_funcRequestTransfer = f1;
+ m_funcRequestApply = f2;
+ m_funcRequestDeleteSS = f3;
+ m_funcRequestDeletePS = f4;
+}
+
+void
+RepState::setStartRequests(FuncRequestStartMetaLog * f5,
+ FuncRequestStartDataLog * f6,
+ FuncRequestStartMetaScan * f7,
+ FuncRequestStartDataScan * f8,
+ FuncRequestEpochInfo * f9)
+{
+ m_funcRequestStartMetaLog = f5;
+ m_funcRequestStartDataLog = f6;
+ m_funcRequestStartMetaScan = f7;
+ m_funcRequestStartDataScan = f8;
+ m_funcRequestEpochInfo = f9;
+}
+
+
+/****************************************************************************
+ * Private Helper functions
+ ****************************************************************************/
+
+void
+RepState::requestTransfer(NdbApiSignal * signal)
+{
+ DBUG_REQUESTOR_PRINT("RepState: Transfer calculations started");
+ for(Uint32 nodeGrp=0; nodeGrp<m_channel.getNoOfNodeGroups(); nodeGrp++) {
+ DBUG_REQUESTOR_PRINT("RepState: Transfer calc for node grp");
+ Interval i;
+ if (m_channel.requestTransfer(nodeGrp, &i)) {
+ m_funcRequestTransfer(m_extSender, signal, nodeGrp, i.first(), i.last());
+ }
+ }
+}
+
+void
+RepState::requestApply(NdbApiSignal * signal)
+{
+ DBUG_REQUESTOR_PRINT("RepState: Apply calculations started");
+ for(Uint32 nodeGrp=0; nodeGrp<m_channel.getNoOfNodeGroups(); nodeGrp++) {
+ DBUG_REQUESTOR_PRINT("RepState: Apply calc for node grp");
+ Uint32 gci;
+ if (m_channel.requestApply(nodeGrp, &gci)) {
+ Uint32 force = (m_channel.getState() == Channel::LOG) ? 0 : 1;
+ m_funcRequestApply(m_applier, signal, nodeGrp, gci, gci, force);
+ }
+ }
+}
+
+void
+RepState::requestDelete(NdbApiSignal * signal)
+{
+ DBUG_REQUESTOR_PRINT("RepState: Delete calculations started");
+ for(Uint32 nodeGrp=0; nodeGrp<m_channel.getNoOfNodeGroups(); nodeGrp++) {
+ DBUG_REQUESTOR_PRINT("RepState: Delete calc for node grp");
+ Interval i;
+ if (m_channel.requestDelete(nodeGrp, &i)){
+ m_funcRequestDeleteSS(m_gciContainer, signal, nodeGrp,
+ i.first(), i.last());
+ m_funcRequestDeletePS(m_extSender, signal, nodeGrp, i.first(), i.last());
+ }
+ }
+}
+
+void
+RepState::requestEpochInfo(NdbApiSignal * signal)
+{
+ DBUG_REQUESTOR_PRINT("RepState: Epoch Info calculations");
+ for(Uint32 nodeGrp=0; nodeGrp<m_channel.getNoOfNodeGroups(); nodeGrp++) {
+ m_funcRequestEpochInfo(m_extSender, signal, nodeGrp);
+ }
+}
+
+/****************************************************************************
+ * Public
+ ****************************************************************************/
+
+GrepError::Code
+RepState::add(Channel::Position s, Uint32 nodeGrp, const Interval i)
+{
+ m_channel.add(s, nodeGrp, i);
+
+ if(s == Channel::PS)
+ {
+ m_connected = CONNECTED;
+ m_connected_counter = 0;
+ }
+
+ Interval fullEpochs;
+ m_channel.getFullyAppliedEpochs(&fullEpochs);
+ if(s == Channel::App &&
+ m_channel.getState() == Channel::DATASCAN_COMPLETED &&
+ fullEpochs.last() >= m_channel.getDataScanEpochs().last() &&
+ fullEpochs.last() >= m_channel.getMetaScanEpochs().last())
+ {
+ RLOG(("[%d-%d] fully applied. Channel state changed to LOG",
+ fullEpochs.first(), fullEpochs.last()));
+ m_channel.setState(Channel::LOG);
+ disableAutoStart();
+ }
+
+ return GrepError::NO_ERROR;
+}
+
+GrepError::Code
+RepState::clear(Channel::Position s, Uint32 nodeGrp, const Interval i)
+{
+ m_channel.clear(s, nodeGrp, i);
+ return GrepError::NO_ERROR;
+}
+
+/****************************************************************************
+ * Execute
+ *
+ * This method should only be called from Requestor!
+ ****************************************************************************/
+
+GrepError::Code
+RepState::protectedExecute()
+{
+ GrepError::Code err;
+
+ NdbMutex_Lock(m_mutex);
+
+ NdbApiSignal* signal = m_extSender->getSignal();
+ if (signal == NULL) {
+ err = GrepError::COULD_NOT_ALLOCATE_MEM_FOR_SIGNAL;
+ } else {
+ err = execute(signal);
+ }
+ NdbMutex_Unlock(m_mutex);
+ return err;
+}
+
+GrepError::Code
+RepState::execute(NdbApiSignal* signal)
+{
+ Uint32 subId = m_channel.getSubId();
+ Uint32 subKey = m_channel.getSubKey();
+
+ if (!m_channel.m_requestorEnabled)
+ return GrepError::NO_ERROR;
+
+ /**
+ * @todo Should have subscriptions in here
+ */
+ requestEpochInfo(signal);
+
+ /**
+ * Update connected counter (Silence time)
+ */
+ m_connected_counter++;
+ if (m_connected_counter > REQUESTOR_EXECUTES_NEEDED_FOR_UNKNOWN_CONNECTION) {
+ m_connected = UNKNOWN;
+ }
+
+ switch (m_channel.getState())
+ {
+ case Channel::CONSISTENT:
+ if (isAutoStartEnabled()) {
+ switch (m_channel.getStateSub())
+ {
+ case Channel::NO_SUBSCRIPTION_EXISTS:
+ m_funcRequestCreateSubscriptionId(m_extSender, signal);
+ m_channel.setStateSub(Channel::CREATING_SUBSCRIPTION_ID);
+ break;
+
+ case Channel::CREATING_SUBSCRIPTION_ID:
+ break;
+
+ case Channel::SUBSCRIPTION_ID_CREATED:
+ if(m_channel.isSelective())
+ m_funcRequestCreateSubscription(m_extSender, signal,
+ m_channel.getSubId(),
+ m_channel.getSubKey(),
+ m_channel.getSelectedTables());
+ else
+ m_funcRequestCreateSubscription(m_extSender, signal,
+ m_channel.getSubId(),
+ m_channel.getSubKey(),
+ 0);
+ m_channel.setStateSub(Channel::STARTING_SUBSCRIPTION);
+ break;
+
+ case Channel::STARTING_SUBSCRIPTION:
+ break;
+
+ case Channel::SUBSCRIPTION_STARTED:
+ m_funcRequestStartMetaLog(m_extSender, signal,
+ m_channel.getSubId(),
+ m_channel.getSubKey());
+ m_channel.setState(Channel::METALOG_STARTING);
+ break;
+ }
+ }
+ break;
+
+ case Channel::METALOG_STARTING:
+ break;
+
+ case Channel::METALOG_STARTED:
+ if (isAutoStartEnabled()) {
+ m_funcRequestStartMetaScan(m_extSender, signal, subId, subKey);
+ m_channel.setState(Channel::METASCAN_STARTING);
+ }
+ break;
+
+ case Channel::METASCAN_STARTING:
+ break;
+
+ case Channel::METASCAN_COMPLETED:
+ if (isAutoStartEnabled()) {
+ m_funcRequestStartDataLog(m_extSender, signal, subId, subKey);
+ m_channel.setState(Channel::DATALOG_STARTING);
+ }
+ break;
+
+ case Channel::DATALOG_STARTING:
+ break;
+
+ case Channel::DATALOG_STARTED:
+ if (isAutoStartEnabled()) {
+ m_funcRequestStartDataScan(m_extSender, signal, subId, subKey);
+ m_channel.setState(Channel::DATASCAN_STARTING);
+ }
+ break;
+
+ case Channel::DATASCAN_STARTING:
+ break;
+
+ case Channel::DATASCAN_COMPLETED:
+ break;
+
+ case Channel::LOG:
+ if (m_channel.shouldStop()) {
+ disableTransfer();
+ m_channel.setState(Channel::STOPPING);
+ }
+ break;
+
+ case Channel::STOPPING:
+ if (m_channel.m_transferEnabled)
+ {
+ REPABORT("Illegal stopping state while transfer is still enabled");
+ }
+ /**
+ * check if channel has a subscription, if not,
+ * check if we have marked a subscription that we want to remove
+ * and remove it. This is used to clean up "dangling subscriptions"
+ * after various crashes
+ */
+ if(!m_channel.subscriptionExists())
+ {
+ if(m_subIdToRemove && m_subKeyToRemove)
+ {
+ m_funcRequestRemoveSubscription(m_extSender, signal,
+ m_subIdToRemove,
+ m_subKeyToRemove);
+ eventSubscriptionDeleted( m_subIdToRemove,
+ m_subKeyToRemove);
+ return GrepError::NO_ERROR;
+ }
+ else {
+ return GrepError::SUBSCRIPTION_ID_NOT_FOUND;
+ }
+ } else {
+ if (m_channel.isStoppable())
+ {
+
+ m_funcRequestRemoveSubscription(m_extSender, signal,
+ m_channel.getSubId(),
+ m_channel.getSubKey());
+ eventSubscriptionDeleted(m_channel.getSubId(),
+ m_channel.getSubKey());
+ }
+ else
+ return GrepError::CHANNEL_NOT_STOPPABLE;
+
+ }
+ break;
+
+ default:
+ REPABORT("Illegal replication state");
+ }
+ if (m_channel.m_transferEnabled) requestTransfer(signal);
+ if (m_channel.m_applyEnabled) requestApply(signal);
+ if (m_channel.m_deleteEnabled) requestDelete(signal);
+ return GrepError::NO_ERROR;
+}
+
+/****************************************************************************
+ * Request
+ *
+ * This method should only be called from Main Thread!
+ ****************************************************************************/
+
+GrepError::Code
+RepState::protectedRequest(GrepReq::Request req, Uint32 arg)
+{
+ return protectedRequest(req, arg, 0);
+}
+
+GrepError::Code
+RepState::protectedRequest(GrepReq::Request req, Uint32 arg1, Uint32 arg2)
+{
+ GrepError::Code code;
+ NdbMutex_Lock(m_mutex);
+
+ NdbApiSignal* signal = m_extSender->getSignal();
+ if (signal == NULL) {
+ code = GrepError::COULD_NOT_ALLOCATE_MEM_FOR_SIGNAL;
+ } else {
+ code = request(req, arg1, arg2, signal);
+ }
+
+ NdbMutex_Unlock(m_mutex);
+ return code;
+}
+
+GrepError::Code
+RepState::protectedAddTable(const char * fullTableName)
+{
+ GrepError::Code code;
+ NdbMutex_Lock(m_mutex);
+ code = m_channel.addTable(fullTableName);
+ NdbMutex_Unlock(m_mutex);
+ return code;
+}
+
+GrepError::Code
+RepState::protectedRemoveTable(const char * fullTableName)
+{
+ GrepError::Code code;
+ if(m_channel.getStateSub() != Channel::NO_SUBSCRIPTION_EXISTS)
+ return GrepError::START_ALREADY_IN_PROGRESS;
+ NdbMutex_Lock(m_mutex);
+ code = m_channel.removeTable(fullTableName);
+ NdbMutex_Unlock(m_mutex);
+ return code;
+}
+
+GrepError::Code
+RepState::request(GrepReq::Request request, Uint32 arg1, Uint32 arg2,
+ NdbApiSignal* signal)
+{
+ switch (request)
+ {
+ /*************************************************************************
+ * STATUS etc
+ *************************************************************************/
+
+ case GrepReq::STATUS:
+ printStatus();
+ break;
+
+ case GrepReq::REMOVE_BUFFERS:
+ return GrepError::NOT_YET_IMPLEMENTED;
+
+ /*************************************************************************
+ * START
+ *************************************************************************/
+
+ case GrepReq::CREATE_SUBSCR:
+ if (m_channel.getStateSub() != Channel::NO_SUBSCRIPTION_EXISTS)
+ return GrepError::SUBSCRIPTION_ID_ALREADY_EXIST;
+
+ m_funcRequestCreateSubscriptionId(m_extSender, signal);
+ m_channel.setStateSub(Channel::CREATING_SUBSCRIPTION_ID);
+ return GrepError::NO_ERROR;
+
+ case GrepReq::START_SUBSCR:
+ if (m_channel.getState() == Channel::STOPPING)
+ return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
+ if (m_channel.getStateSub() != Channel::SUBSCRIPTION_ID_CREATED)
+ return GrepError::SUBSCRIPTION_ID_NOT_FOUND;
+ if(m_channel.isSelective())
+ m_funcRequestCreateSubscription(m_extSender, signal,
+ m_channel.getSubId(),
+ m_channel.getSubKey(),
+ m_channel.getSelectedTables());
+ else
+ m_funcRequestCreateSubscription(m_extSender, signal,
+ m_channel.getSubId(),
+ m_channel.getSubKey(),
+ 0);
+ m_channel.setStateSub(Channel::STARTING_SUBSCRIPTION);
+ return GrepError::NO_ERROR;
+
+ case GrepReq::START_METALOG:
+ if (m_channel.getState() == Channel::STOPPING)
+ return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
+ if (m_channel.getStateSub() != Channel::SUBSCRIPTION_STARTED)
+ return GrepError::SUBSCRIPTION_NOT_STARTED;
+ if (m_channel.getState() != Channel::CONSISTENT)
+ return GrepError::START_OF_COMPONENT_IN_WRONG_STATE;
+
+ m_funcRequestStartMetaLog(m_extSender, signal,
+ m_channel.getSubId(),
+ m_channel.getSubKey());
+ m_channel.setState(Channel::METALOG_STARTING);
+ return GrepError::NO_ERROR;
+
+ case GrepReq::START_METASCAN:
+ if (m_channel.getState() == Channel::STOPPING)
+ return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
+ if (m_channel.getStateSub() != Channel::SUBSCRIPTION_STARTED)
+ return GrepError::SUBSCRIPTION_NOT_STARTED;
+ if (m_channel.getState() != Channel::METALOG_STARTED)
+ return GrepError::START_OF_COMPONENT_IN_WRONG_STATE;
+
+ m_funcRequestStartMetaScan(m_extSender, signal,
+ m_channel.getSubId(),
+ m_channel.getSubKey());
+ m_channel.setState(Channel::METASCAN_STARTING);
+ return GrepError::NO_ERROR;
+
+ case GrepReq::START_DATALOG:
+ if (m_channel.getState() == Channel::STOPPING)
+ return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
+ if (m_channel.getStateSub() != Channel::SUBSCRIPTION_STARTED)
+ return GrepError::SUBSCRIPTION_NOT_STARTED;
+ if (m_channel.getState() != Channel::METASCAN_COMPLETED)
+ return GrepError::START_OF_COMPONENT_IN_WRONG_STATE;
+
+ m_funcRequestStartDataLog(m_extSender, signal,
+ m_channel.getSubId(),
+ m_channel.getSubKey());
+ m_channel.setState(Channel::DATALOG_STARTING);
+ return GrepError::NO_ERROR;
+
+ case GrepReq::START_DATASCAN:
+ if (m_channel.getState() == Channel::STOPPING)
+ return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
+ if (m_channel.getStateSub() != Channel::SUBSCRIPTION_STARTED)
+ return GrepError::SUBSCRIPTION_NOT_STARTED;
+ if (m_channel.getState() != Channel::DATALOG_STARTED)
+ return GrepError::START_OF_COMPONENT_IN_WRONG_STATE;
+
+ m_funcRequestStartDataScan(m_extSender, signal,
+ m_channel.getSubId(),
+ m_channel.getSubKey());
+ m_channel.setState(Channel::DATASCAN_STARTING);
+ return GrepError::NO_ERROR;
+
+ case GrepReq::START_REQUESTOR:
+ enable();
+ return GrepError::NO_ERROR;
+
+ case GrepReq::START_TRANSFER:
+ if (m_channel.getState() == Channel::STOPPING)
+ return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
+ enableTransfer();
+ return GrepError::NO_ERROR;
+
+ case GrepReq::START_APPLY:
+ enableApply();
+ return GrepError::NO_ERROR;
+
+ case GrepReq::START_DELETE:
+ enableDelete();
+ return GrepError::NO_ERROR;
+
+ case GrepReq::START:
+ if (isAutoStartEnabled())
+ return GrepError::START_ALREADY_IN_PROGRESS;
+
+ enableAutoStart();
+ return GrepError::NO_ERROR;
+
+ /*************************************************************************
+ * STOP
+ *************************************************************************/
+
+ case GrepReq::STOP:
+ if (m_channel.getStateSub() == Channel::NO_SUBSCRIPTION_EXISTS)
+ return GrepError::SUBSCRIPTION_NOT_STARTED;
+ if (m_channel.getState() == Channel::STOPPING)
+ return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
+
+ if (arg1 == 0) {
+ /**
+ * Stop immediately
+ */
+ disableTransfer();
+ m_channel.setState(Channel::STOPPING);
+ m_channel.setStopEpochId(0);
+ return GrepError::NO_ERROR;
+ } else {
+ /**
+ * Set future stop epoch
+ */
+ return m_channel.setStopEpochId(arg1);
+ }
+
+ case GrepReq::STOP_SUBSCR:
+ {
+ if(m_subIdToRemove == 0 && m_subKeyToRemove == 0) {
+ m_subIdToRemove = arg1;
+ m_subKeyToRemove = arg2;
+ } else {
+ return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
+ }
+
+ if(m_channel.getSubId() != 0 && m_channel.getSubKey() != 0)
+ return GrepError::ILLEGAL_USE_OF_COMMAND;
+ if (m_channel.getState() == Channel::STOPPING)
+ return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
+ disableTransfer();
+ m_channel.setState(Channel::STOPPING);
+ return GrepError::NO_ERROR;
+ }
+ case GrepReq::STOP_METALOG:
+ case GrepReq::STOP_METASCAN:
+ case GrepReq::STOP_DATALOG:
+ case GrepReq::STOP_DATASCAN:
+ return GrepError::NOT_YET_IMPLEMENTED;
+
+ case GrepReq::STOP_REQUESTOR:
+ disable();
+ return GrepError::NO_ERROR;
+
+ case GrepReq::STOP_TRANSFER:
+ disableTransfer();
+ return GrepError::NO_ERROR;
+
+ case GrepReq::STOP_APPLY:
+ disableApply();
+ return GrepError::NO_ERROR;
+
+ case GrepReq::STOP_DELETE:
+ disableDelete();
+ return GrepError::NO_ERROR;
+
+ default:
+ ndbout_c("RepCommandInterpreter: Illegal request received");
+ return GrepError::NOT_YET_IMPLEMENTED;
+ }
+ return GrepError::NOT_YET_IMPLEMENTED;
+}
+
+/****************************************************************************
+ *
+ ****************************************************************************/
+
+/*
+GrepError::Code
+RepState::slowStop()
+{
+ switch(m_channel.getState())
+ {
+ case Channel::LOG:
+ m_channel.setState(Channel::LOG_SLOW_STOP);
+ return GrepError::NO_ERROR;
+ default:
+ return GrepError::REQUESTOR_ILLEGAL_STATE_FOR_SLOWSTOP;
+ }
+}
+
+GrepError::Code
+RepState::fastStop()
+{
+ switch(m_channel.getState())
+ {
+ case Channel::LOG:
+ m_channel.setState(Channel::LOG_FAST_STOP);
+ return GrepError::NO_ERROR;
+ default:
+ return GrepError::REQUESTOR_ILLEGAL_STATE_FOR_FASTSTOP;
+ }
+}
+*/
+
+/****************************************************************************
+ * Print Status
+ ****************************************************************************/
+
+static const char*
+headerText =
+"+-------------------------------------------------------------------------+\n"
+"| MySQL Replication Server |\n"
+"+-------------------------------------------------------------------------+\n"
+;
+
+static const char*
+channelHeaderText =
+"+-------------------------------------------------------------------------+\n"
+"| Applier Channel 1 Replication Status |\n"
+"+-------------------------------------------------------------------------+\n"
+;
+
+static const char*
+line =
+"+-------------------------------------------------------------------------+\n"
+;
+
+
+Properties *
+RepState::getStatus()
+{
+ Properties * prop = new Properties();
+ if(prop == NULL)
+ return NULL;
+ NdbMutex_Lock(m_mutex);
+
+ prop->put("nodegroups", (int)m_channel.getNoOfNodeGroups());
+// prop->put("epoch_state", m_channel.getEpochState());
+ NdbMutex_Unlock(m_mutex);
+ return prop;
+}
+
+
+Properties * RepState::query(QueryCounter counter, Uint32 replicationId)
+{
+ Properties * prop = new Properties();
+ if(prop == NULL)
+ return NULL;
+ NdbMutex_Lock(m_mutex);
+ if(counter != ~(Uint32)0)
+ getEpochState((Channel::Position)counter, prop );
+ prop->put("no_of_nodegroups", m_channel.getNoOfNodeGroups());
+ prop->put("subid", m_channel.getNoOfNodeGroups());
+ prop->put("subkey", m_channel.getSubKey());
+ prop->put("connected_db", m_connected);
+ prop->put("connected_rep", m_repConnected);
+ prop->put("state_sub", (int)m_channel.getStateSub());
+ prop->put("state", (int)m_channel.getState());
+
+ NdbMutex_Unlock(m_mutex);
+ return prop;
+
+}
+
+void
+RepState::getEpochState(Channel::Position pos, Properties * p)
+{
+ char first_buf[20];
+ char last_buf[20];
+ int pos_first = 0, pos_last = 0;
+ Uint32 first = 0, last = 0;
+ for(Uint32 i = 0; i < m_channel.getNoOfNodeGroups() ; i++)
+ {
+ m_channel.getEpochState(pos, i, &first, &last);
+ pos_first += sprintf(first_buf+pos_first,"%d%s",first,",");
+ pos_last += sprintf(last_buf+pos_last,"%d%s",last,",");
+ }
+/**
+ * remove trailing comma
+ */
+ pos_first--;
+ pos_last--;
+ first_buf[pos_first]= '\0';
+ last_buf[pos_last]= '\0';
+#if 0
+ sprintf(first_buf+pos_first,"","");
+ sprintf(last_buf + pos_last,"","");
+#endif
+
+ p->put("first", first_buf);
+ p->put("last", last_buf);
+
+}
+
+
+void
+RepState::printStatus()
+{
+ /***************************************************************************
+ * Global Status
+ ***************************************************************************/
+ ndbout << headerText;
+ switch (m_connected)
+ {
+ case CONNECTED:
+ ndbout << "| Source: Connected "; break;
+ case DISCONNECTED:
+ ndbout << "| Source: Disconnected "; break;
+ case CONNECTABLE:
+ ndbout << "| Source: Disconnected "; break;
+ default:
+ ndbout << "| Source: Unknown "; break;
+ }
+ switch (m_repConnected)
+ {
+ case CONNECTED:
+ ndbout << "(Rep: Connected) "; break;
+ case DISCONNECTED:
+ ndbout << "(Rep: Disconnected) "; break;
+ case CONNECTABLE:
+ ndbout << "(Rep: Disconnected) "; break;
+ default:
+ ndbout << "(Rep: Unknown) "; break;
+ }
+ ndbout << " |" << endl;
+ ndbout << "| Autostart: " << (isAutoStartEnabled() ? "On " : "Off")
+ << " ";
+ ndbout_c(" Silence time: %10u |", m_connected_counter);
+
+ /***************************************************************************
+ * Channel Status
+ ***************************************************************************/
+ ndbout << channelHeaderText;
+ switch(m_channel.getStateSub()) {
+ case Channel::NO_SUBSCRIPTION_EXISTS:
+ ndbout_c("| Subscription: Non-existing "
+ " |");
+ break;
+ case Channel::CREATING_SUBSCRIPTION_ID:
+ ndbout_c("| Subscription: Non-existing (Id is being created)"
+ " |");
+ break;
+ case Channel::SUBSCRIPTION_ID_CREATED:
+ ndbout_c("| Subscription: %-3d-%-6d in state: Not yet started "
+ " |",
+ m_channel.getSubId(), m_channel.getSubKey());
+ break;
+ case Channel::STARTING_SUBSCRIPTION:
+ ndbout_c("| Subscription: %-3d-%-6d in state: Being started "
+ " |",
+ m_channel.getSubId(), m_channel.getSubKey());
+ break;
+ case Channel::SUBSCRIPTION_STARTED:
+ ndbout_c("| Subscription: %-3d-%-6d in state: Started "
+ " |",
+ m_channel.getSubId(), m_channel.getSubKey());
+ break;
+ default:
+ REPABORT("Illegal subscription state");
+ }
+ ndbout << "| Stop epoch: ";
+ if (m_channel.getStopEpochId() == intervalMax) {
+ ndbout << "No stop defined ";
+ } else {
+ ndbout.print("%-10d ",
+ m_channel.getStopEpochId());
+ }
+ ndbout << " |" << endl;
+
+ ndbout << "| State: ";
+ switch(m_channel.getState())
+ {
+ case Channel::CONSISTENT:
+ ndbout << "Local database is subscription consistent ";
+ break;
+ case Channel::METALOG_STARTING:
+ ndbout << "Starting (Phase 1: Metalog starting) ";
+ break;
+ case Channel::METALOG_STARTED:
+ ndbout << "Starting (Phase 2: Metalog started) ";
+ break;
+ case Channel::METASCAN_STARTING:
+ ndbout << "Starting (Phase 3: Metascan starting) ";
+ break;
+ case Channel::METASCAN_COMPLETED:
+ ndbout << "Starting (Phase 4: Metascan completed) ";
+ break;
+ case Channel::DATALOG_STARTING:
+ ndbout << "Starting (Phase 5: Datalog starting) ";
+ break;
+ case Channel::DATALOG_STARTED:
+ ndbout << "Starting (Phase 6: Datalog started) ";
+ break;
+ case Channel::DATASCAN_STARTING:
+ ndbout << "Starting (Phase 7: Datascan completed) ";
+ break;
+ case Channel::DATASCAN_COMPLETED:
+ ndbout << "Starting (Phase 8: Datascan completed) ";
+ break;
+ case Channel::LOG:
+ ndbout << "Logging ";
+ break;
+ case Channel::STOPPING:
+ ndbout << "Stopping (Stopped when all epochs applied) ";
+ break;
+ }
+ ndbout << " |" << endl;
+
+/* @todo
+ ndbout_c("| Syncable: Yes/Scan/No/Unknown (Not implemented)"
+ " |");
+*/
+ ndbout << "| Requestor: " << (isEnabled() ? "On " : "Off")
+ << " (Transfer: " << (isTransferEnabled() ? "On, " : "Off, ")
+ << "Apply: " << (isApplyEnabled() ? "On, " : "Off, ")
+ << "Delete: " << (isDeleteEnabled() ? "On) " : "Off)")
+ << " |" << endl;
+ ndbout_c("| Tables being replicated using this channel: "
+ " |");
+ m_channel.printTables();
+
+ /**
+ * Print node groups
+ */
+ if (getNoOfNodeGroups() == 0)
+ {
+ ndbout_c("| No node groups are known. "
+ " |");
+ }
+ else
+ {
+ m_channel.print();
+ }
+ ndbout << line;
+}