TAO_Transport Class Reference

Generic definitions for the Transport class. More...

#include <Transport.h>

Collaboration diagram for TAO_Transport:
Collaboration graph
[legend]

List of all members.

Classes

struct  Drain_Result

Public Types

enum  Drain_Result_Enum { DR_ERROR = -1, DR_OK = 0, DR_QUEUE_EMPTY = 1, DR_WOULDBLOCK = 2 }

Public Member Functions

 TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core, size_t input_cdr_size=ACE_CDR::DEFAULT_BUFSIZE)
 Default creator, requires the tag value be supplied.
virtual ~TAO_Transport (void)
 Destructor.
CORBA::ULong tag (void) const
 Return the protocol tag.
TAO_ORB_Coreorb_core (void) const
 Access the ORB that owns this connection.
TAO_Transport_Mux_Strategytms (void) const
 Get the TAO_Tranport_Mux_Strategy used by this object.
TAO_Wait_Strategywait_strategy (void) const
 Return the TAO_Wait_Strategy used by this object.
Drain_Result handle_output (TAO::Transport::Drain_Constraints const &c)
 Callback method to reactively drain the outgoing data queue.
int bidirectional_flag (void) const
 Get the bidirectional flag.
void bidirectional_flag (int flag)
 Set the bidirectional flag.
void cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry)
 Set the Cache Map entry.
TAO::Transport_Cache_Manager::HASH_MAP_ENTRYcache_map_entry (void)
 Get the Cache Map entry.
size_t id (void) const
 Set and Get the identifier for this transport instance.
void id (size_t id)
TAO::Connection_Role opened_as (void) const
void opened_as (TAO::Connection_Role)
unsigned long purging_order (void) const
void purging_order (unsigned long value)
bool queue_is_empty (void)
 Check if there are messages pending in the queue.
bool register_if_necessary (void)
 Register with the reactor via the wait strategy.
void provide_handler (TAO::Connection_Handler_Set &handlers)
 Added event handler to the handlers set.
bool provide_blockable_handler (TAO::Connection_Handler_Set &handlers)
virtual int register_handler (void)
 Register the handler with the reactor.
virtual int remove_handler (void)
 Remove the handler from the reactor.
virtual ssize_t send (iovec *iov, int iovcnt, size_t &bytes_transferred, ACE_Time_Value const *timeout)=0
 Write the complete Message_Block chain to the connection.
virtual ssize_t recv (char *buffer, size_t len, const ACE_Time_Value *timeout=0)=0
 Read len bytes from into buf.
Control connection lifecycle

These methods are routed through the TMS object. The TMS strategies implement them correctly.



bool idle_after_send (void)
bool idle_after_reply (void)
virtual void close_connection (void)
 Call the implementation method after obtaining the lock.

Template methods

The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below.



class TAO_Reactive_Flushing_Strategy
class TAO_Leader_Follower_Flushing_Strategy
class TAO_Thread_Per_Connection_Handler
CORBA::ULong const tag_
 IOP protocol tag.
TAO_ORB_Core *const orb_core_
 Global orbcore resource.
TAO::Transport_Cache_Manager::HASH_MAP_ENTRYcache_map_entry_
TAO_Transport_Mux_Strategytms_
TAO_Wait_Strategyws_
 Strategy for waiting for the reply after sending the request.
int bidirectional_flag_
TAO::Connection_Role opening_connection_role_
TAO_Queued_Messagehead_
 Implement the outgoing data queue.
TAO_Queued_Messagetail_
TAO_Incoming_Message_Queue incoming_message_queue_
 Queue of the consolidated, incoming messages..
TAO::Incoming_Message_Stack incoming_message_stack_
ACE_Time_Value current_deadline_
long flush_timer_id_
 The timer ID.
TAO_Transport_Timer transport_timer_
 The adapter used to receive timeout callbacks from the Reactor.
ACE_Lockhandler_lock_
size_t id_
 A unique identifier for the transport.
unsigned long purging_order_
 Used by the LRU, LFU and FIFO Connection Purging Strategies.
size_t recv_buffer_size_
 Size of the buffer received.
size_t sent_byte_count_
 Number of bytes sent.
bool is_connected_
bool connection_closed_on_read_
TAO_GIOP_Message_Basemessaging_object_
 Our messaging object.
TAO_Codeset_Translator_Basechar_translator_
 Additional member values required to support codeset translation.
TAO_Codeset_Translator_Basewchar_translator_
CORBA::Boolean tcs_set_
bool first_request_
ACE_Message_Blockpartial_message_
 Holds the partial GIOP message (if there is one).
TAO::Transport::Stats * stats_
 Statistics.
bool flush_in_post_open_
 Indicate that flushing needs to be done in post_open().
TAO_SYNCH_MUTEX output_cdr_mutex_
 lock for synchronizing Transport OutputCDR access
void messaging_init (TAO_GIOP_Message_Version const &version)
virtual int tear_listen_point_list (TAO_InputCDR &cdr)
virtual bool post_connect_hook (void)
 Hooks that can be overridden in concrete transports.
ACE_Event_Handler::Reference_Count add_reference (void)
 Memory management routines.
ACE_Event_Handler::Reference_Count remove_reference (void)
TAO_GIOP_Message_Basemessaging_object (void)
virtual ACE_Event_Handlerevent_handler_i (void)=0
bool is_connected (void) const
 Is this transport really connected.
bool connection_closed_on_read (void) const
 Was a connection seen as closed during a read.
bool post_open (size_t id)
 Perform all the actions when this transport get opened.
void pre_close (void)
 do what needs to be done when closing the transport
TAO_Connection_Handlerconnection_handler (void)
 Get the connection handler for this transport.
TAO_OutputCDRout_stream (void)
 Accessor for the output CDR stream.
TAO_SYNCH_MUTEX & output_cdr_lock (void)
 Accessor for synchronizing Transport OutputCDR access.
bool can_be_purged (void)
 Can the transport be purged?
virtual void set_bidir_context_info (TAO_Operation_Details &opdetails)
int generate_locate_request (TAO_Target_Specification &spec, TAO_Operation_Details &opdetails, TAO_OutputCDR &output)
virtual int generate_request_header (TAO_Operation_Details &opd, TAO_Target_Specification &spec, TAO_OutputCDR &msg)
int recache_transport (TAO_Transport_Descriptor_Interface *desc)
 Recache ourselves in the cache.
virtual int handle_input (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time=0)
 Callback to read incoming data.
virtual int send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, TAO_Message_Semantics message_semantics, ACE_Time_Value *max_time_wait)=0
virtual int send_message (TAO_OutputCDR &stream, TAO_Stub *stub=0, TAO_ServerRequest *request=0, TAO_Message_Semantics message_semantics=TAO_Message_Semantics(), ACE_Time_Value *max_time_wait=0)=0
virtual int send_message_shared (TAO_Stub *stub, TAO_Message_Semantics message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
 Sent the contents of message_block.
int format_queue_message (TAO_OutputCDR &stream, ACE_Time_Value *max_wait_time, TAO_Stub *stub)
int send_message_block_chain (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time=0)
int send_message_block_chain_i (const ACE_Message_Block *message_block, size_t &bytes_transferred, TAO::Transport::Drain_Constraints const &dc)
 Send a message block chain, assuming the lock is held.
int purge_entry (void)
 Cache management.
int make_idle (void)
 Cache management.
int update_transport (void)
 Cache management.
int handle_timeout (const ACE_Time_Value &current_time, const void *act)
size_t recv_buffer_size (void) const
 Accessor to recv_buffer_size_.
size_t sent_byte_count (void) const
 Accessor to sent_byte_count_.
TAO_Codeset_Translator_Basechar_translator (void) const
 CodeSet Negotiation - Get the char codeset translator factory.
TAO_Codeset_Translator_Basewchar_translator (void) const
 CodeSet Negotiation - Get the wchar codeset translator factory.
void char_translator (TAO_Codeset_Translator_Base *)
 CodeSet negotiation - Set the char codeset translator factory.
void wchar_translator (TAO_Codeset_Translator_Base *)
 CodeSet negotiation - Set the wchar codeset translator factory.
void assign_translators (TAO_InputCDR *, TAO_OutputCDR *)
void clear_translators (TAO_InputCDR *, TAO_OutputCDR *)
CORBA::Boolean is_tcs_set () const
 Return true if the tcs has been set.
void first_request_sent (bool flag=false)
 Set the state of the first_request_ to flag.
bool first_request () const
 Get the first request flag.
void send_connection_closed_notifications (void)
TAO::Transport::Stats * stats (void) const
 Transport statistics.
virtual TAO_Connection_Handlerconnection_handler_i (void)=0
int process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh)
int send_message_shared_i (TAO_Stub *stub, TAO_Message_Semantics message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int queue_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time, bool back=true)
ACE_Time_Value const * io_timeout (TAO::Transport::Drain_Constraints const &dc) const
 Re-factor computation of I/O timeouts based on operation timeouts. Depending on the wait strategy, we need to timeout I/O operations or not. For example, if we are using a non-blocking strategy, we want to pass 0 to all I/O operations, and rely on the ACE_NONBLOCK settings on the underlying sockets. However, for blocking strategies we want to pass the operation timeouts, to respect the application level policies.
int notify_reactor_now (void)
TAO::Transport_Cache_Managertransport_cache_manager (void)
 Helper method that returns the Transport Cache Manager.
Drain_Result drain_queue (TAO::Transport::Drain_Constraints const &dc)
 Send some of the data in the queue.
Drain_Result drain_queue_i (TAO::Transport::Drain_Constraints const &dc)
 Implement drain_queue() assuming the lock is held.
bool queue_is_empty_i (void) const
 Check if there are messages pending in the queue.
Drain_Result drain_queue_helper (int &iovcnt, iovec iov[], TAO::Transport::Drain_Constraints const &dc)
 A helper routine used in drain_queue_i().
int schedule_output_i (void)
 Schedule handle_output() callbacks.
int cancel_output_i (void)
 Cancel handle_output() callbacks.
void cleanup_queue (size_t byte_count)
 Cleanup the queue.
void cleanup_queue_i ()
 Cleanup the complete queue.
bool check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush)
 Check if the buffering constraints have been reached.
int send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int send_reply_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int send_asynchronous_message_i (TAO_Stub *stub, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int send_synch_message_helper_i (TAO_Synch_Queued_Message &s, ACE_Time_Value *max_wait_time)
int flush_timer_pending (void) const
 Check if the flush timer is still pending.
void reset_flush_timer (void)
void report_invalid_event_handler (const char *caller)
 Print out error messages if the event handler is not valid.
int handle_input_missing_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time, TAO_Queued_Data *q_data)
int handle_input_parse_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time)
int handle_input_parse_extra_messages (ACE_Message_Block &message_block)
int consolidate_enqueue_message (TAO_Queued_Data *qd)
int consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh)
int process_queue_head (TAO_Resume_Handle &rh)
int notify_reactor (void)
void send_connection_closed_notifications_i (void)
 Assume the lock is held.
void allocate_partial_message_block (void)
bool using_blocking_io_for_synch_messages () const
bool using_blocking_io_for_asynch_messages () const

Detailed Description

Generic definitions for the Transport class.

The transport object is created in the Service handler constructor and deleted in the Service Handler's destructor!!

The main responsibility of a Transport object is to encapsulate a connection, and provide a transport independent way to send and receive data. Since TAO is heavily based on the Reactor for all if not all its I/O the Transport class is usually implemented with a helper Connection Handler that adapts the generic Transport interface to the Reactor types.

The outgoing data path:

One of the responsibilities of the TAO_Transport class is to send out GIOP messages as efficiently as possible. In most cases messages are put out in FIFO order, the transport object will put out the message using a single system call and return control to the application. However, for oneways and AMI requests it may be more efficient (or required if the SYNC_NONE policy is in effect) to queue the messages until a large enough data set is available. Another reason to queue is that some applications cannot block for I/O, yet they want to send messages so large that a single write() operation would not be able to cope with them. In such cases we need to queue the data and use the Reactor to drain the queue.

Therefore, the Transport class may need to use a queue to temporarily hold the messages, and, in some configurations, it may need to use the Reactor to concurrently drain such queues.

Out of order messages:

TAO provides explicit policies to send 'urgent' messages. Such messages may put at the head of the queue. However, they cannot be sent immediately because the transport may already be sending another message in a reactive fashion.

Consequently, the Transport must also know if the head of the queue has been partially sent. In that case new messages can only follow the head. Only once the head is completely sent we can start sending new messages.

Waiting threads:

One or more threads can be blocked waiting for the connection to completely send the message. The thread should return as soon as its message has been sent, so a per-thread condition is required. This suggest that simply using a ACE_Message_Queue would not be enough: there is a significant amount of ancillary information, to keep on each message that the Message_Block class does not provide room for.

Blocking I/O is still attractive for some applications. First, my eliminating the Reactor overhead performance is improved when sending large blocks of data. Second, using the Reactor to send out data opens the door for nested upcalls, yet some applications cannot deal with the reentrancy issues in this case.

Timeouts:

Some or all messages could have a timeout period attached to them. The timeout source could either be some high-level policy or maybe some strategy to prevent denial of service attacks. In any case the timeouts are per-message, and later messages could have shorter timeouts. In fact, some kind of scheduling (such as EDF) could be required in a few applications.

Conclusions:

The outgoing data path consist in several components:

The Transport object provides a single method to send request messages (send_request_message ()).

The incoming data path:

One of the main responsibilities of the transport is to read and process the incoming GIOP message as quickly and efficiently as possible. There are other forces that needs to be given due consideration. They are

Parsing messages (GIOP) & processing the message:

The messages should be checked for validity and the right information should be sent to the higher layer for processing. The process of doing a sanity check and preparing the messages for the higher layers of the ORB are done by the messaging protocol.

Design forces and Challenges

To keep things as efficient as possible for medium sized requests, it would be good to minimize data copying and locking along the incoming path ie. from the time of reading the data from the handle to the application. We achieve this by creating a buffer on stack and reading the data from the handle into the buffer. We then pass the same data block (the buffer is encapsulated into a data block) to the higher layers of the ORB. The problems stem from the following (a) Data is bigger than the buffer that we have on stack (b) Transports like TCP do not guarantee availability of the whole chunk of data in one shot. Data could trickle in byte by byte. (c) Single read gives multiple messages

We solve the problems as follows

(a) First do a read with the buffer on stack. Query the underlying messaging object whether the message has any incomplete portion. If so, data will be copied into new buffer being able to hold full message and is queued; succeeding events will read data from socket and write directly into this buffer. Otherwise, if if the message in local buffer is complete, we free the handle and then send the message to the higher layers of the ORB for processing.

(b) If buffer with incomplete message has been enqueued, while trying to do the above, the reactor will call us back when the handle becomes read ready. The read-operation will copy data directly into the enqueued buffer. If the message has bee read completely the message is sent to the higher layers of the ORB for processing.

(c) If we get multiple messages (possible if the client connected to the server sends oneways or AMI requests), we parse and split the messages. Every message is put in the queue. Once the messages are queued, the thread picks up one message to send to the higher layers of the ORB. Before doing that, if it finds more messages, it sends a notify to the reactor without resuming the handle. The next thread picks up a message from the queue and processes that. Once the queue is drained the last thread resumes the handle.

Sending Replies

We could use the outgoing path of the ORB to send replies. This would allow us to reuse most of the code in the outgoing data path. We were doing this till TAO-1.2.3. We run in to problems. When writing the reply the ORB gets flow controlled, and the ORB tries to flush the message by going into the reactor. This resulted in unnecessary nesting. The thread that gets into the Reactor could potentially handle other messages (incoming or outgoing) and the stack starts growing leading to crashes.

Solution to the nesting problem

The solution that we (plan to) adopt is pretty straight forward. The thread sending replies will not block to send the replies but queue the replies and return to the Reactor. (Note the careful usages of the terms "blocking in the Reactor" as opposed to "return back to the Reactor".

See Also:

http://htmlpreview.github.com/?https://github.com/DOCGroup/ACE_TAO/blob/master/TAO/docs/pluggable_protocols/index.html

Definition at line 320 of file Transport.h.


Member Enumeration Documentation

Enumerator:
DR_ERROR 
DR_OK 
DR_QUEUE_EMPTY 
DR_WOULDBLOCK 

Definition at line 366 of file Transport.h.

00367     {
00368       DR_ERROR = -1,
00369       DR_OK = 0,
00370       DR_QUEUE_EMPTY = 1, // used internally, not returned from drain_queue()
00371       DR_WOULDBLOCK = 2
00372     };


Constructor & Destructor Documentation

TAO_Transport::TAO_Transport ( CORBA::ULong  tag,
TAO_ORB_Core orb_core,
size_t  input_cdr_size = ACE_CDR::DEFAULT_BUFSIZE 
)

Default creator, requires the tag value be supplied.

Definition at line 123 of file Transport.cpp.

00126   : tag_ (tag)
00127   , orb_core_ (orb_core)
00128   , cache_map_entry_ (0)
00129   , tms_ (0)
00130   , ws_ (0)
00131   , bidirectional_flag_ (-1)
00132   , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE)
00133   , head_ (0)
00134   , tail_ (0)
00135   , incoming_message_queue_ (orb_core)
00136   , current_deadline_ (ACE_Time_Value::zero)
00137   , flush_timer_id_ (-1)
00138   , transport_timer_ (this)
00139   , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ())
00140   , id_ ((size_t) this)
00141   , purging_order_ (0)
00142   , recv_buffer_size_ (0)
00143   , sent_byte_count_ (0)
00144   , is_connected_ (false)
00145   , connection_closed_on_read_ (false)
00146   , messaging_object_ (0)
00147   , char_translator_ (0)
00148   , wchar_translator_ (0)
00149   , tcs_set_ (0)
00150   , first_request_ (true)
00151   , partial_message_ (0)
00152 #if TAO_HAS_SENDFILE == 1
00153     // The ORB has been configured to use the MMAP allocator, meaning
00154     // we could/should use sendfile() to send data.  Cast once rather
00155     // here rather than during each send.  This assumes that all
00156     // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator
00157     // instance as the underlying output CDR buffer allocator.
00158   , mmap_allocator_ (
00159       dynamic_cast<TAO_MMAP_Allocator *> (
00160         orb_core->output_cdr_buffer_allocator ()))
00161 #endif  /* TAO_HAS_SENDFILE==1 */
00162 #if TAO_HAS_TRANSPORT_CURRENT == 1
00163   , stats_ (0)
00164 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00165   , flush_in_post_open_ (false)
00166 {
00167   ACE_NEW (this->messaging_object_,
00168             TAO_GIOP_Message_Base (orb_core,
00169                                    this,
00170                                    input_cdr_size));
00171 
00172   TAO_Client_Strategy_Factory *cf =
00173     this->orb_core_->client_factory ();
00174 
00175   // Create WS now.
00176   this->ws_ = cf->create_wait_strategy (this);
00177 
00178   // Create TMS now.
00179   this->tms_ = cf->create_transport_mux_strategy (this);
00180 
00181 #if TAO_HAS_TRANSPORT_CURRENT == 1
00182   // Allocate stats
00183   ACE_NEW_THROW_EX (this->stats_,
00184                     TAO::Transport::Stats,
00185                     CORBA::NO_MEMORY ());
00186 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00187 
00188   /*
00189    * Hook to add code that initializes components that
00190    * belong to the concrete protocol implementation.
00191    * Further additions to this Transport class will
00192    * need to add code *before* this hook.
00193    */
00194   //@@ TAO_TRANSPORT_SPL_CONSTRUCTOR_ADD_HOOK
00195 }

TAO_Transport::~TAO_Transport ( void   )  [virtual]

Destructor.

Definition at line 197 of file Transport.cpp.

00198 {
00199   if (TAO_debug_level > 9)
00200     {
00201       TAOLIB_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::~Transport\n"),
00202                   this->id_
00203                   ));
00204     }
00205 
00206   delete this->messaging_object_;
00207 
00208   delete this->ws_;
00209 
00210   delete this->tms_;
00211 
00212   delete this->handler_lock_;
00213 
00214   if (!this->is_connected_)
00215     {
00216       // When we have a not connected transport we could have buffered
00217       // messages on this transport which we have to cleanup now.
00218       this->cleanup_queue_i();
00219     }
00220 
00221   // Release the partial message block, however we may
00222   // have never allocated one.
00223   ACE_Message_Block::release (this->partial_message_);
00224 
00225   // By the time the destructor is reached here all the connection stuff
00226   // *must* have been cleaned up.
00227 
00228   // The following assert is needed for the test "Bug_2494_Regression".
00229   // See the bugzilla bug #2494 for details.
00230   ACE_ASSERT (this->queue_is_empty_i ());
00231   ACE_ASSERT (this->cache_map_entry_ == 0);
00232 
00233 #if TAO_HAS_TRANSPORT_CURRENT == 1
00234   delete this->stats_;
00235 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00236 
00237   /*
00238    * Hook to add code that cleans up components
00239    * belong to the concrete protocol implementation.
00240    * Further additions to this Transport class will
00241    * need to add code *before* this hook.
00242    */
00243   //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK
00244 }


Member Function Documentation

ACE_Event_Handler::Reference_Count TAO_Transport::add_reference ( void   ) 

Memory management routines.

Forwards to event handler.

Definition at line 2742 of file Transport.cpp.

02743 {
02744   return this->event_handler_i ()->add_reference ();
02745 }

void TAO_Transport::allocate_partial_message_block ( void   )  [private]

Allocate a partial message block and store it in our partial_message_ data member.

Definition at line 2872 of file Transport.cpp.

02873 {
02874   if (this->partial_message_ == 0)
02875     {
02876       // This value must be at least large enough to hold a GIOP message
02877       // header plus a GIOP fragment header
02878       size_t const partial_message_size =
02879         this->messaging_object ()->header_length ();
02880        // + this->messaging_object ()->fragment_header_length ();
02881        // deprecated, conflicts with not-single_read_opt.
02882 
02883       ACE_NEW (this->partial_message_,
02884                ACE_Message_Block (partial_message_size));
02885     }
02886 }

void TAO_Transport::assign_translators ( TAO_InputCDR inp,
TAO_OutputCDR outp 
)

Use the Transport's codeset factories to set the translator for input and output CDRs.

Definition at line 2712 of file Transport.cpp.

02713 {
02714   if (this->char_translator_)
02715     {
02716       this->char_translator_->assign (inp);
02717       this->char_translator_->assign (outp);
02718     }
02719   if (this->wchar_translator_)
02720     {
02721       this->wchar_translator_->assign (inp);
02722       this->wchar_translator_->assign (outp);
02723     }
02724 }

void TAO_Transport::bidirectional_flag ( int  flag  ) 

Set the bidirectional flag.

Definition at line 42 of file Transport.inl.

00043 {
00044   this->bidirectional_flag_ = flag;
00045 }

int TAO_Transport::bidirectional_flag ( void   )  const

Get the bidirectional flag.

Definition at line 36 of file Transport.inl.

00037 {
00038   return this->bidirectional_flag_;
00039 }

TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * TAO_Transport::cache_map_entry ( void   ) 

Get the Cache Map entry.

Definition at line 60 of file Transport.inl.

00061 {
00062   return this->cache_map_entry_;
00063 }

void TAO_Transport::cache_map_entry ( TAO::Transport_Cache_Manager::HASH_MAP_ENTRY entry  ) 

Set the Cache Map entry.

Definition at line 66 of file Transport.inl.

00068 {
00069   // Sync with TAO_Transport::purge_entry()
00070   ACE_GUARD (ACE_Lock, ace_mon, *this->handler_lock_);
00071   this->cache_map_entry_ = entry;
00072 }

bool TAO_Transport::can_be_purged ( void   ) 

Can the transport be purged?

Definition at line 571 of file Transport.cpp.

00572 {
00573   return !this->tms_->has_request ();
00574 }

int TAO_Transport::cancel_output_i ( void   )  [private]

Cancel handle_output() callbacks.

Definition at line 940 of file Transport.cpp.

00941 {
00942   ACE_Event_Handler * const eh = this->event_handler_i ();
00943   ACE_Reactor *const reactor = eh->reactor ();
00944 
00945   if (TAO_debug_level > 3)
00946     {
00947       TAOLIB_DEBUG ((LM_DEBUG,
00948          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
00949          this->id ()));
00950     }
00951 
00952   return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00953 }

void TAO_Transport::char_translator ( TAO_Codeset_Translator_Base tf  ) 

CodeSet negotiation - Set the char codeset translator factory.

Definition at line 148 of file Transport.inl.

00149 {
00150   this->char_translator_ = tf;
00151   this->tcs_set_ = 1;
00152 }

TAO_Codeset_Translator_Base * TAO_Transport::char_translator ( void   )  const

CodeSet Negotiation - Get the char codeset translator factory.

Definition at line 136 of file Transport.inl.

00137 {
00138   return this->char_translator_;
00139 }

bool TAO_Transport::check_buffering_constraints_i ( TAO_Stub stub,
bool &  must_flush 
) [private]

Check if the buffering constraints have been reached.

Definition at line 1305 of file Transport.cpp.

01306 {
01307   // First let's compute the size of the queue:
01308   size_t msg_count = 0;
01309   size_t total_bytes = 0;
01310 
01311   for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
01312     {
01313       ++msg_count;
01314       total_bytes += i->message_length ();
01315     }
01316 
01317   bool set_timer = false;
01318   ACE_Time_Value new_deadline;
01319 
01320   TAO::Transport_Queueing_Strategy *queue_strategy =
01321     stub->transport_queueing_strategy ();
01322 
01323   bool constraints_reached = true;
01324 
01325   if (queue_strategy)
01326     {
01327       constraints_reached =
01328         queue_strategy->buffering_constraints_reached (stub,
01329                                                        msg_count,
01330                                                        total_bytes,
01331                                                        must_flush,
01332                                                        this->current_deadline_,
01333                                                        set_timer,
01334                                                        new_deadline);
01335     }
01336   else
01337     {
01338       must_flush = false;
01339     }
01340 
01341   // ... set the new timer, also cancel any previous timers ...
01342   // Check for connected state since this method may be called
01343   // before the connection is established and than there will be no
01344   // reactor available yet.
01345   if (set_timer && this->is_connected_)
01346     {
01347       ACE_Event_Handler *eh = this->event_handler_i ();
01348       ACE_Reactor * const reactor = eh->reactor ();
01349       this->current_deadline_ = new_deadline;
01350       ACE_Time_Value delay = new_deadline - ACE_OS::gettimeofday ();
01351 
01352       if (this->flush_timer_pending ())
01353         {
01354           reactor->cancel_timer (this->flush_timer_id_);
01355         }
01356 
01357       this->flush_timer_id_ =
01358         reactor->schedule_timer (&this->transport_timer_,
01359                                  &this->current_deadline_,
01360                                  delay);
01361     }
01362 
01363   return constraints_reached;
01364 }

void TAO_Transport::cleanup_queue ( size_t  byte_count  )  [private]

Cleanup the queue.

Exactly byte_count bytes have been sent, the queue must be cleaned up as potentially several messages have been completely sent out. It leaves on head_ the next message to send out.

Definition at line 1258 of file Transport.cpp.

01259 {
01260   while (!this->queue_is_empty_i () && byte_count > 0)
01261     {
01262       TAO_Queued_Message *i = this->head_;
01263 
01264       if (TAO_debug_level > 4)
01265         {
01266           TAOLIB_DEBUG ((LM_DEBUG,
01267              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01268              ACE_TEXT ("byte_count = %d\n"),
01269              this->id (), byte_count));
01270         }
01271 
01272       // Update the state of the first message
01273       i->bytes_transferred (byte_count);
01274 
01275       if (TAO_debug_level > 4)
01276         {
01277           TAOLIB_DEBUG ((LM_DEBUG,
01278              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01279              ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
01280              this->id (), byte_count, i->all_data_sent (),
01281              i->message_length ()));
01282         }
01283 
01284       // ... if all the data was sent the message must be removed from
01285       // the queue...
01286       if (i->all_data_sent ())
01287         {
01288           i->remove_from_list (this->head_, this->tail_);
01289           i->destroy ();
01290         }
01291       else if (byte_count == 0)
01292         {
01293           // If we have sent out a full message block, but we are not
01294           // finished with this message, we need to do something with the
01295           // message block chain held by our output stream.  If we don't,
01296           // another thread can attempt to service this transport and end
01297           // up resetting the output stream which will release the
01298           // message that we haven't finished sending.
01299           i->copy_if_necessary (this->out_stream ().begin ());
01300         }
01301     }
01302 }

void TAO_Transport::cleanup_queue_i (  )  [private]

Cleanup the complete queue.

Definition at line 1215 of file Transport.cpp.

01216 {
01217   if (TAO_debug_level > 4)
01218     {
01219       TAOLIB_DEBUG ((LM_DEBUG,
01220          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01221          ACE_TEXT ("cleaning up complete queue\n"),
01222          this->id ()));
01223     }
01224 
01225   size_t byte_count = 0;
01226   int msg_count = 0;
01227 
01228   // Cleanup all messages
01229   while (!this->queue_is_empty_i ())
01230     {
01231       TAO_Queued_Message *i = this->head_;
01232 
01233       if (TAO_debug_level > 4)
01234         {
01235           byte_count += i->message_length();
01236           ++msg_count;
01237         }
01238        // @@ This is a good point to insert a flag to indicate that a
01239        //    CloseConnection message was successfully received.
01240       i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
01241                         this->orb_core_->leader_follower ());
01242 
01243       i->remove_from_list (this->head_, this->tail_);
01244 
01245       i->destroy ();
01246     }
01247 
01248   if (TAO_debug_level > 4)
01249     {
01250       TAOLIB_DEBUG ((LM_DEBUG,
01251                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01252                   ACE_TEXT ("discarded %d messages, %u bytes.\n"),
01253                   this->id (), msg_count, byte_count));
01254     }
01255 }

void TAO_Transport::clear_translators ( TAO_InputCDR inp,
TAO_OutputCDR outp 
)

It is necessary to clear the codeset translator when a CDR stream is used for more than one GIOP message. This is required since the header must not be translated, whereas the body must be.

Definition at line 2727 of file Transport.cpp.

02728 {
02729   if (inp)
02730     {
02731       inp->char_translator (0);
02732       inp->wchar_translator (0);
02733     }
02734   if (outp)
02735     {
02736       outp->char_translator (0);
02737       outp->wchar_translator (0);
02738     }
02739 }

void TAO_Transport::close_connection ( void   )  [virtual]

Call the implementation method after obtaining the lock.

Definition at line 361 of file Transport.cpp.

00362 {
00363   this->connection_handler_i ()->close_connection ();
00364 }

bool TAO_Transport::connection_closed_on_read ( void   )  const

Was a connection seen as closed during a read.

Definition at line 2925 of file Transport.cpp.

02926 {
02927   return connection_closed_on_read_;
02928 }

TAO_Connection_Handler * TAO_Transport::connection_handler ( void   ) 

Get the connection handler for this transport.

Definition at line 188 of file Transport.inl.

00189 {
00190   return this->connection_handler_i();
00191 }

virtual TAO_Connection_Handler* TAO_Transport::connection_handler_i ( void   )  [protected, pure virtual]

These classes need privileged access to:

int TAO_Transport::consolidate_enqueue_message ( TAO_Queued_Data qd  )  [private]
Returns:
-1 error, otherwise 0

Definition at line 1873 of file Transport.cpp.

01874 {
01875   // consolidate message on top of stack, only for fragmented messages
01876 
01877   // paranoid check
01878   if (q_data->missing_data () != 0)
01879     {
01880        return -1;
01881     }
01882 
01883   if (q_data->more_fragments () ||
01884       q_data->msg_type () == GIOP::Fragment)
01885     {
01886       TAO_Queued_Data *new_q_data = 0;
01887 
01888       switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01889         {
01890         case -1: // error
01891           return -1;
01892 
01893         case 0:  // returning consolidated message in new_q_data
01894           if (!new_q_data)
01895             {
01896               if (TAO_debug_level > 0)
01897                 {
01898                   TAOLIB_ERROR ((LM_ERROR,
01899                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
01900                      ACE_TEXT ("error, consolidated message is NULL\n"),
01901                      this->id ()));
01902                 }
01903               return -1;
01904             }
01905 
01906           if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0)
01907             {
01908               TAO_Queued_Data::release (new_q_data);
01909               return -1;
01910             }
01911           break;
01912 
01913         case 1:  // fragment has been stored in messaging_oject()
01914           break;
01915         }
01916     }
01917   else
01918     {
01919       if (this->incoming_message_queue_.enqueue_tail (q_data) != 0)
01920         {
01921           TAO_Queued_Data::release (q_data);
01922           return -1;
01923         }
01924     }
01925 
01926   return 0; // success
01927 }

int TAO_Transport::consolidate_process_message ( TAO_Queued_Data qd,
TAO_Resume_Handle rh 
) [private]
Returns:
-1 error, otherwise 0

Definition at line 1786 of file Transport.cpp.

01788 {
01789   // paranoid check
01790   if (q_data->missing_data () != 0)
01791     {
01792       if (TAO_debug_level > 0)
01793         {
01794            TAOLIB_ERROR ((LM_ERROR,
01795               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01796               ACE_TEXT ("missing data\n"),
01797               this->id ()));
01798         }
01799        return -1;
01800     }
01801 
01802   if (q_data->more_fragments () ||
01803       q_data->msg_type () == GIOP::Fragment)
01804     {
01805       // consolidate message on top of stack, only for fragmented messages
01806       TAO_Queued_Data *new_q_data = 0;
01807 
01808       switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01809         {
01810         case -1: // error
01811           return -1;
01812 
01813         case 0:  // returning consolidated message in q_data
01814           if (!new_q_data)
01815             {
01816               if (TAO_debug_level > 0)
01817                 {
01818                   TAOLIB_ERROR ((LM_ERROR,
01819                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01820                      ACE_TEXT ("error, consolidated message is NULL\n"),
01821                      this->id ()));
01822                 }
01823               return -1;
01824             }
01825 
01826 
01827           if (this->process_parsed_messages (new_q_data, rh) == -1)
01828             {
01829               TAO_Queued_Data::release (new_q_data);
01830 
01831               if (TAO_debug_level > 0)
01832                 {
01833                   TAOLIB_ERROR ((LM_ERROR,
01834                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01835                      ACE_TEXT ("error processing consolidated message\n"),
01836                      this->id ()));
01837                 }
01838               return -1;
01839             }
01840 
01841           TAO_Queued_Data::release (new_q_data);
01842 
01843           break;
01844 
01845         case 1:  // fragment has been stored in messaging_oject()
01846           break;
01847         }
01848     }
01849   else
01850     {
01851       if (this->process_parsed_messages (q_data, rh) == -1)
01852         {
01853           TAO_Queued_Data::release (q_data);
01854 
01855           if (TAO_debug_level > 0)
01856             {
01857               TAOLIB_ERROR ((LM_ERROR,
01858                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01859                  ACE_TEXT ("error processing message\n"),
01860                  this->id ()));
01861             }
01862           return -1;
01863         }
01864 
01865       TAO_Queued_Data::release (q_data);
01866 
01867     }
01868 
01869   return 0;
01870 }

TAO_Transport::Drain_Result TAO_Transport::drain_queue ( TAO::Transport::Drain_Constraints const &  dc  )  [private]

Send some of the data in the queue.

As the outgoing data is drained this method is invoked to send as much of the current message as possible.

Definition at line 997 of file Transport.cpp.

00998 {
00999   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, DR_ERROR);
01000   Drain_Result const retval = this->drain_queue_i (dc);
01001 
01002   if (retval == DR_QUEUE_EMPTY)
01003     {
01004       // ... there is no current message or it was completely
01005       // sent, cancel output...
01006       TAO_Flushing_Strategy *flushing_strategy =
01007         this->orb_core ()->flushing_strategy ();
01008 
01009       flushing_strategy->cancel_output (this);
01010 
01011       return DR_OK;
01012     }
01013 
01014   return retval;
01015 }

TAO_Transport::Drain_Result TAO_Transport::drain_queue_helper ( int &  iovcnt,
iovec  iov[],
TAO::Transport::Drain_Constraints const &  dc 
) [private]

A helper routine used in drain_queue_i().

Definition at line 1018 of file Transport.cpp.

01020 {
01021   // As a side-effect, this decrements the timeout() pointed-to value by
01022   // the time used in this function.  That might be important as there are
01023   // potentially long running system calls invoked from here.
01024   TAO::ORB_Countdown_Time countdown(dc.timeout());
01025 
01026   size_t byte_count = 0;
01027 
01028   // ... send the message ...
01029   ssize_t retval = -1;
01030 
01031 #if TAO_HAS_SENDFILE == 1
01032   if (this->mmap_allocator_)
01033     retval = this->sendfile (this->mmap_allocator_,
01034                              iov,
01035                              iovcnt,
01036                              byte_count,
01037                              dc);
01038   else
01039 #endif  /* TAO_HAS_SENDFILE==1 */
01040     retval = this->send (iov, iovcnt, byte_count,
01041                          this->io_timeout (dc));
01042 
01043   if (TAO_debug_level > 9)
01044     {
01045       dump_iov (iov, iovcnt, this->id (),
01046                 byte_count, ACE_TEXT("drain_queue_helper"));
01047     }
01048 
01049   if (retval == 0)
01050     {
01051       if (TAO_debug_level > 4)
01052         {
01053           TAOLIB_DEBUG ((LM_DEBUG,
01054              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
01055              ACE_TEXT ("send() returns 0\n"),
01056              this->id ()));
01057         }
01058       return DR_ERROR;
01059     }
01060   else if (retval == -1)
01061     {
01062       if (TAO_debug_level > 4)
01063         {
01064           TAOLIB_DEBUG ((LM_DEBUG,
01065              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
01066              ACE_TEXT ("error during send() (errno: %d) - %m\n"),
01067              this->id (), ACE_ERRNO_GET));
01068         }
01069 
01070       if (errno == EWOULDBLOCK || errno == EAGAIN)
01071         {
01072           return DR_WOULDBLOCK;
01073         }
01074 
01075       return DR_ERROR;
01076     }
01077 
01078   // ... now we need to update the queue, removing elements
01079   // that have been sent, and updating the last element if it
01080   // was only partially sent ...
01081   this->cleanup_queue (byte_count);
01082   iovcnt = 0;
01083 
01084   // ... start over, how do we guarantee progress?  Because if
01085   // no bytes are sent send() can only return 0 or -1
01086 
01087   // Total no. of bytes sent for a send call
01088   this->sent_byte_count_ += byte_count;
01089 
01090   if (TAO_debug_level > 4)
01091     {
01092       TAOLIB_DEBUG ((LM_DEBUG,
01093          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
01094          ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
01095          this->id(), byte_count, this->queue_is_empty_i ()));
01096     }
01097 
01098   return DR_QUEUE_EMPTY;
01099   // drain_queue_i will check if the queue is actually empty
01100 }

TAO_Transport::Drain_Result TAO_Transport::drain_queue_i ( TAO::Transport::Drain_Constraints const &  dc  )  [private]

Implement drain_queue() assuming the lock is held.

Definition at line 1103 of file Transport.cpp.

01104 {
01105   // This is the vector used to send data, it must be declared outside
01106   // the loop because after the loop there may still be data to be
01107   // sent
01108   int iovcnt = 0;
01109 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
01110   iovec iov[ACE_IOV_MAX] = { { 0 , 0 } };
01111 #else
01112   iovec iov[ACE_IOV_MAX];
01113 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
01114 
01115   // We loop over all the elements in the queue ...
01116   TAO_Queued_Message *i = this->head_;
01117 
01118   // Reset the value so that the counting is done for each new send
01119   // call.
01120   this->sent_byte_count_ = 0;
01121 
01122   // Avoid calling this expensive function each time through the loop. Instead
01123   // we'll assume that the time is unlikely to change much during the loop.
01124   // If we are forced to send in the loop then we'll recompute the time.
01125   ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr ();
01126 
01127   while (i != 0)
01128     {
01129       if (i->is_expired (now))
01130         {
01131           if (TAO_debug_level > 3)
01132           {
01133             TAOLIB_DEBUG ((LM_DEBUG,
01134               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01135               ACE_TEXT ("Discarding expired queued message.\n"),
01136               this->id ()));
01137           }
01138           TAO_Queued_Message *next = i->next ();
01139           i->state_changed (TAO_LF_Event::LFS_TIMEOUT,
01140                             this->orb_core_->leader_follower ());
01141           i->remove_from_list (this->head_, this->tail_);
01142           i->destroy ();
01143           i = next;
01144           continue;
01145         }
01146       // ... each element fills the iovector ...
01147       i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
01148 
01149       // ... the vector is full, no choice but to send some data out.
01150       // We need to loop because a single message can span multiple
01151       // IOV_MAX elements ...
01152       if (iovcnt == ACE_IOV_MAX)
01153         {
01154           Drain_Result const retval =
01155             this->drain_queue_helper (iovcnt, iov, dc);
01156 
01157           if (TAO_debug_level > 4)
01158             {
01159               TAOLIB_DEBUG ((LM_DEBUG,
01160                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01161                  ACE_TEXT ("helper retval = %d\n"),
01162                  this->id (), static_cast<int> (retval.dre_)));
01163             }
01164 
01165           if (retval != DR_QUEUE_EMPTY)
01166             {
01167               return retval;
01168             }
01169 
01170           now = ACE_High_Res_Timer::gettimeofday_hr ();
01171 
01172           i = this->head_;
01173           continue;
01174         }
01175       // ... notice that this line is only reached if there is still
01176       // room in the iovector ...
01177       i = i->next ();
01178     }
01179 
01180   if (iovcnt != 0)
01181     {
01182       Drain_Result const retval = this->drain_queue_helper (iovcnt, iov, dc);
01183 
01184       if (TAO_debug_level > 4)
01185         {
01186           TAOLIB_DEBUG ((LM_DEBUG,
01187               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01188               ACE_TEXT ("helper retval = %d\n"),
01189               this->id (), static_cast<int> (retval.dre_)));
01190         }
01191 
01192       if (retval != DR_QUEUE_EMPTY)
01193         {
01194           return retval;
01195         }
01196     }
01197 
01198   if (this->queue_is_empty_i ())
01199     {
01200       if (this->flush_timer_pending ())
01201         {
01202           ACE_Event_Handler *eh = this->event_handler_i ();
01203           ACE_Reactor * const reactor = eh->reactor ();
01204           reactor->cancel_timer (this->flush_timer_id_);
01205           this->reset_flush_timer ();
01206         }
01207 
01208       return DR_QUEUE_EMPTY;
01209     }
01210 
01211   return DR_OK;
01212 }

virtual ACE_Event_Handler* TAO_Transport::event_handler_i ( void   )  [pure virtual]

Return the event handler used to receive notifications from the Reactor. Normally a concrete TAO_Transport object has-a ACE_Event_Handler member that functions as an adapter between the ACE_Reactor framework and the TAO pluggable protocol framework. In all the protocols implemented so far this role is fullfilled by an instance of ACE_Svc_Handler.

Todo:
Since we only use a limited functionality of ACE_Svc_Handler we could probably implement a generic adapter class (TAO_Transport_Event_Handler or something), this will reduce footprint and simplify the process of implementing a pluggable protocol.
Todo:
This method has to be renamed to event_handler()
bool TAO_Transport::first_request ( void   )  const

Get the first request flag.

Definition at line 175 of file Transport.inl.

00176 {
00177   return this->first_request_;
00178 }

void TAO_Transport::first_request_sent ( bool  flag = false  ) 

Set the state of the first_request_ to flag.

Definition at line 169 of file Transport.inl.

00170 {
00171   this->first_request_ = flag;
00172 }

int TAO_Transport::flush_timer_pending ( void   )  const [private]

Check if the flush timer is still pending.

Definition at line 113 of file Transport.inl.

00114 {
00115   return this->flush_timer_id_ != -1;
00116 }

int TAO_Transport::format_queue_message ( TAO_OutputCDR stream,
ACE_Time_Value max_wait_time,
TAO_Stub stub 
)

Format and queue a message for stream

Parameters:
max_wait_time The maximum time that the operation can block, used in the implementation of timeouts.

Definition at line 631 of file Transport.cpp.

00634 {
00635   if (this->messaging_object ()->format_message (stream, stub, 0) != 0)
00636     return -1;
00637 
00638   if (this->queue_message_i (stream.begin (), max_wait_time) != 0)
00639     return -1;
00640 
00641   // check the buffering constraints to see what must be done in post_open()
00642   bool must_flush = false;
00643   this->flush_in_post_open_ |=
00644     this->check_buffering_constraints_i (stub,
00645                                          must_flush);
00646 
00647   return 0;
00648 }

int TAO_Transport::generate_locate_request ( TAO_Target_Specification spec,
TAO_Operation_Details opdetails,
TAO_OutputCDR output 
)

This is a request for the transport object to write a LocateRequest header before it is sent out.

Definition at line 497 of file Transport.cpp.

00501 {
00502   if (this->messaging_object ()->generate_locate_request_header (opdetails,
00503                                                                  spec,
00504                                                                  output) == -1)
00505     {
00506       if (TAO_debug_level > 0)
00507         {
00508           TAOLIB_ERROR ((LM_ERROR,
00509                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
00510                       ACE_TEXT ("error while marshalling the LocateRequest header\n"),
00511                       this->id ()));
00512         }
00513 
00514       return -1;
00515     }
00516 
00517   return 0;
00518 }

int TAO_Transport::generate_request_header ( TAO_Operation_Details opd,
TAO_Target_Specification spec,
TAO_OutputCDR msg 
) [virtual]

This is a request for the transport object to write a request header before it sends out the request

Definition at line 521 of file Transport.cpp.

00525 {
00526   if (this->messaging_object ()->generate_request_header (opdetails,
00527                                                           spec,
00528                                                           output) == -1)
00529     {
00530       if (TAO_debug_level > 0)
00531         {
00532           TAOLIB_ERROR ((LM_ERROR,
00533                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_request_header, ")
00534                       ACE_TEXT ("error while marshalling the Request header\n"),
00535                       this->id()));
00536         }
00537 
00538       return -1;
00539     }
00540 
00541   return 0;
00542 }

int TAO_Transport::handle_input ( TAO_Resume_Handle rh,
ACE_Time_Value max_wait_time = 0 
) [virtual]

Callback to read incoming data.

The ACE_Event_Handler adapter invokes this method as part of its handle_input() operation.

Todo:
the method name is confusing! Calling it handle_input() would probably make things easier to understand and follow!

Once a complete message is read the Transport class delegates on the Messaging layer to invoke the right upcall (on the server) or the TAO_Reply_Dispatcher (on the client side).

Parameters:
max_wait_time In some cases the I/O is synchronous, e.g. a thread-per-connection server or when Wait_On_Read is enabled. In those cases a maximum read time can be specified.

All the methods relevant to the incoming data path of the ORB are defined below

Definition at line 1712 of file Transport.cpp.

01714 {
01715   if (TAO_debug_level > 3)
01716     {
01717       TAOLIB_DEBUG ((LM_DEBUG,
01718          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
01719          this->id ()));
01720     }
01721 
01722   // First try to process messages of the head of the incoming queue.
01723   int const retval = this->process_queue_head (rh);
01724 
01725   if (retval <= 0)
01726     {
01727       if (retval == -1)
01728         {
01729           if (TAO_debug_level > 2)
01730             {
01731               TAOLIB_ERROR ((LM_ERROR,
01732                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01733                  ACE_TEXT ("error while parsing the head of the queue\n"),
01734                  this->id()));
01735 
01736             }
01737           return -1;
01738         }
01739       else
01740         {
01741           // retval == 0
01742 
01743           // Processed a message in queue successfully. This
01744           // thread must return to thread-pool now.
01745           return 0;
01746         }
01747     }
01748 
01749   TAO_Queued_Data *q_data = 0;
01750 
01751   if (this->incoming_message_stack_.top (q_data) != -1
01752       && q_data->missing_data () != TAO_MISSING_DATA_UNDEFINED)
01753     {
01754       /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete  */
01755       if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1)
01756         {
01757           if (TAO_debug_level > 0)
01758             {
01759               TAOLIB_ERROR ((LM_ERROR,
01760                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01761                  ACE_TEXT ("error consolidating incoming message\n"),
01762                  this->id ()));
01763             }
01764           return -1;
01765         }
01766     }
01767   else
01768     {
01769       if (this->handle_input_parse_data (rh, max_wait_time) == -1)
01770         {
01771           if (TAO_debug_level > 0)
01772             {
01773               TAOLIB_ERROR ((LM_ERROR,
01774                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01775                  ACE_TEXT ("error parsing incoming message\n"),
01776                  this->id ()));
01777             }
01778           return -1;
01779         }
01780     }
01781 
01782   return 0;
01783 }

int TAO_Transport::handle_input_missing_data ( TAO_Resume_Handle rh,
ACE_Time_Value max_wait_time,
TAO_Queued_Data q_data 
) [private]

Is invoked by handle_input operation. It consolidate message on top of incoming_message_stack. The amount of missing data is known and recv operation copies data directly into message buffer, as much as a single recv-invocation provides.

Definition at line 1930 of file Transport.cpp.

01933 {
01934   // paranoid check
01935   if (q_data == 0)
01936     {
01937       return -1;
01938     }
01939 
01940   if (TAO_debug_level > 3)
01941     {
01942       TAOLIB_DEBUG ((LM_DEBUG,
01943          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01944          ACE_TEXT ("enter (missing data == %d)\n"),
01945          this->id (), q_data->missing_data ()));
01946     }
01947 
01948   size_t const recv_size = q_data->missing_data ();
01949 
01950   if (q_data->msg_block ()->space() < recv_size)
01951     {
01952       // make sure the message_block has enough space
01953       size_t const message_size = recv_size + q_data->msg_block ()->length();
01954 
01955       if (ACE_CDR::grow (q_data->msg_block (), message_size) == -1)
01956         {
01957           return -1;
01958         }
01959     }
01960 
01961   // Saving the size of the received buffer in case any one needs to
01962   // get the size of the message that is received in the
01963   // context. Obviously the value will be changed for each recv call
01964   // and the user is supposed to invoke the accessor only in the
01965   // invocation context to get meaningful information.
01966   this->recv_buffer_size_ = recv_size;
01967 
01968   // Read the message into the existing message block on heap
01969   ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(),
01970                                 recv_size,
01971                                 max_wait_time);
01972 
01973   if (n <= 0)
01974     {
01975       return ACE_Utils::truncate_cast<int> (n);
01976     }
01977 
01978   if (TAO_debug_level > 3)
01979     {
01980       TAOLIB_DEBUG ((LM_DEBUG,
01981          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01982          ACE_TEXT ("read bytes %d\n"),
01983          this->id (), n));
01984     }
01985 
01986   q_data->msg_block ()->wr_ptr(n);
01987   q_data->missing_data (q_data->missing_data () - n);
01988 
01989   if (q_data->missing_data () == 0)
01990     {
01991       // paranoid check
01992       if (this->incoming_message_stack_.pop (q_data) == -1)
01993         {
01994           return -1;
01995         }
01996 
01997       if (this->consolidate_process_message (q_data, rh) == -1)
01998         {
01999           return -1;
02000         }
02001     }
02002 
02003   return 0;
02004 }

int TAO_Transport::handle_input_parse_data ( TAO_Resume_Handle rh,
ACE_Time_Value max_wait_time 
) [private]

Is invoked by handle_input operation. It parses new messages from input stream or consolidates messages whose header has been partially read, the message size being unknown so far. It parses as much data as a single recv-invocation provides.

Definition at line 2051 of file Transport.cpp.

02053 {
02054   if (TAO_debug_level > 3)
02055     {
02056       TAOLIB_DEBUG ((LM_DEBUG,
02057          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02058          ACE_TEXT ("enter\n"),
02059          this->id ()));
02060     }
02061 
02062   // The buffer on the stack which will be used to hold the input
02063   // messages, ACE_CDR::MAX_ALIGNMENT compensates the
02064   // memory-alignment. This improves performance with SUN-Java-ORB-1.4
02065   // and higher that sends fragmented requests of size 1024 bytes.
02066   char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
02067 
02068 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
02069   (void) ACE_OS::memset (buf,
02070                          '\0',
02071                          sizeof buf);
02072 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
02073 
02074   // Create a data block
02075   ACE_Data_Block db (sizeof (buf),
02076                      ACE_Message_Block::MB_DATA,
02077                      buf,
02078                      this->orb_core_->input_cdr_buffer_allocator (),
02079                      this->orb_core_->locking_strategy (),
02080                      ACE_Message_Block::DONT_DELETE,
02081                      this->orb_core_->input_cdr_dblock_allocator ());
02082 
02083   // Create a message block
02084   ACE_Message_Block message_block (&db,
02085                                    ACE_Message_Block::DONT_DELETE,
02086                                    this->orb_core_->input_cdr_msgblock_allocator ());
02087 
02088   // Align the message block
02089   ACE_CDR::mb_align (&message_block);
02090 
02091   size_t recv_size = 0; // Note: unsigned integer
02092 
02093   // Pointer to newly parsed message
02094   TAO_Queued_Data *q_data = 0;
02095 
02096   // Optimizing access of constants
02097   size_t const header_length = this->messaging_object ()->header_length ();
02098 
02099   // Paranoid check
02100   if (header_length > message_block.space ())
02101     {
02102       return -1;
02103     }
02104 
02105   if (this->orb_core_->orb_params ()->single_read_optimization ())
02106     {
02107       recv_size = message_block.space ();
02108     }
02109   else
02110     {
02111       // Single read optimization has been de-activated. That means
02112       // that we need to read from transport the GIOP header first
02113       // before the payload. This codes first checks the incoming
02114       // stack for partial messages which needs to be
02115       // consolidated. Otherwise we are in new cycle, reading complete
02116       // GIOP header of new incoming message.
02117       if (this->incoming_message_stack_.top (q_data) != -1
02118            && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
02119         {
02120           // There is a partial message on incoming_message_stack_
02121           // whose length is unknown so far. We need to consolidate
02122           // the GIOP header to get to know the payload size,
02123           recv_size = header_length - q_data->msg_block ()->length ();
02124         }
02125       else
02126         {
02127           // Read amount of data forming GIOP header of new incoming
02128           // message.
02129           recv_size = header_length;
02130         }
02131       // POST: 0 <= recv_size <= header_length
02132     }
02133   // POST: 0 <= recv_size <= message_block->space ()
02134 
02135   // If we have a partial message, copy it into our message block and
02136   // clear out the partial message.
02137   if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
02138     {
02139       // (*) Copy back the partial message into current read-buffer,
02140       // verify that the read-strategy of "recv_size" bytes is not
02141       // exceeded. The latter check guarantees that recv_size does not
02142       // roll-over and keeps in range
02143       // 0<=recv_size<=message_block->space()
02144       if (this->partial_message_->length () <= recv_size &&
02145           message_block.copy (this->partial_message_->rd_ptr (),
02146                               this->partial_message_->length ()) == 0)
02147         {
02148 
02149           recv_size -= this->partial_message_->length ();
02150           // reset is done later to avoid problem in case of EWOULDBLOCK
02151           // or EAGAIN errno
02152         }
02153       else
02154         {
02155           return -1;
02156         }
02157     }
02158   // POST: 0 <= recv_size <= buffer_space
02159 
02160   if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0
02161     {
02162       // This event would cause endless looping, trying frequently to
02163       // read zero bytes from stream.  This might happen, if TAOs
02164       // protocol implementation is not correct and tries to read data
02165       // beyond header without "single_read_optimazation" being
02166       // activated.
02167       if (TAO_debug_level > 0)
02168         {
02169           TAOLIB_ERROR ((LM_ERROR,
02170              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02171              ACE_TEXT ("Error - endless loop detection, closing connection"),
02172              this->id ()));
02173         }
02174       if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
02175         {
02176           this->partial_message_->reset ();
02177         }
02178       return -1;
02179     }
02180 
02181   // Saving the size of the received buffer in case any one needs to
02182   // get the size of the message thats received in the
02183   // context. Obviously the value will be changed for each recv call
02184   // and the user is supposed to invoke the accessor only in the
02185   // invocation context to get meaningful information.
02186   this->recv_buffer_size_ = recv_size;
02187 
02188   // Read the message into the message block that we have created on
02189   // the stack.
02190   ssize_t const n = this->recv (message_block.wr_ptr (),
02191                                 recv_size,
02192                                 max_wait_time);
02193 
02194   // If there is an error return to the reactor..
02195   // do not reset partial message in case of n == 0 (EWOULDBLOCK || EAGAIN),
02196   // we will need it during next try
02197   if (n <= 0)
02198     {
02199       if ((n < 0) &&
02200           (this->partial_message_ != 0 && this->partial_message_->length () > 0))
02201         {
02202           this->partial_message_->reset ();
02203         }
02204 
02205       return ACE_Utils::truncate_cast<int> (n);
02206     }
02207 
02208   if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
02209     {
02210       this->partial_message_->reset ();
02211     }
02212 
02213   if (TAO_debug_level > 3)
02214     {
02215       TAOLIB_DEBUG ((LM_DEBUG,
02216          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02217          ACE_TEXT ("read %d bytes\n"),
02218          this->id (), n));
02219     }
02220 
02221   // Set the write pointer in the stack buffer
02222   message_block.wr_ptr (n);
02223 
02224   //
02225   // STACK PROCESSING OR MESSAGE CONSOLIDATION
02226   //
02227 
02228   // PRE: data in buffer is aligned && message_block.length() > 0
02229 
02230   if (this->incoming_message_stack_.top (q_data) != -1
02231       && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
02232     {
02233       //
02234       // MESSAGE CONSOLIDATION
02235       //
02236 
02237       // Partial message on incoming_message_stack_ needs to be
02238       // consolidated.  The message header could not be parsed so far
02239       // and therefor the message size is unknown yet. Consolidating
02240       // the message destroys the memory alignment of succeeding
02241       // messages sharing the buffer, for that reason consolidation
02242       // and stack based processing are mutial exclusive.
02243       if (this->messaging_object ()->consolidate_node (q_data,
02244                                                        message_block) == -1)
02245         {
02246            if (TAO_debug_level > 0)
02247             {
02248                 TAOLIB_ERROR ((LM_ERROR,
02249                    ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02250                    ACE_TEXT ("error consolidating message from input buffer\n"),
02251                    this->id () ));
02252              }
02253            return -1;
02254         }
02255 
02256       // Complete message are to be enqueued and later processed
02257       if (q_data->missing_data () == 0)
02258         {
02259           if (this->incoming_message_stack_.pop (q_data) == -1)
02260             {
02261               return -1;
02262             }
02263 
02264           if (this->consolidate_enqueue_message (q_data) == -1)
02265             {
02266               return -1;
02267             }
02268         }
02269 
02270       if (message_block.length () > 0
02271           && this->handle_input_parse_extra_messages (message_block) == -1)
02272         {
02273           return -1;
02274         }
02275 
02276       // In any case try to process the enqueued messages
02277       if (this->process_queue_head (rh) == -1)
02278         {
02279           return -1;
02280         }
02281     }
02282   else
02283     {
02284       //
02285       // STACK PROCESSING (critical path)
02286       //
02287 
02288       // Process the first message in buffer on stack
02289 
02290       // (PRE: first message resides in aligned memory) Make a node of
02291       // the message-block..
02292 
02293       TAO_Queued_Data qd (&message_block,
02294                           this->orb_core_->transport_message_buffer_allocator ());
02295 
02296       size_t mesg_length  = 0;
02297 
02298       if (this->messaging_object ()->parse_next_message (qd, mesg_length) == -1
02299           || (qd.missing_data () == 0
02300               && mesg_length > message_block.length ()) )
02301         {
02302           // extracting message failed
02303           return -1;
02304         }
02305       // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length()
02306       // This prevents seeking rd_ptr behind the wr_ptr
02307 
02308       if (qd.missing_data () != 0 ||
02309           qd.more_fragments () ||
02310           qd.msg_type () == GIOP::Fragment)
02311         {
02312           if (qd.missing_data () == 0)
02313             {
02314               // Dealing with a fragment
02315               TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd);
02316 
02317               if (nqd == 0)
02318                 {
02319                   return -1;
02320                 }
02321 
02322               // mark the end of message in new buffer
02323               char* end_mark = nqd->msg_block ()->rd_ptr ()
02324                              + mesg_length;
02325               nqd->msg_block ()->wr_ptr (end_mark);
02326 
02327               // move the read pointer forward in old buffer
02328               message_block.rd_ptr (mesg_length);
02329 
02330               // enqueue the message
02331               if (this->consolidate_enqueue_message (nqd) == -1)
02332                 {
02333                   return -1;
02334                 }
02335 
02336               if (message_block.length () > 0
02337                   && this->handle_input_parse_extra_messages (message_block) == -1)
02338                 {
02339                   return -1;
02340                 }
02341 
02342               // In any case try to process the enqueued messages
02343               if (this->process_queue_head (rh) == -1)
02344                 {
02345                   return -1;
02346                 }
02347             }
02348           else if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED)
02349             {
02350               // Incomplete message, must be the last one in buffer
02351 
02352               if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED &&
02353                   qd.missing_data () > message_block.space ())
02354                 {
02355                   // Re-Allocate correct size on heap
02356                   if (ACE_CDR::grow (qd.msg_block (),
02357                                      message_block.length ()
02358                                      + qd.missing_data ()) == -1)
02359                     {
02360                       return -1;
02361                     }
02362                 }
02363 
02364               TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd);
02365 
02366               if (nqd == 0)
02367                 {
02368                   return -1;
02369                 }
02370 
02371               // move read-pointer to end of buffer
02372               message_block.rd_ptr (message_block.length());
02373 
02374               this->incoming_message_stack_.push (nqd);
02375             }
02376         }
02377       else
02378         {
02379           //
02380           // critical path
02381           //
02382 
02383           // We cant process the message on stack right now. First we
02384           // have got to parse extra messages from message_block,
02385           // putting them into queue.  When this is done we can return
02386           // to process this message, and notifying other threads to
02387           // process the messages in queue.
02388           char * end_marker = message_block.rd_ptr ()
02389                             + mesg_length;
02390 
02391           if (message_block.length () > mesg_length)
02392             {
02393               // There are more message in data stream to be parsed.
02394               // Safe the rd_ptr to restore later.
02395               char *rd_ptr_stack_mesg = message_block.rd_ptr ();
02396 
02397               // Skip parsed message, jump to next message in buffer
02398               // PRE: mesg_length <= message_block.length ()
02399               message_block.rd_ptr (mesg_length);
02400 
02401               // Extract remaining messages and enqueue them for later
02402               // heap processing
02403               if (this->handle_input_parse_extra_messages (message_block) == -1)
02404                 {
02405                   return -1;
02406                 }
02407 
02408               // correct the wr_ptr using the end_marker to point to the
02409               // end of the first message else the code after this will
02410               // see the full stream with all the messages
02411               message_block.wr_ptr (end_marker);
02412 
02413               // Restore rd_ptr
02414               message_block.rd_ptr (rd_ptr_stack_mesg);
02415             }
02416 
02417           // The following if-else has been copied from
02418           // process_queue_head().  While process_queue_head()
02419           // processes message on heap, here we will process a message
02420           // on stack.
02421 
02422           // Now that we have one message on stack to be processed,
02423           // check whether we have one more message in the queue...
02424           if (this->incoming_message_queue_.queue_length () > 0)
02425             {
02426               if (TAO_debug_level > 0)
02427                 {
02428                   TAOLIB_DEBUG ((LM_DEBUG,
02429                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02430                      ACE_TEXT ("notify reactor\n"),
02431                      this->id ()));
02432                 }
02433 
02434               int const retval = this->notify_reactor ();
02435 
02436               if (retval == 1)
02437                 {
02438                   // Let the class know that it doesn't need to resume  the
02439                   // handle..
02440                   rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02441                 }
02442               else if (retval < 0)
02443                 return -1;
02444             }
02445           else
02446             {
02447               // As there are no further messages in queue just resume
02448               // the handle. Set the flag incase someone had reset the flag..
02449               rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02450             }
02451 
02452           // PRE: incoming_message_queue is empty
02453           if (this->process_parsed_messages (&qd, rh) == -1)
02454             {
02455               return -1;
02456             }
02457           // move the rd_ptr tp position of end_marker
02458           message_block.rd_ptr (end_marker);
02459         }
02460     }
02461 
02462   // Now that all cases have been processed, there might be kept some data
02463   // in buffer that needs to be safed for next "handle_input" invocations.
02464    if (message_block.length () > 0)
02465      {
02466        if (this->partial_message_ == 0)
02467          {
02468            this->allocate_partial_message_block ();
02469          }
02470 
02471        if (this->partial_message_ != 0 &&
02472            this->partial_message_->copy (message_block.rd_ptr (),
02473                                          message_block.length ()) == 0)
02474          {
02475            message_block.rd_ptr (message_block.length ());
02476          }
02477        else
02478          {
02479            return -1;
02480          }
02481      }
02482 
02483    return 0;
02484 }

int TAO_Transport::handle_input_parse_extra_messages ( ACE_Message_Block message_block  )  [private]

Is invoked by handle_input_parse_data. Parses all messages remaining in message_block.

Definition at line 2008 of file Transport.cpp.

02010 {
02011   // store buffer status of last extraction: -1 parse error, 0
02012   // incomplete message header in buffer, 1 complete messages header
02013   // parsed
02014   int buf_status = 0;
02015 
02016   TAO_Queued_Data *q_data = 0;     // init
02017 
02018   // parse buffer until all messages have been extracted, consolidate
02019   // and enqueue complete messages, if the last message being parsed
02020   // has missin data, it is stays on top of incoming_message_stack.
02021   while (message_block.length () > 0 &&
02022          (buf_status = this->messaging_object ()->extract_next_message
02023           (message_block, q_data)) != -1 &&
02024          q_data != 0) // paranoid check
02025     {
02026       if (q_data->missing_data () == 0)
02027         {
02028           if (this->consolidate_enqueue_message (q_data) == -1)
02029             {
02030               return -1;
02031             }
02032         }
02033       else  // incomplete message read, probably the last message in buffer
02034         {
02035           // can not fail
02036           this->incoming_message_stack_.push (q_data);
02037         }
02038 
02039       q_data = 0; // reset
02040     } // while
02041 
02042   if (buf_status == -1)
02043     {
02044       return -1;
02045     }
02046 
02047   return 0;
02048 }

TAO_Transport::Drain_Result TAO_Transport::handle_output ( TAO::Transport::Drain_Constraints const &  dc  ) 

Callback method to reactively drain the outgoing data queue.

Methods called and used in the output path of the ORB.

Definition at line 599 of file Transport.cpp.

00600 {
00601   if (TAO_debug_level > 3)
00602     {
00603       TAOLIB_DEBUG ((LM_DEBUG,
00604                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output")
00605                   ACE_TEXT (" - block_on_io=%d, timeout=%d.%06d\n"),
00606                   this->id (),
00607                   dc.block_on_io(),
00608                   dc.timeout() ? dc.timeout()->sec() : static_cast<time_t> (-1),
00609                   dc.timeout() ? dc.timeout()->usec() : -1 ));
00610     }
00611 
00612   // The flushing strategy (potentially via the Reactor) wants to send
00613   // more data, first check if there is a current message that needs
00614   // more sending...
00615   Drain_Result const retval = this->drain_queue (dc);
00616 
00617   if (TAO_debug_level > 3)
00618     {
00619       TAOLIB_DEBUG ((LM_DEBUG,
00620                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
00621                   ACE_TEXT ("drain_queue returns %d/%d\n"),
00622                   this->id (),
00623                   static_cast<int> (retval.dre_), ACE_ERRNO_GET));
00624     }
00625 
00626   // Any errors are returned directly to the Reactor
00627   return retval;
00628 }

int TAO_Transport::handle_timeout ( const ACE_Time_Value current_time,
const void *  act 
)

The timeout callback, invoked when any of the timers related to this transport expire.

Parameters:
current_time The current time as reported from the Reactor
act The Asynchronous Completion Token. Currently it is interpreted as follows:

  • If the ACT is the address of this->current_deadline_ the queueing timeout has expired and the queue should start flushing.
Returns:
Returns 0 if there are no problems, -1 if there is an error
Todo:
In the future this function could be used to expire messages (oneways) that have been sitting for too long on the queue.

This is the only legal ACT in the current configuration....

Definition at line 956 of file Transport.cpp.

00958 {
00959   if (TAO_debug_level > 6)
00960     {
00961       TAOLIB_DEBUG ((LM_DEBUG,
00962          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_timeout, ")
00963          ACE_TEXT ("timer expired\n"),
00964          this->id ()));
00965     }
00966 
00967   /// This is the only legal ACT in the current configuration....
00968   if (act != &this->current_deadline_)
00969     {
00970       return -1;
00971     }
00972 
00973   if (this->flush_timer_pending ())
00974     {
00975       // The timer is always a oneshot timer, so mark is as not
00976       // pending.
00977       this->reset_flush_timer ();
00978 
00979       TAO_Flushing_Strategy *flushing_strategy =
00980         this->orb_core ()->flushing_strategy ();
00981       int const result = flushing_strategy->schedule_output (this);
00982       if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00983         {
00984           typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00985           TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00986           ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00987           if (flushing_strategy->flush_transport (this, 0) == -1) {
00988             return -1;
00989           }
00990         }
00991     }
00992 
00993   return 0;
00994 }

void TAO_Transport::id ( size_t  id  ) 

Definition at line 97 of file Transport.inl.

00098 {
00099   this->id_ = id;
00100 }

size_t TAO_Transport::id ( void   )  const

Set and Get the identifier for this transport instance.

If not set, this will return an integer representation of the this pointer for the instance on which it's called.

Definition at line 91 of file Transport.inl.

00092 {
00093   return this->id_;
00094 }

bool TAO_Transport::idle_after_reply ( void   ) 

Request is sent and the reply is received. Idle the transport now.

Definition at line 275 of file Transport.cpp.

00276 {
00277   return this->tms ()->idle_after_reply ();
00278 }

bool TAO_Transport::idle_after_send ( void   ) 

Request has been just sent, but the reply is not received. Idle the transport now.

Definition at line 269 of file Transport.cpp.

00270 {
00271   return this->tms ()->idle_after_send ();
00272 }

ACE_Time_Value const * TAO_Transport::io_timeout ( TAO::Transport::Drain_Constraints const &  dc  )  const [protected]

Re-factor computation of I/O timeouts based on operation timeouts. Depending on the wait strategy, we need to timeout I/O operations or not. For example, if we are using a non-blocking strategy, we want to pass 0 to all I/O operations, and rely on the ACE_NONBLOCK settings on the underlying sockets. However, for blocking strategies we want to pass the operation timeouts, to respect the application level policies.

This function was introduced as part of the fixes for bug 3647.

Definition at line 2894 of file Transport.cpp.

02896 {
02897   if (dc.block_on_io())
02898   {
02899     return dc.timeout();
02900   }
02901   if (this->wait_strategy()->can_process_upcalls())
02902   {
02903     return 0;
02904   }
02905   return dc.timeout();
02906 }

bool TAO_Transport::is_connected ( void   )  const

Is this transport really connected.

Definition at line 181 of file Transport.inl.

00182 {
00183   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, false);
00184   return this->is_connected_;
00185 }

CORBA::Boolean TAO_Transport::is_tcs_set ( void   )  const

Return true if the tcs has been set.

CodeSet negotiation.

Definition at line 163 of file Transport.inl.

00164 {
00165   return tcs_set_;
00166 }

int TAO_Transport::make_idle ( void   ) 

Cache management.

Definition at line 577 of file Transport.cpp.

00578 {
00579   if (TAO_debug_level > 3)
00580     {
00581       TAOLIB_DEBUG ((LM_DEBUG,
00582                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
00583                   this->id ()));
00584     }
00585 
00586   return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
00587 }

void TAO_Transport::messaging_init ( TAO_GIOP_Message_Version const &  version  ) 

Initializing the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects.

Definition at line 2766 of file Transport.cpp.

02767 {
02768   this->messaging_object ()->init (version.major, version.minor);
02769 }

TAO_GIOP_Message_Base * TAO_Transport::messaging_object ( void   ) 

Return the messaging object that is used to format the data that needs to be sent.

Definition at line 126 of file Transport.inl.

00127 {
00128   return this->messaging_object_;
00129 }

int TAO_Transport::notify_reactor ( void   )  [private]

These classes need privileged access to:

Definition at line 292 of file Transport.inl.

00293 {
00294   if (!this->ws_->is_registered ())
00295     {
00296       return 0;
00297     }
00298 
00299   return this->notify_reactor_now ();
00300 }

int TAO_Transport::notify_reactor_now ( void   )  [protected]

These classes need privileged access to:

Definition at line 2674 of file Transport.cpp.

02675 {
02676   ACE_Event_Handler *eh = this->event_handler_i ();
02677 
02678   // Get the reactor associated with the event handler
02679   ACE_Reactor *reactor = this->orb_core ()->reactor ();
02680 
02681   if (TAO_debug_level > 0)
02682     {
02683       TAOLIB_DEBUG ((LM_DEBUG,
02684          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02685          ACE_TEXT ("notify to Reactor\n"),
02686          this->id ()));
02687     }
02688 
02689   // Send a notification to the reactor...
02690   int const retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK);
02691 
02692   if (retval < 0 && TAO_debug_level > 2)
02693     {
02694       // @todo: need to think about what is the action that
02695       // we can take when we get here.
02696       TAOLIB_ERROR ((LM_ERROR,
02697          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02698          ACE_TEXT ("notify to the reactor failed..\n"),
02699          this->id ()));
02700     }
02701 
02702   return 1;
02703 }

void TAO_Transport::opened_as ( TAO::Connection_Role  role  ) 

Definition at line 54 of file Transport.inl.

00055 {
00056   this->opening_connection_role_ = role;
00057 }

TAO::Connection_Role TAO_Transport::opened_as ( void   )  const

Methods dealing with the role of the connection, e.g., CLIENT or SERVER. See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions.

Definition at line 48 of file Transport.inl.

00049 {
00050   return this->opening_connection_role_;
00051 }

TAO_ORB_Core * TAO_Transport::orb_core ( void   )  const

Access the ORB that owns this connection.

Definition at line 17 of file Transport.inl.

00018 {
00019   return this->orb_core_;
00020 }

TAO_OutputCDR & TAO_Transport::out_stream ( void   ) 

Accessor for the output CDR stream.

Definition at line 2754 of file Transport.cpp.

02755 {
02756   return this->messaging_object ()->out_stream ();
02757 }

TAO_SYNCH_MUTEX & TAO_Transport::output_cdr_lock ( void   ) 

Accessor for synchronizing Transport OutputCDR access.

Definition at line 2760 of file Transport.cpp.

02761 {
02762   return this->output_cdr_mutex_;
02763 }

bool TAO_Transport::post_connect_hook ( void   )  [virtual]

Hooks that can be overridden in concrete transports.

These hooks are invoked just after connection establishment (or after a connection is fetched from cache). The return value signifies whether the invoker should proceed with post connection establishment activities. Protocols like SSLIOP need this to verify whether connections already established have valid certificates. There are no pre_connect_hooks () since the transport doesn't exist before a connection establishment. :-)

Note:
The methods are not made const with a reason.

Definition at line 327 of file Transport.cpp.

00328 {
00329   return true;
00330 }

bool TAO_Transport::post_open ( size_t  id  ) 

Perform all the actions when this transport get opened.

Definition at line 2795 of file Transport.cpp.

02796 {
02797   if (TAO_debug_level > 9)
02798     {
02799       TAOLIB_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport::post_open, ")
02800                   ACE_TEXT ("tport id changed from %d to %d\n"), this->id_, id));
02801     }
02802   this->id_ = id;
02803 
02804   // When we have data in our outgoing queue schedule ourselves
02805   // for output
02806   if (!this->queue_is_empty_i ())
02807     {
02808       // If the wait strategy wants us to be registered with the reactor
02809       // then we do so. If registration is required and it succeeds,
02810       // #REFCOUNT# becomes two.
02811       if (this->wait_strategy ()->register_handler () == 0)
02812         {
02813           if (this->flush_in_post_open_)
02814             {
02815               TAO_Flushing_Strategy *flushing_strategy =
02816                 this->orb_core ()->flushing_strategy ();
02817 
02818               if (flushing_strategy == 0)
02819                 throw CORBA::INTERNAL ();
02820 
02821               this->flush_in_post_open_ = false;
02822               (void)flushing_strategy->schedule_output (this);
02823             }
02824         }
02825       else
02826         {
02827           // Registration failures.
02828 
02829           // Purge from the connection cache, if we are not in the cache, this
02830           // just does nothing.
02831           (void) this->purge_entry ();
02832 
02833           // Close the handler.
02834           (void) this->close_connection ();
02835 
02836           if (TAO_debug_level > 0)
02837             {
02838               TAOLIB_ERROR ((LM_ERROR,
02839                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_open , ")
02840                      ACE_TEXT ("could not register the transport ")
02841                      ACE_TEXT ("in the reactor.\n"),
02842                      this->id ()));
02843             }
02844 
02845           return false;
02846         }
02847     }
02848 
02849   {
02850     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, false);
02851     this->is_connected_ = true;
02852   }
02853 
02854   if (TAO_debug_level > 9 && !this->cache_map_entry_)
02855     {
02856       TAOLIB_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_open")
02857                             ACE_TEXT (", cache_map_entry_ is 0\n"), this->id_));
02858     }
02859 
02860   this->transport_cache_manager ().mark_connected (this->cache_map_entry_,
02861                                                    true);
02862 
02863   // update transport cache to make this entry available
02864   this->transport_cache_manager ().set_entry_state (
02865     this->cache_map_entry_,
02866     TAO::ENTRY_IDLE_AND_PURGABLE);
02867 
02868   return true;
02869 }

void TAO_Transport::pre_close ( void   ) 

do what needs to be done when closing the transport

Definition at line 2772 of file Transport.cpp.

02773 {
02774   if (TAO_debug_level > 9)
02775     {
02776       TAOLIB_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::pre_close\n"),
02777                   this->id_));
02778     }
02779   // @TODO: something needs to be done with is_connected_. Checking it is
02780   // guarded by a mutex, but setting it is not. Until the need for mutexed
02781   // protection is required, the transport cache is holding its own copy
02782   // of the is_connected_ flag, so that during cache lookups the cache
02783   // manager doesn't need to be burdened by the lock in is_connected().
02784   this->is_connected_ = false;
02785   this->transport_cache_manager ().mark_connected (this->cache_map_entry_,
02786                                                    false);
02787   this->purge_entry ();
02788   {
02789     ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
02790     this->cleanup_queue_i ();
02791   }
02792 }

int TAO_Transport::process_parsed_messages ( TAO_Queued_Data qd,
TAO_Resume_Handle rh 
) [protected]

Process the message by sending it to the higher layers of the ORB.

Definition at line 2488 of file Transport.cpp.

02490 {
02491   if (TAO_debug_level > 7)
02492     {
02493       TAOLIB_DEBUG ((LM_DEBUG,
02494          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02495          ACE_TEXT ("entering (missing data == %d)\n"),
02496          this->id(), qd->missing_data ()));
02497     }
02498 
02499 #if TAO_HAS_TRANSPORT_CURRENT == 1
02500   // Update stats, if any
02501   if (this->stats_ != 0)
02502     this->stats_->messages_received (qd->msg_block ()->length ());
02503 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
02504 
02505   switch (qd->msg_type ())
02506   {
02507     case GIOP::CloseConnection:
02508     {
02509       if (TAO_debug_level > 0)
02510         {
02511           TAOLIB_DEBUG ((LM_DEBUG,
02512              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02513              ACE_TEXT ("received CloseConnection message - %m\n"),
02514              this->id()));
02515         }
02516 
02517       // Return a "-1" so that the next stage can take care of
02518       // closing connection and the necessary memory management.
02519       return -1;
02520     }
02521     break;
02522     case GIOP::Request:
02523     case GIOP::LocateRequest:
02524     {
02525       // Let us resume the handle before we go ahead to process the
02526       // request. This will open up the handle for other threads.
02527       rh.resume_handle ();
02528 
02529       if (this->messaging_object ()->process_request_message (this, qd) == -1)
02530         {
02531           // Return a "-1" so that the next stage can take care of
02532           // closing connection and the necessary memory management.
02533           return -1;
02534         }
02535     }
02536     break;
02537     case GIOP::Reply:
02538     case GIOP::LocateReply:
02539     {
02540       rh.resume_handle ();
02541 
02542       TAO_Pluggable_Reply_Params params (this);
02543 
02544       if (this->messaging_object ()->process_reply_message (params, qd) == -1)
02545         {
02546           if (TAO_debug_level > 0)
02547             {
02548               TAOLIB_ERROR ((LM_ERROR,
02549                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02550                  ACE_TEXT ("error in process_reply_message - %m\n"),
02551                  this->id ()));
02552             }
02553 
02554           return -1;
02555         }
02556 
02557     }
02558     break;
02559     case GIOP::CancelRequest:
02560     {
02561       // The associated request might be incomplete residing
02562       // fragmented in messaging object. We must make sure the
02563       // resources allocated by fragments are released.
02564       if (this->messaging_object ()->discard_fragmented_message (qd) == -1)
02565         {
02566           if (TAO_debug_level > 0)
02567             {
02568               TAOLIB_ERROR ((LM_ERROR,
02569                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02570                  ACE_TEXT ("error processing CancelRequest\n"),
02571                  this->id ()));
02572             }
02573         }
02574 
02575       // We are not able to cancel requests being processed already;
02576       // this is declared as optional feature by CORBA, and TAO does
02577       // not support this currently.
02578 
02579       // Just continue processing, CancelRequest does not mean to cut
02580       // off the connection.
02581     }
02582     break;
02583     case GIOP::MessageError:
02584     {
02585       if (TAO_debug_level > 0)
02586         {
02587           TAOLIB_ERROR ((LM_ERROR,
02588              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02589              ACE_TEXT ("received MessageError, closing connection\n"),
02590              this->id ()));
02591         }
02592       return -1;
02593     }
02594     break;
02595     case GIOP::Fragment:
02596     {
02597       // Nothing to be done.
02598     }
02599     break;
02600   }
02601 
02602   // If not, just return back..
02603   return 0;
02604 }

int TAO_Transport::process_queue_head ( TAO_Resume_Handle rh  )  [private]

These classes need privileged access to:

Definition at line 2607 of file Transport.cpp.

02608 {
02609   if (TAO_debug_level > 3)
02610     {
02611       TAOLIB_DEBUG ((LM_DEBUG,
02612          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
02613          this->id (), this->incoming_message_queue_.queue_length () ));
02614     }
02615 
02616   // See if  message in queue ...
02617   if (this->incoming_message_queue_.queue_length () > 0)
02618     {
02619       // Get the message on the head of the queue..
02620       TAO_Queued_Data *qd =
02621         this->incoming_message_queue_.dequeue_head ();
02622 
02623       if (TAO_debug_level > 3)
02624         {
02625           TAOLIB_DEBUG ((LM_DEBUG,
02626              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02627              ACE_TEXT ("the size of the queue is [%d]\n"),
02628              this->id (),
02629              this->incoming_message_queue_.queue_length()));
02630         }
02631       // Now that we have pulled out out one message out of the queue,
02632       // check whether we have one more message in the queue...
02633       if (this->incoming_message_queue_.queue_length () > 0)
02634         {
02635           if (TAO_debug_level > 0)
02636             {
02637               TAOLIB_DEBUG ((LM_DEBUG,
02638                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02639                  ACE_TEXT ("notify reactor\n"),
02640                  this->id ()));
02641             }
02642 
02643           int const retval = this->notify_reactor ();
02644 
02645           if (retval == 1)
02646             {
02647               // Let the class know that it doesn't need to resume  the
02648               // handle..
02649               rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02650             }
02651           else if (retval < 0)
02652             return -1;
02653         }
02654       else
02655         {
02656           // As we are ready to process the last message just resume
02657           // the handle. Set the flag incase someone had reset the flag..
02658           rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02659         }
02660 
02661       // Process the message...
02662       int const retval = this->process_parsed_messages (qd, rh);
02663 
02664       // Delete the Queued_Data..
02665       TAO_Queued_Data::release (qd);
02666 
02667       return retval;
02668     }
02669 
02670   return 1;
02671 }

bool TAO_Transport::provide_blockable_handler ( TAO::Connection_Handler_Set handlers  ) 

Add event handlers corresponding to transports that have RW wait strategy to the handlers set. Called by the cache when the ORB is shutting down.

Parameters:
handlers The TAO_Connection_Handler_Set into which the transport should place its handler if the transport has RW strategy on.
Returns:
true indicates a handler was added to the handler set. false indocates that the transport did not have a blockable handler that could be added.

Definition at line 255 of file Transport.cpp.

00256 {
00257   if (this->ws_->non_blocking () ||
00258       this->opening_connection_role_ == TAO::TAO_SERVER_ROLE)
00259     return false;
00260 
00261   (void) this->add_reference ();
00262 
00263   h.insert (this->connection_handler_i ());
00264 
00265   return true;
00266 }

void TAO_Transport::provide_handler ( TAO::Connection_Handler_Set handlers  ) 

Added event handler to the handlers set.

Called by the cache when the cache is closing.

Parameters:
handlers The TAO_Connection_Handler_Set into which the transport should place its handler

Definition at line 247 of file Transport.cpp.

00248 {
00249   (void) this->add_reference ();
00250 
00251   handlers.insert (this->connection_handler_i ());
00252 }

int TAO_Transport::purge_entry ( void   ) 

Cache management.

Definition at line 557 of file Transport.cpp.

00558 {
00559   if (TAO_debug_level > 3)
00560     {
00561       TAOLIB_DEBUG ((LM_DEBUG,
00562                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::purge_entry, ")
00563                   ACE_TEXT ("entry is %@\n"),
00564                   this->id (), this->cache_map_entry_));
00565     }
00566 
00567   return this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
00568 }

void TAO_Transport::purging_order ( unsigned long  value  ) 

Definition at line 81 of file Transport.inl.

00082 {
00083   // This should only be called by the Transport Cache Manager when
00084   // it is holding it's lock.
00085   // The transport should still be here since the cache manager still
00086   // has a reference to it.
00087   this->purging_order_ = value;
00088 }

unsigned long TAO_Transport::purging_order ( void   )  const

Get and Set the purging order. The purging strategy uses the set version to set the purging order.

Definition at line 75 of file Transport.inl.

00076 {
00077   return this->purging_order_;
00078 }

bool TAO_Transport::queue_is_empty ( void   ) 

Check if there are messages pending in the queue.

Returns:
true if the queue is empty

Definition at line 103 of file Transport.inl.

00104 {
00105   ACE_GUARD_RETURN (ACE_Lock,
00106                     ace_mon,
00107                     *this->handler_lock_,
00108                     false);
00109   return this->queue_is_empty_i ();
00110 }

bool TAO_Transport::queue_is_empty_i ( void   )  const [private]

Check if there are messages pending in the queue.

This version assumes that the lock is already held. Use with care!

Returns:
true if the queue is empty

Definition at line 5 of file Transport.inl.

00006 {
00007   return (this->head_ == 0);
00008 }

int TAO_Transport::queue_message_i ( const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time,
bool  back = true 
) [protected]

Queue a message for message_block

Parameters:
max_wait_time The maximum time that the operation can block, used in the implementation of timeouts.
back If true, the message will be pushed to the back of the queue. If false, the message will be pushed to the front of the queue.

Definition at line 1686 of file Transport.cpp.

01688 {
01689   TAO_Queued_Message *queued_message = 0;
01690   ACE_NEW_RETURN (queued_message,
01691                   TAO_Asynch_Queued_Message (message_block,
01692                                              this->orb_core_,
01693                                              max_wait_time,
01694                                              0,
01695                                              true),
01696                   -1);
01697   if (back) {
01698     queued_message->push_back (this->head_, this->tail_);
01699   }
01700   else {
01701     queued_message->push_front (this->head_, this->tail_);
01702   }
01703 
01704   return 0;
01705 }

int TAO_Transport::recache_transport ( TAO_Transport_Descriptor_Interface desc  ) 

Recache ourselves in the cache.

Todo:

Ideally the following should be inline.

purge_entry has a return value, use it

Definition at line 547 of file Transport.cpp.

00548 {
00549   // First purge our entry
00550   this->purge_entry ();
00551 
00552   // Then add ourselves to the cache
00553   return this->transport_cache_manager ().cache_transport (desc, this);
00554 }

virtual ssize_t TAO_Transport::recv ( char *  buffer,
size_t  len,
const ACE_Time_Value timeout = 0 
) [pure virtual]

Read len bytes from into buf.

This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.

Parameters:
buffer ORB allocated buffer where the data should be
timeout The ACE_Time_Value *s is just a place holder for now. It is not clear this this is the best place to specify this. The actual timeout values will be kept in the Policies.
size_t TAO_Transport::recv_buffer_size ( void   )  const

Accessor to recv_buffer_size_.

Definition at line 194 of file Transport.inl.

00195 {
00196   return this->recv_buffer_size_;
00197 }

int TAO_Transport::register_handler ( void   )  [virtual]

Register the handler with the reactor.

Register the handler with the reactor. This method is used by the Wait_On_Reactor strategy. The transport must register its event handler with the ORB's Reactor.

Todo:
I think this method is pretty much useless, the connections are *always* registered with the Reactor, except in thread-per-connection mode. In that case putting the connection in the Reactor would produce unpredictable results anyway.

Definition at line 367 of file Transport.cpp.

00368 {
00369   if (TAO_debug_level > 4)
00370     {
00371       TAOLIB_DEBUG ((LM_DEBUG,
00372                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
00373                   this->id ()));
00374     }
00375 
00376   ACE_Reactor * const r = this->orb_core_->reactor ();
00377 
00378   // @@note: This should be okay since the register handler call will
00379   // not make a nested call into the transport.
00380   ACE_GUARD_RETURN (ACE_Lock,
00381                     ace_mon,
00382                     *this->handler_lock_,
00383                     false);
00384 
00385   if (r == this->event_handler_i ()->reactor () &&
00386       (this->wait_strategy ()->non_blocking () ||
00387        !this->orb_core ()->client_factory ()->use_cleanup_options ()))
00388     {
00389       if (TAO_debug_level > 6)
00390         {
00391           TAOLIB_DEBUG ((LM_DEBUG,
00392                          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler - ")
00393                          ACE_TEXT ("already registered with reactor\n"),
00394                          this->id ()));
00395         }
00396 
00397       return 0;
00398     }
00399 
00400   if (TAO_debug_level > 6)
00401     {
00402       TAOLIB_DEBUG ((LM_DEBUG,
00403                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler - ")
00404                   ACE_TEXT ("registering event handler with reactor\n"),
00405                   this->id ()));
00406     }
00407 
00408   // Set the flag in the Connection Handler and in the Wait Strategy
00409   // @@Maybe we should set these flags after registering with the
00410   // reactor. What if the  registration fails???
00411   this->ws_->is_registered (true);
00412 
00413   // Register the handler with the reactor
00414   return r->register_handler (this->event_handler_i (),
00415                               ACE_Event_Handler::READ_MASK);
00416 }

bool TAO_Transport::register_if_necessary ( void   ) 

Register with the reactor via the wait strategy.

Definition at line 333 of file Transport.cpp.

00334 {
00335   if (this->is_connected_ &&
00336       this->wait_strategy ()->register_handler () == -1)
00337     {
00338       // Registration failures.
00339       if (TAO_debug_level > 0)
00340         {
00341           TAOLIB_ERROR ((LM_ERROR,
00342                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_if_necessary, ")
00343                       ACE_TEXT ("could not register the transport ")
00344                       ACE_TEXT ("in the reactor.\n"),
00345                       this->id ()));
00346         }
00347 
00348       // Purge from the connection cache, if we are not in the cache, this
00349       // just does nothing.
00350       (void) this->purge_entry ();
00351 
00352       // Close the handler.
00353       (void) this->close_connection ();
00354 
00355       return false;
00356     }
00357   return true;
00358 }

int TAO_Transport::remove_handler ( void   )  [virtual]

Remove the handler from the reactor.

Definition at line 419 of file Transport.cpp.

00420 {
00421   if (TAO_debug_level > 4)
00422     {
00423       TAOLIB_DEBUG ((LM_DEBUG,
00424                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::remove_handler\n"),
00425                   this->id ()));
00426     }
00427 
00428   ACE_Reactor * const r = this->orb_core_->reactor ();
00429 
00430   // @@note: This should be okay since the remove handler call will
00431   // not make a nested call into the transport.
00432   ACE_GUARD_RETURN (ACE_Lock,
00433                     ace_mon,
00434                     *this->handler_lock_,
00435                     false);
00436 
00437 
00438   if (this->event_handler_i ()->reactor () == 0)
00439     {
00440       return 0;
00441     }
00442 
00443   if (TAO_debug_level > 6)
00444     {
00445       TAOLIB_DEBUG ((LM_DEBUG,
00446                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::remove_handler - ")
00447                   ACE_TEXT ("removing event handler from reactor\n"),
00448                   this->id ()));
00449     }
00450 
00451   // Set the flag in the Wait Strategy
00452   this->ws_->is_registered (false);
00453 
00454   // Remove the handler from the reactor
00455   if (r->remove_handler (this->event_handler_i (),
00456                          ACE_Event_Handler::READ_MASK|
00457                          ACE_Event_Handler::DONT_CALL) == -1)
00458     {
00459       if (TAO_debug_level > 0)
00460         TAOLIB_ERROR ((LM_ERROR,
00461                     ACE_TEXT ("TAO (%P|%t) - Transport[%d]::remove_handler - ")
00462                     ACE_TEXT ("reactor->remove_handler failed\n"),
00463                     this->id ()));
00464       return -1;
00465     }
00466   else
00467     {
00468       // reset the reactor property of the event handler or
00469       // Transport::register_handler() will not re-register
00470       // when called after us again.
00471       this->event_handler_i ()->reactor (0);
00472       return 0;
00473     }
00474 }

ACE_Event_Handler::Reference_Count TAO_Transport::remove_reference ( void   ) 

These classes need privileged access to:

Definition at line 2748 of file Transport.cpp.

02749 {
02750   return this->event_handler_i ()->remove_reference ();
02751 }

void TAO_Transport::report_invalid_event_handler ( const char *  caller  )  [private]

Print out error messages if the event handler is not valid.

Definition at line 1367 of file Transport.cpp.

01368 {
01369   if (TAO_debug_level > 0)
01370     {
01371       TAOLIB_DEBUG ((LM_DEBUG,
01372          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
01373          ACE_TEXT ("(%C) no longer associated with handler [tag=%d]\n"),
01374          this->id (), caller, this->tag_));
01375     }
01376 }

void TAO_Transport::reset_flush_timer ( void   )  [private]

The flush timer expired or was explicitly cancelled, mark it as not pending

Definition at line 119 of file Transport.inl.

00120 {
00121   this->flush_timer_id_ = -1;
00122   this->current_deadline_ = ACE_Time_Value::zero;
00123 }

int TAO_Transport::schedule_output_i ( void   )  [private]

Schedule handle_output() callbacks.

Definition at line 886 of file Transport.cpp.

00887 {
00888   ACE_Event_Handler * const eh = this->event_handler_i ();
00889   ACE_Reactor * const reactor = eh->reactor ();
00890 
00891   if (reactor == 0)
00892     {
00893       if (TAO_debug_level > 1)
00894         {
00895           TAOLIB_ERROR ((LM_ERROR,
00896                       ACE_TEXT ("TAO (%P|%t) - ")
00897                       ACE_TEXT ("Transport[%d]::schedule_output_i, ")
00898                       ACE_TEXT ("no reactor,")
00899                       ACE_TEXT ("returning -1\n"),
00900                       this->id ()));
00901         }
00902       return -1;
00903     }
00904 
00905   // Check to see if our event handler is still registered with the
00906   // reactor.  It's possible for another thread to have run close_connection()
00907   // since we last used the event handler.
00908   ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ());
00909   if (found)
00910     {
00911       found->remove_reference ();
00912 
00913       if (found != eh)
00914         {
00915           if (TAO_debug_level > 3)
00916             {
00917               TAOLIB_ERROR ((LM_ERROR,
00918                           ACE_TEXT ("TAO (%P|%t) - ")
00919                           ACE_TEXT ("Transport[%d]::schedule_output_i ")
00920                           ACE_TEXT ("event handler not found in reactor,")
00921                           ACE_TEXT ("returning -1\n"),
00922                           this->id ()));
00923             }
00924 
00925           return -1;
00926         }
00927     }
00928 
00929   if (TAO_debug_level > 3)
00930     {
00931       TAOLIB_DEBUG ((LM_DEBUG,
00932          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
00933          this->id ()));
00934     }
00935 
00936   return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00937 }

virtual ssize_t TAO_Transport::send ( iovec iov,
int  iovcnt,
size_t bytes_transferred,
ACE_Time_Value const *  timeout 
) [pure virtual]

Write the complete Message_Block chain to the connection.

This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.

Often the implementation simply forwards the arguments to the underlying ACE_Svc_Handler class. Using the code factored out into ACE.

Be careful with protocols that perform non-trivial transformations of the data, such as SSLIOP or protocols that compress the stream.

Parameters:
iov contains the data that must be sent.
timeout is the maximum time that the application is willing to wait for the data to be sent, useful in platforms that implement timed writes. The timeout value is obtained from the policies set by the application.
bytes_transferred should return the total number of bytes successfully transferred before the connection blocked. This is required because in some platforms and/or protocols multiple system calls may be required to send the chain of message blocks. The first few calls can work successfully, but the final one can fail or signal a flow control situation (via EAGAIN). In this case the ORB expects the function to return -1, errno to be appropriately set and this argument to return the number of bytes already on the OS I/O subsystem.

This call can also fail if the transport instance is no longer associated with a connection (e.g., the connection handler closed down). In that case, it returns -1 and sets errno to ENOENT.

int TAO_Transport::send_asynchronous_message_i ( TAO_Stub stub,
const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time 
) [private]

Send an asynchronous message, i.e. do not block until the message is on the wire

Definition at line 1435 of file Transport.cpp.

01438 {
01439   // Let's figure out if the message should be queued without trying
01440   // to send first:
01441   bool try_sending_first = true;
01442 
01443   bool const queue_empty = this->queue_is_empty_i ();
01444 
01445   TAO::Transport_Queueing_Strategy *queue_strategy =
01446     stub->transport_queueing_strategy ();
01447 
01448   if (!queue_empty)
01449     {
01450       try_sending_first = false;
01451     }
01452   else if (queue_strategy)
01453     {
01454       if (queue_strategy->must_queue (queue_empty))
01455         {
01456           try_sending_first = false;
01457         }
01458     }
01459 
01460   bool partially_sent = false;
01461   bool timeout_encountered = false;
01462 
01463   TAO::Transport::Drain_Constraints dc(
01464       max_wait_time, this->using_blocking_io_for_asynch_messages());
01465 
01466   if (try_sending_first)
01467     {
01468       ssize_t n = 0;
01469       size_t byte_count = 0;
01470       // ... in this case we must try to send the message first ...
01471 
01472       size_t const total_length = message_block->total_length ();
01473 
01474       if (TAO_debug_level > 6)
01475         {
01476           TAOLIB_DEBUG ((LM_DEBUG,
01477              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01478              ACE_TEXT ("trying to send the message (ml = %d)\n"),
01479              this->id (), total_length));
01480         }
01481 
01482       // @@ I don't think we want to hold the mutex here, however if
01483       // we release it we need to recheck the status of the transport
01484       // after we return... once I understand the final form for this
01485       // code I will re-visit this decision
01486       n = this->send_message_block_chain_i (message_block,
01487                                             byte_count,
01488                                             dc);
01489 
01490       if (n == -1)
01491         {
01492           // ... if this is just an EWOULDBLOCK we must schedule the
01493           // message for later, if it is ETIME we still have to send
01494           // the complete message, because cutting off the message at
01495           // this point will destroy the synchronization with the
01496           // server ...
01497           if (errno != EWOULDBLOCK && errno != ETIME)
01498             {
01499               if (TAO_debug_level > 0)
01500                 {
01501                   TAOLIB_ERROR ((LM_ERROR,
01502                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01503                      ACE_TEXT ("fatal error in ")
01504                      ACE_TEXT ("send_message_block_chain_i - %m\n"),
01505                      this->id ()));
01506                 }
01507               return -1;
01508             }
01509         }
01510 
01511       // ... let's figure out if the complete message was sent ...
01512       if (total_length == byte_count)
01513         {
01514           // Done, just return.  Notice that there are no allocations
01515           // or copies up to this point (though some fancy calling
01516           // back and forth).
01517           // This is the common case for the critical path, it should
01518           // be fast.
01519           return 0;
01520         }
01521 
01522       if (byte_count > 0)
01523       {
01524         partially_sent = true;
01525       }
01526 
01527       // If it was partially sent, then push to front of queue and don't flush
01528       if (n == -1 && errno == ETIME)
01529       {
01530         timeout_encountered = true;
01531         if (byte_count == 0)
01532         {
01533           //This request has timed out and none of it was sent to the transport
01534           //We can't return -1 here, since that would end up closing the tranpsort
01535           if (TAO_debug_level > 2)
01536             {
01537               TAOLIB_DEBUG ((LM_DEBUG,
01538                           ACE_TEXT ("TAO (%P|%t) - ")
01539                           ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ")
01540                           ACE_TEXT ("timeout encountered before any bytes sent\n"),
01541                           this->id ()));
01542             }
01543           throw ::CORBA::TIMEOUT (
01544             CORBA::SystemException::_tao_minor_code (
01545               TAO_TIMEOUT_SEND_MINOR_CODE,
01546               ETIME),
01547             CORBA::COMPLETED_NO);
01548         }
01549       }
01550 
01551       if (TAO_debug_level > 6)
01552         {
01553           TAOLIB_DEBUG ((LM_DEBUG,
01554              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01555              ACE_TEXT ("partial send %d / %d bytes\n"),
01556              this->id (), byte_count, total_length));
01557         }
01558 
01559       // ... part of the data was sent, need to figure out what piece
01560       // of the message block chain must be queued ...
01561       while (message_block != 0 && message_block->length () == 0)
01562         {
01563           message_block = message_block->cont ();
01564         }
01565 
01566       // ... at least some portion of the message block chain should
01567       // remain ...
01568     }
01569 
01570   // ... either the message must be queued or we need to queue it
01571   // because it was not completely sent out ...
01572 
01573   ACE_Time_Value *wait_time = (partially_sent ? 0: max_wait_time);
01574   if (this->queue_message_i (message_block, wait_time, !partially_sent)
01575       == -1)
01576     {
01577       if (TAO_debug_level > 0)
01578         {
01579           TAOLIB_DEBUG ((LM_DEBUG,
01580                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01581                       ACE_TEXT ("send_asynchronous_message_i, ")
01582                       ACE_TEXT ("cannot queue message for  - %m\n"),
01583                       this->id ()));
01584         }
01585       return -1;
01586     }
01587 
01588   if (TAO_debug_level > 6)
01589     {
01590       TAOLIB_DEBUG ((LM_DEBUG,
01591          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01592          ACE_TEXT ("message is queued\n"),
01593          this->id ()));
01594     }
01595 
01596   if (timeout_encountered && partially_sent)
01597     {
01598       //Must close down the transport here since we can't guarantee the
01599       //integrity of the GIOP stream (the next send may try to write to
01600       //the socket before looking at the queue).
01601       if (TAO_debug_level > 0)
01602         {
01603           TAOLIB_DEBUG ((LM_DEBUG,
01604                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01605                       ACE_TEXT ("send_asynchronous_message_i, ")
01606                       ACE_TEXT ("timeout after partial send, closing.\n"),
01607                       this->id ()));
01608         }
01609       return -1;
01610     }
01611   else if (!timeout_encountered)
01612     {
01613       // We can't flush if we have already encountered a timeout
01614       // ... if the queue is full we need to activate the output on the
01615       // queue ...
01616       bool must_flush = false;
01617       const bool constraints_reached =
01618         this->check_buffering_constraints_i (stub,
01619                                              must_flush);
01620 
01621       // ... but we also want to activate it if the message was partially
01622       // sent.... Plus, when we use the blocking flushing strategy the
01623       // queue is flushed as a side-effect of 'schedule_output()'
01624 
01625       TAO_Flushing_Strategy *flushing_strategy =
01626         this->orb_core ()->flushing_strategy ();
01627 
01628       if (constraints_reached || try_sending_first)
01629         {
01630           int const result = flushing_strategy->schedule_output (this);
01631           if (result == TAO_Flushing_Strategy::MUST_FLUSH)
01632             {
01633               must_flush = true;
01634             }
01635         }
01636 
01637       if (must_flush)
01638         {
01639           if (TAO_debug_level > 0)
01640             {
01641               TAOLIB_DEBUG ((LM_DEBUG,
01642                           ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01643                           ACE_TEXT ("send_asynchronous_message_i, ")
01644                           ACE_TEXT ("flushing transport.\n"),
01645                           this->id ()));
01646             }
01647 
01648           size_t sent_byte = sent_byte_count_;
01649           int ret = 0;
01650           {
01651             typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
01652             TAO_REVERSE_LOCK reverse (*this->handler_lock_);
01653             ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
01654             ret = flushing_strategy->flush_transport (this, max_wait_time);
01655           }
01656 
01657           if (ret == -1)
01658             {
01659               if (errno == ETIME)
01660                 {
01661                   if (sent_byte == sent_byte_count_) // if nothing was actually flushed
01662                     {
01663                       //This request has timed out and none of it was sent to the transport
01664                       //We can't return -1 here, since that would end up closing the tranpsort
01665                       if (TAO_debug_level > 2)
01666                         {
01667                           TAOLIB_DEBUG ((LM_DEBUG,
01668                                       ACE_TEXT ("TAO (%P|%t) - ")
01669                                       ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ")
01670                                       ACE_TEXT ("2 timeout encountered before any bytes sent\n"),
01671                                       this->id ()));
01672                         }
01673                       throw ::CORBA::TIMEOUT (CORBA::SystemException::_tao_minor_code
01674                                               (TAO_TIMEOUT_SEND_MINOR_CODE, ETIME),
01675                                               CORBA::COMPLETED_NO);
01676                     }
01677                 }
01678               return -1;
01679             }
01680         }
01681     }
01682   return 0;
01683 }

void TAO_Transport::send_connection_closed_notifications ( void   ) 

Notify all the components inside a Transport when the underlying connection is closed.

Definition at line 1379 of file Transport.cpp.

01380 {
01381   {
01382     ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
01383 
01384     this->send_connection_closed_notifications_i ();
01385   }
01386 
01387   this->tms ()->connection_closed ();
01388 }

void TAO_Transport::send_connection_closed_notifications_i ( void   )  [private]

Assume the lock is held.

Definition at line 1391 of file Transport.cpp.

01392 {
01393   this->cleanup_queue_i ();
01394 }

virtual int TAO_Transport::send_message ( TAO_OutputCDR stream,
TAO_Stub stub = 0,
TAO_ServerRequest request = 0,
TAO_Message_Semantics  message_semantics = TAO_Message_Semantics(),
ACE_Time_Value max_time_wait = 0 
) [pure virtual]

This method formats the stream and then sends the message on the transport. Once the ORB is prepared to receive a reply (see send_request() above), and all the arguments have been marshaled the CDR stream must be 'formatted', i.e. the message_size field in the GIOP header can finally be set to the proper value.

int TAO_Transport::send_message_block_chain ( const ACE_Message_Block message_block,
size_t bytes_transferred,
ACE_Time_Value max_wait_time = 0 
)

This is a very specialized interface to send a simple chain of messages through the Transport. The only place we use this interface is in GIOP_Message_Base.cpp, to send error messages (i.e., an indication that we received a malformed GIOP message,) and to close the connection.

Definition at line 651 of file Transport.cpp.

00654 {
00655   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00656 
00657   TAO::Transport::Drain_Constraints dc(
00658       max_wait_time, true);
00659 
00660   return this->send_message_block_chain_i (mb,
00661                                            bytes_transferred,
00662                                            dc);
00663 }

int TAO_Transport::send_message_block_chain_i ( const ACE_Message_Block message_block,
size_t bytes_transferred,
TAO::Transport::Drain_Constraints const &  dc 
)

Send a message block chain, assuming the lock is held.

Definition at line 666 of file Transport.cpp.

00669 {
00670   size_t const total_length = mb->total_length ();
00671 
00672   // We are going to block, so there is no need to clone
00673   // the message block.
00674   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00675 
00676   synch_message.push_back (this->head_, this->tail_);
00677 
00678   Drain_Result const n = this->drain_queue_i (dc);
00679 
00680   if (n == DR_ERROR)
00681     {
00682       synch_message.remove_from_list (this->head_, this->tail_);
00683       return -1; // Error while sending...
00684     }
00685   else if (n == DR_QUEUE_EMPTY)
00686     {
00687       bytes_transferred = total_length;
00688       return 1;  // Empty queue, message was sent..
00689     }
00690 
00691   // Remove the temporary message from the queue...
00692   synch_message.remove_from_list (this->head_, this->tail_);
00693 
00694   bytes_transferred = total_length - synch_message.message_length ();
00695 
00696   return 0;
00697 }

int TAO_Transport::send_message_shared ( TAO_Stub stub,
TAO_Message_Semantics  message_semantics,
const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time 
) [virtual]

Sent the contents of message_block.

Parameters:
stub The object reference used for this operation, useful to obtain the current policies.
message_semantics If this is set to TAO_TWO_REQUEST this method will block until the operation is completely written on the wire. If it is set to other values this operation could return.
message_block The CDR encapsulation of the GIOP message that must be sent. The message may consist of multiple Message Blocks chained through the cont() field.
max_wait_time The maximum time that the operation can block, used in the implementation of timeouts.

Definition at line 297 of file Transport.cpp.

00301 {
00302   int result = 0;
00303 
00304   {
00305     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00306 
00307     result =
00308       this->send_message_shared_i (stub, message_semantics,
00309                                    message_block, max_wait_time);
00310   }
00311 
00312   if (result == -1)
00313     {
00314       // The connection needs to be closed here.
00315       // In the case of a partially written message this is the only way to cleanup
00316       //  the physical connection as well as the Transport. An EOF on the remote end
00317       //  will cancel the partially received message.
00318       this->close_connection ();
00319     }
00320 
00321   return result;
00322 }

int TAO_Transport::send_message_shared_i ( TAO_Stub stub,
TAO_Message_Semantics  message_semantics,
const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time 
) [protected]

Implement send_message_shared() assuming the handler_lock_ is held.

Definition at line 1397 of file Transport.cpp.

01401 {
01402   int ret = 0;
01403 
01404 #if TAO_HAS_TRANSPORT_CURRENT == 1
01405   size_t const message_length = message_block->length ();
01406 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01407 
01408   switch (message_semantics.type_)
01409     {
01410       case TAO_Message_Semantics::TAO_TWOWAY_REQUEST:
01411         ret = this->send_synchronous_message_i (message_block, max_wait_time);
01412         break;
01413 
01414       case TAO_Message_Semantics::TAO_REPLY:
01415         ret = this->send_reply_message_i (message_block, max_wait_time);
01416         break;
01417 
01418       case TAO_Message_Semantics::TAO_ONEWAY_REQUEST:
01419         ret = this->send_asynchronous_message_i (stub,
01420                                                  message_block,
01421                                                  max_wait_time);
01422         break;
01423     }
01424 
01425 #if TAO_HAS_TRANSPORT_CURRENT == 1
01426   // "Count" the message, only if no error was encountered.
01427   if (ret != -1 && this->stats_ != 0)
01428     this->stats_->messages_sent (message_length);
01429 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01430 
01431   return ret;
01432 }

int TAO_Transport::send_reply_message_i ( const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time 
) [private]

Send a reply message, i.e. do not block until the message is on the wire, but just return after adding them to the queue.

Definition at line 795 of file Transport.cpp.

00797 {
00798   // Don't clone now.. We could be sent in one shot!
00799   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00800 
00801   synch_message.push_back (this->head_, this->tail_);
00802 
00803   int const n =
00804     this->send_synch_message_helper_i (synch_message, max_wait_time);
00805 
00806   // What about partially sent messages.
00807   if (n == -1 || n == 1)
00808     {
00809       return n;
00810     }
00811 
00812   if (TAO_debug_level > 3)
00813     {
00814       TAOLIB_DEBUG ((LM_DEBUG,
00815          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
00816          ACE_TEXT ("preparing to add to queue before leaving\n"),
00817          this->id ()));
00818     }
00819 
00820   // Till this point we shouldn't have any copying and that is the
00821   // point anyway. Now, remove the node from the list
00822   synch_message.remove_from_list (this->head_, this->tail_);
00823 
00824   // Clone the node that we have.
00825   TAO_Queued_Message *msg =
00826     synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
00827 
00828   // Stick it in the queue
00829   msg->push_back (this->head_, this->tail_);
00830 
00831   TAO_Flushing_Strategy *flushing_strategy =
00832     this->orb_core ()->flushing_strategy ();
00833 
00834   int const result = flushing_strategy->schedule_output (this);
00835 
00836   if (result == -1)
00837     {
00838       if (TAO_debug_level > 5)
00839         {
00840           TAOLIB_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_"
00841                       "message_i, dequeuing msg due to schedule_output "
00842                       "failure\n", this->id ()));
00843         }
00844       msg->remove_from_list (this->head_, this->tail_);
00845       msg->destroy ();
00846     }
00847   else if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00848     {
00849       typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00850       TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00851       ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00852       (void) flushing_strategy->flush_transport (this, 0);
00853     }
00854 
00855   return 1;
00856 }

virtual int TAO_Transport::send_request ( TAO_Stub stub,
TAO_ORB_Core orb_core,
TAO_OutputCDR stream,
TAO_Message_Semantics  message_semantics,
ACE_Time_Value max_time_wait 
) [pure virtual]

Prepare the waiting and demuxing strategy to receive a reply for a new request. Preparing the ORB to receive the reply only once the request is completely sent opens the system to some subtle race conditions: suppose the ORB is running in a multi-threaded configuration, thread A makes a request while thread B is using the Reactor to process all incoming requests. Thread A could be implemented as follows: 1) send the request 2) setup the ORB to receive the reply 3) wait for the request

but in this case thread B may receive the reply between step (1) and (2), and drop it as an invalid or unexpected message. Consequently the correct implementation is: 1) setup the ORB to receive the reply 2) send the request 3) wait for the reply

The following method encapsulates this idiom.

Todo:
This is generic code, it should be factored out into the Transport class.
int TAO_Transport::send_synch_message_helper_i ( TAO_Synch_Queued_Message s,
ACE_Time_Value max_wait_time 
) [private]

A helper method used by send_synchronous_message_i() and send_reply_message_i(). Reusable code that could be used by both the methods.

Definition at line 859 of file Transport.cpp.

00861 {
00862   TAO::Transport::Drain_Constraints dc(
00863       max_wait_time, this->using_blocking_io_for_synch_messages());
00864 
00865   Drain_Result const n = this->drain_queue_i (dc);
00866 
00867   if (n == DR_ERROR)
00868     {
00869       synch_message.remove_from_list (this->head_, this->tail_);
00870       return -1; // Error while sending...
00871     }
00872   else if (n == DR_QUEUE_EMPTY)
00873     {
00874       return 1;  // Empty queue, message was sent..
00875     }
00876 
00877   if (synch_message.all_data_sent ())
00878     {
00879       return 1;
00880     }
00881 
00882   return 0;
00883 }

int TAO_Transport::send_synchronous_message_i ( const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time 
) [private]

Send a synchronous message, i.e. block until the message is on the wire

Definition at line 700 of file Transport.cpp.

00702 {
00703   // We are going to block, so there is no need to clone
00704   // the message block.
00705   size_t const total_length = mb->total_length ();
00706   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00707 
00708   synch_message.push_back (this->head_, this->tail_);
00709 
00710   int const result = this->send_synch_message_helper_i (synch_message,
00711                                                         max_wait_time);
00712   if (result == -1 && errno == ETIME)
00713     {
00714       if (total_length == synch_message.message_length ()) //none was sent
00715         {
00716           if (TAO_debug_level > 2)
00717             {
00718               TAOLIB_DEBUG ((LM_DEBUG,
00719                           ACE_TEXT ("TAO (%P|%t) - ")
00720                           ACE_TEXT ("Transport[%d]::send_synchronous_message_i, ")
00721                           ACE_TEXT ("timeout encountered before any bytes sent\n"),
00722                           this->id ()));
00723             }
00724           throw ::CORBA::TIMEOUT (
00725             CORBA::SystemException::_tao_minor_code (
00726               TAO_TIMEOUT_SEND_MINOR_CODE,
00727               ETIME),
00728             CORBA::COMPLETED_NO);
00729         }
00730       else
00731         {
00732           return -1;
00733         }
00734     }
00735   else if(result == -1 || result == 1)
00736     {
00737       return result;
00738     }
00739 
00740   TAO_Flushing_Strategy *flushing_strategy =
00741     this->orb_core ()->flushing_strategy ();
00742   if (flushing_strategy->schedule_output (this) == -1)
00743     {
00744       synch_message.remove_from_list (this->head_, this->tail_);
00745       if (TAO_debug_level > 0)
00746         {
00747           TAOLIB_ERROR ((LM_ERROR,
00748                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
00749                       ACE_TEXT ("send_synchronous_message_i, ")
00750                       ACE_TEXT ("error while scheduling flush - %m\n"),
00751                       this->id ()));
00752         }
00753       return -1;
00754     }
00755 
00756   // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH,
00757   // because we're always going to flush anyway.
00758 
00759   // Release the mutex, other threads may modify the queue as we
00760   // block for a long time writing out data.
00761   int flush_result;
00762   {
00763     typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00764     TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00765     ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00766 
00767     flush_result = flushing_strategy->flush_message (this,
00768                                                      &synch_message,
00769                                                      max_wait_time);
00770   }
00771 
00772   if (flush_result == -1)
00773     {
00774       synch_message.remove_from_list (this->head_, this->tail_);
00775 
00776       // We don't need to do anything special for the timeout case.
00777       // The connection is going to get closed and the Transport destroyed.
00778       // The only thing to do maybe is to empty the queue.
00779 
00780       if (TAO_debug_level > 0)
00781         {
00782           TAOLIB_ERROR ((LM_ERROR,
00783              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
00784              ACE_TEXT ("error while sending message - %m\n"),
00785              this->id ()));
00786         }
00787 
00788       return -1;
00789     }
00790 
00791   return 1;
00792 }

size_t TAO_Transport::sent_byte_count ( void   )  const

Accessor to sent_byte_count_.

Definition at line 200 of file Transport.inl.

00201 {
00202   return this->sent_byte_count_;
00203 }

void TAO_Transport::set_bidir_context_info ( TAO_Operation_Details opdetails  )  [virtual]

These classes need privileged access to:

Definition at line 2889 of file Transport.cpp.

02890 {
02891 }

TAO::Transport::Stats * TAO_Transport::stats ( void   )  const

Transport statistics.

Definition at line 208 of file Transport.inl.

00209 {
00210   return this->stats_;
00211 }

CORBA::ULong TAO_Transport::tag ( void   )  const

Return the protocol tag.

The OMG assigns unique tags (a 32-bit unsigned number) to each protocol. New protocol tags can be obtained free of charge from the OMG, check the documents in corbafwd.h for more details.

Definition at line 11 of file Transport.inl.

00012 {
00013   return this->tag_;
00014 }

int TAO_Transport::tear_listen_point_list ( TAO_InputCDR cdr  )  [virtual]

Extracts the list of listen points from the cdr stream. The list would have the protocol specific details of the ListenPoints

Definition at line 291 of file Transport.cpp.

00292 {
00293   ACE_NOTSUP_RETURN (-1);
00294 }

TAO_Transport_Mux_Strategy * TAO_Transport::tms ( void   )  const

Get the TAO_Tranport_Mux_Strategy used by this object.

The role of the TAO_Transport_Mux_Strategy is described in more detail in that class' documentation. Enough is to say that the class is used to control how many threads can have pending requests over the same connection. Multiplexing multiple threads over the same connection conserves resources and is almost required for AMI, but having only one pending request per connection is more efficient and reduces the possibilities of priority inversions.

Definition at line 23 of file Transport.inl.

00024 {
00025   return tms_;
00026 }

TAO::Transport_Cache_Manager & TAO_Transport::transport_cache_manager ( void   )  [private]

Helper method that returns the Transport Cache Manager.

Definition at line 2706 of file Transport.cpp.

02707 {
02708   return this->orb_core_->lane_resources ().transport_cache ();
02709 }

int TAO_Transport::update_transport ( void   ) 

Cache management.

Definition at line 590 of file Transport.cpp.

00591 {
00592   return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
00593 }

bool TAO_Transport::using_blocking_io_for_asynch_messages ( void   )  const [private]

Return true if blocking I/O should be used for sending asynchronous (AMI calls, non-blocking oneways, responses to operations, etc.) messages. This is determined based on the current flushing strategy.

Definition at line 2919 of file Transport.cpp.

02920 {
02921   return false;
02922 }

bool TAO_Transport::using_blocking_io_for_synch_messages ( void   )  const [private]

Return true if blocking I/O should be used for sending synchronous (two-way, reliable oneways, etc.) messages. This is determined based on the current flushing and waiting strategies.

Definition at line 2909 of file Transport.cpp.

02910 {
02911   if (this->wait_strategy()->can_process_upcalls())
02912   {
02913     return false;
02914   }
02915   return true;
02916 }

TAO_Wait_Strategy * TAO_Transport::wait_strategy ( void   )  const

Return the TAO_Wait_Strategy used by this object.

The role of the TAO_Wait_Strategy is described in more detail in that class' documentation. Enough is to say that the ORB can wait for a reply blocking on read(), using the Reactor to wait for multiple events concurrently or using the Leader/Followers protocol.

Definition at line 30 of file Transport.inl.

00031 {
00032   return this->ws_;
00033 }

void TAO_Transport::wchar_translator ( TAO_Codeset_Translator_Base tf  ) 

CodeSet negotiation - Set the wchar codeset translator factory.

Definition at line 155 of file Transport.inl.

00156 {
00157   this->wchar_translator_ = tf;
00158   this->tcs_set_ = 1;
00159 }

TAO_Codeset_Translator_Base * TAO_Transport::wchar_translator ( void   )  const

CodeSet Negotiation - Get the wchar codeset translator factory.

Definition at line 142 of file Transport.inl.

00143 {
00144   return this->wchar_translator_;
00145 }


Friends And Related Function Documentation

These classes need privileged access to:

Definition at line 951 of file Transport.h.

friend class TAO_Reactive_Flushing_Strategy [friend]

These classes need privileged access to:

Definition at line 950 of file Transport.h.

friend class TAO_Thread_Per_Connection_Handler [friend]

Needs priveleged access to event_handler_i ()

Definition at line 955 of file Transport.h.


Member Data Documentation

Use to check if bidirectional info has been synchronized with the peer. Have we sent any info on bidirectional information or have we received any info regarding making the connection served by this transport bidirectional. The flag is used as follows: + We dont want to send the bidirectional context info more than once on the connection. Why? Waste of marshalling and demarshalling time on the client. + On the server side -- once a client that has established the connection asks the server to use the connection both ways, we *dont* want the server to pack service info to the client. That is not allowed. We need a flag to prevent such a things from happening.

The value of this flag will be 0 if the client sends info and 1 if the server receives the info.

Definition at line 1127 of file Transport.h.

Our entry in the cache. We don't own this. It is here for our convenience. We cannot just change things around.

Definition at line 1099 of file Transport.h.

Additional member values required to support codeset translation.

@Phil, I think it would be nice if we could think of a way to do the following. We have been trying to use the transport for marking about translator factories and such! IMHO this is a wrong encapulation ie. trying to populate the transport object with these details. We should probably have a class something like TAO_Message_Property or TAO_Message_Translator or whatever (I am sure you get the idea) and encapsulate all these details. Coupling these seems odd. if I have to be more cynical we can move this to the connection_handler and it may more sense with the DSCP stuff around there. Do you agree?

Definition at line 1211 of file Transport.h.

Track if connection was seen as closed during a read so that invocation can optionally be retried using a different profile. Note that this could result in violate the "at most once" CORBA semantics.

Definition at line 1191 of file Transport.h.

The queue will start draining no later than <queeing_deadline_> if* the deadline is

Definition at line 1144 of file Transport.h.

First_request_ is true until the first request is sent or received. This is necessary since codeset context information is necessary only on the first request. After that, the translators are fixed for the life of the connection.

Definition at line 1223 of file Transport.h.

Indicate that flushing needs to be done in post_open().

Definition at line 1243 of file Transport.h.

The timer ID.

Definition at line 1147 of file Transport.h.

ACE_Lock* TAO_Transport::handler_lock_ [mutable, protected]

Lock that insures that activities that *might* use handler-related resources (such as a connection handler) get serialized. This is an ACE_Lock that gets initialized from TAO_ORB_Core::resource_factory()->create_cached_connection_lock(). This way, one can use a lock appropriate for the type of system, i.e., a null lock for single-threaded systems, and a real lock for multi-threaded systems.

Definition at line 1161 of file Transport.h.

Implement the outgoing data queue.

Definition at line 1132 of file Transport.h.

A unique identifier for the transport.

This never *never* changes over the lifespan, so we don't have to worry about locking it.

HINT: Protocol-specific transports that use connection handler might choose to set this to the handle for their connection.

Definition at line 1171 of file Transport.h.

Queue of the consolidated, incoming messages..

Definition at line 1136 of file Transport.h.

TAO::Incoming_Message_Stack TAO_Transport::incoming_message_stack_ [protected]

Stack of incoming fragments, consolidated messages are going to be enqueued in "incoming_message_queue_"

Definition at line 1140 of file Transport.h.

bool TAO_Transport::is_connected_ [protected]

Is this transport really connected or not. In case of oneways with SYNC_NONE Policy we don't wait until the connection is ready and we buffer the requests in this transport until the connection is ready

Definition at line 1185 of file Transport.h.

Our messaging object.

Definition at line 1196 of file Transport.h.

These classes need privileged access to:

Definition at line 1129 of file Transport.h.

Global orbcore resource.

Definition at line 1095 of file Transport.h.

TAO_SYNCH_MUTEX TAO_Transport::output_cdr_mutex_ [mutable, private]

lock for synchronizing Transport OutputCDR access

Definition at line 1246 of file Transport.h.

Holds the partial GIOP message (if there is one).

Definition at line 1226 of file Transport.h.

unsigned long TAO_Transport::purging_order_ [protected]

Used by the LRU, LFU and FIFO Connection Purging Strategies.

Definition at line 1174 of file Transport.h.

Size of the buffer received.

Definition at line 1177 of file Transport.h.

Number of bytes sent.

Definition at line 1180 of file Transport.h.

TAO::Transport::Stats* TAO_Transport::stats_ [private]

Statistics.

Definition at line 1239 of file Transport.h.

CORBA::ULong const TAO_Transport::tag_ [protected]

IOP protocol tag.

Definition at line 1092 of file Transport.h.

These classes need privileged access to:

Definition at line 1133 of file Transport.h.

The tcs_set_ flag indicates that negotiation has occurred and so the translators are correct, since a null translator is valid if both ends are using the same codeset, whatever that codeset might be.

Definition at line 1217 of file Transport.h.

Strategy to decide whether multiple requests can be sent over the same connection or the connection is exclusive for a request.

Definition at line 1103 of file Transport.h.

The adapter used to receive timeout callbacks from the Reactor.

Definition at line 1150 of file Transport.h.

These classes need privileged access to:

Definition at line 1212 of file Transport.h.

Strategy for waiting for the reply after sending the request.

Definition at line 1106 of file Transport.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for TAO by  doxygen 1.6.1