Generic definitions for the Transport class. More...
#include <Transport.h>
Classes | |
struct | Drain_Result |
Public Types | |
enum | Drain_Result_Enum { DR_ERROR = -1, DR_OK = 0, DR_QUEUE_EMPTY = 1, DR_WOULDBLOCK = 2 } |
Public Member Functions | |
TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core, size_t input_cdr_size=ACE_CDR::DEFAULT_BUFSIZE) | |
Default creator, requires the tag value be supplied. | |
virtual | ~TAO_Transport (void) |
Destructor. | |
CORBA::ULong | tag (void) const |
Return the protocol tag. | |
TAO_ORB_Core * | orb_core (void) const |
Access the ORB that owns this connection. | |
TAO_Transport_Mux_Strategy * | tms (void) const |
Get the TAO_Tranport_Mux_Strategy used by this object. | |
TAO_Wait_Strategy * | wait_strategy (void) const |
Return the TAO_Wait_Strategy used by this object. | |
Drain_Result | handle_output (TAO::Transport::Drain_Constraints const &c) |
Callback method to reactively drain the outgoing data queue. | |
int | bidirectional_flag (void) const |
Get the bidirectional flag. | |
void | bidirectional_flag (int flag) |
Set the bidirectional flag. | |
void | cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry) |
Set the Cache Map entry. | |
TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | cache_map_entry (void) |
Get the Cache Map entry. | |
size_t | id (void) const |
Set and Get the identifier for this transport instance. | |
void | id (size_t id) |
TAO::Connection_Role | opened_as (void) const |
void | opened_as (TAO::Connection_Role) |
unsigned long | purging_order (void) const |
void | purging_order (unsigned long value) |
bool | queue_is_empty (void) |
Check if there are messages pending in the queue. | |
bool | register_if_necessary (void) |
Register with the reactor via the wait strategy. | |
void | provide_handler (TAO::Connection_Handler_Set &handlers) |
Added event handler to the handlers set. | |
bool | provide_blockable_handler (TAO::Connection_Handler_Set &handlers) |
virtual int | register_handler (void) |
Register the handler with the reactor. | |
virtual int | remove_handler (void) |
Remove the handler from the reactor. | |
virtual ssize_t | send (iovec *iov, int iovcnt, size_t &bytes_transferred, ACE_Time_Value const *timeout)=0 |
Write the complete Message_Block chain to the connection. | |
virtual ssize_t | recv (char *buffer, size_t len, const ACE_Time_Value *timeout=0)=0 |
Read len bytes from into buf. | |
Control connection lifecycle | |
bool | idle_after_send (void) |
bool | idle_after_reply (void) |
virtual void | close_connection (void) |
Call the implementation method after obtaining the lock. | |
Template methods | |
The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below. | |
class | TAO_Reactive_Flushing_Strategy |
class | TAO_Leader_Follower_Flushing_Strategy |
class | TAO_Thread_Per_Connection_Handler |
CORBA::ULong const | tag_ |
IOP protocol tag. | |
TAO_ORB_Core *const | orb_core_ |
Global orbcore resource. | |
TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | cache_map_entry_ |
TAO_Transport_Mux_Strategy * | tms_ |
TAO_Wait_Strategy * | ws_ |
Strategy for waiting for the reply after sending the request. | |
int | bidirectional_flag_ |
TAO::Connection_Role | opening_connection_role_ |
TAO_Queued_Message * | head_ |
Implement the outgoing data queue. | |
TAO_Queued_Message * | tail_ |
TAO_Incoming_Message_Queue | incoming_message_queue_ |
Queue of the consolidated, incoming messages.. | |
TAO::Incoming_Message_Stack | incoming_message_stack_ |
ACE_Time_Value | current_deadline_ |
long | flush_timer_id_ |
The timer ID. | |
TAO_Transport_Timer | transport_timer_ |
The adapter used to receive timeout callbacks from the Reactor. | |
ACE_Lock * | handler_lock_ |
size_t | id_ |
A unique identifier for the transport. | |
unsigned long | purging_order_ |
Used by the LRU, LFU and FIFO Connection Purging Strategies. | |
size_t | recv_buffer_size_ |
Size of the buffer received. | |
size_t | sent_byte_count_ |
Number of bytes sent. | |
bool | is_connected_ |
bool | connection_closed_on_read_ |
TAO_GIOP_Message_Base * | messaging_object_ |
Our messaging object. | |
TAO_Codeset_Translator_Base * | char_translator_ |
Additional member values required to support codeset translation. | |
TAO_Codeset_Translator_Base * | wchar_translator_ |
CORBA::Boolean | tcs_set_ |
bool | first_request_ |
ACE_Message_Block * | partial_message_ |
Holds the partial GIOP message (if there is one). | |
TAO::Transport::Stats * | stats_ |
Statistics. | |
bool | flush_in_post_open_ |
Indicate that flushing needs to be done in post_open(). | |
TAO_SYNCH_MUTEX | output_cdr_mutex_ |
lock for synchronizing Transport OutputCDR access | |
void | messaging_init (TAO_GIOP_Message_Version const &version) |
virtual int | tear_listen_point_list (TAO_InputCDR &cdr) |
virtual bool | post_connect_hook (void) |
Hooks that can be overridden in concrete transports. | |
ACE_Event_Handler::Reference_Count | add_reference (void) |
Memory management routines. | |
ACE_Event_Handler::Reference_Count | remove_reference (void) |
TAO_GIOP_Message_Base * | messaging_object (void) |
virtual ACE_Event_Handler * | event_handler_i (void)=0 |
bool | is_connected (void) const |
Is this transport really connected. | |
bool | connection_closed_on_read (void) const |
Was a connection seen as closed during a read. | |
bool | post_open (size_t id) |
Perform all the actions when this transport get opened. | |
void | pre_close (void) |
do what needs to be done when closing the transport | |
TAO_Connection_Handler * | connection_handler (void) |
Get the connection handler for this transport. | |
TAO_OutputCDR & | out_stream (void) |
Accessor for the output CDR stream. | |
TAO_SYNCH_MUTEX & | output_cdr_lock (void) |
Accessor for synchronizing Transport OutputCDR access. | |
bool | can_be_purged (void) |
Can the transport be purged? | |
virtual void | set_bidir_context_info (TAO_Operation_Details &opdetails) |
int | generate_locate_request (TAO_Target_Specification &spec, TAO_Operation_Details &opdetails, TAO_OutputCDR &output) |
virtual int | generate_request_header (TAO_Operation_Details &opd, TAO_Target_Specification &spec, TAO_OutputCDR &msg) |
int | recache_transport (TAO_Transport_Descriptor_Interface *desc) |
Recache ourselves in the cache. | |
virtual int | handle_input (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time=0) |
Callback to read incoming data. | |
virtual int | send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, TAO_Message_Semantics message_semantics, ACE_Time_Value *max_time_wait)=0 |
virtual int | send_message (TAO_OutputCDR &stream, TAO_Stub *stub=0, TAO_ServerRequest *request=0, TAO_Message_Semantics message_semantics=TAO_Message_Semantics(), ACE_Time_Value *max_time_wait=0)=0 |
virtual int | send_message_shared (TAO_Stub *stub, TAO_Message_Semantics message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
Sent the contents of message_block. | |
int | format_queue_message (TAO_OutputCDR &stream, ACE_Time_Value *max_wait_time, TAO_Stub *stub) |
int | send_message_block_chain (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time=0) |
int | send_message_block_chain_i (const ACE_Message_Block *message_block, size_t &bytes_transferred, TAO::Transport::Drain_Constraints const &dc) |
Send a message block chain, assuming the lock is held. | |
int | purge_entry (void) |
Cache management. | |
int | make_idle (void) |
Cache management. | |
int | update_transport (void) |
Cache management. | |
int | handle_timeout (const ACE_Time_Value ¤t_time, const void *act) |
size_t | recv_buffer_size (void) const |
Accessor to recv_buffer_size_. | |
size_t | sent_byte_count (void) const |
Accessor to sent_byte_count_. | |
TAO_Codeset_Translator_Base * | char_translator (void) const |
CodeSet Negotiation - Get the char codeset translator factory. | |
TAO_Codeset_Translator_Base * | wchar_translator (void) const |
CodeSet Negotiation - Get the wchar codeset translator factory. | |
void | char_translator (TAO_Codeset_Translator_Base *) |
CodeSet negotiation - Set the char codeset translator factory. | |
void | wchar_translator (TAO_Codeset_Translator_Base *) |
CodeSet negotiation - Set the wchar codeset translator factory. | |
void | assign_translators (TAO_InputCDR *, TAO_OutputCDR *) |
void | clear_translators (TAO_InputCDR *, TAO_OutputCDR *) |
CORBA::Boolean | is_tcs_set () const |
Return true if the tcs has been set. | |
void | first_request_sent (bool flag=false) |
Set the state of the first_request_ to flag. | |
bool | first_request () const |
Get the first request flag. | |
void | send_connection_closed_notifications (void) |
TAO::Transport::Stats * | stats (void) const |
Transport statistics. | |
virtual TAO_Connection_Handler * | connection_handler_i (void)=0 |
int | process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) |
int | send_message_shared_i (TAO_Stub *stub, TAO_Message_Semantics message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | queue_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time, bool back=true) |
ACE_Time_Value const * | io_timeout (TAO::Transport::Drain_Constraints const &dc) const |
Re-factor computation of I/O timeouts based on operation timeouts. Depending on the wait strategy, we need to timeout I/O operations or not. For example, if we are using a non-blocking strategy, we want to pass 0 to all I/O operations, and rely on the ACE_NONBLOCK settings on the underlying sockets. However, for blocking strategies we want to pass the operation timeouts, to respect the application level policies. | |
int | notify_reactor_now (void) |
TAO::Transport_Cache_Manager & | transport_cache_manager (void) |
Helper method that returns the Transport Cache Manager. | |
Drain_Result | drain_queue (TAO::Transport::Drain_Constraints const &dc) |
Send some of the data in the queue. | |
Drain_Result | drain_queue_i (TAO::Transport::Drain_Constraints const &dc) |
Implement drain_queue() assuming the lock is held. | |
bool | queue_is_empty_i (void) const |
Check if there are messages pending in the queue. | |
Drain_Result | drain_queue_helper (int &iovcnt, iovec iov[], TAO::Transport::Drain_Constraints const &dc) |
A helper routine used in drain_queue_i(). | |
int | schedule_output_i (void) |
Schedule handle_output() callbacks. | |
int | cancel_output_i (void) |
Cancel handle_output() callbacks. | |
void | cleanup_queue (size_t byte_count) |
Cleanup the queue. | |
void | cleanup_queue_i () |
Cleanup the complete queue. | |
bool | check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush) |
Check if the buffering constraints have been reached. | |
int | send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | send_reply_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | send_asynchronous_message_i (TAO_Stub *stub, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
int | send_synch_message_helper_i (TAO_Synch_Queued_Message &s, ACE_Time_Value *max_wait_time) |
int | flush_timer_pending (void) const |
Check if the flush timer is still pending. | |
void | reset_flush_timer (void) |
void | report_invalid_event_handler (const char *caller) |
Print out error messages if the event handler is not valid. | |
int | handle_input_missing_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time, TAO_Queued_Data *q_data) |
int | handle_input_parse_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time) |
int | handle_input_parse_extra_messages (ACE_Message_Block &message_block) |
int | consolidate_enqueue_message (TAO_Queued_Data *qd) |
int | consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) |
int | process_queue_head (TAO_Resume_Handle &rh) |
int | notify_reactor (void) |
void | send_connection_closed_notifications_i (void) |
Assume the lock is held. | |
void | allocate_partial_message_block (void) |
bool | using_blocking_io_for_synch_messages () const |
bool | using_blocking_io_for_asynch_messages () const |
Generic definitions for the Transport class.
The transport object is created in the Service handler constructor and deleted in the Service Handler's destructor!!
The main responsibility of a Transport object is to encapsulate a connection, and provide a transport independent way to send and receive data. Since TAO is heavily based on the Reactor for all if not all its I/O the Transport class is usually implemented with a helper Connection Handler that adapts the generic Transport interface to the Reactor types.
One of the responsibilities of the TAO_Transport class is to send out GIOP messages as efficiently as possible. In most cases messages are put out in FIFO order, the transport object will put out the message using a single system call and return control to the application. However, for oneways and AMI requests it may be more efficient (or required if the SYNC_NONE policy is in effect) to queue the messages until a large enough data set is available. Another reason to queue is that some applications cannot block for I/O, yet they want to send messages so large that a single write() operation would not be able to cope with them. In such cases we need to queue the data and use the Reactor to drain the queue.
Therefore, the Transport class may need to use a queue to temporarily hold the messages, and, in some configurations, it may need to use the Reactor to concurrently drain such queues.
TAO provides explicit policies to send 'urgent' messages. Such messages may put at the head of the queue. However, they cannot be sent immediately because the transport may already be sending another message in a reactive fashion.
Consequently, the Transport must also know if the head of the queue has been partially sent. In that case new messages can only follow the head. Only once the head is completely sent we can start sending new messages.
One or more threads can be blocked waiting for the connection to completely send the message. The thread should return as soon as its message has been sent, so a per-thread condition is required. This suggest that simply using a ACE_Message_Queue would not be enough: there is a significant amount of ancillary information, to keep on each message that the Message_Block class does not provide room for.
Blocking I/O is still attractive for some applications. First, my eliminating the Reactor overhead performance is improved when sending large blocks of data. Second, using the Reactor to send out data opens the door for nested upcalls, yet some applications cannot deal with the reentrancy issues in this case.
Some or all messages could have a timeout period attached to them. The timeout source could either be some high-level policy or maybe some strategy to prevent denial of service attacks. In any case the timeouts are per-message, and later messages could have shorter timeouts. In fact, some kind of scheduling (such as EDF) could be required in a few applications.
The outgoing data path consist in several components:
The Transport object provides a single method to send request messages (send_request_message ()).
One of the main responsibilities of the transport is to read and process the incoming GIOP message as quickly and efficiently as possible. There are other forces that needs to be given due consideration. They are
The messages should be checked for validity and the right information should be sent to the higher layer for processing. The process of doing a sanity check and preparing the messages for the higher layers of the ORB are done by the messaging protocol.
To keep things as efficient as possible for medium sized requests, it would be good to minimize data copying and locking along the incoming path ie. from the time of reading the data from the handle to the application. We achieve this by creating a buffer on stack and reading the data from the handle into the buffer. We then pass the same data block (the buffer is encapsulated into a data block) to the higher layers of the ORB. The problems stem from the following (a) Data is bigger than the buffer that we have on stack (b) Transports like TCP do not guarantee availability of the whole chunk of data in one shot. Data could trickle in byte by byte. (c) Single read gives multiple messages
We solve the problems as follows
(a) First do a read with the buffer on stack. Query the underlying messaging object whether the message has any incomplete portion. If so, data will be copied into new buffer being able to hold full message and is queued; succeeding events will read data from socket and write directly into this buffer. Otherwise, if if the message in local buffer is complete, we free the handle and then send the message to the higher layers of the ORB for processing.
(b) If buffer with incomplete message has been enqueued, while trying to do the above, the reactor will call us back when the handle becomes read ready. The read-operation will copy data directly into the enqueued buffer. If the message has bee read completely the message is sent to the higher layers of the ORB for processing.
(c) If we get multiple messages (possible if the client connected to the server sends oneways or AMI requests), we parse and split the messages. Every message is put in the queue. Once the messages are queued, the thread picks up one message to send to the higher layers of the ORB. Before doing that, if it finds more messages, it sends a notify to the reactor without resuming the handle. The next thread picks up a message from the queue and processes that. Once the queue is drained the last thread resumes the handle.
We could use the outgoing path of the ORB to send replies. This would allow us to reuse most of the code in the outgoing data path. We were doing this till TAO-1.2.3. We run in to problems. When writing the reply the ORB gets flow controlled, and the ORB tries to flush the message by going into the reactor. This resulted in unnecessary nesting. The thread that gets into the Reactor could potentially handle other messages (incoming or outgoing) and the stack starts growing leading to crashes.
The solution that we (plan to) adopt is pretty straight forward. The thread sending replies will not block to send the replies but queue the replies and return to the Reactor. (Note the careful usages of the terms "blocking in the Reactor" as opposed to "return back to the Reactor".
See Also:
Definition at line 320 of file Transport.h.
Definition at line 366 of file Transport.h.
00367 { 00368 DR_ERROR = -1, 00369 DR_OK = 0, 00370 DR_QUEUE_EMPTY = 1, // used internally, not returned from drain_queue() 00371 DR_WOULDBLOCK = 2 00372 };
TAO_Transport::TAO_Transport | ( | CORBA::ULong | tag, | |
TAO_ORB_Core * | orb_core, | |||
size_t | input_cdr_size = ACE_CDR::DEFAULT_BUFSIZE | |||
) |
Default creator, requires the tag value be supplied.
Definition at line 123 of file Transport.cpp.
00126 : tag_ (tag) 00127 , orb_core_ (orb_core) 00128 , cache_map_entry_ (0) 00129 , tms_ (0) 00130 , ws_ (0) 00131 , bidirectional_flag_ (-1) 00132 , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE) 00133 , head_ (0) 00134 , tail_ (0) 00135 , incoming_message_queue_ (orb_core) 00136 , current_deadline_ (ACE_Time_Value::zero) 00137 , flush_timer_id_ (-1) 00138 , transport_timer_ (this) 00139 , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ()) 00140 , id_ ((size_t) this) 00141 , purging_order_ (0) 00142 , recv_buffer_size_ (0) 00143 , sent_byte_count_ (0) 00144 , is_connected_ (false) 00145 , connection_closed_on_read_ (false) 00146 , messaging_object_ (0) 00147 , char_translator_ (0) 00148 , wchar_translator_ (0) 00149 , tcs_set_ (0) 00150 , first_request_ (true) 00151 , partial_message_ (0) 00152 #if TAO_HAS_SENDFILE == 1 00153 // The ORB has been configured to use the MMAP allocator, meaning 00154 // we could/should use sendfile() to send data. Cast once rather 00155 // here rather than during each send. This assumes that all 00156 // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator 00157 // instance as the underlying output CDR buffer allocator. 00158 , mmap_allocator_ ( 00159 dynamic_cast<TAO_MMAP_Allocator *> ( 00160 orb_core->output_cdr_buffer_allocator ())) 00161 #endif /* TAO_HAS_SENDFILE==1 */ 00162 #if TAO_HAS_TRANSPORT_CURRENT == 1 00163 , stats_ (0) 00164 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 00165 , flush_in_post_open_ (false) 00166 { 00167 ACE_NEW (this->messaging_object_, 00168 TAO_GIOP_Message_Base (orb_core, 00169 this, 00170 input_cdr_size)); 00171 00172 TAO_Client_Strategy_Factory *cf = 00173 this->orb_core_->client_factory (); 00174 00175 // Create WS now. 00176 this->ws_ = cf->create_wait_strategy (this); 00177 00178 // Create TMS now. 00179 this->tms_ = cf->create_transport_mux_strategy (this); 00180 00181 #if TAO_HAS_TRANSPORT_CURRENT == 1 00182 // Allocate stats 00183 ACE_NEW_THROW_EX (this->stats_, 00184 TAO::Transport::Stats, 00185 CORBA::NO_MEMORY ()); 00186 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 00187 00188 /* 00189 * Hook to add code that initializes components that 00190 * belong to the concrete protocol implementation. 00191 * Further additions to this Transport class will 00192 * need to add code *before* this hook. 00193 */ 00194 //@@ TAO_TRANSPORT_SPL_CONSTRUCTOR_ADD_HOOK 00195 }
TAO_Transport::~TAO_Transport | ( | void | ) | [virtual] |
Destructor.
Definition at line 197 of file Transport.cpp.
00198 { 00199 if (TAO_debug_level > 9) 00200 { 00201 TAOLIB_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::~Transport\n"), 00202 this->id_ 00203 )); 00204 } 00205 00206 delete this->messaging_object_; 00207 00208 delete this->ws_; 00209 00210 delete this->tms_; 00211 00212 delete this->handler_lock_; 00213 00214 if (!this->is_connected_) 00215 { 00216 // When we have a not connected transport we could have buffered 00217 // messages on this transport which we have to cleanup now. 00218 this->cleanup_queue_i(); 00219 } 00220 00221 // Release the partial message block, however we may 00222 // have never allocated one. 00223 ACE_Message_Block::release (this->partial_message_); 00224 00225 // By the time the destructor is reached here all the connection stuff 00226 // *must* have been cleaned up. 00227 00228 // The following assert is needed for the test "Bug_2494_Regression". 00229 // See the bugzilla bug #2494 for details. 00230 ACE_ASSERT (this->queue_is_empty_i ()); 00231 ACE_ASSERT (this->cache_map_entry_ == 0); 00232 00233 #if TAO_HAS_TRANSPORT_CURRENT == 1 00234 delete this->stats_; 00235 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 00236 00237 /* 00238 * Hook to add code that cleans up components 00239 * belong to the concrete protocol implementation. 00240 * Further additions to this Transport class will 00241 * need to add code *before* this hook. 00242 */ 00243 //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK 00244 }
ACE_Event_Handler::Reference_Count TAO_Transport::add_reference | ( | void | ) |
Memory management routines.
Forwards to event handler.
Definition at line 2742 of file Transport.cpp.
02743 { 02744 return this->event_handler_i ()->add_reference (); 02745 }
void TAO_Transport::allocate_partial_message_block | ( | void | ) | [private] |
Allocate a partial message block and store it in our partial_message_ data member.
Definition at line 2872 of file Transport.cpp.
02873 { 02874 if (this->partial_message_ == 0) 02875 { 02876 // This value must be at least large enough to hold a GIOP message 02877 // header plus a GIOP fragment header 02878 size_t const partial_message_size = 02879 this->messaging_object ()->header_length (); 02880 // + this->messaging_object ()->fragment_header_length (); 02881 // deprecated, conflicts with not-single_read_opt. 02882 02883 ACE_NEW (this->partial_message_, 02884 ACE_Message_Block (partial_message_size)); 02885 } 02886 }
void TAO_Transport::assign_translators | ( | TAO_InputCDR * | inp, | |
TAO_OutputCDR * | outp | |||
) |
Use the Transport's codeset factories to set the translator for input and output CDRs.
Definition at line 2712 of file Transport.cpp.
02713 { 02714 if (this->char_translator_) 02715 { 02716 this->char_translator_->assign (inp); 02717 this->char_translator_->assign (outp); 02718 } 02719 if (this->wchar_translator_) 02720 { 02721 this->wchar_translator_->assign (inp); 02722 this->wchar_translator_->assign (outp); 02723 } 02724 }
void TAO_Transport::bidirectional_flag | ( | int | flag | ) |
Set the bidirectional flag.
Definition at line 42 of file Transport.inl.
00043 { 00044 this->bidirectional_flag_ = flag; 00045 }
int TAO_Transport::bidirectional_flag | ( | void | ) | const |
Get the bidirectional flag.
Definition at line 36 of file Transport.inl.
00037 { 00038 return this->bidirectional_flag_; 00039 }
TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * TAO_Transport::cache_map_entry | ( | void | ) |
Get the Cache Map entry.
Definition at line 60 of file Transport.inl.
00061 { 00062 return this->cache_map_entry_; 00063 }
void TAO_Transport::cache_map_entry | ( | TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | entry | ) |
Set the Cache Map entry.
Definition at line 66 of file Transport.inl.
00068 { 00069 // Sync with TAO_Transport::purge_entry() 00070 ACE_GUARD (ACE_Lock, ace_mon, *this->handler_lock_); 00071 this->cache_map_entry_ = entry; 00072 }
bool TAO_Transport::can_be_purged | ( | void | ) |
Can the transport be purged?
Definition at line 571 of file Transport.cpp.
00572 { 00573 return !this->tms_->has_request (); 00574 }
int TAO_Transport::cancel_output_i | ( | void | ) | [private] |
Cancel handle_output() callbacks.
Definition at line 940 of file Transport.cpp.
00941 { 00942 ACE_Event_Handler * const eh = this->event_handler_i (); 00943 ACE_Reactor *const reactor = eh->reactor (); 00944 00945 if (TAO_debug_level > 3) 00946 { 00947 TAOLIB_DEBUG ((LM_DEBUG, 00948 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"), 00949 this->id ())); 00950 } 00951 00952 return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK); 00953 }
void TAO_Transport::char_translator | ( | TAO_Codeset_Translator_Base * | tf | ) |
CodeSet negotiation - Set the char codeset translator factory.
Definition at line 148 of file Transport.inl.
00149 { 00150 this->char_translator_ = tf; 00151 this->tcs_set_ = 1; 00152 }
TAO_Codeset_Translator_Base * TAO_Transport::char_translator | ( | void | ) | const |
CodeSet Negotiation - Get the char codeset translator factory.
Definition at line 136 of file Transport.inl.
00137 { 00138 return this->char_translator_; 00139 }
bool TAO_Transport::check_buffering_constraints_i | ( | TAO_Stub * | stub, | |
bool & | must_flush | |||
) | [private] |
Check if the buffering constraints have been reached.
Definition at line 1305 of file Transport.cpp.
01306 { 01307 // First let's compute the size of the queue: 01308 size_t msg_count = 0; 01309 size_t total_bytes = 0; 01310 01311 for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) 01312 { 01313 ++msg_count; 01314 total_bytes += i->message_length (); 01315 } 01316 01317 bool set_timer = false; 01318 ACE_Time_Value new_deadline; 01319 01320 TAO::Transport_Queueing_Strategy *queue_strategy = 01321 stub->transport_queueing_strategy (); 01322 01323 bool constraints_reached = true; 01324 01325 if (queue_strategy) 01326 { 01327 constraints_reached = 01328 queue_strategy->buffering_constraints_reached (stub, 01329 msg_count, 01330 total_bytes, 01331 must_flush, 01332 this->current_deadline_, 01333 set_timer, 01334 new_deadline); 01335 } 01336 else 01337 { 01338 must_flush = false; 01339 } 01340 01341 // ... set the new timer, also cancel any previous timers ... 01342 // Check for connected state since this method may be called 01343 // before the connection is established and than there will be no 01344 // reactor available yet. 01345 if (set_timer && this->is_connected_) 01346 { 01347 ACE_Event_Handler *eh = this->event_handler_i (); 01348 ACE_Reactor * const reactor = eh->reactor (); 01349 this->current_deadline_ = new_deadline; 01350 ACE_Time_Value delay = new_deadline - ACE_OS::gettimeofday (); 01351 01352 if (this->flush_timer_pending ()) 01353 { 01354 reactor->cancel_timer (this->flush_timer_id_); 01355 } 01356 01357 this->flush_timer_id_ = 01358 reactor->schedule_timer (&this->transport_timer_, 01359 &this->current_deadline_, 01360 delay); 01361 } 01362 01363 return constraints_reached; 01364 }
void TAO_Transport::cleanup_queue | ( | size_t | byte_count | ) | [private] |
Cleanup the queue.
Exactly byte_count bytes have been sent, the queue must be cleaned up as potentially several messages have been completely sent out. It leaves on head_ the next message to send out.
Definition at line 1258 of file Transport.cpp.
01259 { 01260 while (!this->queue_is_empty_i () && byte_count > 0) 01261 { 01262 TAO_Queued_Message *i = this->head_; 01263 01264 if (TAO_debug_level > 4) 01265 { 01266 TAOLIB_DEBUG ((LM_DEBUG, 01267 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ") 01268 ACE_TEXT ("byte_count = %d\n"), 01269 this->id (), byte_count)); 01270 } 01271 01272 // Update the state of the first message 01273 i->bytes_transferred (byte_count); 01274 01275 if (TAO_debug_level > 4) 01276 { 01277 TAOLIB_DEBUG ((LM_DEBUG, 01278 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ") 01279 ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"), 01280 this->id (), byte_count, i->all_data_sent (), 01281 i->message_length ())); 01282 } 01283 01284 // ... if all the data was sent the message must be removed from 01285 // the queue... 01286 if (i->all_data_sent ()) 01287 { 01288 i->remove_from_list (this->head_, this->tail_); 01289 i->destroy (); 01290 } 01291 else if (byte_count == 0) 01292 { 01293 // If we have sent out a full message block, but we are not 01294 // finished with this message, we need to do something with the 01295 // message block chain held by our output stream. If we don't, 01296 // another thread can attempt to service this transport and end 01297 // up resetting the output stream which will release the 01298 // message that we haven't finished sending. 01299 i->copy_if_necessary (this->out_stream ().begin ()); 01300 } 01301 } 01302 }
void TAO_Transport::cleanup_queue_i | ( | ) | [private] |
Cleanup the complete queue.
Definition at line 1215 of file Transport.cpp.
01216 { 01217 if (TAO_debug_level > 4) 01218 { 01219 TAOLIB_DEBUG ((LM_DEBUG, 01220 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ") 01221 ACE_TEXT ("cleaning up complete queue\n"), 01222 this->id ())); 01223 } 01224 01225 size_t byte_count = 0; 01226 int msg_count = 0; 01227 01228 // Cleanup all messages 01229 while (!this->queue_is_empty_i ()) 01230 { 01231 TAO_Queued_Message *i = this->head_; 01232 01233 if (TAO_debug_level > 4) 01234 { 01235 byte_count += i->message_length(); 01236 ++msg_count; 01237 } 01238 // @@ This is a good point to insert a flag to indicate that a 01239 // CloseConnection message was successfully received. 01240 i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED, 01241 this->orb_core_->leader_follower ()); 01242 01243 i->remove_from_list (this->head_, this->tail_); 01244 01245 i->destroy (); 01246 } 01247 01248 if (TAO_debug_level > 4) 01249 { 01250 TAOLIB_DEBUG ((LM_DEBUG, 01251 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ") 01252 ACE_TEXT ("discarded %d messages, %u bytes.\n"), 01253 this->id (), msg_count, byte_count)); 01254 } 01255 }
void TAO_Transport::clear_translators | ( | TAO_InputCDR * | inp, | |
TAO_OutputCDR * | outp | |||
) |
It is necessary to clear the codeset translator when a CDR stream is used for more than one GIOP message. This is required since the header must not be translated, whereas the body must be.
Definition at line 2727 of file Transport.cpp.
02728 { 02729 if (inp) 02730 { 02731 inp->char_translator (0); 02732 inp->wchar_translator (0); 02733 } 02734 if (outp) 02735 { 02736 outp->char_translator (0); 02737 outp->wchar_translator (0); 02738 } 02739 }
void TAO_Transport::close_connection | ( | void | ) | [virtual] |
Call the implementation method after obtaining the lock.
Definition at line 361 of file Transport.cpp.
00362 { 00363 this->connection_handler_i ()->close_connection (); 00364 }
bool TAO_Transport::connection_closed_on_read | ( | void | ) | const |
Was a connection seen as closed during a read.
Definition at line 2925 of file Transport.cpp.
02926 { 02927 return connection_closed_on_read_; 02928 }
TAO_Connection_Handler * TAO_Transport::connection_handler | ( | void | ) |
Get the connection handler for this transport.
Definition at line 188 of file Transport.inl.
00189 { 00190 return this->connection_handler_i(); 00191 }
virtual TAO_Connection_Handler* TAO_Transport::connection_handler_i | ( | void | ) | [protected, pure virtual] |
These classes need privileged access to:
int TAO_Transport::consolidate_enqueue_message | ( | TAO_Queued_Data * | qd | ) | [private] |
Definition at line 1873 of file Transport.cpp.
01874 { 01875 // consolidate message on top of stack, only for fragmented messages 01876 01877 // paranoid check 01878 if (q_data->missing_data () != 0) 01879 { 01880 return -1; 01881 } 01882 01883 if (q_data->more_fragments () || 01884 q_data->msg_type () == GIOP::Fragment) 01885 { 01886 TAO_Queued_Data *new_q_data = 0; 01887 01888 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data)) 01889 { 01890 case -1: // error 01891 return -1; 01892 01893 case 0: // returning consolidated message in new_q_data 01894 if (!new_q_data) 01895 { 01896 if (TAO_debug_level > 0) 01897 { 01898 TAOLIB_ERROR ((LM_ERROR, 01899 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ") 01900 ACE_TEXT ("error, consolidated message is NULL\n"), 01901 this->id ())); 01902 } 01903 return -1; 01904 } 01905 01906 if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0) 01907 { 01908 TAO_Queued_Data::release (new_q_data); 01909 return -1; 01910 } 01911 break; 01912 01913 case 1: // fragment has been stored in messaging_oject() 01914 break; 01915 } 01916 } 01917 else 01918 { 01919 if (this->incoming_message_queue_.enqueue_tail (q_data) != 0) 01920 { 01921 TAO_Queued_Data::release (q_data); 01922 return -1; 01923 } 01924 } 01925 01926 return 0; // success 01927 }
int TAO_Transport::consolidate_process_message | ( | TAO_Queued_Data * | qd, | |
TAO_Resume_Handle & | rh | |||
) | [private] |
Definition at line 1786 of file Transport.cpp.
01788 { 01789 // paranoid check 01790 if (q_data->missing_data () != 0) 01791 { 01792 if (TAO_debug_level > 0) 01793 { 01794 TAOLIB_ERROR ((LM_ERROR, 01795 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01796 ACE_TEXT ("missing data\n"), 01797 this->id ())); 01798 } 01799 return -1; 01800 } 01801 01802 if (q_data->more_fragments () || 01803 q_data->msg_type () == GIOP::Fragment) 01804 { 01805 // consolidate message on top of stack, only for fragmented messages 01806 TAO_Queued_Data *new_q_data = 0; 01807 01808 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data)) 01809 { 01810 case -1: // error 01811 return -1; 01812 01813 case 0: // returning consolidated message in q_data 01814 if (!new_q_data) 01815 { 01816 if (TAO_debug_level > 0) 01817 { 01818 TAOLIB_ERROR ((LM_ERROR, 01819 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01820 ACE_TEXT ("error, consolidated message is NULL\n"), 01821 this->id ())); 01822 } 01823 return -1; 01824 } 01825 01826 01827 if (this->process_parsed_messages (new_q_data, rh) == -1) 01828 { 01829 TAO_Queued_Data::release (new_q_data); 01830 01831 if (TAO_debug_level > 0) 01832 { 01833 TAOLIB_ERROR ((LM_ERROR, 01834 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01835 ACE_TEXT ("error processing consolidated message\n"), 01836 this->id ())); 01837 } 01838 return -1; 01839 } 01840 01841 TAO_Queued_Data::release (new_q_data); 01842 01843 break; 01844 01845 case 1: // fragment has been stored in messaging_oject() 01846 break; 01847 } 01848 } 01849 else 01850 { 01851 if (this->process_parsed_messages (q_data, rh) == -1) 01852 { 01853 TAO_Queued_Data::release (q_data); 01854 01855 if (TAO_debug_level > 0) 01856 { 01857 TAOLIB_ERROR ((LM_ERROR, 01858 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01859 ACE_TEXT ("error processing message\n"), 01860 this->id ())); 01861 } 01862 return -1; 01863 } 01864 01865 TAO_Queued_Data::release (q_data); 01866 01867 } 01868 01869 return 0; 01870 }
TAO_Transport::Drain_Result TAO_Transport::drain_queue | ( | TAO::Transport::Drain_Constraints const & | dc | ) | [private] |
Send some of the data in the queue.
As the outgoing data is drained this method is invoked to send as much of the current message as possible.
Definition at line 997 of file Transport.cpp.
00998 { 00999 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, DR_ERROR); 01000 Drain_Result const retval = this->drain_queue_i (dc); 01001 01002 if (retval == DR_QUEUE_EMPTY) 01003 { 01004 // ... there is no current message or it was completely 01005 // sent, cancel output... 01006 TAO_Flushing_Strategy *flushing_strategy = 01007 this->orb_core ()->flushing_strategy (); 01008 01009 flushing_strategy->cancel_output (this); 01010 01011 return DR_OK; 01012 } 01013 01014 return retval; 01015 }
TAO_Transport::Drain_Result TAO_Transport::drain_queue_helper | ( | int & | iovcnt, | |
iovec | iov[], | |||
TAO::Transport::Drain_Constraints const & | dc | |||
) | [private] |
A helper routine used in drain_queue_i().
Definition at line 1018 of file Transport.cpp.
01020 { 01021 // As a side-effect, this decrements the timeout() pointed-to value by 01022 // the time used in this function. That might be important as there are 01023 // potentially long running system calls invoked from here. 01024 TAO::ORB_Countdown_Time countdown(dc.timeout()); 01025 01026 size_t byte_count = 0; 01027 01028 // ... send the message ... 01029 ssize_t retval = -1; 01030 01031 #if TAO_HAS_SENDFILE == 1 01032 if (this->mmap_allocator_) 01033 retval = this->sendfile (this->mmap_allocator_, 01034 iov, 01035 iovcnt, 01036 byte_count, 01037 dc); 01038 else 01039 #endif /* TAO_HAS_SENDFILE==1 */ 01040 retval = this->send (iov, iovcnt, byte_count, 01041 this->io_timeout (dc)); 01042 01043 if (TAO_debug_level > 9) 01044 { 01045 dump_iov (iov, iovcnt, this->id (), 01046 byte_count, ACE_TEXT("drain_queue_helper")); 01047 } 01048 01049 if (retval == 0) 01050 { 01051 if (TAO_debug_level > 4) 01052 { 01053 TAOLIB_DEBUG ((LM_DEBUG, 01054 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") 01055 ACE_TEXT ("send() returns 0\n"), 01056 this->id ())); 01057 } 01058 return DR_ERROR; 01059 } 01060 else if (retval == -1) 01061 { 01062 if (TAO_debug_level > 4) 01063 { 01064 TAOLIB_DEBUG ((LM_DEBUG, 01065 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") 01066 ACE_TEXT ("error during send() (errno: %d) - %m\n"), 01067 this->id (), ACE_ERRNO_GET)); 01068 } 01069 01070 if (errno == EWOULDBLOCK || errno == EAGAIN) 01071 { 01072 return DR_WOULDBLOCK; 01073 } 01074 01075 return DR_ERROR; 01076 } 01077 01078 // ... now we need to update the queue, removing elements 01079 // that have been sent, and updating the last element if it 01080 // was only partially sent ... 01081 this->cleanup_queue (byte_count); 01082 iovcnt = 0; 01083 01084 // ... start over, how do we guarantee progress? Because if 01085 // no bytes are sent send() can only return 0 or -1 01086 01087 // Total no. of bytes sent for a send call 01088 this->sent_byte_count_ += byte_count; 01089 01090 if (TAO_debug_level > 4) 01091 { 01092 TAOLIB_DEBUG ((LM_DEBUG, 01093 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") 01094 ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"), 01095 this->id(), byte_count, this->queue_is_empty_i ())); 01096 } 01097 01098 return DR_QUEUE_EMPTY; 01099 // drain_queue_i will check if the queue is actually empty 01100 }
TAO_Transport::Drain_Result TAO_Transport::drain_queue_i | ( | TAO::Transport::Drain_Constraints const & | dc | ) | [private] |
Implement drain_queue() assuming the lock is held.
Definition at line 1103 of file Transport.cpp.
01104 { 01105 // This is the vector used to send data, it must be declared outside 01106 // the loop because after the loop there may still be data to be 01107 // sent 01108 int iovcnt = 0; 01109 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) 01110 iovec iov[ACE_IOV_MAX] = { { 0 , 0 } }; 01111 #else 01112 iovec iov[ACE_IOV_MAX]; 01113 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ 01114 01115 // We loop over all the elements in the queue ... 01116 TAO_Queued_Message *i = this->head_; 01117 01118 // Reset the value so that the counting is done for each new send 01119 // call. 01120 this->sent_byte_count_ = 0; 01121 01122 // Avoid calling this expensive function each time through the loop. Instead 01123 // we'll assume that the time is unlikely to change much during the loop. 01124 // If we are forced to send in the loop then we'll recompute the time. 01125 ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr (); 01126 01127 while (i != 0) 01128 { 01129 if (i->is_expired (now)) 01130 { 01131 if (TAO_debug_level > 3) 01132 { 01133 TAOLIB_DEBUG ((LM_DEBUG, 01134 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") 01135 ACE_TEXT ("Discarding expired queued message.\n"), 01136 this->id ())); 01137 } 01138 TAO_Queued_Message *next = i->next (); 01139 i->state_changed (TAO_LF_Event::LFS_TIMEOUT, 01140 this->orb_core_->leader_follower ()); 01141 i->remove_from_list (this->head_, this->tail_); 01142 i->destroy (); 01143 i = next; 01144 continue; 01145 } 01146 // ... each element fills the iovector ... 01147 i->fill_iov (ACE_IOV_MAX, iovcnt, iov); 01148 01149 // ... the vector is full, no choice but to send some data out. 01150 // We need to loop because a single message can span multiple 01151 // IOV_MAX elements ... 01152 if (iovcnt == ACE_IOV_MAX) 01153 { 01154 Drain_Result const retval = 01155 this->drain_queue_helper (iovcnt, iov, dc); 01156 01157 if (TAO_debug_level > 4) 01158 { 01159 TAOLIB_DEBUG ((LM_DEBUG, 01160 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") 01161 ACE_TEXT ("helper retval = %d\n"), 01162 this->id (), static_cast<int> (retval.dre_))); 01163 } 01164 01165 if (retval != DR_QUEUE_EMPTY) 01166 { 01167 return retval; 01168 } 01169 01170 now = ACE_High_Res_Timer::gettimeofday_hr (); 01171 01172 i = this->head_; 01173 continue; 01174 } 01175 // ... notice that this line is only reached if there is still 01176 // room in the iovector ... 01177 i = i->next (); 01178 } 01179 01180 if (iovcnt != 0) 01181 { 01182 Drain_Result const retval = this->drain_queue_helper (iovcnt, iov, dc); 01183 01184 if (TAO_debug_level > 4) 01185 { 01186 TAOLIB_DEBUG ((LM_DEBUG, 01187 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") 01188 ACE_TEXT ("helper retval = %d\n"), 01189 this->id (), static_cast<int> (retval.dre_))); 01190 } 01191 01192 if (retval != DR_QUEUE_EMPTY) 01193 { 01194 return retval; 01195 } 01196 } 01197 01198 if (this->queue_is_empty_i ()) 01199 { 01200 if (this->flush_timer_pending ()) 01201 { 01202 ACE_Event_Handler *eh = this->event_handler_i (); 01203 ACE_Reactor * const reactor = eh->reactor (); 01204 reactor->cancel_timer (this->flush_timer_id_); 01205 this->reset_flush_timer (); 01206 } 01207 01208 return DR_QUEUE_EMPTY; 01209 } 01210 01211 return DR_OK; 01212 }
virtual ACE_Event_Handler* TAO_Transport::event_handler_i | ( | void | ) | [pure virtual] |
Return the event handler used to receive notifications from the Reactor. Normally a concrete TAO_Transport object has-a ACE_Event_Handler member that functions as an adapter between the ACE_Reactor framework and the TAO pluggable protocol framework. In all the protocols implemented so far this role is fullfilled by an instance of ACE_Svc_Handler.
bool TAO_Transport::first_request | ( | void | ) | const |
Get the first request flag.
Definition at line 175 of file Transport.inl.
00176 { 00177 return this->first_request_; 00178 }
void TAO_Transport::first_request_sent | ( | bool | flag = false |
) |
Set the state of the first_request_ to flag.
Definition at line 169 of file Transport.inl.
00170 { 00171 this->first_request_ = flag; 00172 }
int TAO_Transport::flush_timer_pending | ( | void | ) | const [private] |
Check if the flush timer is still pending.
Definition at line 113 of file Transport.inl.
00114 { 00115 return this->flush_timer_id_ != -1; 00116 }
int TAO_Transport::format_queue_message | ( | TAO_OutputCDR & | stream, | |
ACE_Time_Value * | max_wait_time, | |||
TAO_Stub * | stub | |||
) |
Format and queue a message for stream
max_wait_time | The maximum time that the operation can block, used in the implementation of timeouts. |
Definition at line 631 of file Transport.cpp.
00634 { 00635 if (this->messaging_object ()->format_message (stream, stub, 0) != 0) 00636 return -1; 00637 00638 if (this->queue_message_i (stream.begin (), max_wait_time) != 0) 00639 return -1; 00640 00641 // check the buffering constraints to see what must be done in post_open() 00642 bool must_flush = false; 00643 this->flush_in_post_open_ |= 00644 this->check_buffering_constraints_i (stub, 00645 must_flush); 00646 00647 return 0; 00648 }
int TAO_Transport::generate_locate_request | ( | TAO_Target_Specification & | spec, | |
TAO_Operation_Details & | opdetails, | |||
TAO_OutputCDR & | output | |||
) |
This is a request for the transport object to write a LocateRequest header before it is sent out.
Definition at line 497 of file Transport.cpp.
00501 { 00502 if (this->messaging_object ()->generate_locate_request_header (opdetails, 00503 spec, 00504 output) == -1) 00505 { 00506 if (TAO_debug_level > 0) 00507 { 00508 TAOLIB_ERROR ((LM_ERROR, 00509 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ") 00510 ACE_TEXT ("error while marshalling the LocateRequest header\n"), 00511 this->id ())); 00512 } 00513 00514 return -1; 00515 } 00516 00517 return 0; 00518 }
int TAO_Transport::generate_request_header | ( | TAO_Operation_Details & | opd, | |
TAO_Target_Specification & | spec, | |||
TAO_OutputCDR & | msg | |||
) | [virtual] |
This is a request for the transport object to write a request header before it sends out the request
Definition at line 521 of file Transport.cpp.
00525 { 00526 if (this->messaging_object ()->generate_request_header (opdetails, 00527 spec, 00528 output) == -1) 00529 { 00530 if (TAO_debug_level > 0) 00531 { 00532 TAOLIB_ERROR ((LM_ERROR, 00533 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_request_header, ") 00534 ACE_TEXT ("error while marshalling the Request header\n"), 00535 this->id())); 00536 } 00537 00538 return -1; 00539 } 00540 00541 return 0; 00542 }
int TAO_Transport::handle_input | ( | TAO_Resume_Handle & | rh, | |
ACE_Time_Value * | max_wait_time = 0 | |||
) | [virtual] |
Callback to read incoming data.
The ACE_Event_Handler adapter invokes this method as part of its handle_input() operation.
Once a complete message is read the Transport class delegates on the Messaging layer to invoke the right upcall (on the server) or the TAO_Reply_Dispatcher (on the client side).
max_wait_time | In some cases the I/O is synchronous, e.g. a thread-per-connection server or when Wait_On_Read is enabled. In those cases a maximum read time can be specified. |
All the methods relevant to the incoming data path of the ORB are defined below
Definition at line 1712 of file Transport.cpp.
01714 { 01715 if (TAO_debug_level > 3) 01716 { 01717 TAOLIB_DEBUG ((LM_DEBUG, 01718 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"), 01719 this->id ())); 01720 } 01721 01722 // First try to process messages of the head of the incoming queue. 01723 int const retval = this->process_queue_head (rh); 01724 01725 if (retval <= 0) 01726 { 01727 if (retval == -1) 01728 { 01729 if (TAO_debug_level > 2) 01730 { 01731 TAOLIB_ERROR ((LM_ERROR, 01732 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") 01733 ACE_TEXT ("error while parsing the head of the queue\n"), 01734 this->id())); 01735 01736 } 01737 return -1; 01738 } 01739 else 01740 { 01741 // retval == 0 01742 01743 // Processed a message in queue successfully. This 01744 // thread must return to thread-pool now. 01745 return 0; 01746 } 01747 } 01748 01749 TAO_Queued_Data *q_data = 0; 01750 01751 if (this->incoming_message_stack_.top (q_data) != -1 01752 && q_data->missing_data () != TAO_MISSING_DATA_UNDEFINED) 01753 { 01754 /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete */ 01755 if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1) 01756 { 01757 if (TAO_debug_level > 0) 01758 { 01759 TAOLIB_ERROR ((LM_ERROR, 01760 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") 01761 ACE_TEXT ("error consolidating incoming message\n"), 01762 this->id ())); 01763 } 01764 return -1; 01765 } 01766 } 01767 else 01768 { 01769 if (this->handle_input_parse_data (rh, max_wait_time) == -1) 01770 { 01771 if (TAO_debug_level > 0) 01772 { 01773 TAOLIB_ERROR ((LM_ERROR, 01774 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") 01775 ACE_TEXT ("error parsing incoming message\n"), 01776 this->id ())); 01777 } 01778 return -1; 01779 } 01780 } 01781 01782 return 0; 01783 }
int TAO_Transport::handle_input_missing_data | ( | TAO_Resume_Handle & | rh, | |
ACE_Time_Value * | max_wait_time, | |||
TAO_Queued_Data * | q_data | |||
) | [private] |
Is invoked by handle_input operation. It consolidate message on top of incoming_message_stack. The amount of missing data is known and recv operation copies data directly into message buffer, as much as a single recv-invocation provides.
Definition at line 1930 of file Transport.cpp.
01933 { 01934 // paranoid check 01935 if (q_data == 0) 01936 { 01937 return -1; 01938 } 01939 01940 if (TAO_debug_level > 3) 01941 { 01942 TAOLIB_DEBUG ((LM_DEBUG, 01943 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ") 01944 ACE_TEXT ("enter (missing data == %d)\n"), 01945 this->id (), q_data->missing_data ())); 01946 } 01947 01948 size_t const recv_size = q_data->missing_data (); 01949 01950 if (q_data->msg_block ()->space() < recv_size) 01951 { 01952 // make sure the message_block has enough space 01953 size_t const message_size = recv_size + q_data->msg_block ()->length(); 01954 01955 if (ACE_CDR::grow (q_data->msg_block (), message_size) == -1) 01956 { 01957 return -1; 01958 } 01959 } 01960 01961 // Saving the size of the received buffer in case any one needs to 01962 // get the size of the message that is received in the 01963 // context. Obviously the value will be changed for each recv call 01964 // and the user is supposed to invoke the accessor only in the 01965 // invocation context to get meaningful information. 01966 this->recv_buffer_size_ = recv_size; 01967 01968 // Read the message into the existing message block on heap 01969 ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(), 01970 recv_size, 01971 max_wait_time); 01972 01973 if (n <= 0) 01974 { 01975 return ACE_Utils::truncate_cast<int> (n); 01976 } 01977 01978 if (TAO_debug_level > 3) 01979 { 01980 TAOLIB_DEBUG ((LM_DEBUG, 01981 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ") 01982 ACE_TEXT ("read bytes %d\n"), 01983 this->id (), n)); 01984 } 01985 01986 q_data->msg_block ()->wr_ptr(n); 01987 q_data->missing_data (q_data->missing_data () - n); 01988 01989 if (q_data->missing_data () == 0) 01990 { 01991 // paranoid check 01992 if (this->incoming_message_stack_.pop (q_data) == -1) 01993 { 01994 return -1; 01995 } 01996 01997 if (this->consolidate_process_message (q_data, rh) == -1) 01998 { 01999 return -1; 02000 } 02001 } 02002 02003 return 0; 02004 }
int TAO_Transport::handle_input_parse_data | ( | TAO_Resume_Handle & | rh, | |
ACE_Time_Value * | max_wait_time | |||
) | [private] |
Is invoked by handle_input operation. It parses new messages from input stream or consolidates messages whose header has been partially read, the message size being unknown so far. It parses as much data as a single recv-invocation provides.
Definition at line 2051 of file Transport.cpp.
02053 { 02054 if (TAO_debug_level > 3) 02055 { 02056 TAOLIB_DEBUG ((LM_DEBUG, 02057 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 02058 ACE_TEXT ("enter\n"), 02059 this->id ())); 02060 } 02061 02062 // The buffer on the stack which will be used to hold the input 02063 // messages, ACE_CDR::MAX_ALIGNMENT compensates the 02064 // memory-alignment. This improves performance with SUN-Java-ORB-1.4 02065 // and higher that sends fragmented requests of size 1024 bytes. 02066 char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT]; 02067 02068 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) 02069 (void) ACE_OS::memset (buf, 02070 '\0', 02071 sizeof buf); 02072 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ 02073 02074 // Create a data block 02075 ACE_Data_Block db (sizeof (buf), 02076 ACE_Message_Block::MB_DATA, 02077 buf, 02078 this->orb_core_->input_cdr_buffer_allocator (), 02079 this->orb_core_->locking_strategy (), 02080 ACE_Message_Block::DONT_DELETE, 02081 this->orb_core_->input_cdr_dblock_allocator ()); 02082 02083 // Create a message block 02084 ACE_Message_Block message_block (&db, 02085 ACE_Message_Block::DONT_DELETE, 02086 this->orb_core_->input_cdr_msgblock_allocator ()); 02087 02088 // Align the message block 02089 ACE_CDR::mb_align (&message_block); 02090 02091 size_t recv_size = 0; // Note: unsigned integer 02092 02093 // Pointer to newly parsed message 02094 TAO_Queued_Data *q_data = 0; 02095 02096 // Optimizing access of constants 02097 size_t const header_length = this->messaging_object ()->header_length (); 02098 02099 // Paranoid check 02100 if (header_length > message_block.space ()) 02101 { 02102 return -1; 02103 } 02104 02105 if (this->orb_core_->orb_params ()->single_read_optimization ()) 02106 { 02107 recv_size = message_block.space (); 02108 } 02109 else 02110 { 02111 // Single read optimization has been de-activated. That means 02112 // that we need to read from transport the GIOP header first 02113 // before the payload. This codes first checks the incoming 02114 // stack for partial messages which needs to be 02115 // consolidated. Otherwise we are in new cycle, reading complete 02116 // GIOP header of new incoming message. 02117 if (this->incoming_message_stack_.top (q_data) != -1 02118 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED) 02119 { 02120 // There is a partial message on incoming_message_stack_ 02121 // whose length is unknown so far. We need to consolidate 02122 // the GIOP header to get to know the payload size, 02123 recv_size = header_length - q_data->msg_block ()->length (); 02124 } 02125 else 02126 { 02127 // Read amount of data forming GIOP header of new incoming 02128 // message. 02129 recv_size = header_length; 02130 } 02131 // POST: 0 <= recv_size <= header_length 02132 } 02133 // POST: 0 <= recv_size <= message_block->space () 02134 02135 // If we have a partial message, copy it into our message block and 02136 // clear out the partial message. 02137 if (this->partial_message_ != 0 && this->partial_message_->length () > 0) 02138 { 02139 // (*) Copy back the partial message into current read-buffer, 02140 // verify that the read-strategy of "recv_size" bytes is not 02141 // exceeded. The latter check guarantees that recv_size does not 02142 // roll-over and keeps in range 02143 // 0<=recv_size<=message_block->space() 02144 if (this->partial_message_->length () <= recv_size && 02145 message_block.copy (this->partial_message_->rd_ptr (), 02146 this->partial_message_->length ()) == 0) 02147 { 02148 02149 recv_size -= this->partial_message_->length (); 02150 // reset is done later to avoid problem in case of EWOULDBLOCK 02151 // or EAGAIN errno 02152 } 02153 else 02154 { 02155 return -1; 02156 } 02157 } 02158 // POST: 0 <= recv_size <= buffer_space 02159 02160 if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0 02161 { 02162 // This event would cause endless looping, trying frequently to 02163 // read zero bytes from stream. This might happen, if TAOs 02164 // protocol implementation is not correct and tries to read data 02165 // beyond header without "single_read_optimazation" being 02166 // activated. 02167 if (TAO_debug_level > 0) 02168 { 02169 TAOLIB_ERROR ((LM_ERROR, 02170 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 02171 ACE_TEXT ("Error - endless loop detection, closing connection"), 02172 this->id ())); 02173 } 02174 if (this->partial_message_ != 0 && this->partial_message_->length () > 0) 02175 { 02176 this->partial_message_->reset (); 02177 } 02178 return -1; 02179 } 02180 02181 // Saving the size of the received buffer in case any one needs to 02182 // get the size of the message thats received in the 02183 // context. Obviously the value will be changed for each recv call 02184 // and the user is supposed to invoke the accessor only in the 02185 // invocation context to get meaningful information. 02186 this->recv_buffer_size_ = recv_size; 02187 02188 // Read the message into the message block that we have created on 02189 // the stack. 02190 ssize_t const n = this->recv (message_block.wr_ptr (), 02191 recv_size, 02192 max_wait_time); 02193 02194 // If there is an error return to the reactor.. 02195 // do not reset partial message in case of n == 0 (EWOULDBLOCK || EAGAIN), 02196 // we will need it during next try 02197 if (n <= 0) 02198 { 02199 if ((n < 0) && 02200 (this->partial_message_ != 0 && this->partial_message_->length () > 0)) 02201 { 02202 this->partial_message_->reset (); 02203 } 02204 02205 return ACE_Utils::truncate_cast<int> (n); 02206 } 02207 02208 if (this->partial_message_ != 0 && this->partial_message_->length () > 0) 02209 { 02210 this->partial_message_->reset (); 02211 } 02212 02213 if (TAO_debug_level > 3) 02214 { 02215 TAOLIB_DEBUG ((LM_DEBUG, 02216 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 02217 ACE_TEXT ("read %d bytes\n"), 02218 this->id (), n)); 02219 } 02220 02221 // Set the write pointer in the stack buffer 02222 message_block.wr_ptr (n); 02223 02224 // 02225 // STACK PROCESSING OR MESSAGE CONSOLIDATION 02226 // 02227 02228 // PRE: data in buffer is aligned && message_block.length() > 0 02229 02230 if (this->incoming_message_stack_.top (q_data) != -1 02231 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED) 02232 { 02233 // 02234 // MESSAGE CONSOLIDATION 02235 // 02236 02237 // Partial message on incoming_message_stack_ needs to be 02238 // consolidated. The message header could not be parsed so far 02239 // and therefor the message size is unknown yet. Consolidating 02240 // the message destroys the memory alignment of succeeding 02241 // messages sharing the buffer, for that reason consolidation 02242 // and stack based processing are mutial exclusive. 02243 if (this->messaging_object ()->consolidate_node (q_data, 02244 message_block) == -1) 02245 { 02246 if (TAO_debug_level > 0) 02247 { 02248 TAOLIB_ERROR ((LM_ERROR, 02249 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 02250 ACE_TEXT ("error consolidating message from input buffer\n"), 02251 this->id () )); 02252 } 02253 return -1; 02254 } 02255 02256 // Complete message are to be enqueued and later processed 02257 if (q_data->missing_data () == 0) 02258 { 02259 if (this->incoming_message_stack_.pop (q_data) == -1) 02260 { 02261 return -1; 02262 } 02263 02264 if (this->consolidate_enqueue_message (q_data) == -1) 02265 { 02266 return -1; 02267 } 02268 } 02269 02270 if (message_block.length () > 0 02271 && this->handle_input_parse_extra_messages (message_block) == -1) 02272 { 02273 return -1; 02274 } 02275 02276 // In any case try to process the enqueued messages 02277 if (this->process_queue_head (rh) == -1) 02278 { 02279 return -1; 02280 } 02281 } 02282 else 02283 { 02284 // 02285 // STACK PROCESSING (critical path) 02286 // 02287 02288 // Process the first message in buffer on stack 02289 02290 // (PRE: first message resides in aligned memory) Make a node of 02291 // the message-block.. 02292 02293 TAO_Queued_Data qd (&message_block, 02294 this->orb_core_->transport_message_buffer_allocator ()); 02295 02296 size_t mesg_length = 0; 02297 02298 if (this->messaging_object ()->parse_next_message (qd, mesg_length) == -1 02299 || (qd.missing_data () == 0 02300 && mesg_length > message_block.length ()) ) 02301 { 02302 // extracting message failed 02303 return -1; 02304 } 02305 // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length() 02306 // This prevents seeking rd_ptr behind the wr_ptr 02307 02308 if (qd.missing_data () != 0 || 02309 qd.more_fragments () || 02310 qd.msg_type () == GIOP::Fragment) 02311 { 02312 if (qd.missing_data () == 0) 02313 { 02314 // Dealing with a fragment 02315 TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd); 02316 02317 if (nqd == 0) 02318 { 02319 return -1; 02320 } 02321 02322 // mark the end of message in new buffer 02323 char* end_mark = nqd->msg_block ()->rd_ptr () 02324 + mesg_length; 02325 nqd->msg_block ()->wr_ptr (end_mark); 02326 02327 // move the read pointer forward in old buffer 02328 message_block.rd_ptr (mesg_length); 02329 02330 // enqueue the message 02331 if (this->consolidate_enqueue_message (nqd) == -1) 02332 { 02333 return -1; 02334 } 02335 02336 if (message_block.length () > 0 02337 && this->handle_input_parse_extra_messages (message_block) == -1) 02338 { 02339 return -1; 02340 } 02341 02342 // In any case try to process the enqueued messages 02343 if (this->process_queue_head (rh) == -1) 02344 { 02345 return -1; 02346 } 02347 } 02348 else if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED) 02349 { 02350 // Incomplete message, must be the last one in buffer 02351 02352 if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED && 02353 qd.missing_data () > message_block.space ()) 02354 { 02355 // Re-Allocate correct size on heap 02356 if (ACE_CDR::grow (qd.msg_block (), 02357 message_block.length () 02358 + qd.missing_data ()) == -1) 02359 { 02360 return -1; 02361 } 02362 } 02363 02364 TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd); 02365 02366 if (nqd == 0) 02367 { 02368 return -1; 02369 } 02370 02371 // move read-pointer to end of buffer 02372 message_block.rd_ptr (message_block.length()); 02373 02374 this->incoming_message_stack_.push (nqd); 02375 } 02376 } 02377 else 02378 { 02379 // 02380 // critical path 02381 // 02382 02383 // We cant process the message on stack right now. First we 02384 // have got to parse extra messages from message_block, 02385 // putting them into queue. When this is done we can return 02386 // to process this message, and notifying other threads to 02387 // process the messages in queue. 02388 char * end_marker = message_block.rd_ptr () 02389 + mesg_length; 02390 02391 if (message_block.length () > mesg_length) 02392 { 02393 // There are more message in data stream to be parsed. 02394 // Safe the rd_ptr to restore later. 02395 char *rd_ptr_stack_mesg = message_block.rd_ptr (); 02396 02397 // Skip parsed message, jump to next message in buffer 02398 // PRE: mesg_length <= message_block.length () 02399 message_block.rd_ptr (mesg_length); 02400 02401 // Extract remaining messages and enqueue them for later 02402 // heap processing 02403 if (this->handle_input_parse_extra_messages (message_block) == -1) 02404 { 02405 return -1; 02406 } 02407 02408 // correct the wr_ptr using the end_marker to point to the 02409 // end of the first message else the code after this will 02410 // see the full stream with all the messages 02411 message_block.wr_ptr (end_marker); 02412 02413 // Restore rd_ptr 02414 message_block.rd_ptr (rd_ptr_stack_mesg); 02415 } 02416 02417 // The following if-else has been copied from 02418 // process_queue_head(). While process_queue_head() 02419 // processes message on heap, here we will process a message 02420 // on stack. 02421 02422 // Now that we have one message on stack to be processed, 02423 // check whether we have one more message in the queue... 02424 if (this->incoming_message_queue_.queue_length () > 0) 02425 { 02426 if (TAO_debug_level > 0) 02427 { 02428 TAOLIB_DEBUG ((LM_DEBUG, 02429 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 02430 ACE_TEXT ("notify reactor\n"), 02431 this->id ())); 02432 } 02433 02434 int const retval = this->notify_reactor (); 02435 02436 if (retval == 1) 02437 { 02438 // Let the class know that it doesn't need to resume the 02439 // handle.. 02440 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); 02441 } 02442 else if (retval < 0) 02443 return -1; 02444 } 02445 else 02446 { 02447 // As there are no further messages in queue just resume 02448 // the handle. Set the flag incase someone had reset the flag.. 02449 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); 02450 } 02451 02452 // PRE: incoming_message_queue is empty 02453 if (this->process_parsed_messages (&qd, rh) == -1) 02454 { 02455 return -1; 02456 } 02457 // move the rd_ptr tp position of end_marker 02458 message_block.rd_ptr (end_marker); 02459 } 02460 } 02461 02462 // Now that all cases have been processed, there might be kept some data 02463 // in buffer that needs to be safed for next "handle_input" invocations. 02464 if (message_block.length () > 0) 02465 { 02466 if (this->partial_message_ == 0) 02467 { 02468 this->allocate_partial_message_block (); 02469 } 02470 02471 if (this->partial_message_ != 0 && 02472 this->partial_message_->copy (message_block.rd_ptr (), 02473 message_block.length ()) == 0) 02474 { 02475 message_block.rd_ptr (message_block.length ()); 02476 } 02477 else 02478 { 02479 return -1; 02480 } 02481 } 02482 02483 return 0; 02484 }
int TAO_Transport::handle_input_parse_extra_messages | ( | ACE_Message_Block & | message_block | ) | [private] |
Is invoked by handle_input_parse_data. Parses all messages remaining in message_block.
Definition at line 2008 of file Transport.cpp.
02010 { 02011 // store buffer status of last extraction: -1 parse error, 0 02012 // incomplete message header in buffer, 1 complete messages header 02013 // parsed 02014 int buf_status = 0; 02015 02016 TAO_Queued_Data *q_data = 0; // init 02017 02018 // parse buffer until all messages have been extracted, consolidate 02019 // and enqueue complete messages, if the last message being parsed 02020 // has missin data, it is stays on top of incoming_message_stack. 02021 while (message_block.length () > 0 && 02022 (buf_status = this->messaging_object ()->extract_next_message 02023 (message_block, q_data)) != -1 && 02024 q_data != 0) // paranoid check 02025 { 02026 if (q_data->missing_data () == 0) 02027 { 02028 if (this->consolidate_enqueue_message (q_data) == -1) 02029 { 02030 return -1; 02031 } 02032 } 02033 else // incomplete message read, probably the last message in buffer 02034 { 02035 // can not fail 02036 this->incoming_message_stack_.push (q_data); 02037 } 02038 02039 q_data = 0; // reset 02040 } // while 02041 02042 if (buf_status == -1) 02043 { 02044 return -1; 02045 } 02046 02047 return 0; 02048 }
TAO_Transport::Drain_Result TAO_Transport::handle_output | ( | TAO::Transport::Drain_Constraints const & | dc | ) |
Callback method to reactively drain the outgoing data queue.
Methods called and used in the output path of the ORB.
Definition at line 599 of file Transport.cpp.
00600 { 00601 if (TAO_debug_level > 3) 00602 { 00603 TAOLIB_DEBUG ((LM_DEBUG, 00604 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output") 00605 ACE_TEXT (" - block_on_io=%d, timeout=%d.%06d\n"), 00606 this->id (), 00607 dc.block_on_io(), 00608 dc.timeout() ? dc.timeout()->sec() : static_cast<time_t> (-1), 00609 dc.timeout() ? dc.timeout()->usec() : -1 )); 00610 } 00611 00612 // The flushing strategy (potentially via the Reactor) wants to send 00613 // more data, first check if there is a current message that needs 00614 // more sending... 00615 Drain_Result const retval = this->drain_queue (dc); 00616 00617 if (TAO_debug_level > 3) 00618 { 00619 TAOLIB_DEBUG ((LM_DEBUG, 00620 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ") 00621 ACE_TEXT ("drain_queue returns %d/%d\n"), 00622 this->id (), 00623 static_cast<int> (retval.dre_), ACE_ERRNO_GET)); 00624 } 00625 00626 // Any errors are returned directly to the Reactor 00627 return retval; 00628 }
int TAO_Transport::handle_timeout | ( | const ACE_Time_Value & | current_time, | |
const void * | act | |||
) |
The timeout callback, invoked when any of the timers related to this transport expire.
current_time | The current time as reported from the Reactor | |
act | The Asynchronous Completion Token. Currently it is interpreted as follows:
|
This is the only legal ACT in the current configuration....
Definition at line 956 of file Transport.cpp.
00958 { 00959 if (TAO_debug_level > 6) 00960 { 00961 TAOLIB_DEBUG ((LM_DEBUG, 00962 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_timeout, ") 00963 ACE_TEXT ("timer expired\n"), 00964 this->id ())); 00965 } 00966 00967 /// This is the only legal ACT in the current configuration.... 00968 if (act != &this->current_deadline_) 00969 { 00970 return -1; 00971 } 00972 00973 if (this->flush_timer_pending ()) 00974 { 00975 // The timer is always a oneshot timer, so mark is as not 00976 // pending. 00977 this->reset_flush_timer (); 00978 00979 TAO_Flushing_Strategy *flushing_strategy = 00980 this->orb_core ()->flushing_strategy (); 00981 int const result = flushing_strategy->schedule_output (this); 00982 if (result == TAO_Flushing_Strategy::MUST_FLUSH) 00983 { 00984 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 00985 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 00986 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 00987 if (flushing_strategy->flush_transport (this, 0) == -1) { 00988 return -1; 00989 } 00990 } 00991 } 00992 00993 return 0; 00994 }
void TAO_Transport::id | ( | size_t | id | ) |
Definition at line 97 of file Transport.inl.
size_t TAO_Transport::id | ( | void | ) | const |
Set and Get the identifier for this transport instance.
If not set, this will return an integer representation of the this
pointer for the instance on which it's called.
Definition at line 91 of file Transport.inl.
00092 { 00093 return this->id_; 00094 }
bool TAO_Transport::idle_after_reply | ( | void | ) |
Request is sent and the reply is received. Idle the transport now.
Definition at line 275 of file Transport.cpp.
00276 { 00277 return this->tms ()->idle_after_reply (); 00278 }
bool TAO_Transport::idle_after_send | ( | void | ) |
Request has been just sent, but the reply is not received. Idle the transport now.
Definition at line 269 of file Transport.cpp.
00270 { 00271 return this->tms ()->idle_after_send (); 00272 }
ACE_Time_Value const * TAO_Transport::io_timeout | ( | TAO::Transport::Drain_Constraints const & | dc | ) | const [protected] |
Re-factor computation of I/O timeouts based on operation timeouts. Depending on the wait strategy, we need to timeout I/O operations or not. For example, if we are using a non-blocking strategy, we want to pass 0 to all I/O operations, and rely on the ACE_NONBLOCK settings on the underlying sockets. However, for blocking strategies we want to pass the operation timeouts, to respect the application level policies.
This function was introduced as part of the fixes for bug 3647.
Definition at line 2894 of file Transport.cpp.
02896 { 02897 if (dc.block_on_io()) 02898 { 02899 return dc.timeout(); 02900 } 02901 if (this->wait_strategy()->can_process_upcalls()) 02902 { 02903 return 0; 02904 } 02905 return dc.timeout(); 02906 }
bool TAO_Transport::is_connected | ( | void | ) | const |
Is this transport really connected.
Definition at line 181 of file Transport.inl.
00182 { 00183 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, false); 00184 return this->is_connected_; 00185 }
CORBA::Boolean TAO_Transport::is_tcs_set | ( | void | ) | const |
Return true if the tcs has been set.
CodeSet negotiation.
Definition at line 163 of file Transport.inl.
00164 { 00165 return tcs_set_; 00166 }
int TAO_Transport::make_idle | ( | void | ) |
Cache management.
Definition at line 577 of file Transport.cpp.
00578 { 00579 if (TAO_debug_level > 3) 00580 { 00581 TAOLIB_DEBUG ((LM_DEBUG, 00582 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"), 00583 this->id ())); 00584 } 00585 00586 return this->transport_cache_manager ().make_idle (this->cache_map_entry_); 00587 }
void TAO_Transport::messaging_init | ( | TAO_GIOP_Message_Version const & | version | ) |
Initializing the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects.
Definition at line 2766 of file Transport.cpp.
02767 { 02768 this->messaging_object ()->init (version.major, version.minor); 02769 }
TAO_GIOP_Message_Base * TAO_Transport::messaging_object | ( | void | ) |
Return the messaging object that is used to format the data that needs to be sent.
Definition at line 126 of file Transport.inl.
00127 { 00128 return this->messaging_object_; 00129 }
int TAO_Transport::notify_reactor | ( | void | ) | [private] |
These classes need privileged access to:
Definition at line 292 of file Transport.inl.
00293 { 00294 if (!this->ws_->is_registered ()) 00295 { 00296 return 0; 00297 } 00298 00299 return this->notify_reactor_now (); 00300 }
int TAO_Transport::notify_reactor_now | ( | void | ) | [protected] |
These classes need privileged access to:
Definition at line 2674 of file Transport.cpp.
02675 { 02676 ACE_Event_Handler *eh = this->event_handler_i (); 02677 02678 // Get the reactor associated with the event handler 02679 ACE_Reactor *reactor = this->orb_core ()->reactor (); 02680 02681 if (TAO_debug_level > 0) 02682 { 02683 TAOLIB_DEBUG ((LM_DEBUG, 02684 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ") 02685 ACE_TEXT ("notify to Reactor\n"), 02686 this->id ())); 02687 } 02688 02689 // Send a notification to the reactor... 02690 int const retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK); 02691 02692 if (retval < 0 && TAO_debug_level > 2) 02693 { 02694 // @todo: need to think about what is the action that 02695 // we can take when we get here. 02696 TAOLIB_ERROR ((LM_ERROR, 02697 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ") 02698 ACE_TEXT ("notify to the reactor failed..\n"), 02699 this->id ())); 02700 } 02701 02702 return 1; 02703 }
void TAO_Transport::opened_as | ( | TAO::Connection_Role | role | ) |
Definition at line 54 of file Transport.inl.
00055 { 00056 this->opening_connection_role_ = role; 00057 }
TAO::Connection_Role TAO_Transport::opened_as | ( | void | ) | const |
Methods dealing with the role of the connection, e.g., CLIENT or SERVER. See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions.
Definition at line 48 of file Transport.inl.
00049 { 00050 return this->opening_connection_role_; 00051 }
TAO_ORB_Core * TAO_Transport::orb_core | ( | void | ) | const |
Access the ORB that owns this connection.
Definition at line 17 of file Transport.inl.
00018 { 00019 return this->orb_core_; 00020 }
TAO_OutputCDR & TAO_Transport::out_stream | ( | void | ) |
Accessor for the output CDR stream.
Definition at line 2754 of file Transport.cpp.
02755 { 02756 return this->messaging_object ()->out_stream (); 02757 }
TAO_SYNCH_MUTEX & TAO_Transport::output_cdr_lock | ( | void | ) |
Accessor for synchronizing Transport OutputCDR access.
Definition at line 2760 of file Transport.cpp.
02761 { 02762 return this->output_cdr_mutex_; 02763 }
bool TAO_Transport::post_connect_hook | ( | void | ) | [virtual] |
Hooks that can be overridden in concrete transports.
These hooks are invoked just after connection establishment (or after a connection is fetched from cache). The return value signifies whether the invoker should proceed with post connection establishment activities. Protocols like SSLIOP need this to verify whether connections already established have valid certificates. There are no pre_connect_hooks () since the transport doesn't exist before a connection establishment. :-)
Definition at line 327 of file Transport.cpp.
bool TAO_Transport::post_open | ( | size_t | id | ) |
Perform all the actions when this transport get opened.
Definition at line 2795 of file Transport.cpp.
02796 { 02797 if (TAO_debug_level > 9) 02798 { 02799 TAOLIB_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport::post_open, ") 02800 ACE_TEXT ("tport id changed from %d to %d\n"), this->id_, id)); 02801 } 02802 this->id_ = id; 02803 02804 // When we have data in our outgoing queue schedule ourselves 02805 // for output 02806 if (!this->queue_is_empty_i ()) 02807 { 02808 // If the wait strategy wants us to be registered with the reactor 02809 // then we do so. If registration is required and it succeeds, 02810 // #REFCOUNT# becomes two. 02811 if (this->wait_strategy ()->register_handler () == 0) 02812 { 02813 if (this->flush_in_post_open_) 02814 { 02815 TAO_Flushing_Strategy *flushing_strategy = 02816 this->orb_core ()->flushing_strategy (); 02817 02818 if (flushing_strategy == 0) 02819 throw CORBA::INTERNAL (); 02820 02821 this->flush_in_post_open_ = false; 02822 (void)flushing_strategy->schedule_output (this); 02823 } 02824 } 02825 else 02826 { 02827 // Registration failures. 02828 02829 // Purge from the connection cache, if we are not in the cache, this 02830 // just does nothing. 02831 (void) this->purge_entry (); 02832 02833 // Close the handler. 02834 (void) this->close_connection (); 02835 02836 if (TAO_debug_level > 0) 02837 { 02838 TAOLIB_ERROR ((LM_ERROR, 02839 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_open , ") 02840 ACE_TEXT ("could not register the transport ") 02841 ACE_TEXT ("in the reactor.\n"), 02842 this->id ())); 02843 } 02844 02845 return false; 02846 } 02847 } 02848 02849 { 02850 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, false); 02851 this->is_connected_ = true; 02852 } 02853 02854 if (TAO_debug_level > 9 && !this->cache_map_entry_) 02855 { 02856 TAOLIB_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_open") 02857 ACE_TEXT (", cache_map_entry_ is 0\n"), this->id_)); 02858 } 02859 02860 this->transport_cache_manager ().mark_connected (this->cache_map_entry_, 02861 true); 02862 02863 // update transport cache to make this entry available 02864 this->transport_cache_manager ().set_entry_state ( 02865 this->cache_map_entry_, 02866 TAO::ENTRY_IDLE_AND_PURGABLE); 02867 02868 return true; 02869 }
void TAO_Transport::pre_close | ( | void | ) |
do what needs to be done when closing the transport
Definition at line 2772 of file Transport.cpp.
02773 { 02774 if (TAO_debug_level > 9) 02775 { 02776 TAOLIB_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::pre_close\n"), 02777 this->id_)); 02778 } 02779 // @TODO: something needs to be done with is_connected_. Checking it is 02780 // guarded by a mutex, but setting it is not. Until the need for mutexed 02781 // protection is required, the transport cache is holding its own copy 02782 // of the is_connected_ flag, so that during cache lookups the cache 02783 // manager doesn't need to be burdened by the lock in is_connected(). 02784 this->is_connected_ = false; 02785 this->transport_cache_manager ().mark_connected (this->cache_map_entry_, 02786 false); 02787 this->purge_entry (); 02788 { 02789 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_)); 02790 this->cleanup_queue_i (); 02791 } 02792 }
int TAO_Transport::process_parsed_messages | ( | TAO_Queued_Data * | qd, | |
TAO_Resume_Handle & | rh | |||
) | [protected] |
Process the message by sending it to the higher layers of the ORB.
Definition at line 2488 of file Transport.cpp.
02490 { 02491 if (TAO_debug_level > 7) 02492 { 02493 TAOLIB_DEBUG ((LM_DEBUG, 02494 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02495 ACE_TEXT ("entering (missing data == %d)\n"), 02496 this->id(), qd->missing_data ())); 02497 } 02498 02499 #if TAO_HAS_TRANSPORT_CURRENT == 1 02500 // Update stats, if any 02501 if (this->stats_ != 0) 02502 this->stats_->messages_received (qd->msg_block ()->length ()); 02503 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 02504 02505 switch (qd->msg_type ()) 02506 { 02507 case GIOP::CloseConnection: 02508 { 02509 if (TAO_debug_level > 0) 02510 { 02511 TAOLIB_DEBUG ((LM_DEBUG, 02512 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02513 ACE_TEXT ("received CloseConnection message - %m\n"), 02514 this->id())); 02515 } 02516 02517 // Return a "-1" so that the next stage can take care of 02518 // closing connection and the necessary memory management. 02519 return -1; 02520 } 02521 break; 02522 case GIOP::Request: 02523 case GIOP::LocateRequest: 02524 { 02525 // Let us resume the handle before we go ahead to process the 02526 // request. This will open up the handle for other threads. 02527 rh.resume_handle (); 02528 02529 if (this->messaging_object ()->process_request_message (this, qd) == -1) 02530 { 02531 // Return a "-1" so that the next stage can take care of 02532 // closing connection and the necessary memory management. 02533 return -1; 02534 } 02535 } 02536 break; 02537 case GIOP::Reply: 02538 case GIOP::LocateReply: 02539 { 02540 rh.resume_handle (); 02541 02542 TAO_Pluggable_Reply_Params params (this); 02543 02544 if (this->messaging_object ()->process_reply_message (params, qd) == -1) 02545 { 02546 if (TAO_debug_level > 0) 02547 { 02548 TAOLIB_ERROR ((LM_ERROR, 02549 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02550 ACE_TEXT ("error in process_reply_message - %m\n"), 02551 this->id ())); 02552 } 02553 02554 return -1; 02555 } 02556 02557 } 02558 break; 02559 case GIOP::CancelRequest: 02560 { 02561 // The associated request might be incomplete residing 02562 // fragmented in messaging object. We must make sure the 02563 // resources allocated by fragments are released. 02564 if (this->messaging_object ()->discard_fragmented_message (qd) == -1) 02565 { 02566 if (TAO_debug_level > 0) 02567 { 02568 TAOLIB_ERROR ((LM_ERROR, 02569 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02570 ACE_TEXT ("error processing CancelRequest\n"), 02571 this->id ())); 02572 } 02573 } 02574 02575 // We are not able to cancel requests being processed already; 02576 // this is declared as optional feature by CORBA, and TAO does 02577 // not support this currently. 02578 02579 // Just continue processing, CancelRequest does not mean to cut 02580 // off the connection. 02581 } 02582 break; 02583 case GIOP::MessageError: 02584 { 02585 if (TAO_debug_level > 0) 02586 { 02587 TAOLIB_ERROR ((LM_ERROR, 02588 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02589 ACE_TEXT ("received MessageError, closing connection\n"), 02590 this->id ())); 02591 } 02592 return -1; 02593 } 02594 break; 02595 case GIOP::Fragment: 02596 { 02597 // Nothing to be done. 02598 } 02599 break; 02600 } 02601 02602 // If not, just return back.. 02603 return 0; 02604 }
int TAO_Transport::process_queue_head | ( | TAO_Resume_Handle & | rh | ) | [private] |
These classes need privileged access to:
Definition at line 2607 of file Transport.cpp.
02608 { 02609 if (TAO_debug_level > 3) 02610 { 02611 TAOLIB_DEBUG ((LM_DEBUG, 02612 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"), 02613 this->id (), this->incoming_message_queue_.queue_length () )); 02614 } 02615 02616 // See if message in queue ... 02617 if (this->incoming_message_queue_.queue_length () > 0) 02618 { 02619 // Get the message on the head of the queue.. 02620 TAO_Queued_Data *qd = 02621 this->incoming_message_queue_.dequeue_head (); 02622 02623 if (TAO_debug_level > 3) 02624 { 02625 TAOLIB_DEBUG ((LM_DEBUG, 02626 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ") 02627 ACE_TEXT ("the size of the queue is [%d]\n"), 02628 this->id (), 02629 this->incoming_message_queue_.queue_length())); 02630 } 02631 // Now that we have pulled out out one message out of the queue, 02632 // check whether we have one more message in the queue... 02633 if (this->incoming_message_queue_.queue_length () > 0) 02634 { 02635 if (TAO_debug_level > 0) 02636 { 02637 TAOLIB_DEBUG ((LM_DEBUG, 02638 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ") 02639 ACE_TEXT ("notify reactor\n"), 02640 this->id ())); 02641 } 02642 02643 int const retval = this->notify_reactor (); 02644 02645 if (retval == 1) 02646 { 02647 // Let the class know that it doesn't need to resume the 02648 // handle.. 02649 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); 02650 } 02651 else if (retval < 0) 02652 return -1; 02653 } 02654 else 02655 { 02656 // As we are ready to process the last message just resume 02657 // the handle. Set the flag incase someone had reset the flag.. 02658 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); 02659 } 02660 02661 // Process the message... 02662 int const retval = this->process_parsed_messages (qd, rh); 02663 02664 // Delete the Queued_Data.. 02665 TAO_Queued_Data::release (qd); 02666 02667 return retval; 02668 } 02669 02670 return 1; 02671 }
bool TAO_Transport::provide_blockable_handler | ( | TAO::Connection_Handler_Set & | handlers | ) |
Add event handlers corresponding to transports that have RW wait strategy to the handlers set. Called by the cache when the ORB is shutting down.
handlers | The TAO_Connection_Handler_Set into which the transport should place its handler if the transport has RW strategy on. |
Definition at line 255 of file Transport.cpp.
00256 { 00257 if (this->ws_->non_blocking () || 00258 this->opening_connection_role_ == TAO::TAO_SERVER_ROLE) 00259 return false; 00260 00261 (void) this->add_reference (); 00262 00263 h.insert (this->connection_handler_i ()); 00264 00265 return true; 00266 }
void TAO_Transport::provide_handler | ( | TAO::Connection_Handler_Set & | handlers | ) |
Added event handler to the handlers set.
Called by the cache when the cache is closing.
handlers | The TAO_Connection_Handler_Set into which the transport should place its handler |
Definition at line 247 of file Transport.cpp.
00248 { 00249 (void) this->add_reference (); 00250 00251 handlers.insert (this->connection_handler_i ()); 00252 }
int TAO_Transport::purge_entry | ( | void | ) |
Cache management.
Definition at line 557 of file Transport.cpp.
00558 { 00559 if (TAO_debug_level > 3) 00560 { 00561 TAOLIB_DEBUG ((LM_DEBUG, 00562 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::purge_entry, ") 00563 ACE_TEXT ("entry is %@\n"), 00564 this->id (), this->cache_map_entry_)); 00565 } 00566 00567 return this->transport_cache_manager ().purge_entry (this->cache_map_entry_); 00568 }
void TAO_Transport::purging_order | ( | unsigned long | value | ) |
Definition at line 81 of file Transport.inl.
00082 { 00083 // This should only be called by the Transport Cache Manager when 00084 // it is holding it's lock. 00085 // The transport should still be here since the cache manager still 00086 // has a reference to it. 00087 this->purging_order_ = value; 00088 }
unsigned long TAO_Transport::purging_order | ( | void | ) | const |
Get and Set the purging order. The purging strategy uses the set version to set the purging order.
Definition at line 75 of file Transport.inl.
00076 { 00077 return this->purging_order_; 00078 }
bool TAO_Transport::queue_is_empty | ( | void | ) |
Check if there are messages pending in the queue.
Definition at line 103 of file Transport.inl.
00104 { 00105 ACE_GUARD_RETURN (ACE_Lock, 00106 ace_mon, 00107 *this->handler_lock_, 00108 false); 00109 return this->queue_is_empty_i (); 00110 }
bool TAO_Transport::queue_is_empty_i | ( | void | ) | const [private] |
Check if there are messages pending in the queue.
This version assumes that the lock is already held. Use with care!
Definition at line 5 of file Transport.inl.
00006 { 00007 return (this->head_ == 0); 00008 }
int TAO_Transport::queue_message_i | ( | const ACE_Message_Block * | message_block, | |
ACE_Time_Value * | max_wait_time, | |||
bool | back = true | |||
) | [protected] |
Queue a message for message_block
max_wait_time | The maximum time that the operation can block, used in the implementation of timeouts. | |
back | If true, the message will be pushed to the back of the queue. If false, the message will be pushed to the front of the queue. |
Definition at line 1686 of file Transport.cpp.
01688 { 01689 TAO_Queued_Message *queued_message = 0; 01690 ACE_NEW_RETURN (queued_message, 01691 TAO_Asynch_Queued_Message (message_block, 01692 this->orb_core_, 01693 max_wait_time, 01694 0, 01695 true), 01696 -1); 01697 if (back) { 01698 queued_message->push_back (this->head_, this->tail_); 01699 } 01700 else { 01701 queued_message->push_front (this->head_, this->tail_); 01702 } 01703 01704 return 0; 01705 }
int TAO_Transport::recache_transport | ( | TAO_Transport_Descriptor_Interface * | desc | ) |
Recache ourselves in the cache.
Ideally the following should be inline.
purge_entry has a return value, use it
Definition at line 547 of file Transport.cpp.
00548 { 00549 // First purge our entry 00550 this->purge_entry (); 00551 00552 // Then add ourselves to the cache 00553 return this->transport_cache_manager ().cache_transport (desc, this); 00554 }
virtual ssize_t TAO_Transport::recv | ( | char * | buffer, | |
size_t | len, | |||
const ACE_Time_Value * | timeout = 0 | |||
) | [pure virtual] |
Read len bytes from into buf.
This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.
buffer | ORB allocated buffer where the data should be | |
timeout | The ACE_Time_Value *s is just a place holder for now. It is not clear this this is the best place to specify this. The actual timeout values will be kept in the Policies. |
size_t TAO_Transport::recv_buffer_size | ( | void | ) | const |
Accessor to recv_buffer_size_.
Definition at line 194 of file Transport.inl.
00195 { 00196 return this->recv_buffer_size_; 00197 }
int TAO_Transport::register_handler | ( | void | ) | [virtual] |
Register the handler with the reactor.
Register the handler with the reactor. This method is used by the Wait_On_Reactor strategy. The transport must register its event handler with the ORB's Reactor.
Definition at line 367 of file Transport.cpp.
00368 { 00369 if (TAO_debug_level > 4) 00370 { 00371 TAOLIB_DEBUG ((LM_DEBUG, 00372 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"), 00373 this->id ())); 00374 } 00375 00376 ACE_Reactor * const r = this->orb_core_->reactor (); 00377 00378 // @@note: This should be okay since the register handler call will 00379 // not make a nested call into the transport. 00380 ACE_GUARD_RETURN (ACE_Lock, 00381 ace_mon, 00382 *this->handler_lock_, 00383 false); 00384 00385 if (r == this->event_handler_i ()->reactor () && 00386 (this->wait_strategy ()->non_blocking () || 00387 !this->orb_core ()->client_factory ()->use_cleanup_options ())) 00388 { 00389 if (TAO_debug_level > 6) 00390 { 00391 TAOLIB_DEBUG ((LM_DEBUG, 00392 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler - ") 00393 ACE_TEXT ("already registered with reactor\n"), 00394 this->id ())); 00395 } 00396 00397 return 0; 00398 } 00399 00400 if (TAO_debug_level > 6) 00401 { 00402 TAOLIB_DEBUG ((LM_DEBUG, 00403 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler - ") 00404 ACE_TEXT ("registering event handler with reactor\n"), 00405 this->id ())); 00406 } 00407 00408 // Set the flag in the Connection Handler and in the Wait Strategy 00409 // @@Maybe we should set these flags after registering with the 00410 // reactor. What if the registration fails??? 00411 this->ws_->is_registered (true); 00412 00413 // Register the handler with the reactor 00414 return r->register_handler (this->event_handler_i (), 00415 ACE_Event_Handler::READ_MASK); 00416 }
bool TAO_Transport::register_if_necessary | ( | void | ) |
Register with the reactor via the wait strategy.
Definition at line 333 of file Transport.cpp.
00334 { 00335 if (this->is_connected_ && 00336 this->wait_strategy ()->register_handler () == -1) 00337 { 00338 // Registration failures. 00339 if (TAO_debug_level > 0) 00340 { 00341 TAOLIB_ERROR ((LM_ERROR, 00342 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_if_necessary, ") 00343 ACE_TEXT ("could not register the transport ") 00344 ACE_TEXT ("in the reactor.\n"), 00345 this->id ())); 00346 } 00347 00348 // Purge from the connection cache, if we are not in the cache, this 00349 // just does nothing. 00350 (void) this->purge_entry (); 00351 00352 // Close the handler. 00353 (void) this->close_connection (); 00354 00355 return false; 00356 } 00357 return true; 00358 }
int TAO_Transport::remove_handler | ( | void | ) | [virtual] |
Remove the handler from the reactor.
Definition at line 419 of file Transport.cpp.
00420 { 00421 if (TAO_debug_level > 4) 00422 { 00423 TAOLIB_DEBUG ((LM_DEBUG, 00424 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::remove_handler\n"), 00425 this->id ())); 00426 } 00427 00428 ACE_Reactor * const r = this->orb_core_->reactor (); 00429 00430 // @@note: This should be okay since the remove handler call will 00431 // not make a nested call into the transport. 00432 ACE_GUARD_RETURN (ACE_Lock, 00433 ace_mon, 00434 *this->handler_lock_, 00435 false); 00436 00437 00438 if (this->event_handler_i ()->reactor () == 0) 00439 { 00440 return 0; 00441 } 00442 00443 if (TAO_debug_level > 6) 00444 { 00445 TAOLIB_DEBUG ((LM_DEBUG, 00446 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::remove_handler - ") 00447 ACE_TEXT ("removing event handler from reactor\n"), 00448 this->id ())); 00449 } 00450 00451 // Set the flag in the Wait Strategy 00452 this->ws_->is_registered (false); 00453 00454 // Remove the handler from the reactor 00455 if (r->remove_handler (this->event_handler_i (), 00456 ACE_Event_Handler::READ_MASK| 00457 ACE_Event_Handler::DONT_CALL) == -1) 00458 { 00459 if (TAO_debug_level > 0) 00460 TAOLIB_ERROR ((LM_ERROR, 00461 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::remove_handler - ") 00462 ACE_TEXT ("reactor->remove_handler failed\n"), 00463 this->id ())); 00464 return -1; 00465 } 00466 else 00467 { 00468 // reset the reactor property of the event handler or 00469 // Transport::register_handler() will not re-register 00470 // when called after us again. 00471 this->event_handler_i ()->reactor (0); 00472 return 0; 00473 } 00474 }
ACE_Event_Handler::Reference_Count TAO_Transport::remove_reference | ( | void | ) |
These classes need privileged access to:
Definition at line 2748 of file Transport.cpp.
02749 { 02750 return this->event_handler_i ()->remove_reference (); 02751 }
void TAO_Transport::report_invalid_event_handler | ( | const char * | caller | ) | [private] |
Print out error messages if the event handler is not valid.
Definition at line 1367 of file Transport.cpp.
01368 { 01369 if (TAO_debug_level > 0) 01370 { 01371 TAOLIB_DEBUG ((LM_DEBUG, 01372 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler") 01373 ACE_TEXT ("(%C) no longer associated with handler [tag=%d]\n"), 01374 this->id (), caller, this->tag_)); 01375 } 01376 }
void TAO_Transport::reset_flush_timer | ( | void | ) | [private] |
The flush timer expired or was explicitly cancelled, mark it as not pending
Definition at line 119 of file Transport.inl.
00120 { 00121 this->flush_timer_id_ = -1; 00122 this->current_deadline_ = ACE_Time_Value::zero; 00123 }
int TAO_Transport::schedule_output_i | ( | void | ) | [private] |
Schedule handle_output() callbacks.
Definition at line 886 of file Transport.cpp.
00887 { 00888 ACE_Event_Handler * const eh = this->event_handler_i (); 00889 ACE_Reactor * const reactor = eh->reactor (); 00890 00891 if (reactor == 0) 00892 { 00893 if (TAO_debug_level > 1) 00894 { 00895 TAOLIB_ERROR ((LM_ERROR, 00896 ACE_TEXT ("TAO (%P|%t) - ") 00897 ACE_TEXT ("Transport[%d]::schedule_output_i, ") 00898 ACE_TEXT ("no reactor,") 00899 ACE_TEXT ("returning -1\n"), 00900 this->id ())); 00901 } 00902 return -1; 00903 } 00904 00905 // Check to see if our event handler is still registered with the 00906 // reactor. It's possible for another thread to have run close_connection() 00907 // since we last used the event handler. 00908 ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ()); 00909 if (found) 00910 { 00911 found->remove_reference (); 00912 00913 if (found != eh) 00914 { 00915 if (TAO_debug_level > 3) 00916 { 00917 TAOLIB_ERROR ((LM_ERROR, 00918 ACE_TEXT ("TAO (%P|%t) - ") 00919 ACE_TEXT ("Transport[%d]::schedule_output_i ") 00920 ACE_TEXT ("event handler not found in reactor,") 00921 ACE_TEXT ("returning -1\n"), 00922 this->id ())); 00923 } 00924 00925 return -1; 00926 } 00927 } 00928 00929 if (TAO_debug_level > 3) 00930 { 00931 TAOLIB_DEBUG ((LM_DEBUG, 00932 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"), 00933 this->id ())); 00934 } 00935 00936 return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK); 00937 }
virtual ssize_t TAO_Transport::send | ( | iovec * | iov, | |
int | iovcnt, | |||
size_t & | bytes_transferred, | |||
ACE_Time_Value const * | timeout | |||
) | [pure virtual] |
Write the complete Message_Block chain to the connection.
This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.
Often the implementation simply forwards the arguments to the underlying ACE_Svc_Handler class. Using the code factored out into ACE.
Be careful with protocols that perform non-trivial transformations of the data, such as SSLIOP or protocols that compress the stream.
iov | contains the data that must be sent. | |
timeout | is the maximum time that the application is willing to wait for the data to be sent, useful in platforms that implement timed writes. The timeout value is obtained from the policies set by the application. | |
bytes_transferred | should return the total number of bytes successfully transferred before the connection blocked. This is required because in some platforms and/or protocols multiple system calls may be required to send the chain of message blocks. The first few calls can work successfully, but the final one can fail or signal a flow control situation (via EAGAIN). In this case the ORB expects the function to return -1, errno to be appropriately set and this argument to return the number of bytes already on the OS I/O subsystem. |
This call can also fail if the transport instance is no longer associated with a connection (e.g., the connection handler closed down). In that case, it returns -1 and sets errno to ENOENT
.
int TAO_Transport::send_asynchronous_message_i | ( | TAO_Stub * | stub, | |
const ACE_Message_Block * | message_block, | |||
ACE_Time_Value * | max_wait_time | |||
) | [private] |
Send an asynchronous message, i.e. do not block until the message is on the wire
Definition at line 1435 of file Transport.cpp.
01438 { 01439 // Let's figure out if the message should be queued without trying 01440 // to send first: 01441 bool try_sending_first = true; 01442 01443 bool const queue_empty = this->queue_is_empty_i (); 01444 01445 TAO::Transport_Queueing_Strategy *queue_strategy = 01446 stub->transport_queueing_strategy (); 01447 01448 if (!queue_empty) 01449 { 01450 try_sending_first = false; 01451 } 01452 else if (queue_strategy) 01453 { 01454 if (queue_strategy->must_queue (queue_empty)) 01455 { 01456 try_sending_first = false; 01457 } 01458 } 01459 01460 bool partially_sent = false; 01461 bool timeout_encountered = false; 01462 01463 TAO::Transport::Drain_Constraints dc( 01464 max_wait_time, this->using_blocking_io_for_asynch_messages()); 01465 01466 if (try_sending_first) 01467 { 01468 ssize_t n = 0; 01469 size_t byte_count = 0; 01470 // ... in this case we must try to send the message first ... 01471 01472 size_t const total_length = message_block->total_length (); 01473 01474 if (TAO_debug_level > 6) 01475 { 01476 TAOLIB_DEBUG ((LM_DEBUG, 01477 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01478 ACE_TEXT ("trying to send the message (ml = %d)\n"), 01479 this->id (), total_length)); 01480 } 01481 01482 // @@ I don't think we want to hold the mutex here, however if 01483 // we release it we need to recheck the status of the transport 01484 // after we return... once I understand the final form for this 01485 // code I will re-visit this decision 01486 n = this->send_message_block_chain_i (message_block, 01487 byte_count, 01488 dc); 01489 01490 if (n == -1) 01491 { 01492 // ... if this is just an EWOULDBLOCK we must schedule the 01493 // message for later, if it is ETIME we still have to send 01494 // the complete message, because cutting off the message at 01495 // this point will destroy the synchronization with the 01496 // server ... 01497 if (errno != EWOULDBLOCK && errno != ETIME) 01498 { 01499 if (TAO_debug_level > 0) 01500 { 01501 TAOLIB_ERROR ((LM_ERROR, 01502 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01503 ACE_TEXT ("fatal error in ") 01504 ACE_TEXT ("send_message_block_chain_i - %m\n"), 01505 this->id ())); 01506 } 01507 return -1; 01508 } 01509 } 01510 01511 // ... let's figure out if the complete message was sent ... 01512 if (total_length == byte_count) 01513 { 01514 // Done, just return. Notice that there are no allocations 01515 // or copies up to this point (though some fancy calling 01516 // back and forth). 01517 // This is the common case for the critical path, it should 01518 // be fast. 01519 return 0; 01520 } 01521 01522 if (byte_count > 0) 01523 { 01524 partially_sent = true; 01525 } 01526 01527 // If it was partially sent, then push to front of queue and don't flush 01528 if (n == -1 && errno == ETIME) 01529 { 01530 timeout_encountered = true; 01531 if (byte_count == 0) 01532 { 01533 //This request has timed out and none of it was sent to the transport 01534 //We can't return -1 here, since that would end up closing the tranpsort 01535 if (TAO_debug_level > 2) 01536 { 01537 TAOLIB_DEBUG ((LM_DEBUG, 01538 ACE_TEXT ("TAO (%P|%t) - ") 01539 ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ") 01540 ACE_TEXT ("timeout encountered before any bytes sent\n"), 01541 this->id ())); 01542 } 01543 throw ::CORBA::TIMEOUT ( 01544 CORBA::SystemException::_tao_minor_code ( 01545 TAO_TIMEOUT_SEND_MINOR_CODE, 01546 ETIME), 01547 CORBA::COMPLETED_NO); 01548 } 01549 } 01550 01551 if (TAO_debug_level > 6) 01552 { 01553 TAOLIB_DEBUG ((LM_DEBUG, 01554 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01555 ACE_TEXT ("partial send %d / %d bytes\n"), 01556 this->id (), byte_count, total_length)); 01557 } 01558 01559 // ... part of the data was sent, need to figure out what piece 01560 // of the message block chain must be queued ... 01561 while (message_block != 0 && message_block->length () == 0) 01562 { 01563 message_block = message_block->cont (); 01564 } 01565 01566 // ... at least some portion of the message block chain should 01567 // remain ... 01568 } 01569 01570 // ... either the message must be queued or we need to queue it 01571 // because it was not completely sent out ... 01572 01573 ACE_Time_Value *wait_time = (partially_sent ? 0: max_wait_time); 01574 if (this->queue_message_i (message_block, wait_time, !partially_sent) 01575 == -1) 01576 { 01577 if (TAO_debug_level > 0) 01578 { 01579 TAOLIB_DEBUG ((LM_DEBUG, 01580 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") 01581 ACE_TEXT ("send_asynchronous_message_i, ") 01582 ACE_TEXT ("cannot queue message for - %m\n"), 01583 this->id ())); 01584 } 01585 return -1; 01586 } 01587 01588 if (TAO_debug_level > 6) 01589 { 01590 TAOLIB_DEBUG ((LM_DEBUG, 01591 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01592 ACE_TEXT ("message is queued\n"), 01593 this->id ())); 01594 } 01595 01596 if (timeout_encountered && partially_sent) 01597 { 01598 //Must close down the transport here since we can't guarantee the 01599 //integrity of the GIOP stream (the next send may try to write to 01600 //the socket before looking at the queue). 01601 if (TAO_debug_level > 0) 01602 { 01603 TAOLIB_DEBUG ((LM_DEBUG, 01604 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") 01605 ACE_TEXT ("send_asynchronous_message_i, ") 01606 ACE_TEXT ("timeout after partial send, closing.\n"), 01607 this->id ())); 01608 } 01609 return -1; 01610 } 01611 else if (!timeout_encountered) 01612 { 01613 // We can't flush if we have already encountered a timeout 01614 // ... if the queue is full we need to activate the output on the 01615 // queue ... 01616 bool must_flush = false; 01617 const bool constraints_reached = 01618 this->check_buffering_constraints_i (stub, 01619 must_flush); 01620 01621 // ... but we also want to activate it if the message was partially 01622 // sent.... Plus, when we use the blocking flushing strategy the 01623 // queue is flushed as a side-effect of 'schedule_output()' 01624 01625 TAO_Flushing_Strategy *flushing_strategy = 01626 this->orb_core ()->flushing_strategy (); 01627 01628 if (constraints_reached || try_sending_first) 01629 { 01630 int const result = flushing_strategy->schedule_output (this); 01631 if (result == TAO_Flushing_Strategy::MUST_FLUSH) 01632 { 01633 must_flush = true; 01634 } 01635 } 01636 01637 if (must_flush) 01638 { 01639 if (TAO_debug_level > 0) 01640 { 01641 TAOLIB_DEBUG ((LM_DEBUG, 01642 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") 01643 ACE_TEXT ("send_asynchronous_message_i, ") 01644 ACE_TEXT ("flushing transport.\n"), 01645 this->id ())); 01646 } 01647 01648 size_t sent_byte = sent_byte_count_; 01649 int ret = 0; 01650 { 01651 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 01652 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 01653 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 01654 ret = flushing_strategy->flush_transport (this, max_wait_time); 01655 } 01656 01657 if (ret == -1) 01658 { 01659 if (errno == ETIME) 01660 { 01661 if (sent_byte == sent_byte_count_) // if nothing was actually flushed 01662 { 01663 //This request has timed out and none of it was sent to the transport 01664 //We can't return -1 here, since that would end up closing the tranpsort 01665 if (TAO_debug_level > 2) 01666 { 01667 TAOLIB_DEBUG ((LM_DEBUG, 01668 ACE_TEXT ("TAO (%P|%t) - ") 01669 ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ") 01670 ACE_TEXT ("2 timeout encountered before any bytes sent\n"), 01671 this->id ())); 01672 } 01673 throw ::CORBA::TIMEOUT (CORBA::SystemException::_tao_minor_code 01674 (TAO_TIMEOUT_SEND_MINOR_CODE, ETIME), 01675 CORBA::COMPLETED_NO); 01676 } 01677 } 01678 return -1; 01679 } 01680 } 01681 } 01682 return 0; 01683 }
void TAO_Transport::send_connection_closed_notifications | ( | void | ) |
Notify all the components inside a Transport when the underlying connection is closed.
Definition at line 1379 of file Transport.cpp.
01380 { 01381 { 01382 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_)); 01383 01384 this->send_connection_closed_notifications_i (); 01385 } 01386 01387 this->tms ()->connection_closed (); 01388 }
void TAO_Transport::send_connection_closed_notifications_i | ( | void | ) | [private] |
Assume the lock is held.
Definition at line 1391 of file Transport.cpp.
01392 { 01393 this->cleanup_queue_i (); 01394 }
virtual int TAO_Transport::send_message | ( | TAO_OutputCDR & | stream, | |
TAO_Stub * | stub = 0 , |
|||
TAO_ServerRequest * | request = 0 , |
|||
TAO_Message_Semantics | message_semantics = TAO_Message_Semantics() , |
|||
ACE_Time_Value * | max_time_wait = 0 | |||
) | [pure virtual] |
This method formats the stream and then sends the message on the transport. Once the ORB is prepared to receive a reply (see send_request() above), and all the arguments have been marshaled the CDR stream must be 'formatted', i.e. the message_size field in the GIOP header can finally be set to the proper value.
int TAO_Transport::send_message_block_chain | ( | const ACE_Message_Block * | message_block, | |
size_t & | bytes_transferred, | |||
ACE_Time_Value * | max_wait_time = 0 | |||
) |
This is a very specialized interface to send a simple chain of messages through the Transport. The only place we use this interface is in GIOP_Message_Base.cpp, to send error messages (i.e., an indication that we received a malformed GIOP message,) and to close the connection.
Definition at line 651 of file Transport.cpp.
00654 { 00655 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); 00656 00657 TAO::Transport::Drain_Constraints dc( 00658 max_wait_time, true); 00659 00660 return this->send_message_block_chain_i (mb, 00661 bytes_transferred, 00662 dc); 00663 }
int TAO_Transport::send_message_block_chain_i | ( | const ACE_Message_Block * | message_block, | |
size_t & | bytes_transferred, | |||
TAO::Transport::Drain_Constraints const & | dc | |||
) |
Send a message block chain, assuming the lock is held.
Definition at line 666 of file Transport.cpp.
00669 { 00670 size_t const total_length = mb->total_length (); 00671 00672 // We are going to block, so there is no need to clone 00673 // the message block. 00674 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); 00675 00676 synch_message.push_back (this->head_, this->tail_); 00677 00678 Drain_Result const n = this->drain_queue_i (dc); 00679 00680 if (n == DR_ERROR) 00681 { 00682 synch_message.remove_from_list (this->head_, this->tail_); 00683 return -1; // Error while sending... 00684 } 00685 else if (n == DR_QUEUE_EMPTY) 00686 { 00687 bytes_transferred = total_length; 00688 return 1; // Empty queue, message was sent.. 00689 } 00690 00691 // Remove the temporary message from the queue... 00692 synch_message.remove_from_list (this->head_, this->tail_); 00693 00694 bytes_transferred = total_length - synch_message.message_length (); 00695 00696 return 0; 00697 }
int TAO_Transport::send_message_shared | ( | TAO_Stub * | stub, | |
TAO_Message_Semantics | message_semantics, | |||
const ACE_Message_Block * | message_block, | |||
ACE_Time_Value * | max_wait_time | |||
) | [virtual] |
Sent the contents of message_block.
stub | The object reference used for this operation, useful to obtain the current policies. | |
message_semantics | If this is set to TAO_TWO_REQUEST this method will block until the operation is completely written on the wire. If it is set to other values this operation could return. | |
message_block | The CDR encapsulation of the GIOP message that must be sent. The message may consist of multiple Message Blocks chained through the cont() field. | |
max_wait_time | The maximum time that the operation can block, used in the implementation of timeouts. |
Definition at line 297 of file Transport.cpp.
00301 { 00302 int result = 0; 00303 00304 { 00305 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); 00306 00307 result = 00308 this->send_message_shared_i (stub, message_semantics, 00309 message_block, max_wait_time); 00310 } 00311 00312 if (result == -1) 00313 { 00314 // The connection needs to be closed here. 00315 // In the case of a partially written message this is the only way to cleanup 00316 // the physical connection as well as the Transport. An EOF on the remote end 00317 // will cancel the partially received message. 00318 this->close_connection (); 00319 } 00320 00321 return result; 00322 }
int TAO_Transport::send_message_shared_i | ( | TAO_Stub * | stub, | |
TAO_Message_Semantics | message_semantics, | |||
const ACE_Message_Block * | message_block, | |||
ACE_Time_Value * | max_wait_time | |||
) | [protected] |
Implement send_message_shared() assuming the handler_lock_ is held.
Definition at line 1397 of file Transport.cpp.
01401 { 01402 int ret = 0; 01403 01404 #if TAO_HAS_TRANSPORT_CURRENT == 1 01405 size_t const message_length = message_block->length (); 01406 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 01407 01408 switch (message_semantics.type_) 01409 { 01410 case TAO_Message_Semantics::TAO_TWOWAY_REQUEST: 01411 ret = this->send_synchronous_message_i (message_block, max_wait_time); 01412 break; 01413 01414 case TAO_Message_Semantics::TAO_REPLY: 01415 ret = this->send_reply_message_i (message_block, max_wait_time); 01416 break; 01417 01418 case TAO_Message_Semantics::TAO_ONEWAY_REQUEST: 01419 ret = this->send_asynchronous_message_i (stub, 01420 message_block, 01421 max_wait_time); 01422 break; 01423 } 01424 01425 #if TAO_HAS_TRANSPORT_CURRENT == 1 01426 // "Count" the message, only if no error was encountered. 01427 if (ret != -1 && this->stats_ != 0) 01428 this->stats_->messages_sent (message_length); 01429 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 01430 01431 return ret; 01432 }
int TAO_Transport::send_reply_message_i | ( | const ACE_Message_Block * | message_block, | |
ACE_Time_Value * | max_wait_time | |||
) | [private] |
Send a reply message, i.e. do not block until the message is on the wire, but just return after adding them to the queue.
Definition at line 795 of file Transport.cpp.
00797 { 00798 // Don't clone now.. We could be sent in one shot! 00799 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); 00800 00801 synch_message.push_back (this->head_, this->tail_); 00802 00803 int const n = 00804 this->send_synch_message_helper_i (synch_message, max_wait_time); 00805 00806 // What about partially sent messages. 00807 if (n == -1 || n == 1) 00808 { 00809 return n; 00810 } 00811 00812 if (TAO_debug_level > 3) 00813 { 00814 TAOLIB_DEBUG ((LM_DEBUG, 00815 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ") 00816 ACE_TEXT ("preparing to add to queue before leaving\n"), 00817 this->id ())); 00818 } 00819 00820 // Till this point we shouldn't have any copying and that is the 00821 // point anyway. Now, remove the node from the list 00822 synch_message.remove_from_list (this->head_, this->tail_); 00823 00824 // Clone the node that we have. 00825 TAO_Queued_Message *msg = 00826 synch_message.clone (this->orb_core_->transport_message_buffer_allocator ()); 00827 00828 // Stick it in the queue 00829 msg->push_back (this->head_, this->tail_); 00830 00831 TAO_Flushing_Strategy *flushing_strategy = 00832 this->orb_core ()->flushing_strategy (); 00833 00834 int const result = flushing_strategy->schedule_output (this); 00835 00836 if (result == -1) 00837 { 00838 if (TAO_debug_level > 5) 00839 { 00840 TAOLIB_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_" 00841 "message_i, dequeuing msg due to schedule_output " 00842 "failure\n", this->id ())); 00843 } 00844 msg->remove_from_list (this->head_, this->tail_); 00845 msg->destroy (); 00846 } 00847 else if (result == TAO_Flushing_Strategy::MUST_FLUSH) 00848 { 00849 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 00850 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 00851 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 00852 (void) flushing_strategy->flush_transport (this, 0); 00853 } 00854 00855 return 1; 00856 }
virtual int TAO_Transport::send_request | ( | TAO_Stub * | stub, | |
TAO_ORB_Core * | orb_core, | |||
TAO_OutputCDR & | stream, | |||
TAO_Message_Semantics | message_semantics, | |||
ACE_Time_Value * | max_time_wait | |||
) | [pure virtual] |
Prepare the waiting and demuxing strategy to receive a reply for a new request. Preparing the ORB to receive the reply only once the request is completely sent opens the system to some subtle race conditions: suppose the ORB is running in a multi-threaded configuration, thread A makes a request while thread B is using the Reactor to process all incoming requests. Thread A could be implemented as follows: 1) send the request 2) setup the ORB to receive the reply 3) wait for the request
but in this case thread B may receive the reply between step (1) and (2), and drop it as an invalid or unexpected message. Consequently the correct implementation is: 1) setup the ORB to receive the reply 2) send the request 3) wait for the reply
The following method encapsulates this idiom.
int TAO_Transport::send_synch_message_helper_i | ( | TAO_Synch_Queued_Message & | s, | |
ACE_Time_Value * | max_wait_time | |||
) | [private] |
A helper method used by send_synchronous_message_i() and send_reply_message_i(). Reusable code that could be used by both the methods.
Definition at line 859 of file Transport.cpp.
00861 { 00862 TAO::Transport::Drain_Constraints dc( 00863 max_wait_time, this->using_blocking_io_for_synch_messages()); 00864 00865 Drain_Result const n = this->drain_queue_i (dc); 00866 00867 if (n == DR_ERROR) 00868 { 00869 synch_message.remove_from_list (this->head_, this->tail_); 00870 return -1; // Error while sending... 00871 } 00872 else if (n == DR_QUEUE_EMPTY) 00873 { 00874 return 1; // Empty queue, message was sent.. 00875 } 00876 00877 if (synch_message.all_data_sent ()) 00878 { 00879 return 1; 00880 } 00881 00882 return 0; 00883 }
int TAO_Transport::send_synchronous_message_i | ( | const ACE_Message_Block * | message_block, | |
ACE_Time_Value * | max_wait_time | |||
) | [private] |
Send a synchronous message, i.e. block until the message is on the wire
Definition at line 700 of file Transport.cpp.
00702 { 00703 // We are going to block, so there is no need to clone 00704 // the message block. 00705 size_t const total_length = mb->total_length (); 00706 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); 00707 00708 synch_message.push_back (this->head_, this->tail_); 00709 00710 int const result = this->send_synch_message_helper_i (synch_message, 00711 max_wait_time); 00712 if (result == -1 && errno == ETIME) 00713 { 00714 if (total_length == synch_message.message_length ()) //none was sent 00715 { 00716 if (TAO_debug_level > 2) 00717 { 00718 TAOLIB_DEBUG ((LM_DEBUG, 00719 ACE_TEXT ("TAO (%P|%t) - ") 00720 ACE_TEXT ("Transport[%d]::send_synchronous_message_i, ") 00721 ACE_TEXT ("timeout encountered before any bytes sent\n"), 00722 this->id ())); 00723 } 00724 throw ::CORBA::TIMEOUT ( 00725 CORBA::SystemException::_tao_minor_code ( 00726 TAO_TIMEOUT_SEND_MINOR_CODE, 00727 ETIME), 00728 CORBA::COMPLETED_NO); 00729 } 00730 else 00731 { 00732 return -1; 00733 } 00734 } 00735 else if(result == -1 || result == 1) 00736 { 00737 return result; 00738 } 00739 00740 TAO_Flushing_Strategy *flushing_strategy = 00741 this->orb_core ()->flushing_strategy (); 00742 if (flushing_strategy->schedule_output (this) == -1) 00743 { 00744 synch_message.remove_from_list (this->head_, this->tail_); 00745 if (TAO_debug_level > 0) 00746 { 00747 TAOLIB_ERROR ((LM_ERROR, 00748 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") 00749 ACE_TEXT ("send_synchronous_message_i, ") 00750 ACE_TEXT ("error while scheduling flush - %m\n"), 00751 this->id ())); 00752 } 00753 return -1; 00754 } 00755 00756 // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH, 00757 // because we're always going to flush anyway. 00758 00759 // Release the mutex, other threads may modify the queue as we 00760 // block for a long time writing out data. 00761 int flush_result; 00762 { 00763 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 00764 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 00765 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 00766 00767 flush_result = flushing_strategy->flush_message (this, 00768 &synch_message, 00769 max_wait_time); 00770 } 00771 00772 if (flush_result == -1) 00773 { 00774 synch_message.remove_from_list (this->head_, this->tail_); 00775 00776 // We don't need to do anything special for the timeout case. 00777 // The connection is going to get closed and the Transport destroyed. 00778 // The only thing to do maybe is to empty the queue. 00779 00780 if (TAO_debug_level > 0) 00781 { 00782 TAOLIB_ERROR ((LM_ERROR, 00783 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ") 00784 ACE_TEXT ("error while sending message - %m\n"), 00785 this->id ())); 00786 } 00787 00788 return -1; 00789 } 00790 00791 return 1; 00792 }
size_t TAO_Transport::sent_byte_count | ( | void | ) | const |
Accessor to sent_byte_count_.
Definition at line 200 of file Transport.inl.
00201 { 00202 return this->sent_byte_count_; 00203 }
void TAO_Transport::set_bidir_context_info | ( | TAO_Operation_Details & | opdetails | ) | [virtual] |
These classes need privileged access to:
Definition at line 2889 of file Transport.cpp.
TAO::Transport::Stats * TAO_Transport::stats | ( | void | ) | const |
Transport statistics.
Definition at line 208 of file Transport.inl.
00209 { 00210 return this->stats_; 00211 }
CORBA::ULong TAO_Transport::tag | ( | void | ) | const |
Return the protocol tag.
The OMG assigns unique tags (a 32-bit unsigned number) to each protocol. New protocol tags can be obtained free of charge from the OMG, check the documents in corbafwd.h for more details.
Definition at line 11 of file Transport.inl.
00012 { 00013 return this->tag_; 00014 }
int TAO_Transport::tear_listen_point_list | ( | TAO_InputCDR & | cdr | ) | [virtual] |
Extracts the list of listen points from the cdr stream. The list would have the protocol specific details of the ListenPoints
Definition at line 291 of file Transport.cpp.
TAO_Transport_Mux_Strategy * TAO_Transport::tms | ( | void | ) | const |
Get the TAO_Tranport_Mux_Strategy used by this object.
The role of the TAO_Transport_Mux_Strategy is described in more detail in that class' documentation. Enough is to say that the class is used to control how many threads can have pending requests over the same connection. Multiplexing multiple threads over the same connection conserves resources and is almost required for AMI, but having only one pending request per connection is more efficient and reduces the possibilities of priority inversions.
Definition at line 23 of file Transport.inl.
00024 { 00025 return tms_; 00026 }
TAO::Transport_Cache_Manager & TAO_Transport::transport_cache_manager | ( | void | ) | [private] |
Helper method that returns the Transport Cache Manager.
Definition at line 2706 of file Transport.cpp.
02707 { 02708 return this->orb_core_->lane_resources ().transport_cache (); 02709 }
int TAO_Transport::update_transport | ( | void | ) |
Cache management.
Definition at line 590 of file Transport.cpp.
00591 { 00592 return this->transport_cache_manager ().update_entry (this->cache_map_entry_); 00593 }
bool TAO_Transport::using_blocking_io_for_asynch_messages | ( | void | ) | const [private] |
Return true if blocking I/O should be used for sending asynchronous (AMI calls, non-blocking oneways, responses to operations, etc.) messages. This is determined based on the current flushing strategy.
Definition at line 2919 of file Transport.cpp.
bool TAO_Transport::using_blocking_io_for_synch_messages | ( | void | ) | const [private] |
Return true if blocking I/O should be used for sending synchronous (two-way, reliable oneways, etc.) messages. This is determined based on the current flushing and waiting strategies.
Definition at line 2909 of file Transport.cpp.
02910 { 02911 if (this->wait_strategy()->can_process_upcalls()) 02912 { 02913 return false; 02914 } 02915 return true; 02916 }
TAO_Wait_Strategy * TAO_Transport::wait_strategy | ( | void | ) | const |
Return the TAO_Wait_Strategy used by this object.
The role of the TAO_Wait_Strategy is described in more detail in that class' documentation. Enough is to say that the ORB can wait for a reply blocking on read(), using the Reactor to wait for multiple events concurrently or using the Leader/Followers protocol.
Definition at line 30 of file Transport.inl.
00031 { 00032 return this->ws_; 00033 }
void TAO_Transport::wchar_translator | ( | TAO_Codeset_Translator_Base * | tf | ) |
CodeSet negotiation - Set the wchar codeset translator factory.
Definition at line 155 of file Transport.inl.
00156 { 00157 this->wchar_translator_ = tf; 00158 this->tcs_set_ = 1; 00159 }
TAO_Codeset_Translator_Base * TAO_Transport::wchar_translator | ( | void | ) | const |
CodeSet Negotiation - Get the wchar codeset translator factory.
Definition at line 142 of file Transport.inl.
00143 { 00144 return this->wchar_translator_; 00145 }
friend class TAO_Leader_Follower_Flushing_Strategy [friend] |
These classes need privileged access to:
Definition at line 951 of file Transport.h.
friend class TAO_Reactive_Flushing_Strategy [friend] |
These classes need privileged access to:
Definition at line 950 of file Transport.h.
friend class TAO_Thread_Per_Connection_Handler [friend] |
Needs priveleged access to event_handler_i ()
Definition at line 955 of file Transport.h.
int TAO_Transport::bidirectional_flag_ [protected] |
Use to check if bidirectional info has been synchronized with the peer. Have we sent any info on bidirectional information or have we received any info regarding making the connection served by this transport bidirectional. The flag is used as follows: + We dont want to send the bidirectional context info more than once on the connection. Why? Waste of marshalling and demarshalling time on the client. + On the server side -- once a client that has established the connection asks the server to use the connection both ways, we *dont* want the server to pack service info to the client. That is not allowed. We need a flag to prevent such a things from happening.
The value of this flag will be 0 if the client sends info and 1 if the server receives the info.
Definition at line 1127 of file Transport.h.
Our entry in the cache. We don't own this. It is here for our convenience. We cannot just change things around.
Definition at line 1099 of file Transport.h.
Additional member values required to support codeset translation.
@Phil, I think it would be nice if we could think of a way to do the following. We have been trying to use the transport for marking about translator factories and such! IMHO this is a wrong encapulation ie. trying to populate the transport object with these details. We should probably have a class something like TAO_Message_Property or TAO_Message_Translator or whatever (I am sure you get the idea) and encapsulate all these details. Coupling these seems odd. if I have to be more cynical we can move this to the connection_handler and it may more sense with the DSCP stuff around there. Do you agree?
Definition at line 1211 of file Transport.h.
bool TAO_Transport::connection_closed_on_read_ [protected] |
Track if connection was seen as closed during a read so that invocation can optionally be retried using a different profile. Note that this could result in violate the "at most once" CORBA semantics.
Definition at line 1191 of file Transport.h.
ACE_Time_Value TAO_Transport::current_deadline_ [protected] |
The queue will start draining no later than <queeing_deadline_> if* the deadline is
Definition at line 1144 of file Transport.h.
bool TAO_Transport::first_request_ [private] |
First_request_ is true until the first request is sent or received. This is necessary since codeset context information is necessary only on the first request. After that, the translators are fixed for the life of the connection.
Definition at line 1223 of file Transport.h.
bool TAO_Transport::flush_in_post_open_ [private] |
Indicate that flushing needs to be done in post_open().
Definition at line 1243 of file Transport.h.
long TAO_Transport::flush_timer_id_ [protected] |
The timer ID.
Definition at line 1147 of file Transport.h.
ACE_Lock* TAO_Transport::handler_lock_ [mutable, protected] |
Lock that insures that activities that *might* use handler-related resources (such as a connection handler) get serialized. This is an ACE_Lock
that gets initialized from TAO_ORB_Core::resource_factory()->create_cached_connection_lock()
. This way, one can use a lock appropriate for the type of system, i.e., a null lock for single-threaded systems, and a real lock for multi-threaded systems.
Definition at line 1161 of file Transport.h.
TAO_Queued_Message* TAO_Transport::head_ [protected] |
Implement the outgoing data queue.
Definition at line 1132 of file Transport.h.
size_t TAO_Transport::id_ [protected] |
A unique identifier for the transport.
This never *never* changes over the lifespan, so we don't have to worry about locking it.
HINT: Protocol-specific transports that use connection handler might choose to set this to the handle for their connection.
Definition at line 1171 of file Transport.h.
Queue of the consolidated, incoming messages..
Definition at line 1136 of file Transport.h.
TAO::Incoming_Message_Stack TAO_Transport::incoming_message_stack_ [protected] |
Stack of incoming fragments, consolidated messages are going to be enqueued in "incoming_message_queue_"
Definition at line 1140 of file Transport.h.
bool TAO_Transport::is_connected_ [protected] |
Is this transport really connected or not. In case of oneways with SYNC_NONE Policy we don't wait until the connection is ready and we buffer the requests in this transport until the connection is ready
Definition at line 1185 of file Transport.h.
Our messaging object.
Definition at line 1196 of file Transport.h.
These classes need privileged access to:
Definition at line 1129 of file Transport.h.
TAO_ORB_Core* const TAO_Transport::orb_core_ [protected] |
Global orbcore resource.
Definition at line 1095 of file Transport.h.
TAO_SYNCH_MUTEX TAO_Transport::output_cdr_mutex_ [mutable, private] |
lock for synchronizing Transport OutputCDR access
Definition at line 1246 of file Transport.h.
ACE_Message_Block* TAO_Transport::partial_message_ [private] |
Holds the partial GIOP message (if there is one).
Definition at line 1226 of file Transport.h.
unsigned long TAO_Transport::purging_order_ [protected] |
Used by the LRU, LFU and FIFO Connection Purging Strategies.
Definition at line 1174 of file Transport.h.
size_t TAO_Transport::recv_buffer_size_ [protected] |
Size of the buffer received.
Definition at line 1177 of file Transport.h.
size_t TAO_Transport::sent_byte_count_ [protected] |
Number of bytes sent.
Definition at line 1180 of file Transport.h.
TAO::Transport::Stats* TAO_Transport::stats_ [private] |
Statistics.
Definition at line 1239 of file Transport.h.
CORBA::ULong const TAO_Transport::tag_ [protected] |
IOP protocol tag.
Definition at line 1092 of file Transport.h.
TAO_Queued_Message* TAO_Transport::tail_ [protected] |
These classes need privileged access to:
Definition at line 1133 of file Transport.h.
CORBA::Boolean TAO_Transport::tcs_set_ [private] |
The tcs_set_ flag indicates that negotiation has occurred and so the translators are correct, since a null translator is valid if both ends are using the same codeset, whatever that codeset might be.
Definition at line 1217 of file Transport.h.
TAO_Transport_Mux_Strategy* TAO_Transport::tms_ [protected] |
Strategy to decide whether multiple requests can be sent over the same connection or the connection is exclusive for a request.
Definition at line 1103 of file Transport.h.
TAO_Transport_Timer TAO_Transport::transport_timer_ [protected] |
The adapter used to receive timeout callbacks from the Reactor.
Definition at line 1150 of file Transport.h.
These classes need privileged access to:
Definition at line 1212 of file Transport.h.
TAO_Wait_Strategy* TAO_Transport::ws_ [protected] |
Strategy for waiting for the reply after sending the request.
Definition at line 1106 of file Transport.h.