summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.cpp
blob: 69f5e906f1dd5a233ad345b5b7b2c0e0cf686866 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// $Id$

#include "orbsvcs/FtRtEvent/EventChannel/Basic_Replication_Strategy.h"
#include "orbsvcs/FtRtEvent/EventChannel/GroupInfoPublisher.h"
#include "orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel.h"
#include "orbsvcs/FtRtEvent/EventChannel/Request_Context_Repository.h"
#include "../Utils/Log.h"

ACE_RCSID (EventChannel,
           Basic_Replication_Strategy,
           "$Id$")

TAO_BEGIN_VERSIONED_NAMESPACE_DECL

/// The mutex has to be recursive; otherwise, if the second replicate_request() is
/// called while the first replicate_request() is waiting for reply, we will get
/// a deadlock.
Basic_Replication_Strategy::Basic_Replication_Strategy(bool mt)
  : sequence_num_(0)
  , mutex_(mt ? new ACE_SYNCH_RECURSIVE_MUTEX : 0)
{
}

Basic_Replication_Strategy::~Basic_Replication_Strategy()
{
  delete mutex_;
}

void
Basic_Replication_Strategy::check_validity(void)
{
    FTRT::SequenceNumber seq_no = Request_Context_Repository().get_sequence_number();

    TAO_FTRTEC::Log(1 , "check_validity : sequence no = %d\n", sequence_num_);

    if (this->sequence_num_ == 0) {
      // this is the first set_update received from the primary
      // sync the sequence number with the primary
      this->sequence_num_ = seq_no;
    }
    else if (seq_no != this->sequence_num_+1) {
      // out of sync, we missed some set_update() request already
      // throw exception
      //            client_interceptor_->sequence_num_--;
      FTRT::OutOfSequence exception;
      exception.current = this->sequence_num_;
      TAO_FTRTEC::Log(3, "Throwing FTRT::OutOfSequence (old sequence_num_ = %d)\n", this->sequence_num_);
      throw FTRT::OutOfSequence(exception);
    }
    else
      this->sequence_num_++;
}

void twoway_set_update(FtRtecEventChannelAdmin::EventChannel_var successor,
                       const FTRT::State& state)
{
  bool finished = true;
  do {
    try{
      successor->set_update(state);
    }
    catch (const CORBA::COMM_FAILURE& ex){
      if (ex.minor() == 6)   finished = false;
      else
        throw;
    }
  } while(!finished);
}

void
Basic_Replication_Strategy::replicate_request(
  const FTRT::State& state,
  RollbackOperation rollback,
  const FtRtecEventChannelAdmin::ObjectId& oid)
{
  ACE_UNUSED_ARG(rollback);
  ACE_UNUSED_ARG(oid);

  FTRT::TransactionDepth transaction_depth =
    Request_Context_Repository().get_transaction_depth();

  GroupInfoPublisherBase * info_publisher = GroupInfoPublisher::instance();
  FtRtecEventChannelAdmin::EventChannel_var successor = info_publisher->successor();
  if (!CORBA::is_nil(successor.in())) {
    if (info_publisher->is_primary())
      this->sequence_num_++;

    TAO_FTRTEC::Log(1, "replicate_request : sequence no = %d\n", sequence_num_);
    Request_Context_Repository().set_sequence_number(sequence_num_);

    Request_Context_Repository().set_transaction_depth(transaction_depth-1);

    if (transaction_depth > 1) {
      twoway_set_update(successor, state);
    }
    else {
      try{
        successor->oneway_set_update(state);
      }
      catch (const CORBA::Exception&){
      }
    }
  }
  else if (transaction_depth > 1) {
    TAO_FTRTEC::Log(3, "Throwing FTRT::TransactionDepthTooHigh\n");
    throw FTRT::TransactionDepthTooHigh();
  }
}

void
Basic_Replication_Strategy::add_member(const FTRT::ManagerInfo & info,
                                       CORBA::ULong object_group_ref_version)
{
  FtRtecEventChannelAdmin::EventChannel_var successor = GroupInfoPublisher::instance()->successor();
  bool finished = true;
  do {
    try{
      successor->add_member(info, object_group_ref_version);
    }
    catch (const CORBA::COMM_FAILURE& ex){
      if (ex.minor() == 6) finished = false;
      else throw;
    }
  } while (!finished);
}

int  Basic_Replication_Strategy::acquire_read (void)
{
  return mutex_ ? mutex_->acquire_read() : 0;
}

int  Basic_Replication_Strategy::acquire_write (void)
{
  return mutex_ ? mutex_->acquire_write() : 0;
}

int  Basic_Replication_Strategy::release (void)
{
  return mutex_ ? mutex_->release() : 0;
}

TAO_END_VERSIONED_NAMESPACE_DECL