00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_PUBLISHER_IMPL_H
00009 #define OPENDDS_DCPS_PUBLISHER_IMPL_H
00010
00011 #include "dds/DdsDcpsInfoUtilsC.h"
00012 #include "EntityImpl.h"
00013 #include "DataWriterImpl.h"
00014 #include "ace/Synch.h"
00015 #include "ace/Reverse_Lock_T.h"
00016
00017 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00018 #pragma once
00019 #endif
00020
00021 namespace OpenDDS {
00022 namespace DCPS {
00023
00024 class DomainParticipantImpl;
00025 class Monitor;
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 class OpenDDS_Dcps_Export PublisherImpl
00038 : public virtual LocalObject<DDS::Publisher>
00039 , public virtual EntityImpl {
00040 public:
00041
00042 friend class DataWriterImpl;
00043
00044 PublisherImpl(DDS::InstanceHandle_t handle,
00045 RepoId id,
00046 const DDS::PublisherQos& qos,
00047 DDS::PublisherListener_ptr a_listener,
00048 const DDS::StatusMask& mask,
00049 DomainParticipantImpl* participant);
00050
00051 virtual ~PublisherImpl();
00052
00053 virtual DDS::InstanceHandle_t get_instance_handle();
00054
00055 bool contains_writer(DDS::InstanceHandle_t a_handle);
00056
00057 virtual DDS::DataWriter_ptr create_datawriter(
00058 DDS::Topic_ptr a_topic,
00059 const DDS::DataWriterQos& qos,
00060 DDS::DataWriterListener_ptr a_listener,
00061 DDS::StatusMask mask);
00062
00063 virtual DDS::ReturnCode_t delete_datawriter(
00064 DDS::DataWriter_ptr a_datawriter);
00065
00066 virtual DDS::DataWriter_ptr lookup_datawriter(
00067 const char* topic_name);
00068
00069 virtual DDS::ReturnCode_t delete_contained_entities();
00070
00071 virtual DDS::ReturnCode_t set_qos(
00072 const DDS::PublisherQos& qos);
00073
00074 virtual DDS::ReturnCode_t get_qos(
00075 DDS::PublisherQos& qos);
00076
00077 virtual DDS::ReturnCode_t set_listener(
00078 DDS::PublisherListener_ptr a_listener,
00079 DDS::StatusMask mask);
00080
00081 virtual DDS::PublisherListener_ptr get_listener();
00082
00083 virtual DDS::ReturnCode_t suspend_publications();
00084
00085 virtual DDS::ReturnCode_t resume_publications();
00086
00087 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00088
00089 virtual DDS::ReturnCode_t begin_coherent_changes();
00090
00091 virtual DDS::ReturnCode_t end_coherent_changes();
00092
00093 #endif
00094
00095 virtual DDS::ReturnCode_t wait_for_acknowledgments(
00096 const DDS::Duration_t& max_wait);
00097
00098 virtual DDS::DomainParticipant_ptr get_participant();
00099
00100 virtual DDS::ReturnCode_t set_default_datawriter_qos(
00101 const DDS::DataWriterQos& qos);
00102
00103 virtual DDS::ReturnCode_t get_default_datawriter_qos(
00104 DDS::DataWriterQos& qos);
00105
00106 virtual DDS::ReturnCode_t copy_from_topic_qos(
00107 DDS::DataWriterQos& a_datawriter_qos,
00108 const DDS::TopicQos& a_topic_qos);
00109
00110 virtual DDS::ReturnCode_t enable();
00111
00112 ACE_Recursive_Thread_Mutex& get_pi_lock() { return pi_lock_; }
00113
00114
00115
00116
00117
00118 bool is_clean() const;
00119
00120
00121
00122
00123 DDS::ReturnCode_t writer_enabled(const char* topic_name,
00124 DataWriterImpl* impl);
00125
00126
00127
00128
00129
00130
00131
00132
00133 DDS::PublisherListener_ptr listener_for(::DDS::StatusKind kind);
00134
00135 DDS::ReturnCode_t assert_liveliness_by_participant();
00136 ACE_Time_Value liveliness_check_interval(DDS::LivelinessQosPolicyKind kind);
00137 bool participant_liveliness_activity_after(const ACE_Time_Value& tv);
00138
00139 typedef OPENDDS_VECTOR(PublicationId) PublicationIdVec;
00140
00141
00142 void get_publication_ids(PublicationIdVec& pubs);
00143
00144 bool is_suspended() const;
00145
00146 virtual EntityImpl* parent() const;
00147 static bool validate_datawriter_qos(const DDS::DataWriterQos& qos,
00148 const DDS::DataWriterQos& default_qos,
00149 DDS::Topic_ptr a_topic,
00150 DDS::DataWriterQos& dw_qos);
00151 private:
00152 typedef OPENDDS_MULTIMAP(OPENDDS_STRING, DataWriterImpl*) DataWriterMap;
00153
00154 typedef OPENDDS_MAP_CMP(PublicationId, DataWriterImpl*, GUID_tKeyLessThan)
00155 PublicationMap;
00156
00157
00158 typedef OPENDDS_MAP_CMP(RepoId, DDS::DataWriterQos, GUID_tKeyLessThan) DwIdToQosMap;
00159
00160 DDS::InstanceHandle_t handle_;
00161
00162
00163 DDS::PublisherQos qos_;
00164
00165 DDS::DataWriterQos default_datawriter_qos_;
00166
00167
00168
00169 DDS::StatusMask listener_mask_;
00170
00171 DDS::PublisherListener_var listener_;
00172
00173 DataWriterMap datawriter_map_;
00174
00175
00176 PublicationMap publication_map_;
00177 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00178
00179 std::size_t change_depth_;
00180 #endif
00181
00182 DDS::DomainId_t domain_id_;
00183
00184 DomainParticipantImpl* participant_;
00185
00186 CORBA::Short suspend_depth_count_;
00187
00188
00189 SequenceNumber sequence_number_;
00190
00191 ACE_Time_Value aggregation_period_start_;
00192
00193 typedef ACE_Recursive_Thread_Mutex lock_type;
00194 typedef ACE_Reverse_Lock<lock_type> reverse_lock_type;
00195
00196 mutable lock_type pi_lock_;
00197 reverse_lock_type reverse_pi_lock_;
00198
00199
00200 Monitor* monitor_;
00201
00202
00203
00204 RepoId publisher_id_;
00205 };
00206
00207 }
00208 }
00209
00210 #endif