diff options
author | jxh <jxh@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-05-28 05:20:11 +0000 |
---|---|---|
committer | jxh <jxh@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-05-28 05:20:11 +0000 |
commit | 5459ef677d39b0475edcf7317b53fe379358c273 (patch) | |
tree | ff2247407b20a66802297fb9195c763e7adbe01e /apps/JAWS/PROTOTYPE/JAWS | |
parent | a96447253b33e4bae683f06f09b2ad0b38496891 (diff) | |
download | ATCD-5459ef677d39b0475edcf7317b53fe379358c273.tar.gz |
Finally, a working framework!
Diffstat (limited to 'apps/JAWS/PROTOTYPE/JAWS')
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp | 42 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Data_Block.cpp | 25 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Data_Block.h | 18 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/IO.cpp | 28 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/IO.h | 32 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.h | 4 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp | 14 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h | 12 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/JAWS.h | 5 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.cpp | 7 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Server.cpp | 20 |
11 files changed, 138 insertions, 69 deletions
diff --git a/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp b/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp index 3d96fce9c56..acb6e9b7b18 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp +++ b/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp @@ -53,6 +53,8 @@ JAWS_Concurrency_Base::put (ACE_Message_Block *mb, ACE_Time_Value *tv) int JAWS_Concurrency_Base::svc (void) { + JAWS_TRACE ("JAWS_Concurrency_Base::svc"); + ACE_Message_Block *mb; // The message queue element JAWS_Data_Block *db; // Contains the task list @@ -60,10 +62,6 @@ JAWS_Concurrency_Base::svc (void) JAWS_IO_Handler *handler; // Keeps the state of the task JAWS_Pipeline_Handler *task; // The task itself - // Thread specific message block and data block - JAWS_Data_Block ts_db; - ACE_Message_Block ts_mb (&ts_db); - int result = 0; mb = this->singleton_mb (); @@ -74,17 +72,28 @@ JAWS_Concurrency_Base::svc (void) return -1; } + db = ACE_dynamic_cast (JAWS_Data_Block *, mb); + if (db == 0) + { + JAWS_TRACE ("JAWS_Concurrency_Base::svc, empty data block"); + return -1; + } + + // Thread specific message block and data block + JAWS_Data_Block *ts_db = new JAWS_Data_Block; + if (ts_db == 0) + { + ACE_ERROR ((LM_ERROR, "%p\n", "JAWS_Concurrency_Base::svc")); + return -1; + } + + ts_db->task (db->task ()); + ts_db->policy (db->policy ()); + for (;;) { // A NULL data block indicates that the thread should shut // itself down - db = ACE_dynamic_cast (JAWS_Data_Block *, mb->data_block ()); - if (db == 0) - { - JAWS_TRACE ("JAWS_Concurrency_Base::svc, empty data block"); - break; - } - policy = db->policy (); // Each time we iterate, we create a handler to maintain @@ -98,13 +107,13 @@ JAWS_Concurrency_Base::svc (void) // Set the initial task in the handler handler->task (db->task ()); + ts_db->io_handler (handler); - ts_db.task (db->task ()); - ts_db.policy (db->policy ()); - ts_db.io_handler (handler); do { + JAWS_TRACE ("JAWS_Concurrency_Base::svc, looping"); + // handler maintains the state of the protocol task = handler->task (); @@ -113,7 +122,7 @@ JAWS_Concurrency_Base::svc (void) break; // the task should set the handler to the appropriate next step - result = task->put (&ts_mb); + result = task->put (ts_db); if (result == 1) { @@ -138,11 +147,12 @@ JAWS_Concurrency_Base::svc (void) } while (result == 0); + result = 0; policy->ioh_factory ()->destroy_io_handler (handler); } - JAWS_TRACE ("JAWS_Concurrency_Base::svc, shutting down"); + ts_db->release (); return 0; } diff --git a/apps/JAWS/PROTOTYPE/JAWS/Data_Block.cpp b/apps/JAWS/PROTOTYPE/JAWS/Data_Block.cpp index 47c2c7e3316..17d20bb6b04 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/Data_Block.cpp +++ b/apps/JAWS/PROTOTYPE/JAWS/Data_Block.cpp @@ -3,6 +3,19 @@ #include "JAWS/Data_Block.h" #include "JAWS/Policy.h" +JAWS_Data_Block::JAWS_Data_Block (void) + : ACE_Message_Block (JAWS_DATA_BLOCK_SIZE), + io_handler_ (0), + policy_ (0), + task_ (0), + payload_ (0) +{ +} + +JAWS_Data_Block::~JAWS_Data_Block (void) +{ +} + JAWS_Pipeline_Handler * JAWS_Data_Block::task (void) { @@ -21,6 +34,12 @@ JAWS_Data_Block::policy (void) return this->policy_; } +void * +JAWS_Data_Block::payload (void) +{ + return this->payload_; +} + void JAWS_Data_Block::task (JAWS_Pipeline_Handler *taskp) { @@ -38,3 +57,9 @@ JAWS_Data_Block::policy (JAWS_Dispatch_Policy *policyp) { this->policy_ = policyp; } + +void +JAWS_Data_Block::payload (void *payloadp) +{ + this->payload_ = payloadp; +} diff --git a/apps/JAWS/PROTOTYPE/JAWS/Data_Block.h b/apps/JAWS/PROTOTYPE/JAWS/Data_Block.h index e1641f88751..6414655c893 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/Data_Block.h +++ b/apps/JAWS/PROTOTYPE/JAWS/Data_Block.h @@ -4,7 +4,8 @@ #if !defined (JAWS_DATA_BLOCK_H) #define JAWS_DATA_BLOCK_H -#include "ace/Singleton.h" +#include "ace/Message_Block.h" + #include "JAWS/Pipeline.h" class JAWS_IO_Handler; @@ -12,23 +13,32 @@ class JAWS_Dispatch_Policy; class JAWS_Data_Block; class JAWS_Pipeline_Handler; -class JAWS_Data_Block : public ACE_Data_Block +class JAWS_Data_Block : public ACE_Message_Block // = TITLE // Defines the communication unit between pipeline components { public: + JAWS_Data_Block (void); + ~JAWS_Data_Block (void); + JAWS_Pipeline_Handler *task (void); JAWS_IO_Handler *io_handler (void); JAWS_Dispatch_Policy *policy (void); + void *payload (void); void task (JAWS_Pipeline_Handler *taskp); - void io_handler (JAWS_IO_Handler * handlerp); - void policy (JAWS_Dispatch_Policy * policyp); + void io_handler (JAWS_IO_Handler *handlerp); + void policy (JAWS_Dispatch_Policy *policyp); + void payload (void *payloadp); + + enum { JAWS_DATA_BLOCK_SIZE = 8192 }; private: JAWS_IO_Handler *io_handler_; JAWS_Dispatch_Policy *policy_; JAWS_Pipeline_Handler *task_; + + void *payload_; }; #endif /* !defined (JAWS_DATA_BLOCK_H) */ diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO.cpp b/apps/JAWS/PROTOTYPE/JAWS/IO.cpp index b499e459448..a64b51d1c7e 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/IO.cpp +++ b/apps/JAWS/PROTOTYPE/JAWS/IO.cpp @@ -1,5 +1,7 @@ // $Id$ +#include "JAWS/JAWS.h" + #include "JAWS/IO.h" #include "JAWS/IO_Handler.h" #include "JAWS/IO_Acceptor.h" @@ -62,6 +64,7 @@ void JAWS_Synch_IO::accept (JAWS_IO_Handler *ioh) { ACE_SOCK_Stream new_stream; + new_stream.set_handle (ACE_INVALID_HANDLE); if (this->acceptor_->accept (new_stream) == -1) ioh->accept_error (); else @@ -70,19 +73,22 @@ JAWS_Synch_IO::accept (JAWS_IO_Handler *ioh) void JAWS_Synch_IO::read (JAWS_IO_Handler *ioh, - ACE_Message_Block &mb, - int size) + ACE_Message_Block *mb, + unsigned int size) { + JAWS_TRACE ("JAWS_Synch_IO::read"); + ACE_SOCK_Stream stream; stream.set_handle (ioh->handle ()); - int result = stream.recv (mb.wr_ptr (), size); + int result = stream.recv (mb->wr_ptr (), size); if (result <= 0) ioh->read_error (); else { - mb.wr_ptr (result); + JAWS_TRACE ("JAWS_Synch_IO::read success"); + mb->wr_ptr (result); ioh->read_complete (mb); } } @@ -91,8 +97,8 @@ void JAWS_Synch_IO::receive_file (JAWS_IO_Handler *ioh, const char *filename, void *initial_data, - int initial_data_length, - int entire_length) + unsigned int initial_data_length, + unsigned int entire_length) { ACE_Filecache_Handle handle (filename, entire_length); @@ -125,9 +131,9 @@ void JAWS_Synch_IO::transmit_file (JAWS_IO_Handler *ioh, const char *filename, const char *header, - int header_size, + unsigned int header_size, const char *trailer, - int trailer_size) + unsigned int trailer_size) { ACE_Filecache_Handle handle (filename); @@ -183,7 +189,7 @@ JAWS_Synch_IO::transmit_file (JAWS_IO_Handler *ioh, void JAWS_Synch_IO::send_confirmation_message (JAWS_IO_Handler *ioh, const char *buffer, - int length) + unsigned int length) { this->send_message (ioh, buffer, length); ioh->confirmation_message_complete (); @@ -192,7 +198,7 @@ JAWS_Synch_IO::send_confirmation_message (JAWS_IO_Handler *ioh, void JAWS_Synch_IO::send_error_message (JAWS_IO_Handler *ioh, const char *buffer, - int length) + unsigned int length) { this->send_message (ioh, buffer, length); ioh->error_message_complete (); @@ -201,7 +207,7 @@ JAWS_Synch_IO::send_error_message (JAWS_IO_Handler *ioh, void JAWS_Synch_IO::send_message (JAWS_IO_Handler *ioh, const char *buffer, - int length) + unsigned int length) { ACE_SOCK_Stream stream; stream.set_handle (ioh->handle ()); diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO.h b/apps/JAWS/PROTOTYPE/JAWS/IO.h index 46dc24ff8c7..e7df09886fb 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/IO.h +++ b/apps/JAWS/PROTOTYPE/JAWS/IO.h @@ -54,33 +54,33 @@ public: // accept a passive connection virtual void read (JAWS_IO_Handler *ioh, - ACE_Message_Block& mb, - int size) = 0; + ACE_Message_Block *mb, + unsigned int size) = 0; // read from the handle size bytes into the message block. virtual void transmit_file (JAWS_IO_Handler *ioh, const char *filename, const char *header, - int header_size, + unsigned int header_size, const char *trailer, - int trailer_size) = 0; + unsigned int trailer_size) = 0; // send header, filename, trailer to the handle. virtual void receive_file (JAWS_IO_Handler *ioh, const char *filename, void *initial_data, - int initial_data_length, - int entire_length) = 0; + unsigned int initial_data_length, + unsigned int entire_length) = 0; // read data from the handle and store in filename. virtual void send_confirmation_message (JAWS_IO_Handler *ioh, const char *buffer, - int length) = 0; + unsigned int length) = 0; // send a confirmation message to the handle. virtual void send_error_message (JAWS_IO_Handler *ioh, const char *buffer, - int length) = 0; + unsigned int length) = 0; // send an error message to the handle. protected: @@ -104,33 +104,33 @@ public: virtual void accept (JAWS_IO_Handler *ioh); - void read (JAWS_IO_Handler *ioh, ACE_Message_Block& mb, int size); + void read (JAWS_IO_Handler *ioh, ACE_Message_Block *mb, unsigned int size); void transmit_file (JAWS_IO_Handler *ioh, const char *filename, const char *header, - int header_size, + unsigned int header_size, const char *trailer, - int trailer_size); + unsigned int trailer_size); void receive_file (JAWS_IO_Handler *ioh, const char *filename, void *initial_data, - int initial_data_length, - int entire_length); + unsigned int initial_data_length, + unsigned int entire_length); void send_confirmation_message (JAWS_IO_Handler *ioh, const char *buffer, - int length); + unsigned int length); void send_error_message (JAWS_IO_Handler *ioh, const char *buffer, - int length); + unsigned int length); protected: virtual void send_message (JAWS_IO_Handler *ioh, const char *buffer, - int length); + unsigned int length); }; typedef ACE_Singleton<JAWS_Synch_IO, ACE_SYNCH_MUTEX> diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.h b/apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.h index 96bcdd9e37e..0fe82cb420c 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.h +++ b/apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.h @@ -17,11 +17,11 @@ class ACE_Proactor; class ACE_Reactor; -#if defined (ACE_HAS_THREAD_SAFE_ACCEPT) +#if defined (JAWS_HAS_THREAD_SAFE_ACCEPT) typedef ACE_LOCK_SOCK_Acceptor<ACE_SYNCH_NULL_MUTEX> JAWS_IO_SOCK_Acceptor; #else typedef ACE_LOCK_SOCK_Acceptor<ACE_SYNCH_MUTEX> JAWS_IO_SOCK_Acceptor; -#endif /* ACE_HAS_THREAD_SAFE_ACCEPT */ +#endif /* JAWS_HAS_THREAD_SAFE_ACCEPT */ class JAWS_IO_Acceptor { diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp b/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp index e72666e456e..7b54eba7ef3 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp +++ b/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp @@ -1,9 +1,15 @@ // $Id$ +#include "JAWS/JAWS.h" + #include "JAWS/IO.h" #include "JAWS/IO_Handler.h" #include "JAWS/Data_Block.h" +JAWS_IO_Handler::~JAWS_IO_Handler (void) +{ +} + JAWS_IO_Handler_Factory::~JAWS_IO_Handler_Factory (void) { } @@ -23,6 +29,9 @@ JAWS_Synch_IO_Handler::~JAWS_Synch_IO_Handler (void) if (this->mb_) this->mb_->release (); this->mb_ = 0; + ACE_OS::close (this->handle_); + this->handle_ = ACE_INVALID_HANDLE; + this->status_ = 0; } void @@ -41,7 +50,7 @@ JAWS_Synch_IO_Handler::accept_error (void) } void -JAWS_Synch_IO_Handler::read_complete (ACE_Message_Block &data) +JAWS_Synch_IO_Handler::read_complete (ACE_Message_Block *data) { ACE_UNUSED_ARG (data); // We can call back into the pipeline task at this point @@ -144,6 +153,8 @@ JAWS_Synch_IO_Handler::status (void) JAWS_IO_Handler * JAWS_Synch_IO_Handler_Factory::create_io_handler (void) { + JAWS_TRACE ("JAWS_Synch_IO_Handler_Factory::create"); + JAWS_Synch_IO *io; JAWS_Synch_IO_Handler *handler; @@ -159,6 +170,7 @@ JAWS_Synch_IO_Handler_Factory::create_io_handler (void) void JAWS_Synch_IO_Handler_Factory::destroy_io_handler (JAWS_IO_Handler *handler) { + JAWS_TRACE ("JAWS_Synch_IO_Handler_Factory::destroy"); delete handler; } diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h b/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h index b9689fc0f13..386e3854f9d 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h +++ b/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h @@ -43,6 +43,8 @@ class JAWS_IO_Handler // = DESCRIPTION { public: + virtual ~JAWS_IO_Handler (void); + virtual void task (JAWS_Pipeline_Handler *ph) = 0; virtual JAWS_Pipeline_Handler *task (void) = 0; @@ -55,16 +57,16 @@ public: // been established. #if 0 - virtual void connect_complete (ACE_Message_Block &) = 0; + virtual void connect_complete (ACE_Message_Block *) = 0; // This method is called by the IO class when new active connection has // been established. - virtual void connect_error (ACE_Message_Block &) = 0; + virtual void connect_error (ACE_Message_Block *) = 0; // This method is called by the IO class when new active connection has // been established. #endif - virtual void read_complete (ACE_Message_Block &data) = 0; + virtual void read_complete (ACE_Message_Block *data) = 0; // This method is called by the IO class when new client data shows // up. @@ -148,7 +150,7 @@ protected: virtual void accept_complete (ACE_HANDLE handle); virtual void accept_error (void); - virtual void read_complete (ACE_Message_Block &data); + virtual void read_complete (ACE_Message_Block *data); virtual void read_error (void); virtual void transmit_file_complete (void); virtual void transmit_file_error (int result); @@ -212,7 +214,7 @@ protected: virtual void accept_complete (void); virtual void accept_error (void); - virtual void read_complete (ACE_Message_Block &data); + virtual void read_complete (ACE_Message_Block *data); virtual void read_error (void); virtual void transmit_file_complete (void); virtual void transmit_file_error (int result); diff --git a/apps/JAWS/PROTOTYPE/JAWS/JAWS.h b/apps/JAWS/PROTOTYPE/JAWS/JAWS.h index bd42e62a017..eedd97847ff 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/JAWS.h +++ b/apps/JAWS/PROTOTYPE/JAWS/JAWS.h @@ -3,6 +3,7 @@ #if (JAWS_NTRACE == 1) # define JAWS_TRACE(X) #else -# define JAWS_TRACE(X) ACE_Trace ____ \ - (ASYS_TEXT (X), __LINE__, ASYS_TEXT (__FILE__)) +# define JAWS_TRACE(X) ACE_Trace ____ (ASYS_TEXT (X), \ + __LINE__, \ + ASYS_TEXT (__FILE__)) #endif /* JAWS_NTRACE */ diff --git a/apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.cpp b/apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.cpp index d1e20c697b0..692f412b082 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.cpp +++ b/apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.cpp @@ -14,7 +14,7 @@ JAWS_Pipeline_Handler::~JAWS_Pipeline_Handler (void) int JAWS_Pipeline_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *tv) { - JAWS_Data_Block *db = ACE_dynamic_cast (JAWS_Data_Block *, mb->data_block ()); + JAWS_Data_Block *db = ACE_dynamic_cast (JAWS_Data_Block *, mb); int status = this->handle_put (db, tv); @@ -39,7 +39,6 @@ JAWS_Pipeline_Accept_Task::handle_put (JAWS_Data_Block *data, // JAWS_Data_Block should contain an INET_Addr and an IO JAWS_IO_Handler *handler = data->io_handler (); JAWS_Dispatch_Policy *policy = data->policy (); - JAWS_Pipeline_Handler *task = handler->task (); // data->policy ()->update (handler); @@ -56,9 +55,7 @@ JAWS_Pipeline_Accept_Task::handle_put (JAWS_Data_Block *data, { result = 0; JAWS_TRACE ("JAWS_Pipeline_Accept_Task::handle_put ACCEPT_OK"); - // At this point need to move to the next task in the pipeline! - // The framework should automatically call the next stage. - data->task (task); + // Move on to next stage in pipeline break; } case JAWS_IO_Handler::ACCEPT_ERROR: diff --git a/apps/JAWS/PROTOTYPE/JAWS/Server.cpp b/apps/JAWS/PROTOTYPE/JAWS/Server.cpp index 502b0258b7a..6cead554731 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/Server.cpp +++ b/apps/JAWS/PROTOTYPE/JAWS/Server.cpp @@ -82,7 +82,13 @@ JAWS_Server::open (JAWS_Pipeline_Handler *protocol, if (policy == 0) policy = &this->policy_; - JAWS_Data_Block db; + JAWS_Data_Block *db = new JAWS_Data_Block; + if (db == 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) JAWS_Server::open, could not create Data_Block\n")); + return -1; + } ACE_INET_Addr inet_addr (this->port_); JAWS_IO_Synch_Acceptor_Singleton::instance ()->open (inet_addr); @@ -90,21 +96,21 @@ JAWS_Server::open (JAWS_Pipeline_Handler *protocol, // initialize data block - db.task (JAWS_Pipeline_Accept_Task_Singleton::instance ()); - db.policy (policy); + db->task (JAWS_Pipeline_Accept_Task_Singleton::instance ()); + db->policy (policy); - db.task ()->next (protocol); + db->task ()->next (protocol); // The message block should contain an INET_Addr, and call the // io->accept (INET_Addr) method! - ACE_Message_Block mb (&db); - - policy->concurrency ()->put (&mb); + policy->concurrency ()->put (db); while (ACE_OS::thr_join (0, NULL) != -1) ; + db->release (); + return 0; } |