OpenDDS  Snapshot(2023/04/28-20:55)
MonitorFactoryImpl.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include "MonitorFactoryImpl.h"
7 
8 #include "SPMonitorImpl.h"
9 #include "DPMonitorImpl.h"
10 #include "TopicMonitorImpl.h"
11 #include "PublisherMonitorImpl.h"
12 #include "SubscriberMonitorImpl.h"
13 #include "DWMonitorImpl.h"
14 #include "DWPeriodicMonitorImpl.h"
15 #include "DRMonitorImpl.h"
16 #include "DRPeriodicMonitorImpl.h"
17 #include "TransportMonitorImpl.h"
18 
19 #include "monitorC.h"
20 #include "monitorTypeSupportImpl.h"
21 
25 #include <dds/DCPS/DCPS_Utils.h>
27 
28 #include <dds/DdsDcpsInfrastructureC.h>
29 #include <dds/DdsDcpsPublicationC.h>
30 
32 
33 namespace OpenDDS {
34 namespace DCPS {
35 
37 {
38 }
39 
41 {
42  deinitialize();
43 }
44 
47 {
48  return new SPMonitorImpl(this, sp);
49 }
50 
51 Monitor*
53 {
54  if (dp->get_domain_id() == MONITOR_DOMAIN_ID) {
55  return 0;
56  }
57  return new DPMonitorImpl(dp, this->dp_writer_);
58 }
59 
60 ServiceParticipantReportDataWriter_ptr
62 {
63  return ServiceParticipantReportDataWriter::_duplicate(this->sp_writer_);
64 }
65 
68 {
69  return new TopicMonitorImpl(topic, this->topic_writer_);
70 }
71 
74 {
75  return new PublisherMonitorImpl(pub, this->pub_writer_);
76 }
77 
80 {
81  return new SubscriberMonitorImpl(sub, this->sub_writer_);
82 }
83 
86 {
87  return new DWMonitorImpl(dw, this->dw_writer_);
88 }
89 
92 {
93  return new DWPeriodicMonitorImpl(dw, this->dw_per_writer_);
94 }
95 
98 {
99  return new DRMonitorImpl(dr, this->dr_writer_);
100 }
101 
104 {
105  return new DRPeriodicMonitorImpl(dr, this->dr_per_writer_);
106 }
107 
110 {
111  return new TransportMonitorImpl(transport, this->transport_writer_);
112 }
113 
114 DDS::DataWriter_ptr
115 MonitorFactoryImpl::create_data_writer(DDS::DomainParticipant_ptr participant,
116  DDS::Publisher_ptr publisher,
117  const char* type_name,
118  const char* topic_name,
119  const DDS::DataWriterQos& dw_qos)
120 {
121  DDS::Topic_var topic =
122  participant->create_topic(topic_name,
123  type_name,
125  DDS::TopicListener::_nil(),
127  if (CORBA::is_nil(topic)) {
128  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::create_data_writer(): Failed to create topic, name = %C\n", topic_name));
129  }
130  DDS::DataWriter_var writer =
131  publisher->create_datawriter(topic.in(),
132  dw_qos,
133  DDS::DataWriterListener::_nil(),
135  if (CORBA::is_nil(writer)) {
136  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::create_data_writer(): Failed to create data writer\n"));
137  }
138 
139  return writer._retn();
140 }
141 
142 void
144 {
145  DDS::DomainParticipantFactory_var dpf =
146  TheServiceParticipant->get_domain_participant_factory();
147  DDS::DomainParticipant_var participant =
148  dpf->create_participant(MONITOR_DOMAIN_ID,
150  DDS::DomainParticipantListener::_nil(),
152  participant_ = participant;
153  if (CORBA::is_nil(participant.in())) {
155  ACE_TEXT("ERROR: %N:%l: MonitorFactoryImpl::initialize() -")
156  ACE_TEXT(" create_participant failed!\n")));
157  }
158 
159  DDS::Publisher_var publisher =
160  participant->create_publisher(PUBLISHER_QOS_DEFAULT,
161  DDS::PublisherListener::_nil(),
163 
164  static const std::string config_name = TransportRegistry::DEFAULT_INST_PREFIX
165  + std::string("MonitorBITTransportConfig");
167  TheTransportRegistry->get_config (config_name);
168  if (config.is_nil ())
169  {
170  config = TransportRegistry::instance()->create_config(config_name);
171 
172  std::string inst_name = TransportRegistry::DEFAULT_INST_PREFIX
173  + std::string("FederationBITTCPTransportInst");
174  TransportInst_rch inst =
175  TransportRegistry::instance()->create_inst(inst_name, "tcp");
176  config->instances_.push_back(inst);
177  }
178 
179  TransportRegistry::instance()->bind_config(config, publisher.in());
180 
181  DDS::DataWriter_var writer;
182  DDS::DataWriterQos dw_qos;
183  publisher->get_default_datawriter_qos(dw_qos);
185 
186  OpenDDS::DCPS::ServiceParticipantReportTypeSupport_var sp_ts =
187  new OpenDDS::DCPS::ServiceParticipantReportTypeSupportImpl();
188  ::DDS::ReturnCode_t ret = sp_ts->register_type(participant.in(), "");
189  if (DDS::RETCODE_OK == ret) {
190  CORBA::String_var sp_type_name = sp_ts->get_type_name();
191  writer = create_data_writer(participant.in(),
192  publisher.in(),
193  sp_type_name.in(),
195  dw_qos);
196  this->sp_writer_ =
197  OpenDDS::DCPS::ServiceParticipantReportDataWriter::_narrow(writer.in());
198  if (CORBA::is_nil(this->sp_writer_)) {
199  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow sp_writer\n"));
200  }
201  } else {
202  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register sp_ts\n"));
203  }
204 
205  OpenDDS::DCPS::DomainParticipantReportTypeSupport_var dp_ts =
206  new OpenDDS::DCPS::DomainParticipantReportTypeSupportImpl();
207  ret = dp_ts->register_type(participant.in(), "");
208  if (DDS::RETCODE_OK == ret) {
209  CORBA::String_var dp_type_name = dp_ts->get_type_name();
210  writer = create_data_writer(participant.in(),
211  publisher.in(),
212  dp_type_name.in(),
214  dw_qos);
215  this->dp_writer_ =
216  OpenDDS::DCPS::DomainParticipantReportDataWriter::_narrow(writer.in());
217  if (CORBA::is_nil(this->dp_writer_)) {
218  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow dp_writer\n"));
219  }
220  } else {
221  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register dp_ts\n"));
222  }
223 
224  OpenDDS::DCPS::TopicReportTypeSupport_var topic_ts =
225  new OpenDDS::DCPS::TopicReportTypeSupportImpl();
226  ret = topic_ts->register_type(participant.in(), "");
227  if (DDS::RETCODE_OK == ret) {
228  CORBA::String_var topic_type_name = topic_ts->get_type_name();
229  writer = create_data_writer(participant.in(),
230  publisher.in(),
231  topic_type_name.in(),
233  dw_qos);
234  this->topic_writer_ =
235  OpenDDS::DCPS::TopicReportDataWriter::_narrow(writer.in());
236  if (CORBA::is_nil(this->topic_writer_)) {
237  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow topic_writer\n"));
238  }
239  } else {
240  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register topic_ts\n"));
241  }
242 
243  OpenDDS::DCPS::PublisherReportTypeSupport_var pub_ts =
244  new OpenDDS::DCPS::PublisherReportTypeSupportImpl();
245  ret = pub_ts->register_type(participant.in(), "");
246  if (DDS::RETCODE_OK == ret) {
247  CORBA::String_var pub_type_name = pub_ts->get_type_name();
248  writer = create_data_writer(participant.in(),
249  publisher.in(),
250  pub_type_name.in(),
252  dw_qos);
253  this->pub_writer_ =
254  OpenDDS::DCPS::PublisherReportDataWriter::_narrow(writer.in());
255  if (CORBA::is_nil(this->pub_writer_)) {
256  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow pub_writer\n"));
257  }
258  } else {
259  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register pub_ts\n"));
260  }
261 
262  OpenDDS::DCPS::SubscriberReportTypeSupport_var sub_ts =
263  new OpenDDS::DCPS::SubscriberReportTypeSupportImpl();
264  ret = sub_ts->register_type(participant.in(), "");
265  if (DDS::RETCODE_OK == ret) {
266  CORBA::String_var sub_type_name = sub_ts->get_type_name();
267  writer = create_data_writer(participant.in(),
268  publisher.in(),
269  sub_type_name.in(),
271  dw_qos);
272  this->sub_writer_ =
273  OpenDDS::DCPS::SubscriberReportDataWriter::_narrow(writer.in());
274  if (CORBA::is_nil(this->sub_writer_)) {
275  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow sub_writer\n"));
276  }
277  } else {
278  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register sub_ts\n"));
279  }
280 
281  OpenDDS::DCPS::DataWriterReportTypeSupport_var dw_ts =
282  new OpenDDS::DCPS::DataWriterReportTypeSupportImpl();
283  ret = dw_ts->register_type(participant.in(), "");
284  if (DDS::RETCODE_OK == ret) {
285  CORBA::String_var dw_type_name = dw_ts->get_type_name();
286  writer = create_data_writer(participant.in(),
287  publisher.in(),
288  dw_type_name.in(),
290  dw_qos);
291  this->dw_writer_ =
292  OpenDDS::DCPS::DataWriterReportDataWriter::_narrow(writer.in());
293  if (CORBA::is_nil(this->dw_writer_)) {
294  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow dw_writer\n"));
295  }
296  } else {
297  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register sp_ts\n"));
298  }
299 
300  OpenDDS::DCPS::DataWriterPeriodicReportTypeSupport_var dw_per_ts =
301  new OpenDDS::DCPS::DataWriterPeriodicReportTypeSupportImpl();
302  ret = dw_per_ts->register_type(participant.in(), "");
303  if (DDS::RETCODE_OK == ret) {
304  CORBA::String_var dw_per_type_name = dw_per_ts->get_type_name();
305  writer = create_data_writer(participant.in(),
306  publisher.in(),
307  dw_per_type_name.in(),
309  dw_qos);
310  this->dw_per_writer_ =
311  OpenDDS::DCPS::DataWriterPeriodicReportDataWriter::_narrow(writer.in());
312  if (CORBA::is_nil(this->dw_per_writer_)) {
313  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow dw_per_writer\n"));
314  }
315  } else {
316  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register dw_per_ts\n"));
317  }
318 
319  OpenDDS::DCPS::DataReaderReportTypeSupport_var dr_ts =
320  new OpenDDS::DCPS::DataReaderReportTypeSupportImpl();
321  ret = dr_ts->register_type(participant.in(), "");
322  if (DDS::RETCODE_OK == ret) {
323  CORBA::String_var dr_type_name = dr_ts->get_type_name();
324  writer = create_data_writer(participant.in(),
325  publisher.in(),
326  dr_type_name.in(),
328  dw_qos);
329  this->dr_writer_ =
330  OpenDDS::DCPS::DataReaderReportDataWriter::_narrow(writer.in());
331  if (CORBA::is_nil(this->dr_writer_)) {
332  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow dr_writer\n"));
333  }
334  } else {
335  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register dr_ts\n"));
336  }
337 
338  OpenDDS::DCPS::DataReaderPeriodicReportTypeSupport_var dr_per_ts =
339  new OpenDDS::DCPS::DataReaderPeriodicReportTypeSupportImpl();
340  ret = dr_per_ts->register_type(participant.in(), "");
341  if (DDS::RETCODE_OK == ret) {
342  CORBA::String_var dr_per_type_name = dr_per_ts->get_type_name();
343  writer = create_data_writer(participant.in(),
344  publisher.in(),
345  dr_per_type_name.in(),
347  dw_qos);
348  this->dr_per_writer_ =
349  OpenDDS::DCPS::DataReaderPeriodicReportDataWriter::_narrow(writer.in());
350  if (CORBA::is_nil(this->dr_per_writer_)) {
351  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow dr_per_writer\n"));
352  }
353  } else {
354  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register dr_per_ts\n"));
355  }
356 
357  OpenDDS::DCPS::TransportReportTypeSupport_var transport_ts =
358  new OpenDDS::DCPS::TransportReportTypeSupportImpl();
359  ret = transport_ts->register_type(participant.in(), "");
360  if (DDS::RETCODE_OK == ret) {
361  CORBA::String_var transport_type_name = transport_ts->get_type_name();
362  writer = create_data_writer(participant.in(),
363  publisher.in(),
364  transport_type_name.in(),
366  dw_qos);
367  this->transport_writer_ =
368  OpenDDS::DCPS::TransportReportDataWriter::_narrow(writer.in());
369  if (CORBA::is_nil(this->transport_writer_)) {
370  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow transport_writer\n"));
371  }
372  } else {
373  ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register transport_ts\n"));
374  }
375 }
376 
378 {
379  if (participant_) {
380  DDS::ReturnCode_t tmp = participant_->delete_contained_entities();
381  if (tmp && log_level >= LogLevel::Error) {
382  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MonitorFactoryImpl::deinitialize: "
383  "delete_contained_entities returned %C\n",
384  retcode_to_string(tmp)));
385  }
386 
387  DDS::DomainParticipantFactory_var dpf =
388  TheServiceParticipant->get_domain_participant_factory();
389  tmp = dpf->delete_participant(participant_);
390  if (tmp && log_level >= LogLevel::Error) {
391  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MonitorFactoryImpl::deinitialize: "
392  "delete_participant returned %C\n",
393  retcode_to_string(tmp)));
394  }
395 
396  participant_ = 0;
397  }
398 }
399 
400 int
402 {
403  return ACE_Service_Config::process_directive(ace_svc_desc_MonitorFactoryImpl);
404 }
405 
406 } // namespace DCPS
407 } // namespace OpenDDS
408 
409 using namespace OpenDDS::DCPS;
410 
411 ACE_FACTORY_DEFINE (OpenDDS_monitor, MonitorFactoryImpl)
413  ACE_TEXT ("OpenDDS_Monitor"),
418  0)
419 
#define TheTransportRegistry
#define PARTICIPANT_QOS_DEFAULT
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
#define ACE_DEBUG(X)
virtual Monitor * create_data_reader_periodic_monitor(DataReaderImpl *dr)
Factory function to create a data reader periodic monitor object.
#define ACE_ERROR(X)
PublisherReportDataWriter_var pub_writer_
const string SERVICE_PARTICIPANT_MONITOR_TOPIC
Definition: monitor.idl:15
const string PUBLISHER_MONITOR_TOPIC
Definition: monitor.idl:18
virtual Monitor * create_sp_monitor(Service_Participant *sp)
Factory function to create a service participant monitor object.
DataReaderPeriodicReportDataWriter_var dr_per_writer_
virtual Monitor * create_publisher_monitor(PublisherImpl *publisher)
Factory function to create a publisher monitor object.
ACE_FACTORY_DEFINE(OpenDDS_InfoRepoDiscovery, IRDiscoveryLoader)
virtual Monitor * create_subscriber_monitor(SubscriberImpl *subscriber)
Factory function to create a subscriber monitor object.
DurabilityQosPolicy durability
virtual Monitor * create_topic_monitor(TopicImpl *topic)
Factory function to create a topic monitor object.
& ACE_SVC_NAME(TAO_AV_TCP_Factory)
static int process_directive(const ACE_TCHAR directive[])
DomainParticipantReportDataWriter_var dp_writer_
const string DATA_READER_PERIODIC_MONITOR_TOPIC
Definition: monitor.idl:23
virtual Monitor * create_data_writer_monitor(DataWriterImpl *dw)
Factory function to create a data writer monitor object.
const string DATA_WRITER_PERIODIC_MONITOR_TOPIC
Definition: monitor.idl:21
ServiceParticipantReportDataWriter_var sp_writer_
ACE_STATIC_SVC_DEFINE(ACE_Logging_Strategy, ACE_TEXT("Logging_Strategy"), ACE_Service_Type::SERVICE_OBJECT, &ACE_SVC_NAME(ACE_Logging_Strategy), ACE_Service_Type::DELETE_THIS|ACE_Service_Type::DELETE_OBJ, 0) extern "C" int _get_dll_unload_policy()
ACE_SVC_OBJ_T
DDS::DataWriter_ptr create_data_writer(DDS::DomainParticipant_ptr participant, DDS::Publisher_ptr publisher, const char *type_name, const char *topic_name, const DDS::DataWriterQos &dw_qos)
void bind_config(const OPENDDS_STRING &name, DDS::Entity_ptr entity)
DDS::DomainParticipant_var participant_
const DDS::StatusMask DEFAULT_STATUS_MASK
DataReaderReportDataWriter_var dr_writer_
Implements the OpenDDS::DCPS::Publisher interfaces.
Definition: PublisherImpl.h:38
Implements the DDS::Topic interface.
Definition: TopicImpl.h:37
ServiceParticipantReportDataWriter_ptr get_sp_writer()
TransportReportDataWriter_var transport_writer_
const string DATA_READER_MONITOR_TOPIC
Definition: monitor.idl:22
LM_DEBUG
DataWriterPeriodicReportDataWriter_var dw_per_writer_
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
const string SUBSCRIBER_MONITOR_TOPIC
Definition: monitor.idl:19
DurabilityQosPolicyKind kind
virtual Monitor * create_transport_monitor(TransportImpl *transport)
Factory function to create a transport monitor object.
const long MONITOR_DOMAIN_ID
Definition: monitor.idl:13
Implements the DDS::DataReader interface.
#define PUBLISHER_QOS_DEFAULT
DataWriterReportDataWriter_var dw_writer_
TransportInst_rch create_inst(const OPENDDS_STRING &name, const OPENDDS_STRING &transport_type)
TransportConfig_rch create_config(const OPENDDS_STRING &name)
TopicReportDataWriter_var topic_writer_
ACE_TEXT("TCP_Factory")
const string TRANSPORT_MONITOR_TOPIC
Definition: monitor.idl:24
const string DATA_WRITER_MONITOR_TOPIC
Definition: monitor.idl:20
virtual Monitor * create_dp_monitor(DomainParticipantImpl *dp)
Factory function to create a domain participant monitor object.
OpenDDS_Dcps_Export LogLevel log_level
Full implementation of the MonitorFactoryImpl.
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
const string TOPIC_MONITOR_TOPIC
Definition: monitor.idl:17
static const char DEFAULT_INST_PREFIX[]
static TransportRegistry * instance()
Return a singleton instance of this class.
const string DOMAIN_PARTICIPANT_MONITOR_TOPIC
Definition: monitor.idl:16
const ReturnCode_t RETCODE_OK
const character_type * in(void) const
virtual Monitor * create_data_reader_monitor(DataReaderImpl *dr)
Factory function to create a data reader monitor object.
#define TheServiceParticipant
LM_ERROR
SubscriberReportDataWriter_var sub_writer_
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
virtual Monitor * create_data_writer_periodic_monitor(DataWriterImpl *dw)
Factory function to create a data writer periodic monitor object.
Boolean is_nil(T x)
#define TOPIC_QOS_DEFAULT
virtual void initialize()
Initialize the monitor (required to report data)