00001
00002
00003
00004
00005
00006
00007
00008 #include "MonitorFactoryImpl.h"
00009 #include "monitorC.h"
00010 #include "monitorTypeSupportImpl.h"
00011 #include "SPMonitorImpl.h"
00012 #include "DPMonitorImpl.h"
00013 #include "TopicMonitorImpl.h"
00014 #include "PublisherMonitorImpl.h"
00015 #include "SubscriberMonitorImpl.h"
00016 #include "DWMonitorImpl.h"
00017 #include "DWPeriodicMonitorImpl.h"
00018 #include "DRMonitorImpl.h"
00019 #include "DRPeriodicMonitorImpl.h"
00020 #include "TransportMonitorImpl.h"
00021 #include "dds/DCPS/Service_Participant.h"
00022 #include "dds/DCPS/DomainParticipantImpl.h"
00023 #include <dds/DdsDcpsInfrastructureC.h>
00024 #include <dds/DdsDcpsPublicationC.h>
00025 #include <dds/DCPS/Marked_Default_Qos.h>
00026 #include <dds/DCPS/transport/framework/TransportRegistry.h>
00027
00028 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00029
00030 namespace OpenDDS {
00031 namespace DCPS {
00032
00033 MonitorFactoryImpl::MonitorFactoryImpl()
00034 {
00035 }
00036
00037 MonitorFactoryImpl::~MonitorFactoryImpl()
00038 {
00039 }
00040
00041 OpenDDS::DCPS::Monitor*
00042 MonitorFactoryImpl::create_sp_monitor(Service_Participant* sp)
00043 {
00044 return new SPMonitorImpl(this, sp);
00045 }
00046
00047 Monitor*
00048 MonitorFactoryImpl::create_dp_monitor(DomainParticipantImpl* dp)
00049 {
00050 if (dp->get_domain_id() == MONITOR_DOMAIN_ID) {
00051 return 0;
00052 }
00053 return new DPMonitorImpl(dp, this->dp_writer_);
00054 }
00055
00056 ServiceParticipantReportDataWriter_ptr
00057 MonitorFactoryImpl::get_sp_writer()
00058 {
00059 return ServiceParticipantReportDataWriter::_duplicate(this->sp_writer_);
00060 }
00061
00062 OpenDDS::DCPS::Monitor*
00063 MonitorFactoryImpl::create_topic_monitor(TopicImpl* topic)
00064 {
00065 return new TopicMonitorImpl(topic, this->topic_writer_);
00066 }
00067
00068 OpenDDS::DCPS::Monitor*
00069 MonitorFactoryImpl::create_publisher_monitor(PublisherImpl* pub)
00070 {
00071 return new PublisherMonitorImpl(pub, this->pub_writer_);
00072 }
00073
00074 OpenDDS::DCPS::Monitor*
00075 MonitorFactoryImpl::create_subscriber_monitor(SubscriberImpl* sub)
00076 {
00077 return new SubscriberMonitorImpl(sub, this->sub_writer_);
00078 }
00079
00080 OpenDDS::DCPS::Monitor*
00081 MonitorFactoryImpl::create_data_writer_monitor(DataWriterImpl* dw)
00082 {
00083 return new DWMonitorImpl(dw, this->dw_writer_);
00084 }
00085
00086 OpenDDS::DCPS::Monitor*
00087 MonitorFactoryImpl::create_data_writer_periodic_monitor(DataWriterImpl* dw)
00088 {
00089 return new DWPeriodicMonitorImpl(dw, this->dw_per_writer_);
00090 }
00091
00092 OpenDDS::DCPS::Monitor*
00093 MonitorFactoryImpl::create_data_reader_monitor(DataReaderImpl* dr)
00094 {
00095 return new DRMonitorImpl(dr, this->dr_writer_);
00096 }
00097
00098 OpenDDS::DCPS::Monitor*
00099 MonitorFactoryImpl::create_data_reader_periodic_monitor(DataReaderImpl* dr)
00100 {
00101 return new DRPeriodicMonitorImpl(dr, this->dr_per_writer_);
00102 }
00103
00104 OpenDDS::DCPS::Monitor*
00105 MonitorFactoryImpl::create_transport_monitor(TransportImpl* transport)
00106 {
00107 return new TransportMonitorImpl(transport, this->transport_writer_);
00108 }
00109
00110 DDS::DataWriter_ptr
00111 MonitorFactoryImpl::create_data_writer(DDS::DomainParticipant_ptr participant,
00112 DDS::Publisher_ptr publisher,
00113 const char* type_name,
00114 const char* topic_name,
00115 const DDS::DataWriterQos& dw_qos)
00116 {
00117 DDS::Topic_var topic =
00118 participant->create_topic(topic_name,
00119 type_name,
00120 TOPIC_QOS_DEFAULT,
00121 DDS::TopicListener::_nil(),
00122 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00123 if (CORBA::is_nil(topic)) {
00124 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::create_data_writer(): Failed to create topic, name = %C\n", topic_name));
00125 }
00126 DDS::DataWriter_var writer =
00127 publisher->create_datawriter(topic.in(),
00128 dw_qos,
00129 DDS::DataWriterListener::_nil(),
00130 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00131 if (CORBA::is_nil(writer)) {
00132 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::create_data_writer(): Failed to create data writer\n"));
00133 }
00134
00135 return writer._retn();
00136 }
00137
00138 void
00139 MonitorFactoryImpl::initialize()
00140 {
00141 DDS::DomainParticipantFactory_var dpf =
00142 TheServiceParticipant->get_domain_participant_factory();
00143 DDS::DomainParticipant_var participant =
00144 dpf->create_participant(MONITOR_DOMAIN_ID,
00145 PARTICIPANT_QOS_DEFAULT,
00146 DDS::DomainParticipantListener::_nil(),
00147 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00148 if (CORBA::is_nil(participant.in())) {
00149 ACE_DEBUG((LM_DEBUG,
00150 ACE_TEXT("ERROR: %N:%l: MonitorFactoryImpl::initialize() -")
00151 ACE_TEXT(" create_participant failed!\n")));
00152 }
00153
00154 DDS::Publisher_var publisher =
00155 participant->create_publisher(PUBLISHER_QOS_DEFAULT,
00156 DDS::PublisherListener::_nil(),
00157 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00158
00159 static const std::string config_name = TransportRegistry::DEFAULT_INST_PREFIX
00160 + std::string("MonitorBITTransportConfig");
00161 OpenDDS::DCPS::TransportConfig_rch config =
00162 TheTransportRegistry->get_config (config_name);
00163 if (config.is_nil ())
00164 {
00165 config = TransportRegistry::instance()->create_config(config_name);
00166
00167 std::string inst_name = TransportRegistry::DEFAULT_INST_PREFIX
00168 + std::string("FederationBITTCPTransportInst");
00169 TransportInst_rch inst =
00170 TransportRegistry::instance()->create_inst(inst_name, "tcp");
00171 config->instances_.push_back(inst);
00172 }
00173
00174 TransportRegistry::instance()->bind_config(config, publisher.in());
00175
00176 DDS::DataWriter_var writer;
00177 DDS::DataWriterQos dw_qos;
00178 publisher->get_default_datawriter_qos(dw_qos);
00179 dw_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00180
00181 OpenDDS::DCPS::ServiceParticipantReportTypeSupport_var sp_ts =
00182 new OpenDDS::DCPS::ServiceParticipantReportTypeSupportImpl();
00183 ::DDS::ReturnCode_t ret = sp_ts->register_type(participant.in(), "");
00184 if (DDS::RETCODE_OK == ret) {
00185 CORBA::String_var sp_type_name = sp_ts->get_type_name();
00186 writer = create_data_writer(participant.in(),
00187 publisher.in(),
00188 sp_type_name.in(),
00189 SERVICE_PARTICIPANT_MONITOR_TOPIC,
00190 dw_qos);
00191 this->sp_writer_ =
00192 OpenDDS::DCPS::ServiceParticipantReportDataWriter::_narrow(writer.in());
00193 if (CORBA::is_nil(this->sp_writer_)) {
00194 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow sp_writer\n"));
00195 }
00196 } else {
00197 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register sp_ts\n"));
00198 }
00199
00200 OpenDDS::DCPS::DomainParticipantReportTypeSupport_var dp_ts =
00201 new OpenDDS::DCPS::DomainParticipantReportTypeSupportImpl();
00202 ret = dp_ts->register_type(participant.in(), "");
00203 if (DDS::RETCODE_OK == ret) {
00204 CORBA::String_var dp_type_name = dp_ts->get_type_name();
00205 writer = create_data_writer(participant.in(),
00206 publisher.in(),
00207 dp_type_name.in(),
00208 DOMAIN_PARTICIPANT_MONITOR_TOPIC,
00209 dw_qos);
00210 this->dp_writer_ =
00211 OpenDDS::DCPS::DomainParticipantReportDataWriter::_narrow(writer.in());
00212 if (CORBA::is_nil(this->dp_writer_)) {
00213 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow dp_writer\n"));
00214 }
00215 } else {
00216 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register dp_ts\n"));
00217 }
00218
00219 OpenDDS::DCPS::TopicReportTypeSupport_var topic_ts =
00220 new OpenDDS::DCPS::TopicReportTypeSupportImpl();
00221 ret = topic_ts->register_type(participant.in(), "");
00222 if (DDS::RETCODE_OK == ret) {
00223 CORBA::String_var topic_type_name = topic_ts->get_type_name();
00224 writer = create_data_writer(participant.in(),
00225 publisher.in(),
00226 topic_type_name.in(),
00227 TOPIC_MONITOR_TOPIC,
00228 dw_qos);
00229 this->topic_writer_ =
00230 OpenDDS::DCPS::TopicReportDataWriter::_narrow(writer.in());
00231 if (CORBA::is_nil(this->topic_writer_)) {
00232 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow topic_writer\n"));
00233 }
00234 } else {
00235 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register topic_ts\n"));
00236 }
00237
00238 OpenDDS::DCPS::PublisherReportTypeSupport_var pub_ts =
00239 new OpenDDS::DCPS::PublisherReportTypeSupportImpl();
00240 ret = pub_ts->register_type(participant.in(), "");
00241 if (DDS::RETCODE_OK == ret) {
00242 CORBA::String_var pub_type_name = pub_ts->get_type_name();
00243 writer = create_data_writer(participant.in(),
00244 publisher.in(),
00245 pub_type_name.in(),
00246 PUBLISHER_MONITOR_TOPIC,
00247 dw_qos);
00248 this->pub_writer_ =
00249 OpenDDS::DCPS::PublisherReportDataWriter::_narrow(writer.in());
00250 if (CORBA::is_nil(this->pub_writer_)) {
00251 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow pub_writer\n"));
00252 }
00253 } else {
00254 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register pub_ts\n"));
00255 }
00256
00257 OpenDDS::DCPS::SubscriberReportTypeSupport_var sub_ts =
00258 new OpenDDS::DCPS::SubscriberReportTypeSupportImpl();
00259 ret = sub_ts->register_type(participant.in(), "");
00260 if (DDS::RETCODE_OK == ret) {
00261 CORBA::String_var sub_type_name = sub_ts->get_type_name();
00262 writer = create_data_writer(participant.in(),
00263 publisher.in(),
00264 sub_type_name.in(),
00265 SUBSCRIBER_MONITOR_TOPIC,
00266 dw_qos);
00267 this->sub_writer_ =
00268 OpenDDS::DCPS::SubscriberReportDataWriter::_narrow(writer.in());
00269 if (CORBA::is_nil(this->sub_writer_)) {
00270 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow sub_writer\n"));
00271 }
00272 } else {
00273 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register sub_ts\n"));
00274 }
00275
00276 OpenDDS::DCPS::DataWriterReportTypeSupport_var dw_ts =
00277 new OpenDDS::DCPS::DataWriterReportTypeSupportImpl();
00278 ret = dw_ts->register_type(participant.in(), "");
00279 if (DDS::RETCODE_OK == ret) {
00280 CORBA::String_var dw_type_name = dw_ts->get_type_name();
00281 writer = create_data_writer(participant.in(),
00282 publisher.in(),
00283 dw_type_name.in(),
00284 DATA_WRITER_MONITOR_TOPIC,
00285 dw_qos);
00286 this->dw_writer_ =
00287 OpenDDS::DCPS::DataWriterReportDataWriter::_narrow(writer.in());
00288 if (CORBA::is_nil(this->dw_writer_)) {
00289 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow dw_writer\n"));
00290 }
00291 } else {
00292 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register sp_ts\n"));
00293 }
00294
00295 OpenDDS::DCPS::DataWriterPeriodicReportTypeSupport_var dw_per_ts =
00296 new OpenDDS::DCPS::DataWriterPeriodicReportTypeSupportImpl();
00297 ret = dw_per_ts->register_type(participant.in(), "");
00298 if (DDS::RETCODE_OK == ret) {
00299 CORBA::String_var dw_per_type_name = dw_per_ts->get_type_name();
00300 writer = create_data_writer(participant.in(),
00301 publisher.in(),
00302 dw_per_type_name.in(),
00303 DATA_WRITER_PERIODIC_MONITOR_TOPIC,
00304 dw_qos);
00305 this->dw_per_writer_ =
00306 OpenDDS::DCPS::DataWriterPeriodicReportDataWriter::_narrow(writer.in());
00307 if (CORBA::is_nil(this->dw_per_writer_)) {
00308 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow dw_per_writer\n"));
00309 }
00310 } else {
00311 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register dw_per_ts\n"));
00312 }
00313
00314 OpenDDS::DCPS::DataReaderReportTypeSupport_var dr_ts =
00315 new OpenDDS::DCPS::DataReaderReportTypeSupportImpl();
00316 ret = dr_ts->register_type(participant.in(), "");
00317 if (DDS::RETCODE_OK == ret) {
00318 CORBA::String_var dr_type_name = dr_ts->get_type_name();
00319 writer = create_data_writer(participant.in(),
00320 publisher.in(),
00321 dr_type_name.in(),
00322 DATA_READER_MONITOR_TOPIC,
00323 dw_qos);
00324 this->dr_writer_ =
00325 OpenDDS::DCPS::DataReaderReportDataWriter::_narrow(writer.in());
00326 if (CORBA::is_nil(this->dr_writer_)) {
00327 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow dr_writer\n"));
00328 }
00329 } else {
00330 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register dr_ts\n"));
00331 }
00332
00333 OpenDDS::DCPS::DataReaderPeriodicReportTypeSupport_var dr_per_ts =
00334 new OpenDDS::DCPS::DataReaderPeriodicReportTypeSupportImpl();
00335 ret = dr_per_ts->register_type(participant.in(), "");
00336 if (DDS::RETCODE_OK == ret) {
00337 CORBA::String_var dr_per_type_name = dr_per_ts->get_type_name();
00338 writer = create_data_writer(participant.in(),
00339 publisher.in(),
00340 dr_per_type_name.in(),
00341 DATA_READER_PERIODIC_MONITOR_TOPIC,
00342 dw_qos);
00343 this->dr_per_writer_ =
00344 OpenDDS::DCPS::DataReaderPeriodicReportDataWriter::_narrow(writer.in());
00345 if (CORBA::is_nil(this->dr_per_writer_)) {
00346 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow dr_per_writer\n"));
00347 }
00348 } else {
00349 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register dr_per_ts\n"));
00350 }
00351
00352 OpenDDS::DCPS::TransportReportTypeSupport_var transport_ts =
00353 new OpenDDS::DCPS::TransportReportTypeSupportImpl();
00354 ret = transport_ts->register_type(participant.in(), "");
00355 if (DDS::RETCODE_OK == ret) {
00356 CORBA::String_var transport_type_name = transport_ts->get_type_name();
00357 writer = create_data_writer(participant.in(),
00358 publisher.in(),
00359 transport_type_name.in(),
00360 TRANSPORT_MONITOR_TOPIC,
00361 dw_qos);
00362 this->transport_writer_ =
00363 OpenDDS::DCPS::TransportReportDataWriter::_narrow(writer.in());
00364 if (CORBA::is_nil(this->transport_writer_)) {
00365 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to narrow transport_writer\n"));
00366 }
00367 } else {
00368 ACE_DEBUG((LM_DEBUG, "MonitorFactoryImpl::initialize(): Failed to register transport_ts\n"));
00369 }
00370 }
00371
00372 int
00373 MonitorFactoryImpl::service_initialize()
00374 {
00375 return ACE_Service_Config::process_directive(ace_svc_desc_MonitorFactoryImpl);
00376 }
00377
00378 }
00379 }
00380
00381 using namespace OpenDDS::DCPS;
00382
00383 ACE_FACTORY_DEFINE (OpenDDS_monitor, MonitorFactoryImpl)
00384 ACE_STATIC_SVC_DEFINE (MonitorFactoryImpl,
00385 ACE_TEXT ("OpenDDS_Monitor"),
00386 ACE_SVC_OBJ_T,
00387 &ACE_SVC_NAME (MonitorFactoryImpl),
00388 ACE_Service_Type::DELETE_THIS |
00389 ACE_Service_Type::DELETE_OBJ,
00390 0)
00391
00392 OPENDDS_END_VERSIONED_NAMESPACE_DECL