// $Id$ #define ACE_BUILD_SVC_DLL #include "ace/Get_Opt.h" #include "Token_Handler.h" ACE_RCSID(lib, Token_Handler, "$Id$") #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Acceptor; template class ACE_Accept_Strategy; template class ACE_Concurrency_Strategy; template class ACE_Creation_Strategy; template class ACE_Schedule_All_Reactive_Strategy; template class ACE_Scheduling_Strategy; template class ACE_Strategy_Acceptor; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Acceptor #pragma instantiate ACE_Accept_Strategy #pragma instantiate ACE_Concurrency_Strategy #pragma instantiate ACE_Creation_Strategy #pragma instantiate ACE_Schedule_All_Reactive_Strategy #pragma instantiate ACE_Scheduling_Strategy #pragma instantiate ACE_Strategy_Acceptor #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ int ACE_Token_Acceptor::parse_args (int argc, char *argv[]) { ACE_TRACE ("ACE_Token_Acceptor::parse_args"); this->service_port_ = ACE_DEFAULT_SERVER_PORT; ACE_LOG_MSG->open ("Token Service"); ACE_Get_Opt get_opt (argc, argv, "p:", 0); for (int c; (c = get_opt ()) != -1; ) { switch (c) { case 'p': this->service_port_ = ACE_OS::atoi (get_opt.optarg); break; default: ACE_ERROR_RETURN ((LM_ERROR, "%n:\n[-p server-port]\n%a", 1), -1); } } this->service_addr_.set (this->service_port_); return 0; } int ACE_Token_Acceptor::init (int argc, char *argv[]) { ACE_TRACE ("ACE_Token_Acceptor::init"); // Use the options hook to parse the command line arguments and set // options. this->parse_args (argc, argv); // Set the acceptor endpoint into listen mode (use the Singleton // global Reactor...). if (this->open (this->service_addr_, ACE_Reactor::instance (), 0, 0, 0, &this->scheduling_strategy_, "Token Server", "ACE token service") == -1) ACE_ERROR_RETURN ((LM_ERROR, "%n: %p on port %d\n", "acceptor::open failed", this->service_addr_.get_port_number ()), -1); // Ignore SIGPIPE so that each can handle this on its // own. ACE_Sig_Action sig ((ACE_SignalHandler) SIG_IGN, SIGPIPE); ACE_UNUSED_ARG (sig); ACE_INET_Addr server_addr; if (this->acceptor ().get_local_addr (server_addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "get_remote_addr"), -1); ACE_DEBUG ((LM_DEBUG, "starting up Token Server at port %d on handle %d\n", server_addr.get_port_number (), this->acceptor ().get_handle ())); return 0; } // The following is a "Factory" used by the ACE_Service_Config and // svc.conf file to dynamically initialize the state of the Naming // Server. ACE_SVC_FACTORY_DEFINE (ACE_Token_Acceptor) // Default constructor. ACE_Token_Handler::ACE_Token_Handler (ACE_Thread_Manager *tm) : ACE_Svc_Handler (tm), collection_ (1), timeout_id_ (0) { ACE_TRACE ("ACE_Token_Handler::ACE_Token_Handler"); } // Create and send a reply to the client. /* VIRTUAL */ int ACE_Token_Handler::send_reply (ACE_UINT32 err) { ACE_TRACE ("ACE_Token_Handler::send_reply"); void *buf; size_t len; ssize_t n; this->token_reply_.errnum (err); len = this->token_reply_.encode (buf); n = this->peer ().send (buf, len); if (n != (ssize_t) len) ACE_ERROR_RETURN ((LM_ERROR, "%p, expected len = %d, actual len = %d\n", "send failed", len, n), -1); else return 0; } // Acquire the token. /* VIRTUAL */ int ACE_Token_Handler::acquire (ACE_Token_Proxy *proxy) { ACE_TRACE ("ACE_Token_Handler::acquire"); ACE_DEBUG ((LM_DEBUG, "in acquire for client id = %s\n", proxy->client_id ())); // @@ add notify in token request reply if (proxy->acquire (0, 0, ACE_Synch_Options::asynch) == -1) { if (errno != EWOULDBLOCK) // bad bad bad return this->send_reply (errno); // acquire would block if (request_options_[ACE_Synch_Options::USE_TIMEOUT] == 1) { // check for polling if ((request_options_.timeout ().sec () == 0) && (request_options_.timeout ().usec () == 0)) return this->send_reply (EWOULDBLOCK); // schedule a timer this->timeout_id_ = this->reactor ()->schedule_timer (this, (void *) proxy, request_options_.timeout ()); if (timeout_id_ == -1) { ACE_ERROR ((LM_ERROR, "%p\n", "schedule_timer")); return this->send_reply (errno); } } // send no reply. wait until we acquire it or until the timer // goes off. return 0; } else // success return this->send_reply (0); } // Try to acquire the token. Never block. /* VIRTUAL */ int ACE_Token_Handler::try_acquire (ACE_Token_Proxy *proxy) { ACE_TRACE ("ACE_Token_Handler::try_acquire"); ACE_DEBUG ((LM_DEBUG, "in try_acquire for client id = %s\n", proxy->client_id ())); // @@ add notify in token request reply if (proxy->tryacquire () == -1) return this->send_reply (errno); else return this->send_reply (0); } // Release the token and allow the next client that is waiting to // proceed. /* VIRTUAL */ int ACE_Token_Handler::release (ACE_Token_Proxy *proxy) { ACE_TRACE ("ACE_Token_Handler::release"); ACE_DEBUG ((LM_DEBUG, "in release for client id = %s\n", proxy->client_id ())); if (proxy->release (ACE_Synch_Options::asynch) == -1) // oops, it failed return this->send_reply (ACE_LOG_MSG->errnum ()); // success if (this->timeout_id_ != 0) { this->reactor ()->cancel_timer (timeout_id_); this->timeout_id_ = 0; } return this->send_reply (0); } // Yield the token if any clients are waiting, otherwise keep the // token. /* VIRTUAL */ int ACE_Token_Handler::renew (ACE_Token_Proxy *proxy) { ACE_TRACE ("ACE_Token_Handler::renew"); ACE_DEBUG ((LM_DEBUG, "in renew for client id = %s\n", proxy->client_id ())); if (proxy->renew (token_request_.requeue_position (), ACE_Synch_Options::asynch) == -1) { int result = ACE_LOG_MSG->errnum (); if (result != EWOULDBLOCK) // bad bad bad return this->send_reply (result); // acquire would block if (request_options_[ACE_Synch_Options::USE_TIMEOUT] == 1) { this->timeout_id_ = this->reactor ()->schedule_timer (this, 0, request_options_.timeout ()); if (timeout_id_ == -1) { ACE_ERROR ((LM_ERROR, "%p\n", "schedule_timer")); return this->send_reply (ACE_LOG_MSG->errnum ()); } } // Send no reply. wait until we acquire it or until the timer // goes off. return 0; } else // Success, we still hold the token. return this->send_reply (0); } /* VIRTUAL */ int ACE_Token_Handler::remove (ACE_Token_Proxy *proxy) { ACE_TRACE ("ACE_Token_Handler::remove"); ACE_DEBUG ((LM_DEBUG, "in remove for client id = %s\n", proxy->client_id ())); ACE_ERROR ((LM_ERROR, "sorry: ACE_Token_Handler::remove() is not implemented")); return this->send_reply (ENOTSUP); } // Enable clients to limit the amount of time they'll wait for a // token. /* VIRTUAL */ int ACE_Token_Handler::handle_timeout (const ACE_Time_Value &, const void *tp) { ACE_TRACE ("ACE_Token_Handler::handle_timeout"); this->timeout_id_ = 0; // @@ add a try acquire here! // Try to acquire the token, but if we can't get it immediately // then abandon the wait. // if (this->try_acquire (&token_entry) == -1) // return this->abandon (token_entry); ACE_Token_Proxy *proxy = (ACE_Token_Proxy *) tp; ACE_DEBUG ((LM_DEBUG, "in handle_timeout for client id = %s\n", proxy->client_id ())); // Remove ourselves from the waiter list. proxy->release (); this->send_reply (ETIME); return 0; } // Dispatch the appropriate operation to handle the client request. ACE_Token_Proxy * ACE_Token_Handler::get_proxy (void) { ACE_TRACE ("ACE_Token_Handler::get_proxy"); // See if the proxy already exists in the collection. ACE_Token_Proxy *proxy = collection_.is_member (token_request_.token_name ()); // If not, create one. if (proxy == 0) { proxy = this->create_proxy (); // Put the new_proxy in this client_id's collection. if (collection_.insert (*proxy) == -1) ACE_ERROR_RETURN ((LM_ERROR, "insert failed\n"), 0); // Delete our copy (one was created in the collection). delete proxy; proxy = collection_.is_member (token_request_.token_name ()); if (proxy == 0) ACE_ERROR_RETURN ((LM_ERROR, "is_member failed\n"), 0); // Set the client_id (it was set to 1 since we're // single-threaded. proxy->client_id (token_request_.client_id ()); } return proxy; } ACE_Token_Proxy * ACE_Token_Handler::create_proxy (void) { ACE_TRACE ("ACE_Token_Handler::new_proxy"); ACE_Token_Proxy *proxy; switch (token_request_.token_type ()) { case ACE_Tokens::RWLOCK: if (token_request_.proxy_type () == ACE_RW_Token::READER) ACE_NEW_RETURN (proxy, ACE_TS_RLock (token_request_.token_name (), this), 0); else ACE_NEW_RETURN (proxy, ACE_TS_WLock (token_request_.token_name (), this), 0); break; case ACE_Tokens::MUTEX: ACE_NEW_RETURN (proxy, ACE_TS_Mutex (token_request_.token_name (), this), 0); break; default: // Nonexistent token type. errno = EINVAL; return 0; } // Check for failed new. if (proxy == 0) errno = ENOMEM; return proxy; } int ACE_Token_Handler::dispatch (void) { ACE_TRACE ("ACE_Token_Handler::dispatch"); ACE_Token_Proxy *proxy = this->get_proxy (); if (proxy == 0) return -1; // Dispatch the appropriate request. switch (this->token_request_.operation_type ()) { case ACE_Token_Request::ACQUIRE: return this->acquire (proxy); case ACE_Token_Request::TRY_ACQUIRE: return this->try_acquire (proxy); case ACE_Token_Request::RELEASE: return this->release (proxy); case ACE_Token_Request::RENEW: return this->renew (proxy); case ACE_Token_Request::REMOVE: return this->remove (proxy); default: ACE_ERROR_RETURN ((LM_ERROR, "invalid type = %d\n", this->token_request_.operation_type ()), -1); /* NOTREACHED */ } } // Receive, frame, and decode the client's request. // Note, this method should use non-blocking I/O. /* VIRTUAL */ int ACE_Token_Handler::recv_request (void) { ACE_TRACE ("ACE_Token_Handler::recv_request"); ssize_t n; // Read the first 4 bytes to get the length of the message // This implementation assumes that the first 4 bytes are // the length of the message. n = this->peer ().recv ((void *) &this->token_request_, sizeof (ACE_UINT32)); switch (n) { case -1: /* FALLTHROUGH */ default: ACE_ERROR ((LM_ERROR, "%p got %d bytes, expected %d bytes\n", "recv failed", n, sizeof (ACE_UINT32))); /* FALLTHROUGH */ case 0: // We've shutdown unexpectedly, let's abandon the connection. this->abandon (0); return -1; /* NOTREACHED */ case sizeof (ACE_UINT32): { // Transform the length into host byte order. ssize_t length = this->token_request_.length (); // Do a sanity check on the length of the message. if (length > (ssize_t) sizeof this->token_request_) { ACE_ERROR ((LM_ERROR, "length %d too long\n", length)); return this->abandon (1); } // Receive the rest of the request message. // @@ beware of blocking read!!!. n = this->peer ().recv ((void *) (((char *) &this->token_request_) + sizeof (ACE_UINT32)), length - sizeof (ACE_UINT32)); // Subtract off the size of the part we skipped over... if (n != (length - (ssize_t) sizeof (ACE_UINT32))) { ACE_ERROR ((LM_ERROR, "%p expected %d, got %d\n", "invalid length", length, n)); return this->abandon (1); } // Decode the request into host byte order. if (this->token_request_.decode () == -1) { ACE_ERROR ((LM_ERROR, "%p\n", "decode failed")); return this->abandon (1); } // if (OS::debug) this->token_request_.dump (); } } return 0; } // Callback method invoked by the ACE_Reactor when // events arrive from the client. /* VIRTUAL */ int ACE_Token_Handler::handle_input (ACE_HANDLE) { ACE_TRACE ("ACE_Token_Handler::handle_input"); ACE_DEBUG ((LM_DEBUG, "****************** in handle_input\n")); if (this->recv_request () == -1) return -1; else return this->dispatch (); } void ACE_Token_Handler::sleep_hook (void) { ACE_TRACE ("ACE_Token_Handler::sleep_hook"); // @@ what should we do? return; } void ACE_Token_Handler::token_acquired (ACE_TPQ_Entry *) { ACE_TRACE ("ACE_Token_Handler::token_acquired"); if (this->timeout_id_ != 0) { this->reactor ()->cancel_timer (this->timeout_id_); this->timeout_id_ = 0; } this->send_reply (0); } int ACE_Token_Handler::abandon (int send_error) { ACE_TRACE ("ACE_Token_Handler::abandon"); // Release ownership or remove us from the waiter list. if (this->timeout_id_ != 0) { this->reactor ()->cancel_timer (timeout_id_); this->timeout_id_ = 0; } // @@ release all tokens collection_.release (); if (send_error) return this->send_reply (EIO); else return -1; } // ************************************************************ // ************************************************************ // ************************************************************ ACE_TS_Mutex::ACE_TS_Mutex (const char *name, ACE_Token_Handler *th) : ACE_Local_Mutex (name, 0, 1), // The 1 is debug. th_ (th) { ACE_TRACE ("ACE_TS_Mutex::ACE_TS_Mutex"); } ACE_TS_Mutex::ACE_TS_Mutex (const ACE_TS_Mutex &m) : ACE_Local_Mutex (m), th_ (m.th_) { ACE_TRACE ("ACE_TS_Mutex::ACE_TS_Mutex"); this->open (m.name (), m.ignore_deadlock_, m.debug_); } void ACE_TS_Mutex::sleep_hook (void) { ACE_TRACE ("ACE_TS_Mutex::sleep_hook"); th_->sleep_hook (); return; } void ACE_TS_Mutex::token_acquired (ACE_TPQ_Entry *e) { ACE_TRACE ("ACE_TS_Mutex::token_acquired"); // Notify the token handler. th_->token_acquired (e); return; } ACE_Token_Proxy * ACE_TS_Mutex::clone (void) const { ACE_TRACE ("ACE_TS_Mutex::clone"); ACE_Token_Proxy *temp; ACE_NEW_RETURN (temp, ACE_TS_Mutex (*this), 0); return temp; } // ************************************************************ ACE_TS_RLock::ACE_TS_RLock (const char *name, ACE_Token_Handler *th) : ACE_Local_RLock (name, 0, 1), // The 1 is debug. th_ (th) { ACE_TRACE ("ACE_TS_RLock::ACE_TS_RLock"); } ACE_TS_RLock::ACE_TS_RLock (const ACE_TS_RLock &r) : ACE_Local_RLock (r), th_ (r.th_) { ACE_TRACE ("ACE_TS_RLock::ACE_TS_RLock"); this->open (r.name (), r.ignore_deadlock_, r.debug_); } void ACE_TS_RLock::sleep_hook (void) { ACE_TRACE ("ACE_TS_RLock::sleep_hook"); th_->sleep_hook (); return; } void ACE_TS_RLock::token_acquired (ACE_TPQ_Entry *e) { ACE_TRACE ("ACE_TS_RLock::token_acquired"); // Notify the token handler. th_->token_acquired (e); return; } ACE_Token_Proxy * ACE_TS_RLock::clone (void) const { ACE_TRACE ("ACE_TS_RLock::clone"); ACE_Token_Proxy *temp; ACE_NEW_RETURN (temp, ACE_TS_RLock (*this), 0); return temp; } // ************************************************************ ACE_TS_WLock::ACE_TS_WLock (const char *name, ACE_Token_Handler *th) : ACE_Local_WLock (name, 0, 1), // The 1 is debug. th_ (th) { ACE_TRACE ("ACE_TS_WLock::ACE_TS_WLock"); } ACE_TS_WLock::ACE_TS_WLock (const ACE_TS_WLock &w) : ACE_Local_WLock (w), th_ (w.th_) { ACE_TRACE ("ACE_TS_WLock::ACE_TS_WLock"); this->open (w.name (), w.ignore_deadlock_, w.debug_); } void ACE_TS_WLock::sleep_hook (void) { ACE_TRACE ("ACE_TS_WLock::sleep_hook"); th_->sleep_hook (); return; } void ACE_TS_WLock::token_acquired (ACE_TPQ_Entry *e) { ACE_TRACE ("ACE_TS_WLock::token_acquired"); // Notify the token handler. th_->token_acquired (e); return; } ACE_Token_Proxy * ACE_TS_WLock::clone (void) const { ACE_TRACE ("ACE_TS_WLock::clone"); ACE_Token_Proxy *temp; ACE_NEW_RETURN (temp, ACE_TS_WLock (*this), 0); return temp; }