OpenDDS  Snapshot(2023/04/28-20:55)
TopicImpl.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #include "TopicImpl.h"
11 
12 #include "Qos_Helper.h"
14 #include "Definitions.h"
15 #include "Service_Participant.h"
16 #include "DomainParticipantImpl.h"
17 #include "MonitorFactory.h"
18 #include "DCPS_Utils.h"
20 
22 
23 namespace OpenDDS {
24 namespace DCPS {
25 
26 TopicImpl::TopicImpl(const char* topic_name,
27  const char* type_name,
28  OpenDDS::DCPS::TypeSupport_ptr type_support,
29  const DDS::TopicQos & qos,
30  DDS::TopicListener_ptr a_listener,
31  const DDS::StatusMask & mask,
32  DomainParticipantImpl* participant)
33  : TopicDescriptionImpl(topic_name,
34  type_name,
35  type_support,
36  participant),
37  qos_(qos),
38  listener_mask_(mask),
39  listener_(DDS::TopicListener::_duplicate(a_listener)),
40  id_(GUID_UNKNOWN)
41 {
44  monitor_.reset(TheServiceParticipant->monitor_factory_->create_topic_monitor(this));
45 }
46 
48 {
49 }
50 
52 {
53  DDS::TopicQos qos = qos_arg;
54 
59 
60  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
61  if (qos_ == qos)
62  return DDS::RETCODE_OK;
63 
64  // for the not changeable qos, it can be changed before enable
65  if (!Qos_Helper::changeable(qos_, qos) && enabled_) {
67 
68  } else {
69  qos_ = qos;
70 
71  Discovery_rch disco =
73  const bool status =
74  disco->update_topic_qos(this->id_, participant_->get_domain_id(),
76 
77  if (!status) {
79  ACE_TEXT("(%P|%t) TopicImpl::set_qos, ")
80  ACE_TEXT("failed on compatibility check.\n")),
82  }
83  }
84 
85  return DDS::RETCODE_OK;
86 
87  } else {
89  }
90 }
91 
94 {
95  qos = qos_;
96  return DDS::RETCODE_OK;
97 }
98 
100 TopicImpl::set_listener(DDS::TopicListener_ptr a_listener, DDS::StatusMask mask)
101 {
103  listener_mask_ = mask;
104  //note: OK to duplicate a nil object ref
105  listener_ = DDS::TopicListener::_duplicate(a_listener);
106  return DDS::RETCODE_OK;
107 }
108 
109 DDS::TopicListener_ptr
111 {
113  return DDS::TopicListener::_duplicate(listener_.in());
114 }
115 
118 {
121  a_status = inconsistent_topic_status_;
123  return DDS::RETCODE_OK;
124 }
125 
128 {
129  //According spec:
130  // - Calling enable on an already enabled Entity returns OK and has no
131  // effect.
132  // - Calling enable on an Entity whose factory is not enabled will fail
133  // and return PRECONDITION_NOT_MET.
134 
135  if (this->is_enabled()) {
136  return DDS::RETCODE_OK;
137  }
138 
139  if (!this->participant_->is_enabled()) {
141  }
142 
143  if (id_ == GUID_UNKNOWN) {
144  const DDS::DomainId_t dom_id = participant_->get_domain_id();
145  Discovery_rch disco = TheServiceParticipant->get_discovery(dom_id);
146  TopicStatus status = disco->assert_topic(id_,
147  dom_id,
148  participant_->get_id(),
149  topic_name_.c_str(),
150  type_name_.c_str(),
151  qos_,
152  type_support_ ? type_support_->has_dcps_key() : false,
153  this);
154  if (status != CREATED && status != FOUND) {
155  if (DCPS_debug_level >= 1) {
157  ACE_TEXT("(%P|%t) ERROR: TopicImpl::enable, ")
158  ACE_TEXT("assert_topic failed with return value <%C>.\n"),
159  topicstatus_to_string(status)));
160  }
161  return DDS::RETCODE_ERROR;
162  }
163  }
164 
165  if (this->monitor_) {
166  monitor_->report();
167  }
168  return this->set_enabled();
169 }
170 
171 GUID_t
173 {
174  return id_;
175 }
176 
179 {
181 }
182 
183 const char*
185 {
186  return this->type_name_.c_str();
187 }
188 
189 const char*
191 {
192  return this->topic_name_.c_str();
193 }
194 
195 
196 void
198 {
199  throw Transport::MiscProblem();
200 }
201 
202 void
204 {
208 
210 
211  DDS::TopicListener_var listener;
212  {
214  listener = listener_;
215  if (!listener || !(listener_mask_ & DDS::INCONSISTENT_TOPIC_STATUS)) {
216  g.release();
217  listener = participant_->listener_for(DDS::INCONSISTENT_TOPIC_STATUS);
218  }
219  }
220  if (listener) {
223  status_guard.release();
224  listener->on_inconsistent_topic(this, status);
225  } else {
226  status_guard.release();
227  }
228 
230 }
231 
233 {
234  if (!type_support_) {
235  return true;
236  }
237  DDS::DataRepresentationIdSeq type_allowed_reprs;
238  type_support_->representations_allowed_by_type(type_allowed_reprs);
239  //default for blank annotation is to allow all types of data representation
240  if (type_allowed_reprs.length() == 0) {
241  return true;
242  }
243  if (qos_ids.length() == 0) {
244  if (log_level >= LogLevel::Notice) {
245  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: TopicImpl::check_data_representation: "
246  "representation qos is blank.\n"));
247  }
248  return false;
249  }
250  //Data Writer will only use the 1st QoS declared
251  if (is_data_writer) {
252  DDS::DataRepresentationId_t id = qos_ids[0];
253  for (CORBA::ULong j = 0; j < type_allowed_reprs.length(); ++j) {
254  if (id == type_allowed_reprs[j]) {
255  return true;
256  }
257  }
258  } else { // if data reader compare both lists for a compatible QoS
259  for (CORBA::ULong i = 0; i < qos_ids.length(); ++i) {
260  for (CORBA::ULong j = 0; j < type_allowed_reprs.length(); ++j) {
261  if (qos_ids[i] == type_allowed_reprs[j]) {
262  return true;
263  }
264  }
265  }
266  }
267  if (log_level >= LogLevel::Notice) {
268  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: TopicImpl::check_data_representation: "
269  "none of the data representation QoS: %C is allowed by the "
270  "topic type IDL annotations: %C\n", repr_seq_to_string(qos_ids, is_data_writer).c_str(), repr_seq_to_string(type_allowed_reprs).c_str()));
271  }
272  return false;
273 }
274 
275 } // namespace DCPS
276 } // namespace OpenDDS
277 
DDS::InconsistentTopicStatus inconsistent_topic_status_
Definition: TopicImpl.h:112
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
DDS::StatusMask listener_mask_
Definition: TopicImpl.h:101
virtual DDS::TopicListener_ptr get_listener()
Definition: TopicImpl.cpp:110
#define ACE_ERROR(X)
const char * c_str(void) const
DDS::ReturnCode_t set_enabled()
Definition: EntityImpl.cpp:40
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
ACE_Thread_Mutex status_mutex_
Mutex to protect status info.
Definition: TopicImpl.h:109
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
DDS::TopicQos qos_
The topic qos.
Definition: TopicImpl.h:95
#define OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, error_rtn_value)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
virtual DDS::ReturnCode_t get_qos(DDS::TopicQos &qos)
Definition: TopicImpl.cpp:93
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
Definition: Qos_Helper.inl:948
ACE_CString type_name_
The datatype of the topic.
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
virtual DDS::ReturnCode_t set_listener(DDS::TopicListener_ptr a_listener, DDS::StatusMask mask)
Definition: TopicImpl.cpp:100
sequence< DataRepresentationId_t > DataRepresentationIdSeq
DDS::DomainParticipantListener_ptr listener_for(DDS::StatusKind kind)
int release(void)
T * _duplicate(T *st)
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
ACE_CString topic_name_
The name of the topic.
virtual DDS::InstanceHandle_t get_instance_handle()
Definition: TopicImpl.cpp:178
DOMAINID_TYPE_NATIVE DomainId_t
bool check_data_representation(const DDS::DataRepresentationIdSeq &qos_ids, bool is_data_writer)
Definition: TopicImpl.cpp:232
void inconsistent_topic(int count)
Definition: TopicImpl.cpp:203
virtual DDS::ReturnCode_t set_qos(const DDS::TopicQos &qos)
Definition: TopicImpl.cpp:51
ACE_CDR::ULong ULong
TopicImpl(const char *topic_name, const char *type_name, OpenDDS::DCPS::TypeSupport_ptr type_support, const DDS::TopicQos &qos, DDS::TopicListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant)
Definition: TopicImpl.cpp:26
TransportConfig_rch transport_config() const
Definition: EntityImpl.cpp:129
OpenDDS::DCPS::TypeSupport_var type_support_
The type_support for this topic.
DDS::TopicListener_var listener_
The topic listener.
Definition: TopicImpl.h:103
LM_NOTICE
const char * topicstatus_to_string(TopicStatus value)
Definition: DCPS_Utils.cpp:70
The End User API.
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.
Definition: TopicImpl.h:98
virtual DDS::ReturnCode_t get_inconsistent_topic_status(DDS::InconsistentTopicStatus &a_status)
Definition: TopicImpl.cpp:117
virtual DDS::ReturnCode_t enable()
Definition: TopicImpl.cpp:127
DDS::InstanceHandle_t get_entity_instance_handle(const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
Definition: EntityImpl.cpp:142
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_TEXT("TCP_Factory")
DCPS::String repr_seq_to_string(const DDS::DataRepresentationIdSeq &id_seq, bool is_data_writer)
Definition: DCPS_Utils.cpp:495
unsigned long StatusMask
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS_Dcps_Export LogLevel log_level
DomainParticipantImpl * participant_
The participant that creates this topic.
Implements the DDS::TopicDescription interface.
const ReturnCode_t RETCODE_ERROR
const char * type_name() const
Definition: TopicImpl.cpp:184
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
unique_ptr< Monitor > monitor_
Pointer to the monitor object for this entity.
Definition: TopicImpl.h:115
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_UNSUPPORTED
const StatusKind INCONSISTENT_TOPIC_STATUS
#define ACE_ERROR_RETURN(X, Y)
short DataRepresentationId_t
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
#define TheServiceParticipant
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
GUID_t id_
The id given by discovery.
Definition: TopicImpl.h:106
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)
const char * topic_name() const
Definition: TopicImpl.cpp:190
GUID_t get_id() const
Definition: TopicImpl.cpp:172