CoreDX DDS Python CookBook

DomainParticipant

To access the domain module:

import dds.domain

To create a DomainParticipant with default QoS settings and no listener:

dp = dds.domain.DomainParticipant( 1 )

To create a DomainParticipant with specific QoS settings, and no listener:

dp_qos = dds.domain.DomainParticipantFactory.get_default_participant_qos()
# adjust qos settings as required ...
dp = dds.domain.DomainParticipant( 1, dp_qos )

To create a DomainParticipant with specific QoS settings, and a listener:

dp_listener = MyDomainParticipantListener()
dp_qos = dds.domain.DomainParticipantFactory.get_default_participant_qos()
dp = dds.domain.DomainParticipant( 1, dp_qos, dp_listener, dds.core.StatusMask.ALL_STATUS )

To create a DomainParticipant with default QoS and a listener, using keyword arguments:

dp_listener = MyDomainParticipantListener()
dp_qos = dds.domain.DomainParticipantFactory.get_default_participant_qos()
dp = dds.domain.DomainParticipant( 1, listener=dp_listener, dds.core.StatusMask.ALL_STATUS )

To destroy a DomainParticipant and clean up all its internal resources:

dp.delete()

Topics

Standard Topic

To create a topic using type ‘A.Foo’ (generated), with default QoS, and no listener:

my_topic = dds.topic.Topic(A.Foo, "MyTopic", "MyType", dp)

To create a topic using type ‘A.Foo’ (generated), with provided QoS, and a listener:

topic_listener = MyTopicListener()
topic_qos = dp.get_default_topic_qos()
my_topic = dds.topic.Topic(A.Foo, "MyTopic", "MyType", dp, topic_qos, topic_listener, dds.core.StatusMask.ALL_STATUS )

To destroy a Topic and clean up all its internal resources:

my_topic.delete()

Content Filtered Topic

Not yet supported.

Publications

Publisher

To create a Publisher with default QoS settings and no listener:

pub = dds.pub.Publisher( dp )

To create a Publisher with specific QoS settings and a listener:

pub_qos = dp.get_default_publisher_qos( )
# adjust qos settings as required ...
pub_listener = MyPublisherListener()
pub = dds.pub.Publisher( dp, pub_qos, pub_listener, dds.core.StatusMask.ALL_STATUS )

To destroy a Publisher and clean up all its internal resources:

pub.delete()

DataWriter

To create a DataWriter in Publisher ‘pub’, associated with topic ‘my_topic’, with default QoS and no lister:

dw = dds.pub.DataWriter( pub, my_topic )

To create a DataWriter associated with topic ‘my_topic’, with specific QoS and a lister:

dw_qos = pub.get_default_datawriter_qos()
# adjust qos settings as required ...
dw_listener = MyDataWriterListener()
dw = dds.pub.DataWriter( pub, my_topic, dw_qos, dw_listener, dds.core.StatusMask.ALL_STATUS )

To destroy a DataWriter and clean up all its internal resources:

dw.delete()

Writing Data

To use a DataWriter to publish a data sample:

my_data = A.Foo()
my_data.x = 10  # initialize values in the data sample
dw.write( my_data )

To use a DataWriter to publish a data sample, providing its instance handle for performance improvements:

handle = dw.register_instance( my_data )
dw.write( my_data, handle )

To use a DataWriter to publish a data sample, with a specific source timestamp:

ts = dds.core.Time(120,0)
dw.write( my_data, source_timestamp=ts )

Subscriptions

Subscriber

To create a Subscriber with default QoS settings and no listener:

sub = dds.sub.Subscriber( dp )

To create a Subscriber with specific QoS settings and a listener:

sub_qos = dp.get_default_subscriber_qos( )
# adjust qos settings as required ...
sub_listener = MySubscriberListener()
sub = dds.sub.Subscriber( dp, sub_qos, sub_listener, dds.core.StatusMask.ALL_STATUS )

To destroy a Subscriber and clean up all its internal resources:

sub.delete()

DataReader

To create a DataReader in Subscriber ‘sub’, associated with topic ‘my_topic’; with default QoS and no lister:

dw = dds.sub.DataReader( sub, my_topic )

To create a DataReader associated with topic ‘my_topic’, with specific QoS and a lister:

dr_qos = sub.get_default_datareader_qos()
# adjust qos settings as required ...
dr_listener =  MyDataReaderListener()
dr = dds.sub.DataReader( sub, my_topic, dr_qos, dr_listener, dds.core.StatusMask.ALL_STATUS )

To destroy a DataReader and clean up all its internal resources:

dr.delete()

Reading Data

To read all samples available in the DataReader:

samples = dr.read( dds.core.LENGTH_UNLIMITED,
                   dds.core.SampleState.ANY_SAMPLE_STATE,
                   dds.core.ViewState.ANY_VIEW_STATE,
                   dds.core.InstanceState.ANY_INSTANCE_STATE )

The read() operation returns a list of dds.core.Sample objects. These Sample objects hold both the meta-data and the data. To access the meta-data, use samples[index].info which is a dds.core.SampleInfo To access the data, use samples[index].data which is of the type associated with the topic (A.Foo, in this case).

[The take() operations behave similarly to their read() counterparts, but result in the removal of the data from the cache.]

To return the samples to the DataReader (required regardless of whether obtained by read() or take()):

dr.return_loan( samples )

To use a dds.cond.ReadCondition in the read operation to select the samples:

rc = dr.create_readcondition( dds.core.SampleState.ANY_SAMPLE_STATE,
                              dds.core.ViewState.ANY_VIEW_STATE,
                              dds.core.InstanceState.ANY_INSTANCE_STATE )
samples = dr.read_w_condition( dds.core.LENGTH_UNLIMITED, rc )

To read a single sample at a time, iterating through all samples:

while True:
    try:
        sample = self.dr.read_next_sample( )
        # process sample...
    except dds.core.NoData:
        break;

To read an instance at a time, iterating through all instances:

previous_handle = dds.core.InstanceHandle.NIL
while True:
   try:
        samples = dr.read_next_instance( dds.core.LENGTH_UNLIMITED,
                                 previous_handle,
                                 dds.core.SampleState.ANY_SAMPLE_STATE,
                                 dds.core.ViewState.ANY_VIEW_STATE,
                                 dds.core.InstanceState.ANY_INSTANCE_STATE )
        # handle samples
        # save previous_handle for next read call
        previous_handle = samples[0].info.instance_handle
        dr.return_loan(samples)

   except dds.core.NoData:
        break

To read a particular instance:

try:
     samples = dr.read_instance( dds.core.LENGTH_UNLIMITED,
                              known_handle,
                              dds.core.SampleState.ANY_SAMPLE_STATE,
                              dds.core.ViewState.ANY_VIEW_STATE,
                              dds.core.InstanceState.ANY_INSTANCE_STATE )
     # process samples
     dr.return_loan(samples)

except dds.core.NoData:
     pass

Examples

Example IDL

struct StringMsg
{
   string msg;
};

Example Publisher

import time
import dds.domain
from  hello import StringMsg

# Use default QoS, and no listener
dp    = dds.domain.DomainParticipant( 0 )
topic = dds.topic.Topic( StringMsg, "helloTopic", "StringMsg", dp )
pub   = dds.pub.Publisher( dp )
dw    = dds.pub.DataWriter( pub, topic )

while True:
    data = StringMsg()
    data.msg = "Hello from Python!"
    print('Write msg...')
    dw.write( data )
    time.sleep(1.0)

# clean up the participant resources
dp.delete()

Example Subscriber

import time
import dds.domain
from  hello import StringMsg

# Use default QoS, and no listener
dp    = dds.domain.DomainParticipant( 0 )
topic = dds.topic.Topic( StringMsg, "helloTopic", "StringMsg", dp )
sub   = dds.sub.Subscriber( dp )
dr    = dds.sub.DataReader( sub, topic )
rc    = dr.create_readcondition( dds.core.SampleState.ANY_SAMPLE_STATE,
                                 dds.core.ViewState.ANY_VIEW_STATE,
                                 dds.core.InstanceState.ANY_INSTANCE_STATE )
ws    = dds.cond.WaitSet()
ws.attach_condition(rc)

while True:
    t = dds.core.Duration.infinite()
    print('waiting for data...')
    ws.wait(t)
    while True:
       try:
           samples = dr.take( )
           for s in samples:
               if s.info.valid_data:
                   print("received: {}".format(s.data.msg))
       except dds.core.Error:
           break;

# clean up the participant resources
dp.delete()