diff --git a/pyas2/admin.py b/pyas2/admin.py index f9f1e37..5e83ecd 100644 --- a/pyas2/admin.py +++ b/pyas2/admin.py @@ -70,6 +70,7 @@ class PartnerAdmin(admin.ModelAdmin): "signature_cert", "mdn", "mdn_mode", + "canonicalize_as_binary", ] list_filter = ("name", "as2_name") fieldsets = ( @@ -123,7 +124,12 @@ class PartnerAdmin(admin.ModelAdmin): "Advanced Settings", { "classes": ("collapse", "wide"), - "fields": ("keep_filename", "cmd_send", "cmd_receive"), + "fields": ( + "canonicalize_as_binary", + "keep_filename", + "cmd_send", + "cmd_receive", + ), }, ), ) diff --git a/pyas2/migrations/0004_partner_canonicalize_as_binary.py b/pyas2/migrations/0004_partner_canonicalize_as_binary.py new file mode 100644 index 0000000..eb30aa0 --- /dev/null +++ b/pyas2/migrations/0004_partner_canonicalize_as_binary.py @@ -0,0 +1,20 @@ +# Generated by Django 3.1.7 on 2022-07-25 09:00 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("pyas2", "0003_auto_20221208_1310"), + ] + + operations = [ + migrations.AddField( + model_name="partner", + name="canonicalize_as_binary", + field=models.BooleanField( + default=False, verbose_name="Force binary canonicalization" + ), + ), + ] diff --git a/pyas2/models.py b/pyas2/models.py index 7463cdf..13de8aa 100644 --- a/pyas2/models.py +++ b/pyas2/models.py @@ -229,6 +229,10 @@ class Partner(models.Model): blank=True, ) + canonicalize_as_binary = models.BooleanField( + verbose_name=_("Force binary canonicalization"), default=False + ) + confirmation_message = models.TextField( verbose_name=_("Confirmation Message"), null=True, @@ -280,6 +284,7 @@ def as2partner(self): "enc_alg": self.encryption, "mdn_mode": self.mdn_mode, "mdn_digest_alg": self.mdn_sign, + "canonicalize_as_binary": bool(self.canonicalize_as_binary), } if self.signature_cert: diff --git a/pyas2/tests/test_basic.py b/pyas2/tests/test_basic.py index 7cb259c..88e6628 100644 --- a/pyas2/tests/test_basic.py +++ b/pyas2/tests/test_basic.py @@ -2,22 +2,22 @@ from email.parser import HeaderParser from unittest import mock -from django.test import TestCase, Client +from django.test import Client, TestCase +from pyas2lib.as2 import Message as As2Message +from pyas2lib.utils import canonicalize from requests import Response from requests.exceptions import RequestException from pyas2.models import ( - PrivateKey, - PublicCertificate, + Mdn, + Message, Organization, Partner, - Message, - Mdn, + PrivateKey, + PublicCertificate, ) from pyas2.tests import TEST_DIR -from pyas2lib.as2 import Message as As2Message - class BasicServerClientTestCase(TestCase): """Test cases for the AS2 server and client. @@ -534,6 +534,68 @@ def testEncryptSignMessageAsyncSignMdn(self, mock_request): mock_request.side_effect = RequestException() out_message.mdn.send_async_mdn() + @mock.patch("requests.post") + def testForceBinaryCanonicalization(self, mock_request): + """Test Permutation 15: Sender sends text data using binary canonicalization + and requests a synchronous receipt.""" + + partner = Partner.objects.create( + name="AS2 Server", + as2_name="as2server", + target_url="http://localhost:8080/pyas2/as2receive", + signature="sha1", + signature_cert=self.server_crt, + encryption=None, + encryption_cert=None, + mdn=True, + mdn_mode="SYNC", + ) + + receiver = Partner.objects.get(as2_name="as2client") + receiver.canonicalize_as_binary = True + receiver.save() + + as2message = As2Message( + sender=self.organization.as2org, receiver=partner.as2partner + ) + + with mock.patch("pyas2lib.as2.canonicalize") as mock_canonicalize: + mock_canonicalize.side_effect = self.mock_canonicalize_function + + as2message.build( + self.payload, + filename="testmessage.edi", + subject=partner.subject, + content_type=partner.content_type, + ) + + in_message, _ = Message.objects.create_from_as2message( + as2message=as2message, payload=self.payload, direction="OUT", status="P" + ) + + mock_request.side_effect = SendMessageMock(self.client) + in_message.send_message( + as2message.headers, self.mock_message_content(as2message) + ) + + # Check if message was processed successfully + out_message = Message.objects.get( + message_id=in_message.message_id, direction="IN" + ) + + receiver.canonicalize_as_binary = False + receiver.save() + + self.assertEqual(out_message.status, "S") + self.assertTrue(out_message.signed) + self.assertEqual(in_message.status, "S") + self.assertIsNotNone(in_message.mdn) + + # Check if input and output files are the same + self.assertTrue( + self.compareFiles(in_message.payload.name, out_message.payload.name) + ) + @mock.patch("requests.post") def build_and_send(self, partner, mock_request): # Build and send the message to server @@ -565,6 +627,20 @@ def compareFiles(filename1, filename2): lineA == lineB for lineA, lineB in zip(a.readlines(), b.readlines()) ) + @staticmethod + def mock_message_content(message): + """Prevent binary content line feeds of being altered when sending.""" + message_bytes = message.payload.as_bytes() + boundary = b"--" + message.payload.get_boundary().encode("utf-8") + temp = message_bytes.split(boundary) + temp.pop(0) + return boundary + boundary.join(temp) + + @staticmethod + def mock_canonicalize_function(email_msg, canonicalize_as_binary=False): + """Explicitly ignore the canonicalize_as_binary argument and force it to True.""" + return canonicalize(email_msg, canonicalize_as_binary=True) + class SendMessageMock(object): def __init__(self, test_client):