|
| 1 | +import os |
| 2 | +import sys |
| 3 | +import json |
| 4 | +import time |
| 5 | + |
| 6 | +import boto3 |
| 7 | +from botocore.exceptions import ClientError |
| 8 | + |
| 9 | + |
| 10 | +_endpoint_name = os.environ.get('EndpointName', 'no-endpoint-name') |
| 11 | +_statemachine_arn = os.environ.get('StateMachineArn', 'no-statemachine-arn') |
| 12 | +_bucket_key_for_input = os.environ.get('BucketKeyForInput', 'no-bucket-key-for-input') |
| 13 | +_bucket_key_for_output = os.environ.get('BucketKeyForOutput', 'no-bucket-key-for-output') |
| 14 | + |
| 15 | + |
| 16 | +def trigger_statemachine(client, execution_name, statemachine_arn, request): |
| 17 | + print('trigger_statemachine: request==>', request) |
| 18 | + |
| 19 | + if len(execution_name) > 80: |
| 20 | + execution_name = execution_name[:80] |
| 21 | + |
| 22 | + try: |
| 23 | + client.start_execution(stateMachineArn=statemachine_arn, |
| 24 | + name=execution_name, input=json.dumps(request)) |
| 25 | + except ClientError as e: |
| 26 | + print('Error : trigger_statemachine - {}'.format(e.response['Error']['Message'])) |
| 27 | + |
| 28 | + |
| 29 | +def get_request_template(): |
| 30 | + return { |
| 31 | + "PreprocessGlue": { |
| 32 | + "--S3_INPUT_FILE": "s3://bucket-name/input/xxx/input.csv", |
| 33 | + "--S3_TRAIN_KEY": "s3://bucket-name/output/xxx/train/", |
| 34 | + "--S3_VALIDATE_KEY": "s3://bucket-name/output/xxx/validate/" |
| 35 | + }, |
| 36 | + "TrainSageMaker": { |
| 37 | + "TrainingJobName": "projectprefix-xxx-input", |
| 38 | + "TrainData": "s3://bucket-name/output/xxx/train/", |
| 39 | + "ValidateData": "s3://bucket-name/output/xxx/validate/", |
| 40 | + "TrainOutput": "s3://bucket-name/output/xxx/model/projectprefix-xxx-input/" |
| 41 | + }, |
| 42 | + "ServeSageMaker": { |
| 43 | + "ModelName": "projectprefix-xxx-input", |
| 44 | + "EndpointConfigName": "projectprefix-xxx-input", |
| 45 | + "EndpointName": "churn-xgboost" |
| 46 | + } |
| 47 | + } |
| 48 | + |
| 49 | + |
| 50 | +def handle(event, context): |
| 51 | + print('event--->', event) |
| 52 | + |
| 53 | + client = boto3.client('stepfunctions') |
| 54 | + |
| 55 | + for record in event['Records']: |
| 56 | + bucket_name: str = record['s3']['bucket']['name'] |
| 57 | + input_file_key: str = record['s3']['object']['key'] |
| 58 | + print('etl_input_file_key => {}'.format(input_file_key)) |
| 59 | + |
| 60 | + if input_file_key.find('.') > -1: |
| 61 | + temp = input_file_key.replace(f'{_bucket_key_for_input}/', f'{_bucket_key_for_output}/', 1) |
| 62 | + output_dir_key = temp.split('.')[0] |
| 63 | + |
| 64 | + temp = output_dir_key.replace(f'{_bucket_key_for_output}/', '', 1) |
| 65 | + temp = temp.replace("/", "-") |
| 66 | + execution_name = f'{_endpoint_name}-{temp}' |
| 67 | + |
| 68 | + request = get_request_template() |
| 69 | + request['PreprocessGlue']['--S3_INPUT_FILE'] = f's3://{bucket_name}/{input_file_key}' |
| 70 | + request['PreprocessGlue']['--S3_TRAIN_KEY'] = f's3://{bucket_name}/{output_dir_key}/data/train/' |
| 71 | + request['PreprocessGlue']['--S3_VALIDATE_KEY'] = f's3://{bucket_name}/{output_dir_key}/data/validate/' |
| 72 | + |
| 73 | + request['TrainSageMaker']['TrainingJobName'] = execution_name |
| 74 | + request['TrainSageMaker']['TrainData'] = request['PreprocessGlue']['--S3_TRAIN_KEY'] |
| 75 | + request['TrainSageMaker']['ValidateData'] = request['PreprocessGlue']['--S3_VALIDATE_KEY'] |
| 76 | + request['TrainSageMaker']['TrainOutput'] = f's3://{bucket_name}/{output_dir_key}/model/' |
| 77 | + |
| 78 | + request['ServeSageMaker']['ModelName'] = execution_name |
| 79 | + request['ServeSageMaker']['EndpointConfigName'] = execution_name |
| 80 | + request['ServeSageMaker']['EndpointName'] = _endpoint_name |
| 81 | + |
| 82 | + trigger_statemachine(client, execution_name, _statemachine_arn, request) |
0 commit comments