summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/Notify_MT_Worker_Task.cpp
blob: 8c9ce78ab3b92af221a32c092c9a68e3190ad042 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// $Id$

#include "Notify_MT_Worker_Task.h"
#include "Notify_Command.h"
#include "Notify_AdminProperties.h"
#include "Notify_Buffering_Strategy.h"

ACE_RCSID(Notify, Notify_MT_Worker_Task, "$Id$")

TAO_Notify_MT_Worker_Task::TAO_Notify_MT_Worker_Task (int n_threads, long flags, int force_active, long priority)
  :buffering_strategy_ (0),
   queue_length_ (0),
   n_threads_ (n_threads),
   flags_ (flags),
   force_active_ (force_active),
   priority_ (priority)
{
}

TAO_Notify_MT_Worker_Task::~TAO_Notify_MT_Worker_Task ()
{
  delete this->buffering_strategy_;
}

int
TAO_Notify_MT_Worker_Task::init_task (TAO_Notify_AdminProperties* const admin_properties)
{
  // Store the admin properties...

  this->queue_length_ = admin_properties->queue_length ();

  // Make us an Active Object.
  if (this->activate (flags_, n_threads_, force_active_, priority_) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("%p\n"),
                       ACE_TEXT ("activate failed")), -1);

  // Create the dispatching strategy.
  this->buffering_strategy_ =
    new TAO_Notify_Buffering_Strategy (admin_properties->queue_length ());

  this->buffering_strategy_->max_queue_length (admin_properties->max_queue_length ());

  // temporary
  this->buffering_strategy_->order_policy (CosNotification::PriorityOrder);
  return 0;
}

void
TAO_Notify_MT_Worker_Task::shutdown (CORBA::Environment& /*ACE_TRY_ENV*/)
{
  // Put a shutdown message in the task queue and wait here till all
  // threads exit.
  this->close (0);
}

int
TAO_Notify_MT_Worker_Task::close (u_long)
{
  if (TAO_debug_level > 0)
    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) close of worker\n")));

  TAO_Notify_Shutdown_Command * mb = new TAO_Notify_Shutdown_Command ();

  ACE_DECLARE_NEW_CORBA_ENV;
  this->process_event (mb, ACE_TRY_ENV);
  ACE_CHECK_RETURN (-1);

  return this->wait ();
}

int
TAO_Notify_MT_Worker_Task::process_event (TAO_Notify_Command *mb, CORBA::Environment& ACE_TRY_ENV, ACE_Time_Value *tv)
{
  // Execute the buffering strategy.
  this->buffering_strategy_->execute (this->msg_queue (), mb, ACE_TRY_ENV, tv);
  ACE_CHECK_RETURN (-1);

  return 0;
}

int
TAO_Notify_MT_Worker_Task::svc (void)
{
  int done = 0;
  while (!done)
    {
      ACE_TRY_NEW_ENV
        {
          ACE_Message_Block *mb;
          if (this->getq (mb) == -1)
            if (ACE_OS::last_error () == ESHUTDOWN)
              return 0;
          else
            ACE_ERROR ((LM_ERROR,
                        "EC (%P|%t) getq error in Dispatching Queue\n"));

          // Decrement the global event count.
          (*this->queue_length_)--;

          ACE_DEBUG ((LM_DEBUG, "removing from queue\n"));
          TAO_Notify_Command *command =
            ACE_dynamic_cast(TAO_Notify_Command*, mb);

          if (command == 0)
            {
              ACE_Message_Block::release (mb);
              continue;
            }

          int result = command->execute (ACE_TRY_ENV);
          ACE_TRY_CHECK;

          ACE_Message_Block::release (mb);

          if (result == -1)
            done = 1;
        }
      ACE_CATCHANY
        {
          ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                               "EC (%P|%t) exception in dispatching queue");
        }
      ACE_ENDTRY;
    }
  return 0;
}

/**************************************************************************/

TAO_Notify_Shutdown_Command::TAO_Notify_Shutdown_Command (void)
  :TAO_Notify_Command (0,0)
{
}

int
TAO_Notify_Shutdown_Command::execute (CORBA::Environment& /*ACE_TRY_ENV*/)
{
  return -1;
}
/**************************************************************************/