42
loading...
This website collects cookies to deliver better user experience
// lib/sync-sf-stack.ts
const detectSentimentFn = createFn(this, 'detectSentimentFn')
detectSentimentFn.addToRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['comprehend:DetectSentiment'],
resources: ['*'],
})
)
const detectSentiment = new tasks.LambdaInvoke(this, 'detectSentiment', {
lambdaFunction: detectSentimentFn,
resultPath: '$.sentimentResult',
})
aws-lambda-nodejs
function via a helper named createFn. We then add a role stating that it has permissions to detect sentiment using Comprehend. Finally, we create a new LambdaInvoke
task that accepts this function and stores the result in a key named sentimentResult
.resultPath
to store each step's results on the final result object which makes it easier to pass between steps and perform custom processing. Payload
under the key that we specify. For e.g. If we set the result path as $.data
, then the output of that step will be available under:{
"data": {
"Payload": "result-here"
}
}
// functions/detectSentimentFn.ts
import { Comprehend } from 'aws-sdk'
const cp = new Comprehend({ apiVersion: '2017-11-27' })
type Event = {
message: string
}
export const handler = async (event: Event) => {
const data = await cp
.detectSentiment({
LanguageCode: 'en',
Text: event.message,
})
.promise()
return data
}
detectSentiment
on the feedback that we receive from our endpoint. We will be looking at how to send feedback from the created API in the final section.// lib/sync-sf-stack.ts
const generateReferenceNumber = new tasks.LambdaInvoke(
this,
'generateReferenceNumber',
{
lambdaFunction: createFn(this, 'generateReferenceNumberFn'),
resultPath: '$.ticketId',
}
)
LambdaInvoke
task that will generate a reference ID for the feedback and store it in a key named ticketId
. The function simply does this:// functions/generateReferenceNumberFn.ts
import { ulid } from 'ulid'
export const handler = async () => {
return ulid()
}
// lib/sync-sf-stack.ts
const formData = new ddb.Table(this, 'formData', {
partitionKey: { name: 'formId', type: ddb.AttributeType.STRING },
billingMode: ddb.BillingMode.PAY_PER_REQUEST,
removalPolicy: cdk.RemovalPolicy.DESTROY,
})
const saveCustomerMessage = new tasks.DynamoPutItem(
this,
'saveCustomerMessage',
{
table: formData,
item: {
formId: tasks.DynamoAttributeValue.fromString(
sfn.JsonPath.stringAt('$.ticketId.Payload')
),
customerMessage: tasks.DynamoAttributeValue.fromString(
sfn.JsonPath.stringAt('$.message')
),
sentiment: tasks.DynamoAttributeValue.fromString(
sfn.JsonPath.stringAt('$.sentimentResult.Payload.Sentiment')
),
},
resultPath: '$.formDataRecord',
}
)
DynamoPutItem
task as Step functions can directly interact with DynamoDB without a need for a Lambda. This will save a record in the table we specify with the following values:formId
which is taken from the output path stored in ticketId
.customerMessage
that directly comes from the frontend.sentiment
result obtained from the very first step.sfn.JsonPath
extracts the value present currently in the output of our Step Function workflow.// lib/sync-sf-stack.ts
const notifyOfNegativeSentimentFn = createFn(
this,
'notifyOfNegativeSentimentFn'
)
.addEnvironment('SENDER', process.env.SENDER)
.addEnvironment('RECEIVER', process.env.RECEIVER)
notifyOfNegativeSentimentFn.addToRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['ses:SendEmail'],
resources: ['*'],
})
)
const checkSentiment = new sfn.Choice(this, 'checkSentiment')
.when(
sfn.Condition.stringEquals(
'$.sentimentResult.Payload.Sentiment',
'NEGATIVE'
),
new tasks.LambdaInvoke(this, 'notifyOfNegativeSentiment', {
lambdaFunction: notifyOfNegativeSentimentFn,
resultPath: '$.notifyViaEmail',
})
)
.otherwise(new sfn.Succeed(this, 'positiveSentiment'))
Choice
state that checks for a specific condition and performs operations when that condition matches. Think of this as a branching (if/else
) statement.NEGATIVE
. If so, we initiate a Lambda function that sends an email via AWS SES to the intended recipient. If the sentiment is not NEGATIVE
, do nothing and succeed.when
and otherwise
.SENDER
and RECEIVER
. These need to be verified in SES for now so that you can send/receive messages. SESV2
from aws-sdk
to send an email.// functions/notifyOfNegativeSentimentFn.ts
import { SESV2 } from 'aws-sdk'
const ses = new SESV2({ region: process.env.AWS_REGION })
export const handler = async (event: any) => {
const Data = `Sentiment analysis: ${event.sentimentResult.Payload.Sentiment}
Feedback from customer: ${event.message}`
await ses
.sendEmail({
FromEmailAddress: process.env.SENDER,
Destination: {
ToAddresses: [process.env.RECEIVER],
},
Content: {
Simple: {
Subject: { Charset: 'UTF-8', Data: 'Feedback form submission' },
Body: { Text: { Charset: 'UTF-8', Data } },
},
},
})
.promise()
return {
body: 'Feedback submitted successfully!',
}
}
RECEIVER
.// lib/sync-sf-stack.ts
const definition = detectSentiment
.next(generateReferenceNumber)
.next(saveCustomerMessage)
.next(checkSentiment)
this.sentimentAnalysis = new sfn.StateMachine(this, 'sentimentAnalysis', {
definition,
stateMachineType: sfn.StateMachineType.EXPRESS,
timeout: cdk.Duration.seconds(30),
logs: {
destination: new logs.LogGroup(this, 'sentimentAnalysisLogs', {
retention: logs.RetentionDays.ONE_WEEK,
}),
},
})
formData.grantWriteData(this.sentimentAnalysis)
definition
here maps the entire workflow of the step function and will look like this as we saw before.definition
to a StateMachine
we named sentimentAnalysis
and defined the type of the State Machine to be EXPRESS
.// bin/sync-sf.ts
const app = new cdk.App()
const env = { region: process.env.CDK_REGION || 'us-east-1' }
const sfn = new SyncSfStack(app, 'SyncSfStack', { env })
new ApiStack(app, 'ApiGwStack', { env, sfn: sfn.sentimentAnalysis })
SyncSfStack
that includes the State Machine and pass it as props to the ApiStack
which contains the API Gateway resource.// lib/api-gw-stack.ts
const apiLogs = new logs.LogGroup(this, 'myApiLogs', {
removalPolicy: cdk.RemovalPolicy.DESTROY,
retention: logs.RetentionDays.ONE_WEEK,
})
const api = new apiGw.RestApi(this, 'myApi', {
endpointTypes: [apiGw.EndpointType.REGIONAL],
deployOptions: {
stageName: 'dev',
loggingLevel: apiGw.MethodLoggingLevel.ERROR,
accessLogDestination: new apiGw.LogGroupLogDestination(apiLogs),
},
defaultCorsPreflightOptions: {
allowOrigins: ['*'],
},
})
REGIONAL
endpoint for this example, but you can use EDGE
as well. We also configured CORS to enable access from anywhere and Logging so that we can view the access logs via CloudWatch.POST
method that accepts a feedback message from the customer.// lib/api-gw-stack.ts
api.root.addMethod('POST', sfIntegration, {
operationName: 'Submit Feedback Form',
requestValidatorOptions: { validateRequestBody: true },
requestModels: {
'application/json': new apiGw.Model(this, 'feedbackFormPayload', {
restApi: api,
schema: {
schema: apiGw.JsonSchemaVersion.DRAFT4,
title: 'Feedback Form Payload',
type: apiGw.JsonSchemaType.OBJECT,
required: ['message'],
properties: {
message: {
type: apiGw.JsonSchemaType.STRING,
minLength: 1,
},
},
},
}),
},
methodResponses: [{ statusCode: '200' }],
})
POST
method that accepts message
in the body. This will be validated as we have specified the requestModels
for what our method accepts. This makes sure the data is sanitised before reaching our Step Function.sfIntegration
to our method. This would be the direct integration from API Gateway to Step Functions.// lib/api-gw-stack.ts
const sfIntegrationRole = new iam.Role(this, 'asyncApiApigRole', {
assumedBy: new iam.ServicePrincipal('apigateway.amazonaws.com'),
})
sfIntegrationRole.addToPolicy(
new iam.PolicyStatement({
resources: [props.sfn.stateMachineArn],
actions: ['states:StartSyncExecution'],
})
)
// Step Functions integration
const sfIntegration = new apiGw.AwsIntegration({
service: 'states',
action: 'StartSyncExecution',
options: {
credentialsRole: sfIntegrationRole,
passthroughBehavior: apiGw.PassthroughBehavior.NEVER,
requestParameters: {
'integration.request.header.Content-Type': `'application/json'`,
},
requestTemplates: {
'application/json': JSON.stringify({
input: `$util.escapeJavaScript($input.json('$'))`,
stateMachineArn: props.sfn.stateMachineArn,
}),
},
integrationResponses: [
{
statusCode: '200',
responseTemplates: {
'application/json': `
#set($output = $input.path('$.output'))
#set($root = $util.parseJson($output))
{
"ticketId": "$root.ticketId.Payload",
"message": "Feedback submitted successfully!"
}`,
},
},
],
},
})
props
to our ApiStack
and we assign the resource's arn
specifically to this role.AWSIntegration
where we specify the service name. states
is the service for Step Functions just like sqs
is for Simple Queue Service. Then we configure the action to be StartSyncExecution
, similar to what we gave it permission for.options
one by one.credentialsRole
to allow this integration access to executing the Step Function. The passthroughBehaviour
makes sure that any request that doesn't match the header's Content-Type
gets discarded. The requestParameters
are parameters we would like to send when we make the API call to our Step Function. The only thing we set here is the Content-Type
as Step Functions accepts JSON.requestTemplates
. We need to send two values in our JSON body.$
here means the entire request body.stateMachineArn
which we obtain via props from the State Machine stack.integrationResponses
. This also uses VTL to parse the response returned by the Step Function where we return the ticketId
along with a success message.yarn cdk deploy --all
yarn cdk destroy --all