summaryrefslogtreecommitdiff
path: root/apps/JAWS/PROTOTYPE/JAWS
diff options
context:
space:
mode:
authorjxh <jxh@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-05-28 05:20:11 +0000
committerjxh <jxh@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-05-28 05:20:11 +0000
commit5459ef677d39b0475edcf7317b53fe379358c273 (patch)
treeff2247407b20a66802297fb9195c763e7adbe01e /apps/JAWS/PROTOTYPE/JAWS
parenta96447253b33e4bae683f06f09b2ad0b38496891 (diff)
downloadATCD-5459ef677d39b0475edcf7317b53fe379358c273.tar.gz
Finally, a working framework!
Diffstat (limited to 'apps/JAWS/PROTOTYPE/JAWS')
-rw-r--r--apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp42
-rw-r--r--apps/JAWS/PROTOTYPE/JAWS/Data_Block.cpp25
-rw-r--r--apps/JAWS/PROTOTYPE/JAWS/Data_Block.h18
-rw-r--r--apps/JAWS/PROTOTYPE/JAWS/IO.cpp28
-rw-r--r--apps/JAWS/PROTOTYPE/JAWS/IO.h32
-rw-r--r--apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.h4
-rw-r--r--apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp14
-rw-r--r--apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h12
-rw-r--r--apps/JAWS/PROTOTYPE/JAWS/JAWS.h5
-rw-r--r--apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.cpp7
-rw-r--r--apps/JAWS/PROTOTYPE/JAWS/Server.cpp20
11 files changed, 138 insertions, 69 deletions
diff --git a/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp b/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp
index 3d96fce9c56..acb6e9b7b18 100644
--- a/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp
+++ b/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp
@@ -53,6 +53,8 @@ JAWS_Concurrency_Base::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
int
JAWS_Concurrency_Base::svc (void)
{
+ JAWS_TRACE ("JAWS_Concurrency_Base::svc");
+
ACE_Message_Block *mb; // The message queue element
JAWS_Data_Block *db; // Contains the task list
@@ -60,10 +62,6 @@ JAWS_Concurrency_Base::svc (void)
JAWS_IO_Handler *handler; // Keeps the state of the task
JAWS_Pipeline_Handler *task; // The task itself
- // Thread specific message block and data block
- JAWS_Data_Block ts_db;
- ACE_Message_Block ts_mb (&ts_db);
-
int result = 0;
mb = this->singleton_mb ();
@@ -74,17 +72,28 @@ JAWS_Concurrency_Base::svc (void)
return -1;
}
+ db = ACE_dynamic_cast (JAWS_Data_Block *, mb);
+ if (db == 0)
+ {
+ JAWS_TRACE ("JAWS_Concurrency_Base::svc, empty data block");
+ return -1;
+ }
+
+ // Thread specific message block and data block
+ JAWS_Data_Block *ts_db = new JAWS_Data_Block;
+ if (ts_db == 0)
+ {
+ ACE_ERROR ((LM_ERROR, "%p\n", "JAWS_Concurrency_Base::svc"));
+ return -1;
+ }
+
+ ts_db->task (db->task ());
+ ts_db->policy (db->policy ());
+
for (;;)
{
// A NULL data block indicates that the thread should shut
// itself down
- db = ACE_dynamic_cast (JAWS_Data_Block *, mb->data_block ());
- if (db == 0)
- {
- JAWS_TRACE ("JAWS_Concurrency_Base::svc, empty data block");
- break;
- }
-
policy = db->policy ();
// Each time we iterate, we create a handler to maintain
@@ -98,13 +107,13 @@ JAWS_Concurrency_Base::svc (void)
// Set the initial task in the handler
handler->task (db->task ());
+ ts_db->io_handler (handler);
- ts_db.task (db->task ());
- ts_db.policy (db->policy ());
- ts_db.io_handler (handler);
do
{
+ JAWS_TRACE ("JAWS_Concurrency_Base::svc, looping");
+
// handler maintains the state of the protocol
task = handler->task ();
@@ -113,7 +122,7 @@ JAWS_Concurrency_Base::svc (void)
break;
// the task should set the handler to the appropriate next step
- result = task->put (&ts_mb);
+ result = task->put (ts_db);
if (result == 1)
{
@@ -138,11 +147,12 @@ JAWS_Concurrency_Base::svc (void)
}
while (result == 0);
+ result = 0;
policy->ioh_factory ()->destroy_io_handler (handler);
}
- JAWS_TRACE ("JAWS_Concurrency_Base::svc, shutting down");
+ ts_db->release ();
return 0;
}
diff --git a/apps/JAWS/PROTOTYPE/JAWS/Data_Block.cpp b/apps/JAWS/PROTOTYPE/JAWS/Data_Block.cpp
index 47c2c7e3316..17d20bb6b04 100644
--- a/apps/JAWS/PROTOTYPE/JAWS/Data_Block.cpp
+++ b/apps/JAWS/PROTOTYPE/JAWS/Data_Block.cpp
@@ -3,6 +3,19 @@
#include "JAWS/Data_Block.h"
#include "JAWS/Policy.h"
+JAWS_Data_Block::JAWS_Data_Block (void)
+ : ACE_Message_Block (JAWS_DATA_BLOCK_SIZE),
+ io_handler_ (0),
+ policy_ (0),
+ task_ (0),
+ payload_ (0)
+{
+}
+
+JAWS_Data_Block::~JAWS_Data_Block (void)
+{
+}
+
JAWS_Pipeline_Handler *
JAWS_Data_Block::task (void)
{
@@ -21,6 +34,12 @@ JAWS_Data_Block::policy (void)
return this->policy_;
}
+void *
+JAWS_Data_Block::payload (void)
+{
+ return this->payload_;
+}
+
void
JAWS_Data_Block::task (JAWS_Pipeline_Handler *taskp)
{
@@ -38,3 +57,9 @@ JAWS_Data_Block::policy (JAWS_Dispatch_Policy *policyp)
{
this->policy_ = policyp;
}
+
+void
+JAWS_Data_Block::payload (void *payloadp)
+{
+ this->payload_ = payloadp;
+}
diff --git a/apps/JAWS/PROTOTYPE/JAWS/Data_Block.h b/apps/JAWS/PROTOTYPE/JAWS/Data_Block.h
index e1641f88751..6414655c893 100644
--- a/apps/JAWS/PROTOTYPE/JAWS/Data_Block.h
+++ b/apps/JAWS/PROTOTYPE/JAWS/Data_Block.h
@@ -4,7 +4,8 @@
#if !defined (JAWS_DATA_BLOCK_H)
#define JAWS_DATA_BLOCK_H
-#include "ace/Singleton.h"
+#include "ace/Message_Block.h"
+
#include "JAWS/Pipeline.h"
class JAWS_IO_Handler;
@@ -12,23 +13,32 @@ class JAWS_Dispatch_Policy;
class JAWS_Data_Block;
class JAWS_Pipeline_Handler;
-class JAWS_Data_Block : public ACE_Data_Block
+class JAWS_Data_Block : public ACE_Message_Block
// = TITLE
// Defines the communication unit between pipeline components
{
public:
+ JAWS_Data_Block (void);
+ ~JAWS_Data_Block (void);
+
JAWS_Pipeline_Handler *task (void);
JAWS_IO_Handler *io_handler (void);
JAWS_Dispatch_Policy *policy (void);
+ void *payload (void);
void task (JAWS_Pipeline_Handler *taskp);
- void io_handler (JAWS_IO_Handler * handlerp);
- void policy (JAWS_Dispatch_Policy * policyp);
+ void io_handler (JAWS_IO_Handler *handlerp);
+ void policy (JAWS_Dispatch_Policy *policyp);
+ void payload (void *payloadp);
+
+ enum { JAWS_DATA_BLOCK_SIZE = 8192 };
private:
JAWS_IO_Handler *io_handler_;
JAWS_Dispatch_Policy *policy_;
JAWS_Pipeline_Handler *task_;
+
+ void *payload_;
};
#endif /* !defined (JAWS_DATA_BLOCK_H) */
diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO.cpp b/apps/JAWS/PROTOTYPE/JAWS/IO.cpp
index b499e459448..a64b51d1c7e 100644
--- a/apps/JAWS/PROTOTYPE/JAWS/IO.cpp
+++ b/apps/JAWS/PROTOTYPE/JAWS/IO.cpp
@@ -1,5 +1,7 @@
// $Id$
+#include "JAWS/JAWS.h"
+
#include "JAWS/IO.h"
#include "JAWS/IO_Handler.h"
#include "JAWS/IO_Acceptor.h"
@@ -62,6 +64,7 @@ void
JAWS_Synch_IO::accept (JAWS_IO_Handler *ioh)
{
ACE_SOCK_Stream new_stream;
+ new_stream.set_handle (ACE_INVALID_HANDLE);
if (this->acceptor_->accept (new_stream) == -1)
ioh->accept_error ();
else
@@ -70,19 +73,22 @@ JAWS_Synch_IO::accept (JAWS_IO_Handler *ioh)
void
JAWS_Synch_IO::read (JAWS_IO_Handler *ioh,
- ACE_Message_Block &mb,
- int size)
+ ACE_Message_Block *mb,
+ unsigned int size)
{
+ JAWS_TRACE ("JAWS_Synch_IO::read");
+
ACE_SOCK_Stream stream;
stream.set_handle (ioh->handle ());
- int result = stream.recv (mb.wr_ptr (), size);
+ int result = stream.recv (mb->wr_ptr (), size);
if (result <= 0)
ioh->read_error ();
else
{
- mb.wr_ptr (result);
+ JAWS_TRACE ("JAWS_Synch_IO::read success");
+ mb->wr_ptr (result);
ioh->read_complete (mb);
}
}
@@ -91,8 +97,8 @@ void
JAWS_Synch_IO::receive_file (JAWS_IO_Handler *ioh,
const char *filename,
void *initial_data,
- int initial_data_length,
- int entire_length)
+ unsigned int initial_data_length,
+ unsigned int entire_length)
{
ACE_Filecache_Handle handle (filename, entire_length);
@@ -125,9 +131,9 @@ void
JAWS_Synch_IO::transmit_file (JAWS_IO_Handler *ioh,
const char *filename,
const char *header,
- int header_size,
+ unsigned int header_size,
const char *trailer,
- int trailer_size)
+ unsigned int trailer_size)
{
ACE_Filecache_Handle handle (filename);
@@ -183,7 +189,7 @@ JAWS_Synch_IO::transmit_file (JAWS_IO_Handler *ioh,
void
JAWS_Synch_IO::send_confirmation_message (JAWS_IO_Handler *ioh,
const char *buffer,
- int length)
+ unsigned int length)
{
this->send_message (ioh, buffer, length);
ioh->confirmation_message_complete ();
@@ -192,7 +198,7 @@ JAWS_Synch_IO::send_confirmation_message (JAWS_IO_Handler *ioh,
void
JAWS_Synch_IO::send_error_message (JAWS_IO_Handler *ioh,
const char *buffer,
- int length)
+ unsigned int length)
{
this->send_message (ioh, buffer, length);
ioh->error_message_complete ();
@@ -201,7 +207,7 @@ JAWS_Synch_IO::send_error_message (JAWS_IO_Handler *ioh,
void
JAWS_Synch_IO::send_message (JAWS_IO_Handler *ioh,
const char *buffer,
- int length)
+ unsigned int length)
{
ACE_SOCK_Stream stream;
stream.set_handle (ioh->handle ());
diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO.h b/apps/JAWS/PROTOTYPE/JAWS/IO.h
index 46dc24ff8c7..e7df09886fb 100644
--- a/apps/JAWS/PROTOTYPE/JAWS/IO.h
+++ b/apps/JAWS/PROTOTYPE/JAWS/IO.h
@@ -54,33 +54,33 @@ public:
// accept a passive connection
virtual void read (JAWS_IO_Handler *ioh,
- ACE_Message_Block& mb,
- int size) = 0;
+ ACE_Message_Block *mb,
+ unsigned int size) = 0;
// read from the handle size bytes into the message block.
virtual void transmit_file (JAWS_IO_Handler *ioh,
const char *filename,
const char *header,
- int header_size,
+ unsigned int header_size,
const char *trailer,
- int trailer_size) = 0;
+ unsigned int trailer_size) = 0;
// send header, filename, trailer to the handle.
virtual void receive_file (JAWS_IO_Handler *ioh,
const char *filename,
void *initial_data,
- int initial_data_length,
- int entire_length) = 0;
+ unsigned int initial_data_length,
+ unsigned int entire_length) = 0;
// read data from the handle and store in filename.
virtual void send_confirmation_message (JAWS_IO_Handler *ioh,
const char *buffer,
- int length) = 0;
+ unsigned int length) = 0;
// send a confirmation message to the handle.
virtual void send_error_message (JAWS_IO_Handler *ioh,
const char *buffer,
- int length) = 0;
+ unsigned int length) = 0;
// send an error message to the handle.
protected:
@@ -104,33 +104,33 @@ public:
virtual void accept (JAWS_IO_Handler *ioh);
- void read (JAWS_IO_Handler *ioh, ACE_Message_Block& mb, int size);
+ void read (JAWS_IO_Handler *ioh, ACE_Message_Block *mb, unsigned int size);
void transmit_file (JAWS_IO_Handler *ioh,
const char *filename,
const char *header,
- int header_size,
+ unsigned int header_size,
const char *trailer,
- int trailer_size);
+ unsigned int trailer_size);
void receive_file (JAWS_IO_Handler *ioh,
const char *filename,
void *initial_data,
- int initial_data_length,
- int entire_length);
+ unsigned int initial_data_length,
+ unsigned int entire_length);
void send_confirmation_message (JAWS_IO_Handler *ioh,
const char *buffer,
- int length);
+ unsigned int length);
void send_error_message (JAWS_IO_Handler *ioh,
const char *buffer,
- int length);
+ unsigned int length);
protected:
virtual void send_message (JAWS_IO_Handler *ioh,
const char *buffer,
- int length);
+ unsigned int length);
};
typedef ACE_Singleton<JAWS_Synch_IO, ACE_SYNCH_MUTEX>
diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.h b/apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.h
index 96bcdd9e37e..0fe82cb420c 100644
--- a/apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.h
+++ b/apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.h
@@ -17,11 +17,11 @@
class ACE_Proactor;
class ACE_Reactor;
-#if defined (ACE_HAS_THREAD_SAFE_ACCEPT)
+#if defined (JAWS_HAS_THREAD_SAFE_ACCEPT)
typedef ACE_LOCK_SOCK_Acceptor<ACE_SYNCH_NULL_MUTEX> JAWS_IO_SOCK_Acceptor;
#else
typedef ACE_LOCK_SOCK_Acceptor<ACE_SYNCH_MUTEX> JAWS_IO_SOCK_Acceptor;
-#endif /* ACE_HAS_THREAD_SAFE_ACCEPT */
+#endif /* JAWS_HAS_THREAD_SAFE_ACCEPT */
class JAWS_IO_Acceptor
{
diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp b/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp
index e72666e456e..7b54eba7ef3 100644
--- a/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp
+++ b/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp
@@ -1,9 +1,15 @@
// $Id$
+#include "JAWS/JAWS.h"
+
#include "JAWS/IO.h"
#include "JAWS/IO_Handler.h"
#include "JAWS/Data_Block.h"
+JAWS_IO_Handler::~JAWS_IO_Handler (void)
+{
+}
+
JAWS_IO_Handler_Factory::~JAWS_IO_Handler_Factory (void)
{
}
@@ -23,6 +29,9 @@ JAWS_Synch_IO_Handler::~JAWS_Synch_IO_Handler (void)
if (this->mb_)
this->mb_->release ();
this->mb_ = 0;
+ ACE_OS::close (this->handle_);
+ this->handle_ = ACE_INVALID_HANDLE;
+ this->status_ = 0;
}
void
@@ -41,7 +50,7 @@ JAWS_Synch_IO_Handler::accept_error (void)
}
void
-JAWS_Synch_IO_Handler::read_complete (ACE_Message_Block &data)
+JAWS_Synch_IO_Handler::read_complete (ACE_Message_Block *data)
{
ACE_UNUSED_ARG (data);
// We can call back into the pipeline task at this point
@@ -144,6 +153,8 @@ JAWS_Synch_IO_Handler::status (void)
JAWS_IO_Handler *
JAWS_Synch_IO_Handler_Factory::create_io_handler (void)
{
+ JAWS_TRACE ("JAWS_Synch_IO_Handler_Factory::create");
+
JAWS_Synch_IO *io;
JAWS_Synch_IO_Handler *handler;
@@ -159,6 +170,7 @@ JAWS_Synch_IO_Handler_Factory::create_io_handler (void)
void
JAWS_Synch_IO_Handler_Factory::destroy_io_handler (JAWS_IO_Handler *handler)
{
+ JAWS_TRACE ("JAWS_Synch_IO_Handler_Factory::destroy");
delete handler;
}
diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h b/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h
index b9689fc0f13..386e3854f9d 100644
--- a/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h
+++ b/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h
@@ -43,6 +43,8 @@ class JAWS_IO_Handler
// = DESCRIPTION
{
public:
+ virtual ~JAWS_IO_Handler (void);
+
virtual void task (JAWS_Pipeline_Handler *ph) = 0;
virtual JAWS_Pipeline_Handler *task (void) = 0;
@@ -55,16 +57,16 @@ public:
// been established.
#if 0
- virtual void connect_complete (ACE_Message_Block &) = 0;
+ virtual void connect_complete (ACE_Message_Block *) = 0;
// This method is called by the IO class when new active connection has
// been established.
- virtual void connect_error (ACE_Message_Block &) = 0;
+ virtual void connect_error (ACE_Message_Block *) = 0;
// This method is called by the IO class when new active connection has
// been established.
#endif
- virtual void read_complete (ACE_Message_Block &data) = 0;
+ virtual void read_complete (ACE_Message_Block *data) = 0;
// This method is called by the IO class when new client data shows
// up.
@@ -148,7 +150,7 @@ protected:
virtual void accept_complete (ACE_HANDLE handle);
virtual void accept_error (void);
- virtual void read_complete (ACE_Message_Block &data);
+ virtual void read_complete (ACE_Message_Block *data);
virtual void read_error (void);
virtual void transmit_file_complete (void);
virtual void transmit_file_error (int result);
@@ -212,7 +214,7 @@ protected:
virtual void accept_complete (void);
virtual void accept_error (void);
- virtual void read_complete (ACE_Message_Block &data);
+ virtual void read_complete (ACE_Message_Block *data);
virtual void read_error (void);
virtual void transmit_file_complete (void);
virtual void transmit_file_error (int result);
diff --git a/apps/JAWS/PROTOTYPE/JAWS/JAWS.h b/apps/JAWS/PROTOTYPE/JAWS/JAWS.h
index bd42e62a017..eedd97847ff 100644
--- a/apps/JAWS/PROTOTYPE/JAWS/JAWS.h
+++ b/apps/JAWS/PROTOTYPE/JAWS/JAWS.h
@@ -3,6 +3,7 @@
#if (JAWS_NTRACE == 1)
# define JAWS_TRACE(X)
#else
-# define JAWS_TRACE(X) ACE_Trace ____ \
- (ASYS_TEXT (X), __LINE__, ASYS_TEXT (__FILE__))
+# define JAWS_TRACE(X) ACE_Trace ____ (ASYS_TEXT (X), \
+ __LINE__, \
+ ASYS_TEXT (__FILE__))
#endif /* JAWS_NTRACE */
diff --git a/apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.cpp b/apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.cpp
index d1e20c697b0..692f412b082 100644
--- a/apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.cpp
+++ b/apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.cpp
@@ -14,7 +14,7 @@ JAWS_Pipeline_Handler::~JAWS_Pipeline_Handler (void)
int
JAWS_Pipeline_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
{
- JAWS_Data_Block *db = ACE_dynamic_cast (JAWS_Data_Block *, mb->data_block ());
+ JAWS_Data_Block *db = ACE_dynamic_cast (JAWS_Data_Block *, mb);
int status = this->handle_put (db, tv);
@@ -39,7 +39,6 @@ JAWS_Pipeline_Accept_Task::handle_put (JAWS_Data_Block *data,
// JAWS_Data_Block should contain an INET_Addr and an IO
JAWS_IO_Handler *handler = data->io_handler ();
JAWS_Dispatch_Policy *policy = data->policy ();
- JAWS_Pipeline_Handler *task = handler->task ();
// data->policy ()->update (handler);
@@ -56,9 +55,7 @@ JAWS_Pipeline_Accept_Task::handle_put (JAWS_Data_Block *data,
{
result = 0;
JAWS_TRACE ("JAWS_Pipeline_Accept_Task::handle_put ACCEPT_OK");
- // At this point need to move to the next task in the pipeline!
- // The framework should automatically call the next stage.
- data->task (task);
+ // Move on to next stage in pipeline
break;
}
case JAWS_IO_Handler::ACCEPT_ERROR:
diff --git a/apps/JAWS/PROTOTYPE/JAWS/Server.cpp b/apps/JAWS/PROTOTYPE/JAWS/Server.cpp
index 502b0258b7a..6cead554731 100644
--- a/apps/JAWS/PROTOTYPE/JAWS/Server.cpp
+++ b/apps/JAWS/PROTOTYPE/JAWS/Server.cpp
@@ -82,7 +82,13 @@ JAWS_Server::open (JAWS_Pipeline_Handler *protocol,
if (policy == 0)
policy = &this->policy_;
- JAWS_Data_Block db;
+ JAWS_Data_Block *db = new JAWS_Data_Block;
+ if (db == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) JAWS_Server::open, could not create Data_Block\n"));
+ return -1;
+ }
ACE_INET_Addr inet_addr (this->port_);
JAWS_IO_Synch_Acceptor_Singleton::instance ()->open (inet_addr);
@@ -90,21 +96,21 @@ JAWS_Server::open (JAWS_Pipeline_Handler *protocol,
// initialize data block
- db.task (JAWS_Pipeline_Accept_Task_Singleton::instance ());
- db.policy (policy);
+ db->task (JAWS_Pipeline_Accept_Task_Singleton::instance ());
+ db->policy (policy);
- db.task ()->next (protocol);
+ db->task ()->next (protocol);
// The message block should contain an INET_Addr, and call the
// io->accept (INET_Addr) method!
- ACE_Message_Block mb (&db);
-
- policy->concurrency ()->put (&mb);
+ policy->concurrency ()->put (db);
while (ACE_OS::thr_join (0, NULL) != -1)
;
+ db->release ();
+
return 0;
}