Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws-stepfunctions: Distributed Map State Item Processor executionType doesn't recognize ProcessorType.EXPRESS #30194

Open
Commas929 opened this issue May 14, 2024 · 2 comments · May be fixed by #30301
Assignees
Labels
@aws-cdk/aws-stepfunctions Related to AWS StepFunctions bug This issue is a bug. effort/small Small work item – less than a day of effort p2

Comments

@Commas929
Copy link

Describe the bug

I have the following piece of CDK code but when I deploy it and check in AWS console, the step function execution type for the map run is Standard instead of Express.

The code:

    const qotOverrideDistributedMapState = new DistributedMap(this, 'Distributed Map State', {
      maxConcurrency: 1000,
      itemBatcher: new ItemBatcher({
        maxItemsPerBatch: 10
      }),
      itemReader: new S3CsvItemReader({
        bucket: qotOverrideBucket,
        key: JsonPath.stringAt('$.s3Key'),
        csvHeaders: CsvHeaders.useFirstRow()
      }),
      resultWriter: new ResultWriter({
        bucket: qotOverrideBucket,
        prefix: 'QoTOverrideProcessingResult',
      }),
      toleratedFailureCount: 0,
    }).itemProcessor(new LambdaInvoke(this, 'Invoke Lambda Processor', {
      lambdaFunction: qotOverrideProcessor,
    }), {
      mode: ProcessorMode.DISTRIBUTED,
      executionType: ProcessorType.EXPRESS,
    })

    const qotOverrideFileProcessor = new StateMachine(this, 'QoT Override File Processor', {
      role: Role.fromRoleArn(this, 'QoT-Override-File-Processor-SfnRole', qotOverrideSfnRole.roleArn, {
        mutable: false,
      }),
      definitionBody: DefinitionBody.fromChainable(qotOverrideDistributedMapState),
      stateMachineName: 'QoT-Override-File-Processor-Sfn',
      stateMachineType: StateMachineType.STANDARD,
      logs: {
        destination: new LogGroup(this, 'QoT-Override-File-Processor-LogGroup', {
          retention: RetentionDays.THREE_MONTHS,
          logGroupName: 'QoT-Override-File-Processor-Log',
        }),
        level: LogLevel.ERROR,
        includeExecutionData: true,
      },
    })

The generated State Machine definition copied from AWS console:

{
  "StartAt": "Distributed Map State",
  "States": {
    "Distributed Map State": {
      "Type": "Map",
      "End": true,
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "DISTRIBUTED",
          "ExecutionType": "STANDARD"
        },
        "StartAt": "Invoke Lambda Processor",
        "States": {
          "Invoke Lambda Processor": {
            "End": true,
            "Retry": [
              {
                "ErrorEquals": [
                  "Lambda.ClientExecutionTimeoutException",
                  "Lambda.ServiceException",
                  "Lambda.AWSLambdaException",
                  "Lambda.SdkClientException"
                ],
                "IntervalSeconds": 2,
                "MaxAttempts": 6,
                "BackoffRate": 2
              }
            ],
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "Parameters": {
              "FunctionName": "arn:aws:lambda:us-east-1:502523373620:function:QoTOverrideProcessor",
              "Payload.$": "$"
            }
          }
        }
      },
      "MaxConcurrency": 1000,
      "ItemReader": {
        "Resource": "arn:aws:states:::s3:getObject",
        "ReaderConfig": {
          "InputType": "CSV",
          "CSVHeaderLocation": "FIRST_ROW"
        },
        "Parameters": {
          "Bucket.$": "$.s3Bucket",
          "Key.$": "$.s3Key"
        }
      },
      "ItemBatcher": {
        "MaxItemsPerBatch": 10
      },
      "ResultWriter": {
        "Resource": "arn:aws:states:::s3:putObject",
        "Parameters": {
          "Bucket.$": "$.s3Bucket",
          "Prefix": "QoTOverrideProcessingResult"
        }
      }
    }
  }
}

Expected Behavior

The ItemProcessor execution type should be Express

Current Behavior

The ItemProcessor execution type is Standard and the specified type in CDK is ignored.

Reproduction Steps

    const bucket = new Bucket(this, 'Bucket', {});
    const distributedMapState = new DistributedMap(this, 'Distributed Map State', {
      maxConcurrency: 1000,
      itemReader: new S3CsvItemReader({
        bucket: bucket,
        key: JsonPath.stringAt('$.s3Key'),
        csvHeaders: CsvHeaders.useFirstRow()
      }),
      toleratedFailureCount: 0,
    }).itemProcessor(new Pass(this, 'Pass State', {}), {
      mode: ProcessorMode.DISTRIBUTED,
      executionType: ProcessorType.EXPRESS,
    })

    const fileProcessor = new StateMachine(this, 'File Processor', {
      definitionBody: DefinitionBody.fromChainable(distributedMapState),
      stateMachineName: 'File-Processor-Sfn',
      stateMachineType: StateMachineType.STANDARD,
    })

Possible Solution

No response

Additional Information/Context

No response

CDK CLI Version

2.136.0

Framework Version

No response

Node.js Version

NodeJS-18

OS

macOS Sonoma 14.4.1

Language

TypeScript

Language Version

No response

Other information

No response

@Commas929 Commas929 added bug This issue is a bug. needs-triage This issue or PR still needs to be triaged. labels May 14, 2024
@github-actions github-actions bot added the @aws-cdk/aws-stepfunctions Related to AWS StepFunctions label May 14, 2024
@khushail khushail added investigating This issue is being investigated and/or work is in progress to resolve the issue. and removed needs-triage This issue or PR still needs to be triaged. labels May 14, 2024
@khushail khushail self-assigned this May 14, 2024
@khushail
Copy link
Contributor

khushail commented May 16, 2024

Hey @Commas929 , thanks for reporting this bug.

On investigating this issue, I checked the CDK Docs and found that this PR introduced the support for itemProcessor() where executiontype can be set to required enum value-

public itemProcessor(processor: IChainable, config: ProcessorConfig = {}): DistributedMap {

However later on this value was overwritten by change introduced in this PR -

rendered.ItemProcessor.ProcessorConfig.ExecutionType = this.mapExecutionType;

So I tried reunning the code with giving value of mapExecutionType to EXPRESS and it worked by assigning the same value to executionType. However I feel this might not be intended as such.

Sharing the code -

    const distributedMapState = new DistributedMap(this, 'Distributed Map State', {
      maxConcurrency: 1000,
      itemReader: new S3CsvItemReader({
        bucket: bucket,
        key: JsonPath.stringAt('$.s3Key'),
        csvHeaders: CsvHeaders.useFirstRow()
      }),
      mapExecutionType: StateMachineType.EXPRESS,
      resultPath: JsonPath.DISCARD,
      toleratedFailureCount: 0,
    }).itemProcessor(new Pass(this, 'Pass State', {}), {
      mode: ProcessorMode.DISTRIBUTED,
      executionType: ProcessorType.STANDARD,
    })

    const fileProcessor = new StateMachine(this, 'File Processor', {
      definitionBody: DefinitionBody.fromChainable(distributedMapState),
      stateMachineName: 'File-Processor-Sfn',
      stateMachineType: StateMachineType.STANDARD,
    })

and synthesized template -


"FileProcessor4197797F": {
   "Type": "AWS::StepFunctions::StateMachine",
   "Properties": {
    "DefinitionString": {
     "Fn::Join": [
      "",
      [
       "{\"StartAt\":\"Distributed Map State\",\"States\":{\"Distributed Map State\":{\"Type\":\"Map\",\"ResultPath\":null,\"End\":true,\"ItemProcessor\":{\"ProcessorConfig\":{\"Mode\":\"DISTRIBUTED\",\"ExecutionType\":\"EXPRESS\"},\"StartAt\":\"Pass State\",\"States\":{\"Pass State\":{\"Type\":\"Pass\",\"End\":true}}},\"MaxConcurrency\":1000,\"ItemReader\":{\"Resource\":\"arn:",
       {
        "Ref": "AWS::Partition"
       },
       ":states:::s3:getObject\",\"ReaderConfig\":{\"InputType\":\"CSV\",\"CSVHeaderLocation\":\"FIRST_ROW\"},\"Parameters\":{\"Bucket\":\"",
       {
        "Ref": "Bucket2001301D4930"
       },
       "\",\"Key.$\":\"$.s3Key\"}}}}}"
      ]
     ]
    },

You could try using Escape hatches for workaround. Marking the issue as appropriate.

@khushail khushail added p2 effort/small Small work item – less than a day of effort response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days. and removed investigating This issue is being investigated and/or work is in progress to resolve the issue. response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days. labels May 16, 2024
@mazyu36
Copy link
Contributor

mazyu36 commented May 18, 2024

I encountered this problem before , and I’m working on it.

I think the executionType property in the itemProcessor config is not needed to set when use the DistributedMap Construct.

The mapExecutionType in the DistributedMap Construct is used to set the Processing Mode of the ItemProcessor, and its purpose overlaps with the executionType property in the ItemProcessor Config.

In the current implementation, the mapExecutionType in the DistributedMap Construct always overwrites the Processing Mode of the ItemProcessor.
So the executionType property in the itemProcessor config is meaningless.

The mapExecutionType in the DistributedMap Construct may have been an unnecessary property, but removing it would cause a breaking change, so it should be avoided.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@aws-cdk/aws-stepfunctions Related to AWS StepFunctions bug This issue is a bug. effort/small Small work item – less than a day of effort p2
Projects
None yet
3 participants