summaryrefslogtreecommitdiff
path: root/ACE/apps/JAWS2/JAWS/Concurrency.h
blob: 85f495facb53b8a5708d01e606ee0bac8aa0a140 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/* -*- c++ -*- */
#ifndef JAWS_CONCURRENCY_H
#define JAWS_CONCURRENCY_H

#include "ace/Singleton.h"

#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */

#include "ace/Task.h"
#include "ace/Synch_Traits.h"

#include "JAWS/Export.h"
#include "JAWS/Jaws_IO.h"

class JAWS_Data_Block;
class JAWS_Dispatch_Policy;
class JAWS_Reaper;

class JAWS_Export JAWS_Concurrency_Base : public ACE_Task<ACE_SYNCH>
  // = TITLE
  //     Base class for different concurrency models
  //
  // = DESCRIPTION
  //     Provides a default implementaion of the virtual put() method
  //     which calls putq(), but can be overloaded to do something
  //     synchronously, such as call put_next().

{
public:
  JAWS_Concurrency_Base (void);
  ~JAWS_Concurrency_Base (void);

  virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0);
  virtual int svc (void);

  virtual int svc_loop (JAWS_Data_Block *db);
  // in thread pool, this is an infinite loop
  // in thread per request, it is a single iteration

  virtual int svc_hook (JAWS_Data_Block *db);
  // does the work of following the pipeline tasks

  virtual int activate_hook (void);
  // callback for IO_Handler when accept completes

  virtual ACE_Message_Block *singleton_mb (void);

protected:
  int mb_acquired_;
  ACE_Message_Block *mb_;
  JAWS_Reaper *reaper_;
  ACE_SYNCH_MUTEX lock_;
};

class JAWS_Export JAWS_Dispatcher
  // = TITLE
  //     The class that is responsible to delivering events to the
  //     appropriate concurrency mechanism.
  //
  // = DESCRIPTION
  //     JAWS_IO_Handler calls into the dispatcher so that the completed
  //     IO can find a thread to take care of it.
{
public:
  JAWS_Dispatcher (void);

  int dispatch (ACE_Message_Block *mb);
  JAWS_Dispatch_Policy *policy (void);
  JAWS_Dispatch_Policy *policy (JAWS_Dispatch_Policy *p);

private:
  JAWS_Dispatch_Policy *policy_;
};

class JAWS_Export JAWS_Thread_Pool_Task : public JAWS_Concurrency_Base
  // = TITLE
  //     Used to implement Thread Pool Concurrency Strategy
  //
  // = DESCRIPTION
  //     This task is created to hold a pool of threads that receive
  //     requests through the message queue.
{
public:
  virtual int make (long flags, int nthreads, int maxthreads);
  // Initiate the thread_pool task

private:
  long flags_;
  int nthreads_;
  int maxthreads_;
};

class JAWS_Export JAWS_Thread_Per_Task : public JAWS_Concurrency_Base
  // = TITLE
  //     Used to implement Thread Per Request Concurrency Strategy
  //
  // = DESCRIPTION
  //     As each new message arrives from the queue, a new thread is
  //     spawned to handle it.  This is done by overloading put to call
  //     activate.
{
public:
  virtual int make (long flags, int maxthreads);
  // Initiate the thread_per task

  virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0);

  virtual int svc_loop (JAWS_Data_Block *db);
  // a single iteration

  virtual int activate_hook (void);
  // callback for IO_Handler when accept completes

private:
  long flags_;
  int maxthreads_;
};

typedef ACE_Singleton<JAWS_Dispatcher, ACE_SYNCH_MUTEX>
        JAWS_Dispatcher_Singleton;

typedef ACE_Singleton<JAWS_Thread_Pool_Task, ACE_SYNCH_MUTEX>
        JAWS_Thread_Pool_Singleton;

typedef ACE_Singleton<JAWS_Thread_Per_Task, ACE_SYNCH_MUTEX>
        JAWS_Thread_Per_Singleton;

#endif /* !defined (JAWS_CONCURRENCY_H) */