summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp
blob: 51f1b14f39ef9baa69c46bfb8d67ab4c8ebd9cd5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// $Id$

#include "ThreadPool_Task.h"

#if ! defined (__ACE_INLINE__)
#include "ThreadPool_Task.inl"
#endif /* __ACE_INLINE__ */

ACE_RCSID(RT_Notify, TAO_NS_ThreadPool_Task, "$Id$")

#include "tao/debug.h"
#include "Properties.h"
#include "Method_Request_Shutdown.h"
#include "AdminProperties.h"

TAO_NS_ThreadPool_Task::TAO_NS_ThreadPool_Task (void)
  : msg_queue_ (*msg_queue ()), buffering_strategy_ (0), queue_length_ (0)
{
}

TAO_NS_ThreadPool_Task::~TAO_NS_ThreadPool_Task ()
{
  delete this->buffering_strategy_;
}

int
TAO_NS_ThreadPool_Task::init (int argc, char **argv)
{
  return this->ACE_Task<ACE_SYNCH>::init (argc, argv);
}

void
TAO_NS_ThreadPool_Task::init (TAO_NS_AdminProperties& admin_properties)
{
  TAO_NS_Worker_Task::init (admin_properties);
}

void
TAO_NS_ThreadPool_Task::init (const NotifyExt::ThreadPoolParams& tp_params, TAO_NS_AdminProperties& admin_properties  ACE_ENV_ARG_DECL)
{
  long flags = THR_NEW_LWP | THR_JOINABLE;

  flags |=
    TAO_NS_PROPERTIES::instance()->scope_policy () |
    TAO_NS_PROPERTIES::instance()->sched_policy ();

  // Become an active object.
  if (this->ACE_Task <ACE_SYNCH>::activate (flags,
                                            tp_params.static_threads,
                                            0,
                                            ACE_DEFAULT_THREAD_PRIORITY) == -1)
    {
      if (TAO_debug_level > 0)
        {
          if (ACE_OS::last_error () == EPERM)
            ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Insufficient privilege.\n")));
          else
            ACE_DEBUG ((LM_ERROR,
                        ACE_TEXT ("(%t) task activation at priority %d failed\n")
                        ACE_TEXT ("exiting!\n%a"),
                        tp_params.default_priority));
        }

        ACE_THROW (CORBA::BAD_PARAM ());
    }

  // Store the admin properties...
  this->queue_length_ = &admin_properties.queue_length ();

  ACE_NEW_THROW_EX (this->buffering_strategy_,
                                        TAO_NS_Buffering_Strategy (this->msg_queue_, admin_properties),
                                        CORBA::NO_MEMORY ());
}

void
TAO_NS_ThreadPool_Task::exec (TAO_NS_Method_Request& method_request)
{
  TAO_NS_Method_Request& request_copy = *method_request.copy ();

  ACE_Time_Value tv;
  this->buffering_strategy_->execute (request_copy, &tv);
}

int
TAO_NS_ThreadPool_Task::svc (void)
{
  int done = 0;
  while (!done)
    {
      ACE_TRY_NEW_ENV
        {
          ACE_Message_Block *mb;
          if (this->getq (mb) == -1)
            if (ACE_OS::last_error () == ESHUTDOWN)
              return 0;
            else
              ACE_ERROR ((LM_ERROR,
                          "EC (%P|%t) getq error in Dispatching Queue\n"));

          // Decrement the global event count.
          this->queue_length_--;

          if (TAO_debug_level > 0)
            ACE_DEBUG ((LM_DEBUG, "removing from queue\n"));
          TAO_NS_Method_Request *request =
            ACE_dynamic_cast (TAO_NS_Method_Request*, mb);

          int result = 0;

          if (request != 0)
            {
              result = request->execute (ACE_ENV_SINGLE_ARG_PARAMETER);
              ACE_TRY_CHECK;
            }

          ACE_Message_Block::release (mb);

          if (result == -1)
            done = 1;
        }
      ACE_CATCHANY
        {
          ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                               "EC (%P|%t) exception in dispatching queue");
        }
      ACE_ENDTRY;
    }
  return 0;
}

void
TAO_NS_ThreadPool_Task::shutdown (void)
{
  this->msg_queue_.enqueue (new TAO_NS_Method_Request_Shutdown (this));

  // We can not wait for ourselves to quit
  if (this->thr_mgr ())
    {
      // call this->thr_mgr ()->task () in the main thread will assert ()
      // fail in ACE_Thread_Manager::thread_desc_self (void) so I get
      // task this way.
      ACE_Thread_Descriptor *mydesc = this->thr_mgr ()->thread_descriptor (ACE_OS::thr_self ());

      if (mydesc && mydesc->task () == this)
        return;
    }

  this->wait ();
  return;
}