|
10 | 10 | import weakref |
11 | 11 |
|
12 | 12 | from .util import find_library, load_kernel |
13 | | -from .xtables import (XT_INV_PROTO, NFPROTO_IPV4, XTablesError, xtables, |
| 13 | +from .xtables import (XT_INV_PROTO, NFPROTO_IPV4, NFPROTO_IPV6, XTablesError, xtables, |
14 | 14 | xt_align, xt_counters, xt_entry_target, xt_entry_match) |
15 | 15 |
|
16 | 16 | __all__ = ["Table", "Chain", "Rule", "Match", "Target", "Policy", "IPTCError"] |
@@ -1389,6 +1389,123 @@ def _get_mask(self): |
1389 | 1389 | mask = property(_get_mask) |
1390 | 1390 | """This is the raw mask buffer as iptables uses it when removing rules.""" |
1391 | 1391 |
|
| 1392 | + @classmethod |
| 1393 | + def from_dict(cls, rule_d): |
| 1394 | + """Generate a Rule(6) object from the input dictionary.""" |
| 1395 | + # Sanity check |
| 1396 | + assert(isinstance(rule_d, dict)) |
| 1397 | + # Basic rule attributes |
| 1398 | + rule_attr = ('src', 'dst', 'protocol', 'in-interface', 'out-interface', 'fragment') |
| 1399 | + iptc_rule = cls() |
| 1400 | + # Avoid issues with matches that require basic parameters to be configured first |
| 1401 | + for name in rule_attr: |
| 1402 | + if name in rule_d: |
| 1403 | + _iptc_setrule(iptc_rule, name, rule_d[name]) |
| 1404 | + for name, value in rule_d.items(): |
| 1405 | + try: |
| 1406 | + if name in rule_attr: |
| 1407 | + #_iptc_setrule(iptc_rule, name, value) |
| 1408 | + continue |
| 1409 | + elif name == 'target': |
| 1410 | + _iptc_settarget(iptc_rule, value) |
| 1411 | + else: |
| 1412 | + _iptc_setmatch(iptc_rule, name, value) |
| 1413 | + except Exception as e: |
| 1414 | + #print('Ignoring unsupported field <{}:{}>'.format(name, value)) |
| 1415 | + continue |
| 1416 | + return iptc_rule |
| 1417 | + |
| 1418 | + def to_dict(self): |
| 1419 | + """Generate a dictionary representation of the Rule(6) object.""" |
| 1420 | + d = {} |
| 1421 | + if self.nfproto==NFPROTO_IPV4 and self.src != '0.0.0.0/0.0.0.0': |
| 1422 | + d['src'] = self.src |
| 1423 | + elif self.nfproto==NFPROTO_IPV6 and self.src != '::/0': |
| 1424 | + d['src'] = self.src |
| 1425 | + if self.nfproto==NFPROTO_IPV4 and self.dst != '0.0.0.0/0.0.0.0': |
| 1426 | + d['dst'] = self.dst |
| 1427 | + elif self.nfproto==NFPROTO_IPV6 and self.dst != '::/0': |
| 1428 | + d['dst'] = self.dst |
| 1429 | + if self.protocol != 'ip': |
| 1430 | + d['protocol'] = self.protocol |
| 1431 | + if self.in_interface is not None: |
| 1432 | + d['in-interface'] = self.in_interface |
| 1433 | + if self.out_interface is not None: |
| 1434 | + d['out-interface'] = self.out_interface |
| 1435 | + if self.nfproto==NFPROTO_IPV4 and self.fragment: |
| 1436 | + d['fragment'] = self.fragment |
| 1437 | + for m in self.matches: |
| 1438 | + if m.name not in d: |
| 1439 | + d[m.name] = m.get_all_parameters() |
| 1440 | + elif isinstance(d[m.name], list): |
| 1441 | + d[m.name].append(m.get_all_parameters()) |
| 1442 | + else: |
| 1443 | + d[m.name] = [d[m.name], m.get_all_parameters()] |
| 1444 | + if self.target and self.target.name and len(self.target.get_all_parameters()): |
| 1445 | + name = self.target.name.replace('-', '_') |
| 1446 | + d['target'] = {name:self.target.get_all_parameters()} |
| 1447 | + elif self.target and self.target.name: |
| 1448 | + d['target'] = self.target.name |
| 1449 | + # Return a filtered dictionary |
| 1450 | + return _filter_empty_field(d) |
| 1451 | + |
| 1452 | +# Helper functions for dictionary operations over Rule(6) objects |
| 1453 | +def _iptc_setattr(object, name, value): |
| 1454 | + # Translate attribute name |
| 1455 | + name = name.replace('-', '_') |
| 1456 | + setattr(object, name, value) |
| 1457 | + |
| 1458 | +def _iptc_setattr_d(object, value_d): |
| 1459 | + for name, value in value_d.items(): |
| 1460 | + _iptc_setattr(object, name, value) |
| 1461 | + |
| 1462 | +def _iptc_setrule(iptc_rule, name, value): |
| 1463 | + _iptc_setattr(iptc_rule, name, value) |
| 1464 | + |
| 1465 | +def _iptc_setmatch(iptc_rule, name, value): |
| 1466 | + # Iterate list/tuple recursively |
| 1467 | + if isinstance(value, list) or isinstance(value, tuple): |
| 1468 | + for inner_value in value: |
| 1469 | + _iptc_setmatch(iptc_rule, name, inner_value) |
| 1470 | + # Assign dictionary value |
| 1471 | + elif isinstance(value, dict): |
| 1472 | + iptc_match = iptc_rule.create_match(name) |
| 1473 | + _iptc_setattr_d(iptc_match, value) |
| 1474 | + # Assign value directly |
| 1475 | + else: |
| 1476 | + iptc_match = iptc_rule.create_match(name) |
| 1477 | + _iptc_setattr(iptc_match, name, value) |
| 1478 | + |
| 1479 | +def _iptc_settarget(iptc_rule, value): |
| 1480 | + # Target is dictionary - Use only 1 pair key/value |
| 1481 | + if isinstance(value, dict): |
| 1482 | + for k, v in value.items(): |
| 1483 | + iptc_target = iptc_rule.create_target(k) |
| 1484 | + _iptc_setattr_d(iptc_target, v) |
| 1485 | + return |
| 1486 | + # Simple target |
| 1487 | + else: |
| 1488 | + iptc_target = iptc_rule.create_target(value) |
| 1489 | + |
| 1490 | +def _filter_empty_field(data_d): |
| 1491 | + """ |
| 1492 | + Remove empty lists from dictionary values |
| 1493 | + Before: {'target': {'CHECKSUM': {'checksum-fill': []}}} |
| 1494 | + After: {'target': {'CHECKSUM': {'checksum-fill': ''}}} |
| 1495 | + Before: {'tcp': {'dport': ['22']}}} |
| 1496 | + After: {'tcp': {'dport': '22'}}} |
| 1497 | + """ |
| 1498 | + for k, v in data_d.items(): |
| 1499 | + if isinstance(v, dict): |
| 1500 | + data_d[k] = _filter_empty_field(v) |
| 1501 | + elif isinstance(v, list) and len(v) != 0: |
| 1502 | + v = [_filter_empty_field(_v) if isinstance(_v, dict) else _v for _v in v ] |
| 1503 | + if isinstance(v, list) and len(v) == 1: |
| 1504 | + data_d[k] = v.pop() |
| 1505 | + elif isinstance(v, list) and len(v) == 0: |
| 1506 | + data_d[k] = '' |
| 1507 | + return data_d |
| 1508 | + |
1392 | 1509 |
|
1393 | 1510 | class Chain(object): |
1394 | 1511 | """Rules are contained by chains. |
|
0 commit comments