@@ -437,27 +437,28 @@ def test_multi_step_framework_processing_pipeline_same_source_dir(
437437 pipeline .create (role )
438438 definition = json .loads (pipeline .definition ())
439439
440- source_dir_1_s3_uri , entry_point_1 = _verify_code_artifacts_of_framework_processing_step (
440+ source_dir_1_tar_uri , entry_point_1 = _verify_code_artifacts_of_framework_processing_step (
441441 pipeline_session ,
442442 framework_processor_tf ,
443443 default_bucket ,
444444 pipeline_name ,
445445 definition ["Steps" ][0 ],
446- SOURCE_DIR ,
446+ DATA_DIR + SOURCE_DIR ,
447447 "script_1.py" ,
448448 )
449- source_dir_2_s3_uri , entry_point_2 = _verify_code_artifacts_of_framework_processing_step (
449+
450+ source_dir_2_tar_uri , entry_point_2 = _verify_code_artifacts_of_framework_processing_step (
450451 pipeline_session ,
451452 framework_processor_sk ,
452453 default_bucket ,
453454 pipeline_name ,
454455 definition ["Steps" ][1 ],
455- SOURCE_DIR ,
456+ DATA_DIR + SOURCE_DIR ,
456457 "script_2.py" ,
457458 )
458459
459- # the same local source_dirs should have the same s3 paths
460- assert source_dir_1_s3_uri == source_dir_2_s3_uri
460+ # the tarred source dirs should have a different s3 uri since the entry_point code is different
461+ assert source_dir_1_tar_uri != source_dir_2_tar_uri
461462
462463 # verify different entry_point paths
463464 assert entry_point_1 != entry_point_2
@@ -528,31 +529,49 @@ def test_multi_step_framework_processing_pipeline_different_source_dir(
528529 pipeline .create (role )
529530 definition = json .loads (pipeline .definition ())
530531
531- source_dir_1_s3_uri , entry_point_1 = _verify_code_artifacts_of_framework_processing_step (
532+ source_dir_1_tar_uri , entry_point_1 = _verify_code_artifacts_of_framework_processing_step (
532533 pipeline_session ,
533534 framework_processor_tf ,
534535 default_bucket ,
535536 pipeline_name ,
536537 definition ["Steps" ][0 ],
537- SOURCE_DIR_1 ,
538+ DATA_DIR + SOURCE_DIR_1 ,
538539 "script_1.py" ,
539540 )
540- source_dir_2_s3_uri , entry_point_2 = _verify_code_artifacts_of_framework_processing_step (
541+
542+ source_dir_2_tar_uri , entry_point_2 = _verify_code_artifacts_of_framework_processing_step (
541543 pipeline_session ,
542544 framework_processor_tf ,
543545 default_bucket ,
544546 pipeline_name ,
545547 definition ["Steps" ][1 ],
546- SOURCE_DIR_2 ,
548+ DATA_DIR + SOURCE_DIR_2 ,
547549 "script_2.py" ,
548550 )
549551
550- # different local source_dirs should have different s3 paths
551- assert source_dir_1_s3_uri != source_dir_2_s3_uri
552+ # the tarred source dirs should have a different s3 uri since the source_dirs and entry_point code are different
553+ assert source_dir_1_tar_uri != source_dir_2_tar_uri
552554
553555 # verify different entry_point paths
554556 assert entry_point_1 != entry_point_2
555557
558+ # define another step with the same source_dir and entry_point as the second step
559+ source_dir_3_tar_uri , entry_point_3 = _verify_code_artifacts_of_framework_processing_step (
560+ pipeline_session ,
561+ framework_processor_tf ,
562+ default_bucket ,
563+ pipeline_name ,
564+ definition ["Steps" ][1 ],
565+ DATA_DIR + SOURCE_DIR_2 ,
566+ "script_2.py" ,
567+ )
568+
569+ # verify the same entry_point paths
570+ assert entry_point_2 == entry_point_3
571+
572+ # the tarred source dirs should now be the same since the source_dirs and entry_point are the same
573+ assert source_dir_2_tar_uri == source_dir_3_tar_uri
574+
556575 execution = pipeline .start (parameters = {})
557576 wait_pipeline_execution (execution = execution , delay = 540 , max_attempts = 3 )
558577
@@ -975,13 +994,19 @@ def test_two_processing_job_depends_on(
975994 pass
976995
977996
997+ # Verifies that the processing step artifacts are created as expected.
998+ # Requires that source_dir and entry_point are exactly those passed to the processing step.
978999def _verify_code_artifacts_of_framework_processing_step (
9791000 pipeline_session , processor , bucket , pipeline_name , step_definition , source_dir , entry_point
9801001):
9811002
982- source_dir_s3_uri = (
983- f"s3://{ bucket } /{ pipeline_name } " f"/code/{ hash_files_or_dirs ([f'{ DATA_DIR } /{ source_dir } ' ])} "
984- )
1003+ files_to_hash = []
1004+ if entry_point is not None :
1005+ files_to_hash .append (source_dir )
1006+ files_to_hash .append (entry_point )
1007+ file_hash = hash_files_or_dirs (files_to_hash )
1008+
1009+ source_dir_s3_uri = f"s3://{ bucket } /{ pipeline_name } /code/{ file_hash } "
9851010
9861011 # verify runproc.sh prefix is different from code artifact prefix
9871012 runprocs = []
@@ -995,10 +1020,7 @@ def _verify_code_artifacts_of_framework_processing_step(
9951020 # verify only one entrypoint generated per step
9961021 assert len (runprocs ) == 1
9971022
998- expected_source_dir_tar = (
999- f"{ pipeline_name } "
1000- f"/code/{ hash_files_or_dirs ([DATA_DIR + '/pipeline/test_source_dir' ])} /sourcedir.tar.gz"
1001- )
1023+ expected_source_dir_tar = f"{ pipeline_name } /code/{ file_hash } /sourcedir.tar.gz"
10021024
10031025 step_script = processor ._generate_framework_script (entry_point )
10041026 expected_step_artifact = f"{ pipeline_name } /code/{ hash_object (step_script )} /runproc.sh"
@@ -1015,4 +1037,4 @@ def _verify_code_artifacts_of_framework_processing_step(
10151037 f"s3://{ bucket } /{ expected_step_artifact } " , pipeline_session
10161038 )
10171039 assert f"python { entry_point } " in step_runproc
1018- return source_dir , expected_step_artifact
1040+ return expected_source_dir_tar , expected_step_artifact
0 commit comments