Collecting and storing real-time data with Kinesis Data Streams, Kinesis Agent, Lambda, and DynamoDB

In this post I will demonstrate how to setup an end-to-end application that collects, processes, and stores real-time purchase order data at a fictitious web site – VarietyGifts.com. The purchase order data gets generated on an EC2 instance using the original dataset from University of California Irvine:

https://archive.ics.uci.edu/ml/datasets/online+retail

In the first part of the article, I demonstrate the construction of the entire pipeline manually, and then, in the second part, I go over the CloudFormation template that automates the entire deployment process.

The source code and the CloudFormation template for this article can be downloaded from this repository: https://github.com/cloud21sak/PurchaseOrderSimulator

High-level solution overview

The end-to-end scenario described in this post uses Kinesis Agent to read the simulated purchase order records from the log file, and pushes the data into Kinesis Data Stream. The lambda function is configured to use Kinesis Data Stream as a source trigger to read the streaming data, and then to store it into a DynamoDB table.

The following diagram shows the end-to-end solution:

I will next go over the manual setup of each layer of the application.

Manual setup of the layers

  1. Log onto a running EC2 instance from remote terminal. Note that here you can use just the basic t2.micro instance type.
  2. Install Kinesis Agent:
    • sudo yum install –y aws-kinesis-agent
  3. Download OrderRecordSimulator.zip package:
  4. Execute:
    • unzip OrderRecordSimulator.zip
  5. You should now have two additional files: OnlineRetail.csv and OrderRecordSimulator.py
  6. Next, we need to make OrderRecordSimulator.py an executable module:
    • sudo chmod a+x OrderRecordSimulator.py
  7. Create a subfolder where the simulator will be writing generated purchase order log files:
    • sudo mkdir /var/log/varietygifts
  8. To keep things simple, and to make sure that both OrderRecordSimulator.py and Kinesis Agent have all the required permissions, I have assigned a role with AdministrativeAccess to the EC2 instance. In the real production environment, the granted permissions should obviously be more granular.
  9. From Management Console, create Kinesis Data Stream instance:
    • Stream name: VarietyGiftOrders
    • Number of open shards: 1
    • Use defaults for everything else
  10. Now that we have data stream set up, we can configure Kinesis Agent to read the log files, and push data into the stream:
    • From remote terminal, on the EC2 instance, go to aws-kinesis folder: cd /etc/aws-kinesis
    • Edit the file agent.json: sudo nano agent.json
    • Replace everything in the file with this data:
				
					{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",
  "flows": [
    {
      "filePattern": "/var/log/varietygifts/*.log",
      "kinesisStream": "VarietyGiftOrders",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
        {
          "optionName": "CSVTOJSON",
          "customFieldNames": [
            "InvoiceNo",
            "StockCode",
            "Description",
            "Quantity",
            "InvoiceDate",
            "UnitPrice",
            "Customer",
            "Country"
          ]
        }
      ]
    }
  ]
}
				
			

11. Start kinesis agent: sudo service aws-kinesis-agent start, and sudo chkconfig aws-kinesis-agent on

12. In Management Console, create a DynamoDB table as shown below:

13. Finally, we need to setup a Lambda function that will read records from Kinesis Stream, and store them in the VarietyGiftOrders table:

  • We need to create a role for Lambda function with permissions to read from Kinesis Stream, and write into DynamoDB table.
  • In Management Console, go to IAM service, and create a role, VarietyGiftOrdersRole, for Lambda service with policies AmazonKinesisReadOnlyAccess, and AmazonDynamoDBFullAccess.
  • In Management Console, create Lambda function ProcessOrdersFunction
  • Choose runtime: Python 3.7
  • Under Permissions, choose existing role, and select VarietyGiftOrdersRole:

  • Next, add trigger to the Lambda function:
    • select Kinesis
    • select data stream VarietyGiftOrders:
  • Next, paste lambda code from processorderslambda.py into the “Code” area, and select the “Deploy” button:
  • Go to the remote terminal, login to the EC2 instance, and generate 500 records by running the following command:
    • sudo ./OrderRecordSimulator.py 500:
  • Verify that the kinesis agent has processed the generated records:
    • tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

  • If the agent has been configured correctly, you should see this kind of message in the log file :
  • In the Management Console, go to the DynamoDB service, and in the VarietyGiftOrders table, under the Items tab you should see the 500 records that our Lambda function has written:

Automated deployment with AWS CloudFormation

The AWS CloudFormation template includes all the steps for this end-to-end solution. I will next go over the important sections of the template:

Parameters:

  • S3SrcBucketName: The name of the source bucket where the lambda package gets uploaded.
  • S3SrcKeyPrefix: The prefix that can be used to separate deployment packages, for example, between production and development.
  • KeyName: Name of an existing EC2 KeyPair to enable SSH access to the instances.
  • InstanceType: EC2 instance type (t2.small is used as default type in our case)

Resources:

  • LambdaPackageBucket: This bucket is used by CloudFormation to deploy ProcessOrdersFunction lambda function. Note that our template copies the lambda package from the source bucket specified in the parameter S3SrcBucketName (more about this in the CopyPackagesFunction resource).
  • CopyPackages: Custom resource which CloudFormation uses during stack creation to copy ProcessOrdersFunction package into the LambdaPackageBucket. This custom resource has the following properties:
    • ServiceToken: Required property that associates the CopyPackagesFunction lambda function with the resource.
    • DestBucket: Name of the bucket where the lambda package will be deployed
    • SourceBucket: Name of the bucket where the lambda package is uploaded.
    • Prefix: Internal folder of the bucket to allow multiple deployment strategies (dev vs prod, for example)
    • Objects: Relative path of the package in the bucket
    • The template snippet below shows how this custom resource is defined:
				
					"CopyPackages": {
    "Type": "Custom::CopyPackages",
    "Properties": {
      "ServiceToken": { "Fn::GetAtt": ["CopyPackagesFunction", "Arn"] },
      "DestBucket": { "Ref": "LambdaPackageBucket" },
      "SourceBucket": { "Ref": "S3SrcBucketName" },
      "Prefix": { "Ref": "S3SrcKeyPrefix" },
      "Objects": ["functions/packages/processorderslambda.zip"]
    }
  }
				
			
  • CopyPackagesRole: IAM role that is assigned to the CopyPackagesFunction lambda function. In the template the policy that is attached to this role has two policy statements: the first one allows the function to read from the source bucket, and the second statement allows the read/write permission. Note that the path to each bucket is constructed dynamically using CloudFormation intrinsic function “Fn::Sub” to set the names of buckets during stack creation.
  • The template snippet below shows how this custom resource is defined:
				
					"CopyPackagesRole": {
    "Type": "AWS::IAM::Role",
    "Properties": {
      "AssumeRolePolicyDocument": {
        "Version": "2012-10-17",
        "Statement": [
          {
            "Effect": "Allow",
            "Principal": { "Service": "lambda.amazonaws.com" },
            "Action": "sts:AssumeRole"
          }
        ]
      },
      "ManagedPolicyArns": [
        "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
      ],
      "Path": "/",
      "Policies": [
        {
          "PolicyName": "lambda-copier",
          "PolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [
              {
                "Effect": "Allow",
                "Action": ["s3:GetObject"],
                "Resource": [
                  {
                    "Fn::Sub": "arn:aws:s3:::${S3SrcBucketName}/${S3SrcKeyPrefix}*"
                  }
                ]
              },
              {
                "Effect": "Allow",
                "Action": ["s3:PutObject", "s3:DeleteObject"],
                "Resource": [
                  {
                    "Fn::Sub": "arn:aws:s3:::${LambdaPackageBucket}/${S3SrcKeyPrefix}*"
                  }
                ]
              }
            ]
          }
        }
      ]
    }
  }
				
			
  • CopyPackagesFunction: Lambda function that copies processorderslambda.zip package from source to destination bucket during stack creation. Since this is a simple, boilerplate type function, it is implemented inline within the template. For better readability, I show the template snippet of this function in YAML:
				
					CopyPackagesFunction: 
    Type: 'AWS::Lambda::Function' 
    Properties: 
        Description: Copies objects from a source S3 bucket to a destination 
        Handler: index.handler 
        Runtime: python3.7 
        Role: !GetAtt 
            - CopyPackagesRole 
            - Arn 
        Timeout: 240 
        Code: ZipFile: | 
            import json 
            import logging 
            import threading 
            import boto3 
            import cfnresponse 
            def copy_objects(source_bucket, dest_bucket, prefix, objects): 
                s3 = boto3.client('s3') 
                for o in objects: 
                     key = prefix + o 
                     copy_source = { 'Bucket': source_bucket, 'Key': key } 
                     print(('copy_source: %s' % copy_source)) 
                     print(('dest_bucket = %s'%dest_bucket)) 
                     print(('key = %s' %key)) 
                     s3.copy_object(CopySource=copy_source, Bucket=dest_bucket, Key=key) 
            def delete_objects(bucket, prefix, objects): 
                s3 = boto3.client('s3') 
                objects = {'Objects': [{'Key': prefix + o} for o in objects]} 
                s3.delete_objects(Bucket=bucket, Delete=objects) 
            def timeout(event, context): 
                logging.error('Execution is about to time out, sending failure response to CloudFormation') 
                cfnresponse.send(event, context, cfnresponse.FAILED, {}, None) 
            def handler(event, context): 
                # make sure we send a failure to CloudFormation if the function 
                # is going to timeout 
                timer = threading.Timer((context.get_remaining_time_in_millis() / 1000.00) - 0.5, timeout, args=[event, context]) 
                timer.start() 
                print(('Received event: %s' % json.dumps(event))) 
                status = cfnresponse.SUCCESS 
                try: 
                    source_bucket = event['ResourceProperties']['SourceBucket'] 
                    dest_bucket = event['ResourceProperties']['DestBucket'] 
                    prefix = event['ResourceProperties']['Prefix'] 
                    objects = event['ResourceProperties']['Objects'] 
                    if event['RequestType'] == 'Delete': 
                         delete_objects(dest_bucket, prefix, objects) 
                    else: 
                        copy_objects(source_bucket, dest_bucket, prefix, objects) 
                except Exception as e: 
                    logging.error('Exception: %s' % e, exc_info=True) 
                    status = cfnresponse.FAILED 
                finally: 
                    timer.cancel() 
                    cfnresponse.send(event, context, status, {}, None) 
				
			
  • VarietyGiftOrdersRole: The IAM role assigned to the lambda function ProcessOrdersFunction. Since this function reads data from Kinesis Data Streams, and writes into the DynamoDB table, we assign AmazonKinesisReadOnlyAccess, and AmazonDynamoDBFullAccess managed policies under the ManagedPolicyArns property. Note that in real production setting these policies would need to be made more granular:
				
					 "VarietyGiftOrdersRole": {
    "Type": "AWS::IAM::Role",
    "Properties": {
      "AssumeRolePolicyDocument": {
        "Version": "2012-10-17",
        "Statement": [
          {
            "Effect": "Allow",
            "Principal": { "Service": "lambda.amazonaws.com" },
            "Action": "sts:AssumeRole"
          }
        ]
      },
      "ManagedPolicyArns": [
        "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
        "arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess",
        "arn:aws:iam::aws:policy/AmazonKinesisReadOnlyAccess"
      ]
    }
  }
				
			
  • EventSourceMapping: This resource creates a mapping between an event source (VarietyGiftOrders stream) and an AWS Lambda function (ProcessOrdersFunction). Lambda reads items from the event source and triggers the function:
				
					"EventSourceMapping": {
    "Type": "AWS::Lambda::EventSourceMapping",
    "Properties": {
      "EventSourceArn": {
        "Fn::Join": [
          "",
          [
            "arn:aws:kinesis:",
            { "Ref": "AWS::Region" },
            ":",
            { "Ref": "AWS::AccountId" },
            ":stream/",
            { "Ref": "VarietyGiftOrdersStream" }
          ]
        ]
      },
      "FunctionName": { "Fn::GetAtt": ["ProcessOrdersFunction", "Arn"] },
      "StartingPosition": "LATEST"
    }
  }
				
			
  • ProcessOrdersFunction: CloudFormation calls Lambda service to deploy this function which is located in an S3 bucket LambdaPackageBucket. The Code property specifies the bucket and the folder path where the processorderslambda.zip package is located:
				
					"ProcessOrdersFunction": {
    "DependsOn": "CopyPackages",
    "Type": "AWS::Lambda::Function",
    "Properties": {
      "Description": "Example",
      "Handler": "processorderslambda.lambda_handler",
      "Runtime": "python3.7",
      "Role": { "Fn::GetAtt": ["VarietyGiftOrdersRole", "Arn"] },
      "Timeout": 300,
      "Code": {
        "S3Bucket": { "Ref": "LambdaPackageBucket" },
        "S3Key": {
          "Fn::Sub": "${S3SrcKeyPrefix}functions/packages/processorderslambda.zip"
        }
      }
    }
  };
				
			
  • VarietyGiftOrdersStream: Kinesis Data Stream resource:
				
					"VarietyGiftOrdersStream": {
    "Type": "AWS::Kinesis::Stream",
    "Properties": { "Name": "VarietyGiftOrders", "ShardCount": 1 }
  }
				
			
  • DDBTable: DynamoDB table resource. Note that here the CustomerID and OrderID fields must be defined under both the AttributeDefinitions and KeySchema properties:

				
					"DDBTable": {
    "Type": "AWS::DynamoDB::Table",
    "Properties": {
      "TableName": "VarietyGiftOrders",
      "AttributeDefinitions": [
        { "AttributeName": "CustomerID", "AttributeType": "N" },
        { "AttributeName": "OrderID", "AttributeType": "S" }
      ],
      "KeySchema": [
        { "AttributeName": "CustomerID", "KeyType": "HASH" },
        { "AttributeName": "OrderID", "KeyType": "RANGE" }
      ],
      "ProvisionedThroughput": {
        "ReadCapacityUnits": 5,
        "WriteCapacityUnits": 5
      }
    }
  }
				
			
  • ec2AdminRole: The IAM role with administrative access policy assigned to the EC2 instance.
  • InstanceAdminProfile: The IAM instance profile resource that is used to pass the ec2AdminRole to the EC2 instance (VarietyGiftsServer). This resource is referenced from the IamInstanceProfile property in the VarietyGiftsServer resource template:
				
					"InstanceAdminProfile": {
    "Type": "AWS::IAM::InstanceProfile",
    "Properties": { "Path": "/", "Roles": [{ "Ref": "ec2AdminRole" }] }
  }
				
			
  • WebServerSecurityGroup: Here I just used the basic template for webserver security group
  • VarietyGiftsServer: The EC2 resource that hosts our applications: OrderRecordSimulator.py and Kinesis Agent. There are a few issues that need to be addressed as part of the resource creation. Our applications need to be properly configured before they are called. We need to make sure that CloudFormation does not set the resource status to CREATE_COMPLETE until all installation and configuration is complete on the instance. The following steps are required to make sure our application and kinesis agent are properly configured as part of VarietyGiftsServer resource creation:
    • We need to use AWS::CloudFormation::Init metadata type to include metadata on the EC2 instance for the cfn-init helper script.
    • The template AWS::CloudFormation::Init metadata has one configSet: kinesisagent_install with two config keys: install_cfn and install_agent. I will focus only on the install_agent key because the other one is not relevant here.
    • cfn-init will process configuration sections under the install_agent key in the following order:
      1. packages: downloads and installs aws-kinesis-agent package
      2. sources: downloads and unzips OrderRecordSimulator.zip
      3. files: this is where the original agent.json file is replaced with this one. Note how we dynamically set the values for “kinesisStream”, “filePattern”, and “customFieldNames” keys. Note also that Kinesis Agent provides us with an option to convert the original CSV format into JSON in the key: optionName.
      4. commands: here first the subfolder varietygifts gets created where we will be writing our purchase order log files, and then we make sure that OrderRecordSimulator.py is an executable module.
      5. services: here the aws-kinesis-agent is started
				
					"VarietyGiftsServer": {
    "Type": "AWS::EC2::Instance",
    "Metadata": {
      "AWS::CloudFormation::Init": {
        "configSets": {
          "kinesisagent_install": ["install_cfn", "install_agent"]
        },
        "install_agent": {
          "packages": { "yum": { "aws-kinesis-agent": [] } },
          "commands": {
            "command1": {
              "command": "sudo mkdir /var/log/varietygifts",
              "cwd": "~"
            },
            "command2": {
              "command": "sudo chmod a+x OrderRecordSimulator.py",
              "cwd": "/"
            }
          },
          "sources": {
            "/": "https://raw.githubusercontent.com/cloud21sak/PurchaseOrderSimulator/main/OrderRecordSimulator.zip"
          },
          "files": {
            "/etc/aws-kinesis/agent.json": {
              "content": {
                "Fn::Join": [
                  "",
                  [
                    "{ \n",
                    "\"cloudwatch.emitMetrics\": ",
                    "true",
                    ",\n",
                    "\"kinesis.endpoint\": ",
                    "\"\"",
                    ",\n",
                    "\"firehose.endpoint\": ",
                    "\"\"",
                    ",\n",
                    "\"flows\": ",
                    "[",
                    "\n",
                    "{ \n",
                    "\"filePattern\": ",
                    "\"/var/log/varietygifts/*.log\",\n",
                    "\"kinesisStream\": ",
                    "\"VarietyGiftOrders\",\n",
                    "\"partitionKeyOption\": ",
                    "\"RANDOM\",\n",
                    "\"dataProcessingOptions\": [\n",
                    "{ \n",
                    "\"optionName\": ",
                    "\"CSVTOJSON\",\n",
                    "\"customFieldNames\": ",
                    "[\"InvoiceNo\", \"StockCode\", \"Description\", \"Quantity\", \"InvoiceDate\", \"UnitPrice\", \"Customer\", \"Country\"]\n",
                    "} ] } ] \n",
                    "} \n"
                  ]
                ]
              },
              "mode": "000777",
              "owner": "root",
              "group": "root"
            }
          },
          "services": {
            "sysvinit": {
              "aws-kinesis-agent": {
                "enabled": "true",
                "ensureRunning": "true"
              }
            }
          }
        }
      }
    }
  }
				
			

The template snippet below shows the UserData property and the CreationPolicy attribute configurations:

				
					"UserData": {
    "Fn::Base64": {
      "Fn::Join": [
        "",
        [
          "#!/bin/bash -xe\n",
          "yum update -y aws-cfn-bootstrap\n",
          "/opt/aws/bin/cfn-init -v ",
          " --stack ",
          { "Ref": "AWS::StackName" },
          " --resource VarietyGiftsServer ",
          " --configsets kinesisagent_install ",
          " --region ",
          { "Ref": "AWS::Region" },
          "\n",
          "/opt/aws/bin/cfn-signal -e $? ",
          " --stack ",
          { "Ref": "AWS::StackName" },
          " --resource VarietyGiftsServer ",
          " --region ",
          { "Ref": "AWS::Region" },
          "\n"
        ]
      ]
    }
  },
  "CreationPolicy": { "ResourceSignal": { "Timeout": "PT15M" } }
				
			
  • Running the yum update -y aws-cfn-bootstrap command ensures that we get the latest helper scripts
  • Next, cfn-init runs, reads metadata from AWS::CloudFormation::Init, and processes configuration sections described above
  • The CreationPolicy attribute associated with the EC2 instance, tells CloudFormation to suspend stack creation until it receives signal that the EC2 instance has been successfully created. The timeout is set to 15minutes, and the number of signals for CloudFormation to receive is set to the default value of 1.
  • After cfn-init completes, the cfn-signal helper script runs, CloudFormation receives signal, and continues the stack creation process.

Create source bucket

Before launching CloudFormation stack creation process, we need to create the source bucket where we upload the processorderslambda.zip package that CloudFormation uses to deploy our Lambda function.

  • In AWS Management Console, go to the S3 console, and select “Create bucket”
  • Choose a unique bucket name for your bucket, and leave everything else as defaults
  • In the created bucket, create nested folders: prod/ -> functions/ -> packages/
  • Upload processorderslambda.zip package to the packages folder of your bucket as shown below:

Note that you can also perform all of the above steps in this section with AWS CLI commands.

Create the stack

  • In AWS Management Console, go to the CloudFormation console.
  • Select “Create Stack with new resources”, and then select “Next”
  • Select “Upload a template file”, and upload VarietyGiftsStoreTemplate.json which you downloaded onto your local drive from the repository at the beginning of this article:
  • You can optionally view the template in the Designer, and validate it before starting the stack creation process.
  • Select the “Next” button
  • Enter the CloudFormation template parameters:
    • Stack name: varietygifts-stack
    • KeyName: the name of your existing EC2 instance KeyPair for SSH access
    • S3SrcBucketName: the bucket name you created to upload processorderslambda.zip package
    • All other parameter values can be left as defaults
  • Choose “Next” twice
  • At the bottom of the last page, select the checkbox, and choose “Create stack”
  • You can now watch the stack creation process in the “Events” tab of the CloudFormation console:
  • It took in my case a little over a minute to create the stack:
  • Notice how CloudFormation suspends EC2 instance in CREATE_IN_PROGRESS status until it receives the SUCCESS signal from cfn-signal helper script that all installations and configurations have been successfully completed on EC2 instance. Only after it had received the SUCCESS signal, it set the status to CREATE_COMPLETE, and the varietygifts-stack creation was completed.
  • Note that you can also create the stack with AWS CLI commands.

Running the application

  • Log onto VarietyGiftsServer EC2 instance from your terminal.
  • Go to the root folder, and you should see our two files: OrderRecordSimulator.py and OnlineRetail.csv:
  • Run the simulator with 1,000 records this time:
    • sudo ./OrderRecordSimulator.py 1000
  • Check the aws-kinesis-agent.log file:
    • tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log
  • In the DynamoDB console, in the VarietyGiftOrders table, you should see the records that the ProcessOrdersFunction lambda function has written:

Performance monitoring and logs

In production environment, it is always important to monitor performance metrics of various resources. For example, we can monitor performance metrics of our DynamoDB table under the “Metrics” tab:

Here, we can see that we provisioned 5KB of WriteCapacityUnits but at the peak we consumed over 10KB.

Similarly, we can view performance metrics for Kinesis Data Streams or Lambda. This topic deserves a separate article which I plan to post in the near future. However, I do think it would be a good exercise to try to generate 5,000, or 10,000 records, and see if there are any bottlenecks, and how they can be addressed by changing the allocated capacity.

In addition, our Lambda function is setup to log information into CloudWatch logs service. This can help troubleshoot possible issues:

Summary

In this post, I described how to setup Kinesis Agent to read real-time log data, push it into Kinesis Data Streams, and have Lambda function read the stream data and write it into a DynamoDB table.

Large part of the article was devoted to implementing the infrastructure for this application as code in CloudFormation. I have covered all of the important parts of the template used, and I hope it was helpful.

If you have questions or suggestions, please leave a comment below.