CoreDX DDS Python CookBook¶
DomainParticipant¶
To access the domain module:
import dds.domain
To create a DomainParticipant with default QoS settings and no listener:
dp = dds.domain.DomainParticipant( 1 )
To create a DomainParticipant with specific QoS settings, and no listener:
dp_qos = dds.domain.DomainParticipantFactory.get_default_participant_qos()
# adjust qos settings as required ...
dp = dds.domain.DomainParticipant( 1, dp_qos )
To create a DomainParticipant with specific QoS settings, and a listener:
dp_listener = MyDomainParticipantListener()
dp_qos = dds.domain.DomainParticipantFactory.get_default_participant_qos()
dp = dds.domain.DomainParticipant( 1, dp_qos, dp_listener, dds.core.StatusMask.ALL_STATUS )
To create a DomainParticipant with default QoS and a listener, using keyword arguments:
dp_listener = MyDomainParticipantListener()
dp_qos = dds.domain.DomainParticipantFactory.get_default_participant_qos()
dp = dds.domain.DomainParticipant( 1, listener=dp_listener, dds.core.StatusMask.ALL_STATUS )
To destroy a DomainParticipant and clean up all its internal resources:
dp.delete()
Topics¶
Standard Topic¶
To create a topic using type ‘A.Foo’ (generated), with default QoS, and no listener:
my_topic = dds.topic.Topic(A.Foo, "MyTopic", "MyType", dp)
To create a topic using type ‘A.Foo’ (generated), with provided QoS, and a listener:
topic_listener = MyTopicListener()
topic_qos = dp.get_default_topic_qos()
my_topic = dds.topic.Topic(A.Foo, "MyTopic", "MyType", dp, topic_qos, topic_listener, dds.core.StatusMask.ALL_STATUS )
To destroy a Topic and clean up all its internal resources:
my_topic.delete()
Content Filtered Topic¶
Not yet supported.
Publications¶
Publisher¶
To create a Publisher with default QoS settings and no listener:
pub = dds.pub.Publisher( dp )
To create a Publisher with specific QoS settings and a listener:
pub_qos = dp.get_default_publisher_qos( )
# adjust qos settings as required ...
pub_listener = MyPublisherListener()
pub = dds.pub.Publisher( dp, pub_qos, pub_listener, dds.core.StatusMask.ALL_STATUS )
To destroy a Publisher and clean up all its internal resources:
pub.delete()
DataWriter¶
To create a DataWriter in Publisher ‘pub’, associated with topic ‘my_topic’, with default QoS and no lister:
dw = dds.pub.DataWriter( pub, my_topic )
To create a DataWriter associated with topic ‘my_topic’, with specific QoS and a lister:
dw_qos = pub.get_default_datawriter_qos()
# adjust qos settings as required ...
dw_listener = MyDataWriterListener()
dw = dds.pub.DataWriter( pub, my_topic, dw_qos, dw_listener, dds.core.StatusMask.ALL_STATUS )
To destroy a DataWriter and clean up all its internal resources:
dw.delete()
Writing Data¶
To use a DataWriter to publish a data sample:
my_data = A.Foo()
my_data.x = 10 # initialize values in the data sample
dw.write( my_data )
To use a DataWriter to publish a data sample, providing its instance handle for performance improvements:
handle = dw.register_instance( my_data )
dw.write( my_data, handle )
To use a DataWriter to publish a data sample, with a specific source timestamp:
ts = dds.core.Time(120,0)
dw.write( my_data, source_timestamp=ts )
Subscriptions¶
Subscriber¶
To create a Subscriber with default QoS settings and no listener:
sub = dds.sub.Subscriber( dp )
To create a Subscriber with specific QoS settings and a listener:
sub_qos = dp.get_default_subscriber_qos( )
# adjust qos settings as required ...
sub_listener = MySubscriberListener()
sub = dds.sub.Subscriber( dp, sub_qos, sub_listener, dds.core.StatusMask.ALL_STATUS )
To destroy a Subscriber and clean up all its internal resources:
sub.delete()
DataReader¶
To create a DataReader in Subscriber ‘sub’, associated with topic ‘my_topic’; with default QoS and no lister:
dw = dds.sub.DataReader( sub, my_topic )
To create a DataReader associated with topic ‘my_topic’, with specific QoS and a lister:
dr_qos = sub.get_default_datareader_qos()
# adjust qos settings as required ...
dr_listener = MyDataReaderListener()
dr = dds.sub.DataReader( sub, my_topic, dr_qos, dr_listener, dds.core.StatusMask.ALL_STATUS )
To destroy a DataReader and clean up all its internal resources:
dr.delete()
Reading Data¶
To read all samples available in the DataReader:
samples = dr.read( dds.core.LENGTH_UNLIMITED,
dds.core.SampleState.ANY_SAMPLE_STATE,
dds.core.ViewState.ANY_VIEW_STATE,
dds.core.InstanceState.ANY_INSTANCE_STATE )
The read() operation returns a list of dds.core.Sample
objects.
These Sample objects hold both the meta-data and the data.
To access the meta-data, use samples[index].info which is a dds.core.SampleInfo
To access the data, use samples[index].data which is of the type associated with the topic (A.Foo, in this case).
[The take() operations behave similarly to their read() counterparts, but result in the removal of the data from the cache.]
To return the samples to the DataReader (required regardless of whether obtained by read() or take()):
dr.return_loan( samples )
To use a dds.cond.ReadCondition
in the read operation to select the samples:
rc = dr.create_readcondition( dds.core.SampleState.ANY_SAMPLE_STATE,
dds.core.ViewState.ANY_VIEW_STATE,
dds.core.InstanceState.ANY_INSTANCE_STATE )
samples = dr.read_w_condition( dds.core.LENGTH_UNLIMITED, rc )
To read a single sample at a time, iterating through all samples:
while True:
try:
sample = self.dr.read_next_sample( )
# process sample...
except dds.core.NoData:
break;
To read an instance at a time, iterating through all instances:
previous_handle = dds.core.InstanceHandle.NIL
while True:
try:
samples = dr.read_next_instance( dds.core.LENGTH_UNLIMITED,
previous_handle,
dds.core.SampleState.ANY_SAMPLE_STATE,
dds.core.ViewState.ANY_VIEW_STATE,
dds.core.InstanceState.ANY_INSTANCE_STATE )
# handle samples
# save previous_handle for next read call
previous_handle = samples[0].info.instance_handle
dr.return_loan(samples)
except dds.core.NoData:
break
To read a particular instance:
try:
samples = dr.read_instance( dds.core.LENGTH_UNLIMITED,
known_handle,
dds.core.SampleState.ANY_SAMPLE_STATE,
dds.core.ViewState.ANY_VIEW_STATE,
dds.core.InstanceState.ANY_INSTANCE_STATE )
# process samples
dr.return_loan(samples)
except dds.core.NoData:
pass
Examples¶
Example IDL¶
struct StringMsg
{
string msg;
};
Example Publisher¶
import time
import dds.domain
from hello import StringMsg
# Use default QoS, and no listener
dp = dds.domain.DomainParticipant( 0 )
topic = dds.topic.Topic( StringMsg, "helloTopic", "StringMsg", dp )
pub = dds.pub.Publisher( dp )
dw = dds.pub.DataWriter( pub, topic )
while True:
data = StringMsg()
data.msg = "Hello from Python!"
print('Write msg...')
dw.write( data )
time.sleep(1.0)
# clean up the participant resources
dp.delete()
Example Subscriber¶
import time
import dds.domain
from hello import StringMsg
# Use default QoS, and no listener
dp = dds.domain.DomainParticipant( 0 )
topic = dds.topic.Topic( StringMsg, "helloTopic", "StringMsg", dp )
sub = dds.sub.Subscriber( dp )
dr = dds.sub.DataReader( sub, topic )
rc = dr.create_readcondition( dds.core.SampleState.ANY_SAMPLE_STATE,
dds.core.ViewState.ANY_VIEW_STATE,
dds.core.InstanceState.ANY_INSTANCE_STATE )
ws = dds.cond.WaitSet()
ws.attach_condition(rc)
while True:
t = dds.core.Duration.infinite()
print('waiting for data...')
ws.wait(t)
while True:
try:
samples = dr.take( )
for s in samples:
if s.info.valid_data:
print("received: {}".format(s.data.msg))
except dds.core.Error:
break;
# clean up the participant resources
dp.delete()