In Part 2, I provided detailed architecture and implementation of the ingestion layer which includes the AWS IoT Core, and Amazon Kinesis Data Streams services.
In this post I explain how the cumulative daily statistics of a running process is implemented in ACME Industries application. I will also cover how statistics of a completed process is calculated, stored, and accessed by the frontend.
The setup instructions for the example application are provided in the Readme.md file. Please note that this project uses services that are not covered by the AWS Free Tier, and will incur cost.
Architecture overview: processing cumulative sensor data of a running process in ACME Industries application
ACME Industries uses the following architecture to process and calculate cumulative statistics for each sensor of a running process:
- Sensor data records are routed by IoT Core rule to the ‘sensordata’ stream in Amazon Kinesis Data Streams service.
- Amazon Kinesis Data Firehose service is one of the consumers of Kinesis Data Streams. Records from ‘sensordata’ stream are delivered as they arrive.
- Kinesis Data Firehose invokes a record transformer Lambda function to append the new line character to each record for easier processing of records by downstream services.
- Batches of records are delivered by Kinesis Data Firehose to an S3 delivery bucket and stored as objects.
- Each time a new object gets written to the intermediary S3 bucket, this event triggers the S3 sensor data processor Lambda function. This function sorts out records of running processes, then merges them with the data in the runtime process data S3 bucket per running process.
- As the object with appended new process records is saved to the S3 bucket, this event triggers the S3 daily data stats Lambda function. This function loads the object from the S3 bucket, calculates the latest stats (min, max, median, and standard deviation) of a running process, and then stores them in the DynamoDB table (‘sensordata-table’).
- When a new statistics record of cumulative sensor data for a running process is written into the DynamoDB table, this event triggers the Lambda function which publishes the new stats data of a running process to an IoT topic (‘process-dailystats’).
- The Admin and Operator frontends both subscribe to the ‘process-dailystats’ IoT topic and receive updated cumulative sensor data stats of a running process.
Implementation details
In Part 2 I explain how AWS IoT core routs records to ‘sensordata’ stream in Kinesis Data Streams service. Kinesis Data Firehose is configured as a consumer of ‘sensordata’ stream.
Note that in ACME Industries, the process of updating sensor data statistics of a running process does not happen in real-time. There may be up to 1 minute between each update. Kinesis Data Firehose provides a few important functions for this process:
- It buffers incoming data based on the BufferingHints which are configured in the template.yaml file of the project. The BufferingHints option has two parameters: SizeInMBs and IntervalInSeconds. In ACME Industries the SizeInMBs is set to 1MB, and IntervalInSeconds is set to 60 seconds which means that the application buffers incoming data until 1MB of records are available or it buffers records up to 60 seconds, whichever is reached first. Both parameters can be adjusted – SizeInMBs can be set between 1MB and 128MB, and IntervalInSeconds can be set between 60 seconds and 900 seconds. Note that these options are hints, and therefore Kinesis Data Firehose can dynamically adjust them if data delivery falls behind data writing in the stream.
- Kinesis Data Firehose is able to invoke a Lambda function in order to transform incoming records. In ACME Industries, the Lambda function is invoked by Kinesis Data Firehose to append the new line character to each record for easier processing of records by downstream services.
- Kinesis Data Firehose enables CloudWatch logging. If logging is enabled, you must set values for LogGroupName and LogStreamName parameters.
- Kinesis Data Firehose also enables data compression before delivery which reduces the S3 storage cost in ACME Industries application. If required, the service can also encrypt the data before delivery.
The configuration of BufferingHints, Lambda data transformation function, CloudWatch logging, and data compression is done in the template.yaml file.
This AWS SAM template snippet shows some of the settings:
DeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
DependsOn:
- DeliveryStreamPolicy
Properties:
DeliveryStreamName: "sensordata-data-firehose"
DeliveryStreamType: "KinesisStreamAsSource"
KinesisStreamSourceConfiguration:
KinesisStreamARN: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${KinesisStreamName}"
RoleARN: !GetAtt DeliveryStreamRole.Arn
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt DeliveryBucket.Arn
BufferingHints:
SizeInMBs: 1
IntervalInSeconds: 60
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: "/aws/kinesisfirehose/sensordata-firehose"
LogStreamName: "S3Delivery"
CompressionFormat: "GZIP"
EncryptionConfiguration:
NoEncryptionConfig: "NoEncryption"
Prefix: ""
ErrorOutputPrefix: myPrefix/error=!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd}
RoleARN: !GetAtt DeliveryStreamRole.Arn
ProcessingConfiguration:
Enabled: true
Processors:
- Type: "Lambda"
Parameters:
- ParameterName: "LambdaArn"
ParameterValue: !GetAtt FirehoseProcessFunction.Arn
The ProcessingConfiguration property in the template above specifies the data processor Lambda function – FirehoseProcessFunction – which transforms incoming records. By default, Kinesis Data Firehose buffers incoming data up to 3MB. It then invokes the specified Lambda function with each buffered batch. In ACME Industries, the FirehoseProcessFunction Lambda function only appends the new line character to each record but in more complex use cases the data processor Lambda function can be used to perform various data transformations before delivering records to the destination.
All transformed records must be returned by Lambda function with the following attributes:
- recordId – the record ID that is passed by Kinesis Data Firehose when invoking Lambda data transformer. The transformed record must contain the same record ID in order for Kinesis Data Firehose to map the transformed record to the original one. Any mismatch between the IDs is treated as a data transformation failure.
- result – the possible values are: “Ok”, “Dropped”, and “ProcessingFailed”. “Dropped” means that the record was intentionally removed by the processing Lambda function. The “ProcessingFailed” result is considered by Kinesis Data Firehose as unsuccessfully processed, and the failed records are delivered to the S3 bucket in a folder called “processing-failed”.
- data – the transformed data payload must be base64 encoded, and the modified record cannot exceed the 1MB limit per record.
You can see the implementation of the transformer Lambda function in the GitHub repository of the project.
How Kinesis Data Firehose output is processed in ACME Industries
Kinesis Data Firehose delivers series of buffered records to an S3 delivery bucket which are stored as objects. Each time an object is delivered, it triggers a put event which invokes the S3 data processor function – S3SensorProcessorFunction. This function sorts out records of running processes, then merges them with the object in the runtime process data S3 bucket – “sensordata-runtimeprocess-bucket” – per running process. In the S3 bucket, ACME Industries uses the facility ID in folder path and the process ID as an object key. Here is an example of the “sensordata-runtimeprocess-bucket” structure:
Note that merging S3 objects by using a Lambda function, is not a scalable pattern and should not be used in production-level implementation. AWS has such services as AWS Glue, Amazon Athena, Amazon Redshift Spectrum and others that are specifically designed to handle processing and querying large sets of data. ACME Industries application uses small data sets in this implementation as a first step to demonstrate how to build an end-to-end application using a subset of serverless services. In the next series of posts, I plan to address the scalability topic.
As the object with appended new process records is saved to the S3 bucket, this event triggers the S3 daily data stats Lambda function – S3DailyDataProcessStatsFunction. This function loads the object from the S3 bucket, calculates the latest stats (min, max, median, and standard deviation) for each sensor of a running process, and then stores them in the DynamoDB table (‘sensordata-table’):
The cumulative statistics of a running process are stored as a compressed JSON string in the “stats” attribute. The snapshot below shows the uncompressed value of the “stats” attribute in both “Form” and “JSON” views:
When a new statistics record of cumulative sensor data for a running process is written into the DynamoDB table, this event invokes the Lambda function – PublishRunningProcessStats – which publishes the new stats data of a running process to the ‘process-dailystats’ IoT topic.
The Admin and Operator frontends both subscribe to the ‘process-dailystats’ IoT topic and receive updated cumulative sensor data stats of a running process:
Architecture overview: processing sensor data of a completed process in ACME Industries application
Processing sensor data of a completed process is similar to the one of a running process that I described in the previous section. In this section I will mostly focus on architectural aspects that are specific to processing and viewing sensor data statistics of a completed process:
- For completed processes, the routing of sensor data records by IoT Core, their ingestion by Kinesis Data Streams, processing by Kinesis Data Firehose, and putting them to the intermediary S3 bucket is exactly the same as for running processes.
- When a new object gets written to the intermediary S3 bucket, this event triggers the S3 sensor data processor Lambda function. This function sorts out records of completed processes.
- For each completed process ID in the list, the Lambda function gets the object with the process ID key in the cumulative daily data S3 bucket (RuntimeProcessBucket in the template.yaml file), and then puts the object to historical data S3 bucket (HistoryBucket in the template.yaml file)
- As the object with completed process data is written to the historical S3 bucket, this event triggers the S3CompletedProcessStatsFunction Lambda function. This function loads the object from the S3 bucket, calculates the stats (min, max, median, and standard deviation) of the completed process, and then stores them in the DynamoDB table (‘sensordata-table’).
- When a new statistics record of sensor data for a completed process is written into the DynamoDB table, this event triggers the Lambda function which publishes the facility ID and the completed process ID to an IoT topic (‘completed-processinfo’).
- The Admin and Operator frontends both subscribe to the ‘completed-processinfo’ IoT topic. Each subscriber then updates its list of completed process IDs.
- Users can view statistics of a completed process by selecting the process ID from the dropdown list of the ‘Completed Process Stats‘ widget.
Implementation details
When a process completes, the message is generated in the data simulator with the ‘event’ key set to ‘complete’. This message is delivered to the intermediary S3 bucket by Kinesis Data Firehose exactly the same way as the records of a running process described above. The invoked S3SensorProcessorFunction looks for records with the ‘event’ field set to ‘complete’:
const checkForCompletedProcessRecords = (currentRecords) => {
let completedProcessRecords = [];
// console.log("currentRecords: ", currentRecords);
completedProcessRecords = currentRecords.filter(
(record) => record.event === "complete"
);
// console.log("completedProcessRecords:", completedProcessRecords);
return completedProcessRecords;
};
For each completed process record, the function uses the completed process ID to get an object with the process ID key in the S3 ‘sensordata-history-bucket-sak’ (‘HistoryBucket’ in the template.yaml file). The function then puts the object to the ‘HistoryBucket’ using facility ID in the folder path and process ID as the object key similar to the way objects are stored in the RuntimeProcessBucket:
Note that getting and putting S3 objects by using a Lambda function, is not a scalable pattern and should be avoided in production-level implementation. The AWS Glue service, for example, is designed to perform ETL jobs on S3 objects. ACME Industries application uses small data sets in this implementation as a first step to demonstrate how to build an end-to-end application using a subset of serverless services. In the next series of posts, I plan to address the scalability issue.
As the object with completed process data is saved to the S3 bucket, this event triggers the S3 completed process stats Lambda function – S3CompletedProcessStatsFunction. This function gets the object from the S3 bucket, calculates sensor data stats (min, max, median, and standard deviation) of the completed process, and then stores them in the DynamoDB ‘sensor-data’ table:
When a new sensor data statistics record of a completed process is written into the DynamoDB table, this event invokes the Lambda function – PublishCompletedProcessStats – which publishes a message with the process ID and facility ID of a completed process to the ‘completed-processinfo’ IoT topic.
The Admin and Operator frontends both subscribe to the ‘completed-processinfo’ IoT topic and update their lists of completed processes:
The API to get completed process list and completed process statistics
When user is redirected to the dashboard page in both Admin and Operator frontends, an API Gateway endpoint gets called that invokes the getCompletedProcesses function. This function uses the scan API of AWS.DynamoDB.DocumentClient class to return an array of all completed process IDs. Note that using the query API instead of the scan is more efficient, and it would be a better option for a production-level implementation.
When user selects a completed process ID from the dropdown list, an API Gateway endpoint gets called that invokes the getProcessStats function. This function uses the query API of AWS.DynamoDB.DocumentClient class to return the sensor data statistics of the selected process:
Conclusion
In this post, I provide an architecture overview and implementation details of how statistics for cumulative sensor data of a running process is implemented. Kinesis Data Firehose is used to buffer and process records of sensor data, and deliver them to an S3 bucket. The AWS Lambda functions are used to perform calculations and to write records per running process to a DynamoDB table. The written records then are published to an IoT topic which allows the frontend subscribers to the topic to receive updates.
I also provide an architecture overview and implementation details of how statistics of a completed process are calculated and stored. In addition, I explain how frontends access the list of completed processes as well as statistics of a selected process by calling an endpoint of the Amazon API Gateway service.
In Part 4 I explain how the latest minute statistics of streaming sensor data is implemented in ACME Industries application. I show two different ways to implement this:
- Using Amazon Kinesis Data Analytics for SQL Applications
- Using the tumbling window feature of AWS Lambda