Skip to content

Commit 466b1b2

Browse files
committed
xdd: Implement option to load object dictionary from XDD file.
Signed-off-by: Taras Zaporozhets <zaporozhets.taras@gmail.com>
1 parent be9c56d commit 466b1b2

File tree

8 files changed

+1884
-32
lines changed

8 files changed

+1884
-32
lines changed

canopen/objectdictionary/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def import_od(
7676
source: Union[str, TextIO, None],
7777
node_id: Optional[int] = None,
7878
) -> ObjectDictionary:
79-
"""Parse an EDS, DCF, or EPF file.
79+
"""Parse an EDS, DCF, EPF or XDD file.
8080
8181
:param source:
8282
The path to object dictionary file, a file like object, or an EPF XML tree.
@@ -106,9 +106,12 @@ def import_od(
106106
elif suffix == ".epf":
107107
from canopen.objectdictionary import epf
108108
return epf.import_epf(source)
109+
elif suffix == ".xdd":
110+
from canopen.objectdictionary import xdd
111+
return xdd.import_xdd(source, node_id)
109112
else:
110113
doc_type = suffix[1:]
111-
allowed = ", ".join(["eds", "dcf", "epf"])
114+
allowed = ", ".join(["eds", "dcf", "epf", "xdd"])
112115
raise ValueError(
113116
f"Cannot import from the {doc_type!r} format; "
114117
f"supported formats: {allowed}"

canopen/objectdictionary/eds.py

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from canopen import objectdictionary
1010
from canopen.objectdictionary import ObjectDictionary, datatypes
1111
from canopen.sdo import SdoClient
12+
from canopen.utils import signed_int_from_hex, calc_bit_length
1213

1314
if TYPE_CHECKING:
1415
import canopen.network
@@ -201,30 +202,6 @@ def import_from_node(node_id: int, network: canopen.network.Network):
201202
network.unsubscribe(0x580 + node_id)
202203
return od
203204

204-
205-
def _calc_bit_length(data_type):
206-
if data_type == datatypes.INTEGER8:
207-
return 8
208-
elif data_type == datatypes.INTEGER16:
209-
return 16
210-
elif data_type == datatypes.INTEGER32:
211-
return 32
212-
elif data_type == datatypes.INTEGER64:
213-
return 64
214-
else:
215-
raise ValueError(f"Invalid data_type '{data_type}', expecting a signed integer data_type.")
216-
217-
218-
def _signed_int_from_hex(hex_str, bit_length):
219-
number = int(hex_str, 0)
220-
max_value = (1 << (bit_length - 1)) - 1
221-
222-
if number > max_value:
223-
return number - (1 << bit_length)
224-
else:
225-
return number
226-
227-
228205
def _convert_variable(node_id, var_type, value):
229206
if var_type in (datatypes.OCTET_STRING, datatypes.DOMAIN):
230207
return bytes.fromhex(value)
@@ -288,7 +265,7 @@ def build_variable(eds, section, node_id, index, subindex=0):
288265
try:
289266
min_string = eds.get(section, "LowLimit")
290267
if var.data_type in datatypes.SIGNED_TYPES:
291-
var.min = _signed_int_from_hex(min_string, _calc_bit_length(var.data_type))
268+
var.min = signed_int_from_hex(min_string, calc_bit_length(var.data_type))
292269
else:
293270
var.min = int(min_string, 0)
294271
except ValueError:
@@ -297,7 +274,7 @@ def build_variable(eds, section, node_id, index, subindex=0):
297274
try:
298275
max_string = eds.get(section, "HighLimit")
299276
if var.data_type in datatypes.SIGNED_TYPES:
300-
var.max = _signed_int_from_hex(max_string, _calc_bit_length(var.data_type))
277+
var.max = signed_int_from_hex(max_string, calc_bit_length(var.data_type))
301278
else:
302279
var.max = int(max_string, 0)
303280
except ValueError:

canopen/objectdictionary/xdd.py

Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
import logging
2+
3+
import re
4+
import xml.etree.ElementTree as etree
5+
from configparser import NoOptionError
6+
from typing import TYPE_CHECKING
7+
8+
from canopen import objectdictionary
9+
from canopen.objectdictionary import ObjectDictionary
10+
from canopen.utils import signed_int_from_hex, calc_bit_length
11+
12+
if TYPE_CHECKING:
13+
import canopen.network
14+
15+
logger = logging.getLogger(__name__)
16+
17+
# Object type. Don't confuse with Data type
18+
VAR = 7
19+
ARR = 8
20+
RECORD = 9
21+
22+
23+
def import_xdd(xdd, node_id):
24+
od = ObjectDictionary()
25+
if etree.iselement(xdd):
26+
root = xdd
27+
else:
28+
root = etree.parse(xdd).getroot()
29+
30+
if node_id is None:
31+
device_commissioning = root.find('.//{*}DeviceCommissioning')
32+
if device_commissioning is not None:
33+
if node_id := device_commissioning.get('nodeID', None):
34+
try:
35+
od.node_id = int(node_id, 0)
36+
except (ValueError, TypeError):
37+
pass
38+
else:
39+
od.node_id = node_id
40+
41+
_add_device_information_to_od(od, root)
42+
_add_object_list_to_od(od, root)
43+
_add_dummy_objects_to_od(od, root)
44+
45+
46+
return od
47+
48+
def _add_device_information_to_od(od, root):
49+
device_identity = root.find('.//{*}DeviceIdentity')
50+
if device_identity is not None:
51+
for src_prop, dst_prop, f in [
52+
("vendorName", "vendor_name", lambda val: str(val)),
53+
("vendorID", "vendor_number", lambda val: int(val, 0)),
54+
("productName", "product_name", lambda val: str(val)),
55+
("productID", "product_number", lambda val: int(val, 0)),
56+
]:
57+
val = device_identity.find(f'{{*}}{src_prop}')
58+
if val is not None and val.text:
59+
try:
60+
setattr(od.device_information, dst_prop, f(val.text))
61+
except NoOptionError:
62+
pass
63+
64+
general_features = root.find('.//{*}CANopenGeneralFeatures')
65+
if general_features is not None:
66+
for src_prop, dst_prop, f in [
67+
("granularity", "granularity", lambda val: int(val, 0)),
68+
("nrOfRxPDO", "nr_of_RXPDO", lambda val: int(val, 0)),
69+
("nrOfTxPDO", "nr_of_TXPDO", lambda val: int(val, 0)),
70+
("bootUpSlave", "simple_boot_up_slave", lambda val: bool(val)),
71+
]:
72+
if val := general_features.get(src_prop, None):
73+
try:
74+
setattr(od.device_information, dst_prop, f(val))
75+
except NoOptionError:
76+
pass
77+
78+
baud_rate = root.find('.//{*}PhysicalLayer/{*}baudRate')
79+
for baud in baud_rate:
80+
try:
81+
rate = int(baud.get("value").replace(' Kbps', ''), 10) * 1000
82+
od.device_information.allowed_baudrates.add(rate)
83+
except (ValueError, TypeError):
84+
pass
85+
86+
if default_baud := baud_rate.get('defaultValue', None):
87+
try:
88+
od.bitrate = int(default_baud.replace(' Kbps', ''), 10) * 1000
89+
except (ValueError, TypeError):
90+
pass
91+
92+
def _add_object_list_to_od(od: ObjectDictionary, root):
93+
# Process all CANopen objects in the file
94+
for obj in root.findall('.//{*}CANopenObjectList/{*}CANopenObject'):
95+
name = obj.get('name', '')
96+
index = int(obj.get('index', '0'), 16)
97+
object_type = int(obj.get('objectType', '0'))
98+
sub_number = obj.get('subNumber')
99+
100+
# Simple variable
101+
if object_type == VAR:
102+
unique_id_ref = obj.get('uniqueIDRef', None)
103+
parameters = root.find(f'.//{{*}}parameter[@uniqueID="{unique_id_ref}"]')
104+
105+
var = _build_variable(parameters, od.node_id, name, index)
106+
_set_parameters_from_xdd_canopen_object(od.node_id, var, obj)
107+
od.add_object(var)
108+
109+
# Array
110+
elif object_type == ARR and sub_number:
111+
array = objectdictionary.ODArray(name, index)
112+
for sub_obj in obj:
113+
sub_name = sub_obj.get('name', '')
114+
sub_index = int(sub_obj.get('subIndex'), 16)
115+
sub_unique_id = sub_obj.get('uniqueIDRef', None)
116+
sub_parameters = root.find(f'.//{{*}}parameter[@uniqueID="{sub_unique_id}"]')
117+
118+
sub_var = _build_variable(sub_parameters, od.node_id, sub_name, index, sub_index)
119+
_set_parameters_from_xdd_canopen_object(od.node_id, sub_var, sub_obj)
120+
array.add_member(sub_var)
121+
od.add_object(array)
122+
123+
# Record/Struct
124+
elif object_type == RECORD and sub_number:
125+
record = objectdictionary.ODRecord(name, index)
126+
for sub_obj in obj:
127+
sub_name = sub_obj.get('name', '')
128+
sub_index = int(sub_obj.get('subIndex'))
129+
sub_unique_id = sub_obj.get('uniqueIDRef', None)
130+
sub_parameters = root.find(f'.//{{*}}parameter[@uniqueID="{sub_unique_id}"]')
131+
sub_var = _build_variable(sub_parameters, od.node_id, sub_name, index, sub_index)
132+
_set_parameters_from_xdd_canopen_object(od.node_id, sub_var, sub_obj)
133+
record.add_member(sub_var)
134+
od.add_object(record)
135+
136+
def _add_dummy_objects_to_od(od: ObjectDictionary, root):
137+
dummy_section = root.find('.//{*}ApplicationLayers/{*}dummyUsage')
138+
for dummy in dummy_section:
139+
p = dummy.get('entry').split('=')
140+
key = p[0]
141+
value = int(p[1], 10)
142+
index = int(key.replace('Dummy', ''), 10)
143+
if value == 1:
144+
var = objectdictionary.ODVariable(key, index, 0)
145+
var.data_type = index
146+
var.access_type = "const"
147+
od.add_object(var)
148+
149+
def _set_parameters_from_xdd_canopen_object(node_id, dst, src):
150+
# PDO mapping of the object, optional, string
151+
# Valid values:
152+
# * no – not mappable
153+
# * default – mapped by default
154+
# * optional – optionally mapped
155+
# * TPDO – may be mapped into TPDO only
156+
# * RPDO – may be mapped into RPDO only
157+
pdo_mapping = src.get('PDOmapping', 'no')
158+
dst.pdo_mappable = pdo_mapping != 'no'
159+
160+
# Name of the object, optional, string
161+
if var_name := src.get('name', None):
162+
dst.name = var_name
163+
164+
# CANopen data type (two hex digits), optional
165+
# data_type matches canopen library, no conversion needed
166+
if var_data_type := src.get('dataType', None):
167+
try:
168+
dst.data_type = int(var_data_type, 16)
169+
except (ValueError, TypeError):
170+
pass
171+
172+
# Access type of the object; valid values, optional, string
173+
# * const – read access only; the value is not changing
174+
# * ro – read access only
175+
# * wo – write access only
176+
# * rw – both read and write access
177+
# strings match with access_type in canopen library, no conversion needed
178+
if access_type := src.get('accessType', None):
179+
dst.access_type = access_type
180+
181+
# Low limit of the parameter value, optional, string
182+
if min_value := src.get('lowLimit', None):
183+
try:
184+
dst.min = _convert_variable(node_id, dst.data_type, min_value)
185+
except (ValueError, TypeError):
186+
pass
187+
188+
# High limit of the parameter value, optional, string
189+
if max_value := src.get('highLimit', None):
190+
try:
191+
dst.max = _convert_variable(node_id, dst.data_type, max_value)
192+
except (ValueError, TypeError):
193+
pass
194+
195+
# Default value of the object, optional, string
196+
if default_value := src.get('defaultValue', None):
197+
try:
198+
dst.default_raw = default_value
199+
if '$NODEID' in dst.default_raw:
200+
dst.relative = True
201+
dst.default = _convert_variable(node_id, dst.data_type, dst.default_raw)
202+
except (ValueError, TypeError):
203+
pass
204+
205+
def _build_variable(par_tree, node_id, name, index, subindex=0):
206+
var = objectdictionary.ODVariable(name, index, subindex)
207+
# Set default parameters
208+
var.default_raw = None
209+
var.access_type = 'ro'
210+
if par_tree is None:
211+
return
212+
213+
var.description = par_tree.get('description', '')
214+
215+
# Extract data type
216+
data_types = {
217+
'BOOL': objectdictionary.BOOLEAN,
218+
'SINT': objectdictionary.INTEGER8,
219+
'INT': objectdictionary.INTEGER16,
220+
'DINT': objectdictionary.INTEGER32,
221+
'LINT': objectdictionary.INTEGER64,
222+
'USINT': objectdictionary.UNSIGNED8,
223+
'UINT': objectdictionary.UNSIGNED16,
224+
'UDINT': objectdictionary.UNSIGNED32,
225+
'ULINT': objectdictionary.UNSIGNED32,
226+
'REAL': objectdictionary.REAL32,
227+
'LREAL': objectdictionary.REAL64,
228+
'STRING': objectdictionary.VISIBLE_STRING,
229+
'BITSTRING': objectdictionary.DOMAIN,
230+
'WSTRING': objectdictionary.UNICODE_STRING
231+
}
232+
233+
#print(f'par_tree={etree.tostring(par_tree, encoding="unicode")}')
234+
for k, v in data_types.items():
235+
if par_tree.find(f'{{*}}{k}') is not None:
236+
var.data_type = v
237+
238+
# Extract access type
239+
if access_type_str := par_tree.get('access', None):
240+
# Defines which operations are valid for the parameter:
241+
# * const – read access only; the value is not changing
242+
# * read – read access only (default value)
243+
# * write – write access only
244+
# * readWrite – both read and write access
245+
# * readWriteInput – both read and write access, but represents process input data
246+
# * readWriteOutput – both read and write access, but represents process output data
247+
# * noAccess – access denied
248+
access_types = {
249+
'const': 'const',
250+
'read': 'ro',
251+
'write': 'wo',
252+
'readWrite': 'rw',
253+
'readWriteInput': 'rw',
254+
'readWriteOutput': 'rw',
255+
'noAccess': 'const',
256+
}
257+
var.access_type = access_types.get(access_type_str)
258+
259+
# Extract default value
260+
default_value = par_tree.find('{*}defaultValue')
261+
if default_value is not None:
262+
try:
263+
var.default_raw = default_value.get('value')
264+
if '$NODEID' in var.default_raw:
265+
var.relative = True
266+
var.default = _convert_variable(node_id, var.data_type, var.default_raw)
267+
except (ValueError, TypeError):
268+
pass
269+
270+
# Extract allowed values range
271+
min_value = par_tree.find('{*}allowedValues/{*}range/{*}minValue')
272+
if min_value is not None:
273+
try:
274+
var.min = _convert_variable(node_id, var.data_type, min_value.get('value'))
275+
except (ValueError, TypeError):
276+
pass
277+
278+
max_value = par_tree.find('{*}allowedValues/{*}range/{*}maxValue')
279+
if max_value is not None:
280+
try:
281+
var.max = _convert_variable(node_id, var.data_type, max_value.get('value'))
282+
except (ValueError, TypeError):
283+
pass
284+
return var
285+
286+
def _convert_variable(node_id, var_type, value):
287+
if var_type in (objectdictionary.OCTET_STRING, objectdictionary.DOMAIN):
288+
return bytes.fromhex(value)
289+
elif var_type in (objectdictionary.VISIBLE_STRING, objectdictionary.UNICODE_STRING):
290+
return value
291+
elif var_type in objectdictionary.FLOAT_TYPES:
292+
return float(value)
293+
else:
294+
# COB-ID can contain '$NODEID+' so replace this with node_id before converting
295+
value = value.replace(" ", "").upper()
296+
if '$NODEID' in value:
297+
if node_id is None:
298+
logger.warn("Cannot convert value with $NODEID, skipping conversion")
299+
return None
300+
else:
301+
return int(re.sub(r'\+?\$NODEID\+?', '', value), 0) + node_id
302+
else:
303+
if var_type in objectdictionary.SIGNED_TYPES:
304+
return signed_int_from_hex(value, calc_bit_length(var_type))
305+
else:
306+
return int(value, 0)

0 commit comments

Comments
 (0)