{"id":1591,"date":"2022-12-20T13:33:00","date_gmt":"2022-12-20T13:33:00","guid":{"rendered":"https:\/\/thecloud21.com\/?p=1591"},"modified":"2023-02-20T18:20:15","modified_gmt":"2023-02-20T18:20:15","slug":"architecting-and-implementing-serverless-application-with-streaming-sensor-data-part-4","status":"publish","type":"post","link":"https:\/\/thecloud21.com\/?p=1591","title":{"rendered":"Architecting and implementing serverless application with streaming sensor data: Part 4"},"content":{"rendered":"\t\t<div data-elementor-type=\"wp-post\" data-elementor-id=\"1591\" class=\"elementor elementor-1591\" data-elementor-post-type=\"post\">\n\t\t\t\t\t\t<section class=\"elementor-section elementor-top-section elementor-element elementor-element-26647a7 elementor-section-boxed elementor-section-height-default elementor-section-height-default\" data-id=\"26647a7\" data-element_type=\"section\" data-e-type=\"section\">\n\t\t\t\t\t\t<div class=\"elementor-container elementor-column-gap-default\">\n\t\t\t\t\t<div class=\"elementor-column elementor-col-100 elementor-top-column elementor-element elementor-element-9830256\" data-id=\"9830256\" data-element_type=\"column\" data-e-type=\"column\">\n\t\t\t<div class=\"elementor-widget-wrap elementor-element-populated\">\n\t\t\t\t\t\t<div class=\"elementor-element elementor-element-b843575 elementor-widget elementor-widget-text-editor\" data-id=\"b843575\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><span style=\"color: #000000;\">In\u00a0<a href=\"https:\/\/thecloud21.com\/?p=1010\" target=\"_blank\" rel=\"noopener\"><span style=\"color: #3366ff;\">Part 3<\/span><\/a> I explain how the cumulative daily statistics of a running process is implemented in ACME Industries application. I also cover how statistics of a completed process is calculated, stored, and accessed by the frontend.<\/span><\/p><p><span style=\"color: #000000;\">In this post I explain how the latest minute statistics of streaming sensor data is implemented in ACME Industries application. I show two different ways to implement this:<\/span><\/p><ul><li><span style=\"color: #000000;\">Using Amazon Kinesis Data Analytics for SQL Applications<\/span><\/li><li><span style=\"color: #000000;\">Using the tumbling window feature of AWS Lambda<\/span><\/li><\/ul><p><span style=\"color: #000000;\">The setup instructions for the example application are provided in the\u00a0<a href=\"https:\/\/github.com\/cloud21sak\/SensorStreamingApplication\/blob\/master\/README.md\" target=\"_blank\" rel=\"noopener\"><span style=\"color: #3366ff;\">Readme.md<\/span><\/a> file. <\/span><\/p><p><em><strong><span style=\"color: #000000;\">Please note that this project uses services that are not covered by the\u00a0<a href=\"https:\/\/aws.amazon.com\/free\/?all-free-tier.sort-by=item.additionalFields.SortRank&amp;all-free-tier.sort-order=asc&amp;awsf.Free%20Tier%20Types=*all&amp;awsf.Free%20Tier%20Categories=*all\" target=\"_blank\" rel=\"noopener\"><span style=\"color: #3366ff;\">AWS Free Tier<\/span><\/a>,\u00a0 and will incur cost.<\/span><\/strong><\/em><\/p>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-962b3a9 elementor-widget elementor-widget-heading\" data-id=\"962b3a9\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h2 class=\"elementor-heading-title elementor-size-default\">Implementing the latest minute statistics of streaming sensor data with Amazon Kinesis Data Analytics for SQL Applications<\/h2>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-0782351 elementor-widget elementor-widget-heading\" data-id=\"0782351\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h3 class=\"elementor-heading-title elementor-size-default\">Architecture overview<\/h3>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-f70d920 elementor-widget elementor-widget-text-editor\" data-id=\"f70d920\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><span style=\"color: #000000;\">ACME Industries uses the following architecture to process and calculate latest minute statistics for each sensor of a running process:\u00a0<\/span><\/p>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-4405846 elementor-widget elementor-widget-image\" data-id=\"4405846\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"image.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<img fetchpriority=\"high\" decoding=\"async\" width=\"856\" height=\"321\" src=\"https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/LatestMinuteStatsKinesisAnalytics-1.png?fit=856%2C321&amp;ssl=1\" class=\"attachment-large size-large wp-image-1613\" alt=\"\" srcset=\"https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/LatestMinuteStatsKinesisAnalytics-1.png?w=856&amp;ssl=1 856w, https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/LatestMinuteStatsKinesisAnalytics-1.png?resize=300%2C113&amp;ssl=1 300w, https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/LatestMinuteStatsKinesisAnalytics-1.png?resize=768%2C288&amp;ssl=1 768w\" sizes=\"(max-width: 856px) 100vw, 856px\" \/>\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-c7eecb6 elementor-widget elementor-widget-text-editor\" data-id=\"c7eecb6\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ol><li><span style=\"color: #000000;\">Kinesis Data Analytics for SQL Applications is one of the consumers for the data stream.<\/span><\/li><li><span style=\"color: #000000;\">Kinesis data analytics application &#8211; &#8216;<strong>sensor-stats&#8217;<\/strong> &#8211; continuously reads streaming data. The application executes time-based windowed query in a 30 second tumbling window based on ROWTIME column.\u00a0<\/span><\/li><li><span style=\"color: #000000;\">The AWS Lambda function &#8211; <strong>SensorStatsFunction <\/strong>&#8211;\u00a0is configured to be an external destination for &#8216;sensor-stats&#8217; Kinesis data analytics application.<\/span><\/li><li><span style=\"color: #000000;\">The invoked SensorStatsFunction sorts out sensor stats records per facility and per running process, and publishes them to the application&#8217;s IoT topic to update the frontend.<\/span><\/li><\/ol>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-b02ca12 elementor-widget elementor-widget-heading\" data-id=\"b02ca12\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h3 class=\"elementor-heading-title elementor-size-default\">Implementation details<\/h3>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-bd2199a elementor-widget elementor-widget-text-editor\" data-id=\"bd2199a\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><span style=\"color: #000000;\">The code that implements the &#8216;sensor-stats&#8217; Kinesis Data Analytics application and SensorStatsFunction Lambda function is located in the <span style=\"color: #3366ff;\"><a style=\"color: #3366ff;\" href=\"https:\/\/github.com\/cloud21sak\/SensorStreamingApplication\/tree\/master\/4-sensorapp-streaming-kda\" target=\"_blank\" rel=\"noopener\">4-sensorapp-streaming-kda<\/a><\/span> directory in the\u00a0<a href=\"https:\/\/github.com\/cloud21sak\/SensorStreamingApplication\" target=\"_blank\" rel=\"noopener\"><span style=\"color: #0000ff;\">GitHub<\/span><\/a> repository.\u00a0<\/span><\/p><p><span style=\"color: #000000;\">The template.yaml file defines the &#8216;sensor-stats&#8217; application with an aggregation query in a tumbling window with 30 second interval:<\/span><\/p>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-5bd1753 elementor-widget elementor-widget-code-highlight\" data-id=\"5bd1753\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-json line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-json\">\n\t\t\t\t\t<xmp>KinesisAnalyticsSensorApplication:\r\n    Type: \"AWS::KinesisAnalytics::Application\"\r\n    Properties:\r\n      ApplicationName: \"sensor-stats\"\r\n      ApplicationCode: |\r\n        CREATE OR REPLACE STREAM \"SENSORCALC_STREAM\" (             \r\n            \"deviceTimestamp\"  TIMESTAMP,\r\n            \"name\" VARCHAR(16),              \r\n            \"facilityId\" INTEGER,\r\n            \"processId\" BIGINT,\r\n            \"sensorId\" INTEGER NOT NULL, \r\n            \"min_value\" REAL,\r\n            \"max_value\" REAL,\r\n            \"stddev_value\" REAL);\r\n            \r\n        CREATE OR REPLACE PUMP \"SENSORCALC_STREAM_PUMP\" AS \r\n        INSERT INTO \"SENSORCALC_STREAM\" \r\n        SELECT STREAM STEP(\"SOURCE_SQL_STREAM_001\".ROWTIME BY INTERVAL '30' SECOND) AS \"deviceTimestamp\",              \r\n            \"name\", \r\n            \"facilityId\",             \r\n            \"processId\",             \r\n            \"sensorId\",\r\n            MIN(\"sensorData\") AS \"min_value\",\r\n            MAX(\"sensorData\") AS \"max_value\",\r\n            STDDEV_SAMP(\"sensorData\") AS \"stddev_value\"                             \r\n        FROM \"SOURCE_SQL_STREAM_001\"\r\n        GROUP BY \"facilityId\", \"processId\", \"sensorId\", \"name\",\r\n               STEP(\"SOURCE_SQL_STREAM_001\".ROWTIME BY INTERVAL '30' SECOND);<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-6593139 elementor-widget elementor-widget-text-editor\" data-id=\"6593139\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><span style=\"color: #000000;\">The &#8216;sensor-stats&#8217; data analytics application consists of two in-application streams and a pump:<\/span><\/p>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-4c29b76 elementor-widget elementor-widget-image\" data-id=\"4c29b76\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"image.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<img decoding=\"async\" width=\"874\" height=\"496\" src=\"https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/KinesisAnalyticsApplication-1.png?fit=874%2C496&amp;ssl=1\" class=\"attachment-large size-large wp-image-1654\" alt=\"\" srcset=\"https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/KinesisAnalyticsApplication-1.png?w=874&amp;ssl=1 874w, https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/KinesisAnalyticsApplication-1.png?resize=300%2C170&amp;ssl=1 300w, https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/KinesisAnalyticsApplication-1.png?resize=768%2C436&amp;ssl=1 768w\" sizes=\"(max-width: 874px) 100vw, 874px\" \/>\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-cd83c3f elementor-widget elementor-widget-text-editor\" data-id=\"cd83c3f\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ul><li><span style=\"color: #000000;\">The Kinesis Data Streams &#8216;sensordata&#8217; stream is the streaming source that is mapped to the in-application stream <strong>&#8216;SOURCE_SQL_STREAM_001&#8217;<\/strong>.\u00a0 Data continuously flows from the streaming source into the in-application stream.<\/span><\/li><li><span style=\"color: #000000;\">The <strong>&#8216;SENSORCALC_STREAM_PUMP&#8217;<\/strong> is a continuous windowed query that inserts query results into the <strong>&#8216;SENSORCALC_STREAM&#8217;<\/strong> in-application stream. The windowed query processes each window in a non-overlapping manner (<em>tumbling window<\/em>); it uses the GROUP BY clause to group sensor data records from &#8216;SOURCE_SQL_STREAM_001&#8217; in-application stream in a 30-second window using the ROWTIME column values. The query emits one output record per each sensor of a running process every 30 seconds providing min, max, and standard deviation for the 30-second interval. <em><strong>Note that, although the feature is called &#8216;latest minute stats&#8217;, the application is configured for the latest 30-second window. If you want to\u00a0 see the full latest minute stats, you just need to replace the 30-second interval with the 60-second one in the Select statement <\/strong><\/em><\/span><em style=\"font-variant-ligatures: normal; font-variant-caps: normal; font-family: Roboto, sans-serif; font-size: 14px; font-weight: 400; color: #000000;\"><span style=\"font-size: 14px; font-weight: 600;\">in the template.yaml file.<\/span><\/em><\/li><li><span style=\"color: #000000;\">The <strong>&#8216;deviceTimestamp&#8217;<\/strong> value in the &#8216;SENSORCALC_STREAM&#8217; approximates the time when the query was processed.<\/span><\/li><\/ul>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-9583f21 elementor-widget elementor-widget-heading\" data-id=\"9583f21\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h4 class=\"elementor-heading-title elementor-size-default\">Output configuration for 'sensor-stats' application<\/h4>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-ff6e412 elementor-widget elementor-widget-text-editor\" data-id=\"ff6e412\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><span style=\"color: #000000;\">Output configuration for &#8216;sensor-stats&#8217; application is provided in the template.yaml file:<\/span><\/p>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-4b4ccde elementor-widget elementor-widget-code-highlight\" data-id=\"4b4ccde\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-json line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-json\">\n\t\t\t\t\t<xmp>KinesisAnalyticsSensorApplicationOutput:\r\n    Type: \"AWS::KinesisAnalytics::ApplicationOutput\"\r\n    DependsOn: KinesisAnalyticsSensorApplication\r\n    Properties:\r\n      ApplicationName: !Ref KinesisAnalyticsSensorApplication\r\n      Output:\r\n        Name: \"SENSORCALC_STREAM\"\r\n        LambdaOutput:\r\n          ResourceARN: !GetAtt SensorStatsFunction.Arn\r\n          RoleARN: !GetAtt KinesisAnalyticsSensorRole.Arn\r\n        DestinationSchema:\r\n          RecordFormatType: \"JSON\"<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-cd0bc38 elementor-widget elementor-widget-text-editor\" data-id=\"cd0bc38\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ul><li><span style=\"color: #000000;\">&#8216;SENSORCALC_STREAM&#8217; is the in-application stream that we want to persist to an external destination.<\/span><\/li><li><span style=\"color: #000000;\">SensorStatsFunction Lambda function ARN is provided as the external destination of the application output.<\/span><\/li><li><span style=\"color: #000000;\">KinesisAnalyticsSensorRole ARN is provided in order for Kinesis Data Analytics to assume the role to be able to invoke the Lambda destination function on our behalf.<\/span><\/li><li><span style=\"color: #000000;\">RecordFormatType (JSON, CSV) specifies the record format to use when writing to the external destination.<\/span><\/li><\/ul>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-ba8ffd9 elementor-widget elementor-widget-heading\" data-id=\"ba8ffd9\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h4 class=\"elementor-heading-title elementor-size-default\">Configuring and implementing the SensorStatsFunction Lambda function for analytics application destination<\/h4>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-e40dc25 elementor-widget elementor-widget-text-editor\" data-id=\"e40dc25\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><span style=\"color: #000000;\">The configuration for the SensorStatsFunction is provided in the template.yaml file:<\/span><\/p>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-225df2d elementor-widget elementor-widget-code-highlight\" data-id=\"225df2d\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-json line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-json\">\n\t\t\t\t\t<xmp>SensorStatsFunction:\r\n    Type: AWS::Serverless::Function\r\n    Properties:\r\n      CodeUri: SensorStatsFunction\/\r\n      Handler: app.handler\r\n      Runtime: nodejs14.x\r\n      Timeout: 30\r\n      MemorySize: 128\r\n      Environment:\r\n        Variables:\r\n          IOT_DATA_ENDPOINT: !Ref IoTdataEndpoint\r\n          TOPIC: \"latest-minutestats\"\r\n      Policies:\r\n        - Statement:\r\n            - Sid: PublishToIotPolicy\r\n              Effect: Allow\r\n              Action:\r\n                - \"iot:Publish\"\r\n              Resource: \"*\"<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-6b80444 elementor-widget elementor-widget-text-editor\" data-id=\"6b80444\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ul><li><span style=\"color: #000000;\">The &#8216;sensor-stats&#8217; application invokes the SensorStatsFunction Lambda function and passes records from the &#8216;SENSORCALC_STREAM&#8217;.<\/span><\/li><li><span style=\"color: #000000;\">The <strong>&#8216;IOT_DATA_ENDPOINT&#8217;<\/strong> variable with topic <strong>&#8216;latest-minutestats&#8217;<\/strong> is an IoT endpoint provided as a final destination for the SensorStatsFunction to send the results.<\/span><\/li><\/ul><p><span style=\"color: #000000;\">The SensorStatsFunction aggregates sensor stats per running process of a facility into a single record, then publishes it to the &#8216;latest-minutestats&#8217; IoT topic to update the frontend:<\/span><\/p>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-e89fe39 elementor-widget elementor-widget-code-highlight\" data-id=\"e89fe39\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-javascript line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-javascript\">\n\t\t\t\t\t<xmp>const AWS = require(\"aws-sdk\");\r\nconst iotdata = new AWS.IotData({ endpoint: process.env.IOT_DATA_ENDPOINT });\r\n\r\nexports.handler = async (event) => {\r\n  console.log(`Received sensor stats: ${event.records.length} messages`);\r\n  console.log(JSON.stringify(event, null, 2));\r\n\r\n  const results = await publishSensorStatsToIoT(event.records);\r\n  console.log(\"results:\", results);\r\n\r\n  return { records: results };\r\n};\r\n\r\nconst publishSensorStatsToIoT = async (records) => {\r\n  let success = 0;\r\n  let failure = 0;\r\n  let processMap = {};\r\n\r\n  records.map((record) => {\r\n    const payload = Buffer.from(record.data, \"base64\").toString(\"ascii\");\r\n    console.log(\"Payload for sensor stats: \", payload);\r\n    const payloadObject = JSON.parse(payload);\r\n\r\n    \/\/ Sort out sensor stats messages per facility and per process for\r\n    \/\/ each facility. Note that currently the simulator only assumes\r\n    \/\/ one facility and one running process:\r\n    if (!processMap[payloadObject.facilityId]) {\r\n      processMap[payloadObject.facilityId] = {};\r\n      processMap[payloadObject.facilityId][payloadObject.processId] = {};\r\n      processMap[payloadObject.facilityId][payloadObject.processId][\r\n        payloadObject.sensorId\r\n      ] = {};\r\n    } else if (!processMap[payloadObject.facilityId][payloadObject.processId]) {\r\n      processMap[payloadObject.facilityId][payloadObject.processId] = {};\r\n      processMap[payloadObject.facilityId][payloadObject.processId][\r\n        payloadObject.sensorId\r\n      ] = {};\r\n    } else if (\r\n      !processMap[payloadObject.facilityId][payloadObject.processId][\r\n        payloadObject.sensorId\r\n      ]\r\n    ) {\r\n      processMap[payloadObject.facilityId][payloadObject.processId][\r\n        payloadObject.sensorId\r\n      ] = {};\r\n    }\r\n\r\n    processMap[payloadObject.facilityId][payloadObject.processId][\r\n      payloadObject.sensorId\r\n    ].payload = payloadObject;\r\n\r\n    \/\/ After messages are published, we must return\r\n    \/\/ the recordId(s) and the result(s) back to KDA application:\r\n    processMap[payloadObject.facilityId][payloadObject.processId][\r\n      payloadObject.sensorId\r\n    ].output = {\r\n      recordId: record.recordId,\r\n      result: \"\",\r\n    };\r\n  });\r\n\r\n  let payloadObjectArray = [];\r\n  let promises = [];\r\n\r\n  \/\/ Publish sensor stats per running process to IoT topic:\r\n  for (let facilityId in processMap) {\r\n    for (let processId in processMap[facilityId]) {\r\n      for (const [sensorId, sensorDataInfo] of Object.entries(\r\n        processMap[facilityId][processId]\r\n      )) {\r\n        payloadObjectArray.push(sensorDataInfo.payload);\r\n      }\r\n\r\n      \/\/ console.log(\"payloadObject:\", payloadObject);\r\n      const JSONpayload = {\r\n        msg: \"sensorstats\",\r\n        facilityId: facilityId,\r\n        processId: `proc-${processId}`,\r\n        sensorstats: JSON.stringify(payloadObjectArray),\r\n      };\r\n\r\n      try {\r\n        let promise = iotdata\r\n          .publish({\r\n            topic: process.env.TOPIC,\r\n            qos: 0,\r\n            payload: JSON.stringify(JSONpayload),\r\n          })\r\n          .promise();\r\n        promises.push(promise);\r\n\r\n        success++;\r\n\r\n        \/\/ Set the result for each sensor data record of the process to \"Ok\":\r\n        for (const [sensorId, sensorDataInfo] of Object.entries(\r\n          processMap[facilityId][processId]\r\n        )) {\r\n          processMap[facilityId][processId][sensorId].output.result = \"Ok\";\r\n        }\r\n      } catch (err) {\r\n        failure++;\r\n        console.error(err);\r\n\r\n        \/\/ Set the result for each sensor data record of the process to \"DeliveryFailed\":\r\n        for (const [sensorId, sensorDataInfo] of Object.entries(\r\n          processMap[facilityId][processId]\r\n        )) {\r\n          processMap[facilityId][processId][sensorId].output.result =\r\n            \"DeliveryFailed\";\r\n        }\r\n      }\r\n\r\n      console.log(\r\n        `Successfully delivered records ${success}, Failed delivered records ${failure}.`\r\n      );\r\n\r\n      const results = await Promise.allSettled(promises);\r\n      results.map((result) =>\r\n        result.status === \"rejected\"\r\n          ? console.log(\"Promise rejected:\", result)\r\n          : null\r\n      );\r\n    }\r\n  }\r\n\r\n  let output = [];\r\n  for (let facilityId in processMap) {\r\n    for (let processId in processMap[facilityId]) {\r\n      for (let sensorId in processMap[facilityId][processId]) {\r\n        output.push(processMap[facilityId][processId][sensorId].output);\r\n      }\r\n    }\r\n  }\r\n\r\n  console.log(\"output:\", output);\r\n  return output;\r\n};\r\n<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-1d9828e elementor-widget elementor-widget-text-editor\" data-id=\"1d9828e\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><em><span style=\"color: #000000;\"><strong>Note that it is important for the invoked Lambda function to return an array of JSON objects that have the exact number of record IDs it received from the analytics application together with a result value (&#8216;Ok&#8217; or &#8216;DeliveryFailed&#8217;). If this is not implemented correctly, the application will assume delivery failure, and will keep re-sending records.<\/strong><\/span><\/em><\/p><p><span style=\"color: #000000;\">The Admin and Operator frontends both subscribe to the &#8216;latest-minutestats&#8217; IoT topic and receive updated latest minute sensor data stats of a running process:<\/span><em><span style=\"color: #000000;\"><strong><br \/><\/strong><\/span><\/em><\/p>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-46b24d4 elementor-widget elementor-widget-image\" data-id=\"46b24d4\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"image.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<img decoding=\"async\" width=\"1024\" height=\"325\" src=\"https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/AdmintRuntimeTop.png?fit=1024%2C325&amp;ssl=1\" class=\"attachment-large size-large wp-image-1796\" alt=\"\" srcset=\"https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/AdmintRuntimeTop.png?w=1353&amp;ssl=1 1353w, https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/AdmintRuntimeTop.png?resize=300%2C95&amp;ssl=1 300w, https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/AdmintRuntimeTop.png?resize=1024%2C325&amp;ssl=1 1024w, https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/AdmintRuntimeTop.png?resize=768%2C244&amp;ssl=1 768w\" sizes=\"(max-width: 1024px) 100vw, 1024px\" \/>\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-7d2ac7a elementor-widget elementor-widget-image\" data-id=\"7d2ac7a\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"image.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<img loading=\"lazy\" decoding=\"async\" width=\"766\" height=\"349\" src=\"https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/RuntimeLastMinuteStatsHighlighted.png?fit=766%2C349&amp;ssl=1\" class=\"attachment-large size-large wp-image-1797\" alt=\"\" srcset=\"https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/RuntimeLastMinuteStatsHighlighted.png?w=766&amp;ssl=1 766w, https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/RuntimeLastMinuteStatsHighlighted.png?resize=300%2C137&amp;ssl=1 300w\" sizes=\"(max-width: 766px) 100vw, 766px\" \/>\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-c619241 elementor-widget elementor-widget-image\" data-id=\"c619241\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"image.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<img loading=\"lazy\" decoding=\"async\" width=\"791\" height=\"362\" src=\"https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/RuntimeDailyAndCompletedStats-3.png?fit=791%2C362&amp;ssl=1\" class=\"attachment-large size-large wp-image-1802\" alt=\"\" srcset=\"https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/RuntimeDailyAndCompletedStats-3.png?w=791&amp;ssl=1 791w, https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/RuntimeDailyAndCompletedStats-3.png?resize=300%2C137&amp;ssl=1 300w, https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/RuntimeDailyAndCompletedStats-3.png?resize=768%2C351&amp;ssl=1 768w\" sizes=\"(max-width: 791px) 100vw, 791px\" \/>\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-d1542b1 elementor-widget elementor-widget-heading\" data-id=\"d1542b1\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h4 class=\"elementor-heading-title elementor-size-default\">IAM role for the data analytics application<\/h4>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-bbbb2b0 elementor-widget elementor-widget-text-editor\" data-id=\"bbbb2b0\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><span style=\"color: #000000;\">The IAM role &#8211; <strong>&#8216;KinesisAnalyticsSensorRole&#8217;<\/strong> &#8211; is attached to the &#8216;sensor-stats&#8217; application to allow it to read records from the Kinesis Data Streams &#8216;sensordata&#8217; stream and invoke the SensorStatsFunction Lambda function:<\/span><\/p>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-9cd6ea5 elementor-widget elementor-widget-code-highlight\" data-id=\"9cd6ea5\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-json line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-json\">\n\t\t\t\t\t<xmp> KinesisAnalyticsSensorRole:\r\n    Type: \"AWS::IAM::Role\"\r\n    Properties:\r\n      AssumeRolePolicyDocument:\r\n        Version: \"2012-10-17\"\r\n        Statement:\r\n          - Effect: Allow\r\n            Principal:\r\n              Service:\r\n                - kinesisanalytics.amazonaws.com\r\n            Action:\r\n              - \"sts:AssumeRole\"\r\n      Path: \/\r\n      Policies:\r\n        - PolicyName: ReadInputSensorKinesis\r\n          PolicyDocument:\r\n            Version: \"2012-10-17\"\r\n            Statement:\r\n              - Effect: Allow\r\n                Action:\r\n                  - \"kinesis:DescribeStream\"\r\n                  - \"kinesis:GetShardIterator\"\r\n                  - \"kinesis:GetRecords\"\r\n                Resource: !Sub \"arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream\/sensordata\"\r\n        - PolicyName: InvokeSensorStatsLambdaFunction\r\n          PolicyDocument:\r\n            Version: \"2012-10-17\"\r\n            Statement:\r\n              - Effect: Allow\r\n                Action:\r\n                  - \"lambda:InvokeFunction\"\r\n                  - \"lambda:GetFunctionConfiguration\"\r\n                Resource: !GetAtt SensorStatsFunction.Arn<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-776f67c elementor-widget elementor-widget-text-editor\" data-id=\"776f67c\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><span style=\"color: #000000;\">The <strong>ReadInputSensorKinesis<\/strong> and the <strong>InvokeSensorStatsLambdaFunction<\/strong> policies grant such permissions.<\/span><\/p>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-f3c7fe6 elementor-widget elementor-widget-heading\" data-id=\"f3c7fe6\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h2 class=\"elementor-heading-title elementor-size-default\">Implementing the latest minute statistics of streaming sensor data with AWS Lambda tumbling window function<\/h2>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-d6af76a elementor-widget elementor-widget-heading\" data-id=\"d6af76a\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h3 class=\"elementor-heading-title elementor-size-default\">Architecture overview<\/h3>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-8127816 elementor-widget elementor-widget-text-editor\" data-id=\"8127816\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><span style=\"color: #000000;\">ACME Industries uses the following architecture that includes the tumbling window Lambda function to calculate latest minute sensor statistics of a running process:\u00a0<\/span><\/p>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-ff34382 elementor-widget elementor-widget-image\" data-id=\"ff34382\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"image.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<img loading=\"lazy\" decoding=\"async\" width=\"859\" height=\"316\" src=\"https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/LatestMinuteTumblingWindow.png?fit=859%2C316&amp;ssl=1\" class=\"attachment-large size-large wp-image-1839\" alt=\"\" srcset=\"https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/LatestMinuteTumblingWindow.png?w=859&amp;ssl=1 859w, https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/LatestMinuteTumblingWindow.png?resize=300%2C110&amp;ssl=1 300w, https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/LatestMinuteTumblingWindow.png?resize=768%2C283&amp;ssl=1 768w\" sizes=\"(max-width: 859px) 100vw, 859px\" \/>\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-27f8d8c elementor-widget elementor-widget-text-editor\" data-id=\"27f8d8c\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ul><li><span style=\"color: #000000;\">The tumbling window function is one of the consumers of Kinesis Data Streams.<\/span><\/li><li><span style=\"color: #000000;\">It receives sensor data of a running process from Kinesis Data Streams every second.<\/span><\/li><li><span style=\"color: #000000;\">The function aggregates sensor data per process during a 30-second window, then calculates sensor stats (min, max, and standard deviation) over the aggregated sensor data.\u00a0<\/span><\/li><li><span style=\"color: #000000;\">It combines calculated sensor stats for each running process into a single payload and publishes it to an IoT topic to update the frontend.<\/span><\/li><\/ul>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-5d0c643 elementor-widget elementor-widget-heading\" data-id=\"5d0c643\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h3 class=\"elementor-heading-title elementor-size-default\">Implementation details<\/h3>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-984b770 elementor-widget elementor-widget-text-editor\" data-id=\"984b770\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><span style=\"color: #000000;\">The\u00a0<span style=\"color: #3366ff;\"><a style=\"color: #3366ff;\" href=\"https:\/\/aws.amazon.com\/blogs\/compute\/using-aws-lambda-for-streaming-analytics\/\" target=\"_blank\" rel=\"noopener\">tumbling window<\/a><\/span> feature of Lambda provides an alternative way to implement near real-time analytics in addition to Amazon Kinesis Data Analytics service. This feature allows the function invocations from streaming source to pass state between invocations.<\/span><\/p><p><span style=\"color: #000000;\">ACME Industries uses this feature to aggregate sensor data per sensor of a running process on each invocation during a 30-second window. At the end of the window, it calculates sensor statistics using the aggregated data from the state object.<\/span><\/p><p><span style=\"color: #000000;\">The\u00a0<span style=\"color: #3366ff;\"><a style=\"color: #3366ff;\" href=\"https:\/\/github.com\/cloud21sak\/SensorStreamingApplication\/tree\/master\/2-sensorapp-streaming-kds\" target=\"_blank\" rel=\"noopener\">template.yaml<\/a><\/span> file describes the tumbling window Lambda function &#8211; <strong>S<\/strong><strong>ensorStatsByLatestMinuteFunction<\/strong>:<\/span><\/p>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-7da270e elementor-widget elementor-widget-code-highlight\" data-id=\"7da270e\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-json line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-json\">\n\t\t\t\t\t<xmp>SensorStatsByLatestMinuteFunction:\r\n    Type: AWS::Serverless::Function\r\n    Properties:\r\n      CodeUri: sensorStatsByLatestMinute\/\r\n      Handler: app.handler\r\n      Runtime: nodejs14.x\r\n      Timeout: 300\r\n      MemorySize: 256\r\n      Environment:\r\n        Variables:          \r\n          IOT_DATA_ENDPOINT: !Ref IoTdataEndpoint\r\n          TOPIC: \"latest-minutestats\"\r\n      Policies:\r\n        - Statement:\r\n            - Sid: PublishToIotPolicy\r\n              Effect: Allow\r\n              Action:\r\n                - \"iot:Publish\"\r\n              Resource: \"*\"\r\n      Events:\r\n        Stream:\r\n          Type: Kinesis\r\n          Properties:\r\n            Stream: !Sub \"arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream\/${KinesisSensorStreamName}\"\r\n            BatchSize: 1000\r\n            StartingPosition: TRIM_HORIZON\r\n            TumblingWindowInSeconds: 30<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-8f1d49d elementor-widget elementor-widget-text-editor\" data-id=\"8f1d49d\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><span style=\"color: #000000;\">The tumbling window duration is specified in the <strong>TumblingWindowInSeconds<\/strong> attribute of the Events definition.<\/span><\/p><p><span style=\"color: #000000;\">When tumbling window is enabled, the Lambda function&#8217;s event payload contains several new attributes:<\/span><\/p><ul><li><span style=\"color: #000000;\"><strong>State<\/strong> &#8211; this is an object where aggregated data is stored between Lambda function invocations. The state object can contain up to 1MB of data. It is initially empty in a new window. Here is an example of the state object in the event payload:<\/span><\/li><\/ul>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-748e94b elementor-widget elementor-widget-code-highlight\" data-id=\"748e94b\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-json \">\n\t\t\t\t<code readonly=\"true\" class=\"language-json\">\n\t\t\t\t\t<xmp>\"state\": {\r\n    \"1656765771320\": {\r\n      \"0\": {\r\n        \"sensorData\": [\r\n          75.65867806305917, 28.85717410712807, 7.862853766610045,\r\n          75.39909061184122, 29.01159697707094, 93.40315875898159,\r\n          56.52958345117889, 71.11442413234643, 37.564327993605914,\r\n          6.412071893480054, 28.13930698035374, 68.17103246373301,\r\n          25.073777450988487, 82.10792619144244, 28.272966638142492,\r\n          43.39891863983387, 43.51220820711272, 16.148025582768888,\r\n          93.8989727250565\r\n        ],\r\n        \"name\": \"temperature_0\"\r\n      },\r\n      \"1\": {\r\n        \"sensorData\": [\r\n          35.512093577877835, 92.30443351178394, 91.1013009317126,\r\n          31.20491262528111, 5.574814343573187, 33.94246386030873,\r\n          6.494679575043105, 71.11215754104599, 8.126400488748109,\r\n          0.07655841090292714, 88.8128976264104, 87.25682260247095,\r\n          83.33884324596261, 8.684308092071191, 51.80094439944403,\r\n          24.897329381789124, 3.8911734219942318, 79.00718198212083,\r\n          39.36909519579619\r\n        ],\r\n        \"name\": \"temperature_1\"\r\n      },\r\n      \"2\": {\r\n        \"sensorData\": [\r\n          30.784271169992937, 45.4010703443011, 42.96638448264816,\r\n          52.817118625082045, 96.21402855527317, 2.930590439236158,\r\n          72.09102178943726, 87.07955921671082, 62.868664604001204,\r\n          83.21783269378844, 90.8021362093286, 36.40327662383835,\r\n          54.20156864736951, 26.899868288525397, 6.237623237252254,\r\n          47.51428556765154, 38.194251401905156, 45.07520152439093\r\n        ],\r\n        \"name\": \"temperature_2\"\r\n      }\r\n    }\r\n  }<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-dfbdfde elementor-widget elementor-widget-text-editor\" data-id=\"dfbdfde\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ul><li><span style=\"color: #000000;\"><strong>Window start and end<\/strong> &#8211; the beginning and ending timestamps of the current window:<\/span><\/li><\/ul>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-d595247 elementor-widget elementor-widget-code-highlight\" data-id=\"d595247\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-json \">\n\t\t\t\t<code readonly=\"true\" class=\"language-json\">\n\t\t\t\t\t<xmp>\"window\": {\r\n    \"start\": \"2022-07-02T12:44:00Z\",\r\n    \"end\": \"2022-07-02T12:44:30Z\"\r\n  }<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-5f2eda0 elementor-widget elementor-widget-text-editor\" data-id=\"5f2eda0\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ul><li><span style=\"color: #000000;\"><strong>isFinalInvokeForWindow<\/strong> &#8211; Indicates if this is the last invocation for the current window. It can have values set to &#8216;true&#8217; or &#8216;false&#8217;:<\/span><\/li><\/ul>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-9a33d6f elementor-widget elementor-widget-code-highlight\" data-id=\"9a33d6f\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-javascript \">\n\t\t\t\t<code readonly=\"true\" class=\"language-javascript\">\n\t\t\t\t\t<xmp>\"isFinalInvokeForWindow\": false, \nor\n\"isFinalInvokeForWindow\": true<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-662b0eb elementor-widget elementor-widget-text-editor\" data-id=\"662b0eb\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ul><li><span style=\"color: #000000;\"><strong>isWindowTerminatedEarly &#8211;<\/strong>\u00a0A window ends early if the state exceeds the maximum allowed size of 1 MB. It can have values set to &#8216;true&#8217; or &#8216;false&#8217;:<\/span><\/li><\/ul>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-d1b603f elementor-widget elementor-widget-code-highlight\" data-id=\"d1b603f\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-javascript \">\n\t\t\t\t<code readonly=\"true\" class=\"language-javascript\">\n\t\t\t\t\t<xmp> \"isWindowTerminatedEarly\": false<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-abe296d elementor-widget elementor-widget-text-editor\" data-id=\"abe296d\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><span style=\"color: #000000;\">Let&#8217;s dive deeper into the implementation details of the SensorStatsByLatestMinuteFunction:<\/span><\/p><ul style=\"font-variant-ligatures: normal; font-variant-caps: normal; font-family: Roboto, sans-serif; font-size: 14px; font-style: normal; font-weight: 400;\"><li style=\"font-size: 14px;\"><p style=\"font-size: 14px;\"><span style=\"font-size: 14px; color: #000000;\">The event handler in app.js receives event payload every second and retrieves sensor data records from the payload:<\/span><\/p><\/li><\/ul>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-1219261 elementor-widget elementor-widget-code-highlight\" data-id=\"1219261\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-javascript \">\n\t\t\t\t<code readonly=\"true\" class=\"language-javascript\">\n\t\t\t\t\t<xmp>exports.handler = async (event) => {\r\n  console.log(`Received sensor data: ${event.Records.length} messages`);\r\n  let jsonRecords = getRecordsFromPayload(event.Records);\r\n  \r\n  -----------------\r\n\r\nconst getRecordsFromPayload = (eventRecords) => {\r\n  let jsonRecords = [];\r\n  \/\/ Get records from event payload\r\n  eventRecords.map((record) => {\r\n    \/\/ Extract JSON record from base64 data\r\n    const buffer = Buffer.from(record.kinesis.data, \"base64\").toString();\r\n    const jsonRecord = JSON.parse(buffer);\r\n\r\n    jsonRecords.push(jsonRecord);\r\n  });\r\n  return jsonRecords;\r\n};<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-a37c8b2 elementor-widget elementor-widget-text-editor\" data-id=\"a37c8b2\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ul><li><span style=\"color: #000000;\">It then checks if the process is complete by looking for the &#8216;event&#8217; value set to &#8216;complete&#8217; by the simulator:<\/span><\/li><\/ul>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-cd258bf elementor-widget elementor-widget-code-highlight\" data-id=\"cd258bf\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-javascript \">\n\t\t\t\t<code readonly=\"true\" class=\"language-javascript\">\n\t\t\t\t\t<xmp>\/\/ Check for the \"complete\" event:\r\n  const completedProcessRecords = jsonRecords.filter(\r\n    (record) => record.event === \"complete\"\r\n  );<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-4aeadb0 elementor-widget elementor-widget-text-editor\" data-id=\"4aeadb0\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ul><li><span style=\"color: #000000;\">If there are completed processes, then we check if there are any sensor data records in the event payload in order to determine if the number of remaining sensor data records is sufficient to calculate statistics over them:<\/span><\/li><\/ul>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-3a52130 elementor-widget elementor-widget-code-highlight\" data-id=\"3a52130\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-javascript line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-javascript\">\n\t\t\t\t\t<xmp>\/\/ If the \"complete\" event was issued, then we need to check the current state,\r\n\/\/ and, if the total number of data points for each sensor is less than 15, then we\r\n\/\/ ignore those values, and return without publishing any new aggregate stats:\r\n  if (completedProcessRecords.length !== 0) {\r\n    console.log(\"Received 'complete' message!\");\r\n    \/\/ Check if there are any current data records passed from the data stream:\r\n    const sensorDataRecords = jsonRecords.filter(\r\n      (record) => record.event === \"update\"\r\n    );\r\n\r\n    \/\/ console.log(\"sensorDataRecords:\", sensorDataRecords);\r\n\r\n    \/\/ Retrieve existing state passed during tumbling window\r\n    let state = event.state || {};\r\n    jsonRecords.map(\r\n      (record) => (processMap[record.processId] = record.facilityId)\r\n    );\r\n\r\n    if (sensorDataRecords.length !== 0) {\r\n      getSensorDataByProcessId(state, sensorDataRecords);\r\n    }\r\n\r\n    console.log(\"Publish last sensor stats after complete event\");\r\n    await publishToIoT(state);\r\n\r\n    console.log(\"Done publishing stats per last minute for the process\");\r\n    if (event.isFinalInvokeForWindow) {\r\n      console.log(\"This is finalInvokeForWindow after complete event\");\r\n      \/\/ We don't need the state anymore, just return:\r\n      return;\r\n    } else {\r\n      \/\/ We need to return the state object here since this is not\r\n      \/\/ the final invoke window.\r\n      \/\/ Remove state data for the completed processes:\r\n      completedProcessRecords.map((record) => {\r\n        delete state[record.processId];\r\n      });\r\n      return { state };\r\n    }\r\n  }<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-0aff51f elementor-widget elementor-widget-text-editor\" data-id=\"0aff51f\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ul><li><span style=\"color: #000000;\">The <strong>getSensorDataByProcessId<\/strong> function combines sensor data records from the event payload with the data in the state object:<\/span><\/li><\/ul>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-554e0b0 elementor-widget elementor-widget-code-highlight\" data-id=\"554e0b0\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-javascript line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-javascript\">\n\t\t\t\t\t<xmp>const getSensorDataByProcessId = (state, jsonRecords) => {\r\n\/\/ console.log(\"getSensorDataByProcessId: \", state);\r\n  jsonRecords.map((record) => {\r\n    \/\/ Add processId if not in state\r\n    if (!state[record.processId]) {\r\n      state[record.processId] = {};\r\n    }\r\n\r\n    if (!state[record.processId][record.sensorId]) {\r\n      state[record.processId][record.sensorId] = {};\r\n      state[record.processId][record.sensorId].sensorData = [];\r\n      state[record.processId][record.sensorId].name = record.name;\r\n    }\r\n    state[record.processId][record.sensorId].sensorData.push(record.sensorData);\r\n  });\r\n};<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-8ef23c9 elementor-widget elementor-widget-text-editor\" data-id=\"8ef23c9\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ul><li><span style=\"color: #000000;\">We next call the <strong>publishToIoT <\/strong>method, and pass the state object. The method checks if there are less than 15 data points of sensor data per sensor, then it is ignored, otherwise, the statistics are calculated, combined into a single payload, and published to the IoT topic &#8216;latest-minutestats&#8217;:<\/span><\/li><\/ul>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-06bd502 elementor-widget elementor-widget-code-highlight\" data-id=\"06bd502\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-javascript line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-javascript\">\n\t\t\t\t\t<xmp>const publishToIoT = async (processSensorData) => {\r\n  let promises = [];\r\n  let payloadObjectArray = [];\r\n  for (let processId in processSensorData) {\r\n    for (const [sensorId, sensorDataInfo] of Object.entries(\r\n      processSensorData[processId]\r\n    )) {\r\n      \/\/ Ignore if sensor data size is less than 15:\r\n      if (sensorDataInfo.sensorData.length < 15) {\r\n        console.log(\r\n          \"sensorData size is < 15. Ignore sensor data: \",\r\n          sensorDataInfo\r\n        );\r\n        continue;\r\n      }\r\n      \/\/  console.log(\"sensorDataInfo: \", sensorDataInfo);\r\n      const payloadObject = {\r\n        name: sensorDataInfo.name,\r\n        sensorId: sensorId,\r\n        deviceTimestamp: Date.now(),\r\n        min_value: Math.min(...sensorDataInfo.sensorData),\r\n        max_value: Math.max(...sensorDataInfo.sensorData),\r\n        stddev_value: getStandardDevitation(sensorDataInfo.sensorData),\r\n      };\r\n      payloadObjectArray.push(payloadObject);\r\n    }\r\n    const JSONpayload = {\r\n      msg: \"sensorstats\",\r\n      facilityId: processMap[processId],\r\n      processId: `proc-${processId}`,\r\n      sensorstats: JSON.stringify(payloadObjectArray),\r\n    };\r\n\r\n    let promise = iotdata\r\n      .publish({\r\n        topic: process.env.TOPIC,\r\n        qos: 0,\r\n        payload: JSON.stringify(JSONpayload),\r\n      })\r\n      .promise();\r\n    promises.push(promise);\r\n  }\r\n\r\n  \/\/ Wait for all promises to be settled\r\n  const results = await Promise.allSettled(promises);\r\n\r\n  \/\/ Log out any rejected results\r\n  results.map((result) =>\r\n    result.status === \"rejected\" ? console.log(result) : null\r\n  );\r\n};<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-b294f91 elementor-widget elementor-widget-text-editor\" data-id=\"b294f91\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ul><li><span style=\"color: #000000;\">When we publish the last sensor stats after the &#8216;complete&#8217; event, we then check if this is the final\u00a0 invoke for the tumbling window; if it is, we just return, otherwise, we remove the state sensor data for the completed process, and return the state object:<\/span><\/li><\/ul>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-523a588 elementor-widget elementor-widget-code-highlight\" data-id=\"523a588\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-javascript line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-javascript\">\n\t\t\t\t\t<xmp>console.log(\"Done publishing stats per last minute for the process\");\r\n    if (event.isFinalInvokeForWindow) {\r\n      console.log(\"This is finalInvokeForWindow after the 'complete' event\");\r\n      \/\/ We don't need the state anymore, just return:\r\n      return;\r\n    } else {\r\n      \/\/ We need to return the state object here since this is not\r\n      \/\/ the final invoke window.\r\n      \/\/ Remove state data for the completed processes:\r\n      completedProcessRecords.map((record) => {\r\n        delete state[record.processId];\r\n      });\r\n      return { state };\r\n    }<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-34e4dc5 elementor-widget elementor-widget-text-editor\" data-id=\"34e4dc5\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ul><li><span style=\"color: #000000;\">If there was no &#8216;complete&#8217; event in the payload records:\u00a0<\/span><\/li><\/ul>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-390ff1e elementor-widget elementor-widget-text-editor\" data-id=\"390ff1e\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ol><li style=\"list-style-type: none;\"><ul><li><span style=\"color: #000000;\">We retrieve existing state passed during tumbling window.<\/span><\/li><li><span style=\"color: #000000;\">Call getSensorDataByProcessId(state, jsonRecords) to combine sensor data records in the event payload with the existing state.<\/span><\/li><li><span style=\"color: #000000;\">Check if this is final invoke for window; if it is, and the state object is not empty, we call publishToIoT(state).<\/span><\/li><li><span style=\"color: #000000;\">We return the state object if this is not the final invoke for window.<\/span><\/li><\/ul><\/li><\/ol>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-b05bf98 elementor-widget elementor-widget-code-highlight\" data-id=\"b05bf98\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-default copy-to-clipboard \">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-javascript line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-javascript\">\n\t\t\t\t\t<xmp>\/\/ There was no \"complete\" event in the payload records:\r\n  else {\r\n    \/\/ Retrieve existing state passed during tumbling window\r\n    let state = event.state || {};\r\n\r\n    \/\/ Get sensor data of a process from event\r\n    jsonRecords.map(\r\n      (record) => (processMap[record.processId] = record.facilityId)\r\n    );\r\n    \/\/ console.log(\"Payload records: \", JSON.stringify(jsonRecords, null, 2));\r\n\r\n    getSensorDataByProcessId(state, jsonRecords);\r\n\r\n    \/\/ Since tumbling window is configured, publish to IoT endpoint\r\n    \/\/ on the final invoke window:\r\n    if (event.isFinalInvokeForWindow) {\r\n      \/\/ Make sure the state is not empty before publishing to IoT.\r\n      \/\/ This is the use case when the 'complete' event has been already processed\r\n      \/\/ during the window invocation that wasn't final.\r\n      if (Object.entries(state).length === 0) {\r\n        console.log(\"isFinalInvokeForWindow but the state is empty.\");\r\n        return;\r\n      }\r\n\r\n      \/\/  console.log(\"Final invoke state: \", JSON.stringify(state, null, 2));\r\n      await publishToIoT(state);\r\n    } else {\r\n      \/\/  console.log(\"Returning state: \", JSON.stringify(state, null, 2));\r\n      return { state };\r\n    }\r\n  }<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-5863e36 elementor-widget elementor-widget-heading\" data-id=\"5863e36\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h4 class=\"elementor-heading-title elementor-size-default\">Debugging 'SensorStatsByLatestMinuteFunction' locally<\/h4>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-9f16148 elementor-widget elementor-widget-text-editor\" data-id=\"9f16148\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><span style=\"color: #000000;\">AWS SAM allows developers to step through Lambda functions locally without actually deploying them in the cloud. <\/span><span style=\"color: #000000;\">To be able to test and debug Lambda functions locally in the IDE of your choice you need to install an\u00a0<\/span><span style=\"color: #3366ff;\"><a style=\"color: #3366ff;\" href=\"https:\/\/docs.aws.amazon.com\/serverless-application-model\/latest\/developerguide\/serverless-sam-cli-using-debugging.html\" target=\"_blank\" rel=\"noopener\">AWS Toolkit plugin<\/a><\/span><span style=\"color: #000000;\"> for that IDE. For those who are using VS Code, here is a link to the documentation page that contains instructions on how to install the toolkit plugin: <span style=\"color: #3366ff;\"><a href=\"https:\/\/docs.aws.amazon.com\/toolkit-for-vscode\/latest\/userguide\/setup-toolkit.html\" target=\"_blank\" rel=\"noopener\"><span style=\"color: #3366ff;\">https:\/\/docs.aws.amazon.com\/toolkit-for-vscode\/latest\/userguide\/setup-toolkit.html<\/span><\/a><span style=\"color: #000000;\">.<\/span><\/span><\/span><\/p><p><span style=\"color: #000000;\"><span style=\"color: #3366ff;\"><span style=\"color: #000000;\">There are three distinct use cases for which I wanted to test (debug to be precise) &#8216;SensorStatsByLatestMinuteFunction&#8217; tumbling window Lambda function:<\/span><\/span><\/span><\/p><ol><li><span style=\"color: #000000;\">Function is invoked with &#8216;isFinalInvokeForWindow&#8217; attribute in the event payload set to &#8216;false&#8217;.<\/span><\/li><li><span style=\"color: #000000;\">Function is invoked with the &#8216;event&#8217; value in the message from the simulator set to &#8216;complete&#8217;.<\/span><\/li><li><span style=\"color: #000000;\">Function is invoked with &#8216;isFinalInvokeForWindow&#8217; attribute in the event payload set to &#8216;true&#8217;.<\/span><\/li><\/ol><p><span style=\"color: #000000;\">The test harnesses and test events for each use case can be found in this\u00a0<span style=\"color: #3366ff;\"><a style=\"color: #3366ff;\" href=\"https:\/\/github.com\/cloud21sak\/SensorStreamingApplication\/tree\/master\/2-sensorapp-streaming-kds\/sensorStatsByLatestMinute\/TestFolder\" target=\"_blank\" rel=\"noopener\">GitHub repository<\/a><\/span>.<\/span><\/p><p><span style=\"color: #000000;\">In VS Code the test folder looks like this:<\/span><\/p>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-1e19cb6 elementor-widget elementor-widget-image\" data-id=\"1e19cb6\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"image.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<img loading=\"lazy\" decoding=\"async\" width=\"990\" height=\"457\" src=\"https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/LatestMinuteStatsDebugging.png?fit=990%2C457&amp;ssl=1\" class=\"attachment-large size-large wp-image-2026\" alt=\"\" srcset=\"https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/LatestMinuteStatsDebugging.png?w=990&amp;ssl=1 990w, https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/LatestMinuteStatsDebugging.png?resize=300%2C138&amp;ssl=1 300w, https:\/\/i0.wp.com\/thecloud21.com\/wp-content\/uploads\/2022\/10\/LatestMinuteStatsDebugging.png?resize=768%2C355&amp;ssl=1 768w\" sizes=\"(max-width: 990px) 100vw, 990px\" \/>\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-11f4b04 elementor-widget elementor-widget-text-editor\" data-id=\"11f4b04\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><span style=\"color: #000000;\">I will explain how to step through the first use case; the other two are very similar:<\/span><\/p><ol><li><span style=\"color: #000000;\">In the <strong>testHaressNoFinalInvoke.js<\/strong> make sure to replace the IOT_DATA_ENDPOINT value with your account&#8217;s device data endpoint. You can find your account&#8217;s device data endpoint in the <strong>Settings<\/strong> page of your AWS IoT Console, or use the following CLI command:\u00a0<em><strong>aws iot describe-endpoint &#8211;endpoint-type iot:Data-ATS<\/strong><\/em><\/span><\/li><li><span style=\"color: #000000;\">In the <strong>testHaressNoFinalInvoke.js<\/strong> set a breakpoint (F9) where the function handler gets called.<\/span><\/li><li><span style=\"color: #000000;\">Press the F5 key to start Debugging.<\/span><\/li><li><span style=\"color: #000000;\">When the program breaks at the breakpoint, press F11 to step into the function we are debugging. Notice that for each test harness, there is a sample event object that is used to test the code.<\/span><\/li><li><span style=\"color: #000000;\">Once you are inside the function, use the debugger the same way as you would usually use to view variables, collections, etc.<\/span><\/li><\/ol><p><span style=\"color: #000000;\">In the <a href=\"https:\/\/github.com\/cloud21sak\/SensorStreamingApplication\/blob\/master\/README.md\" target=\"_blank\" rel=\"noopener\"><span style=\"color: #3366ff;\">Readme<\/span><\/a> file of this series, I provide additional section on the local debugging of other Lambda functions in this project.<\/span><\/p>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-404f885 elementor-widget elementor-widget-heading\" data-id=\"404f885\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h2 class=\"elementor-heading-title elementor-size-default\">Kinesis Data Analytics vs. tumbling window Lambda function<\/h2>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-6e2d049 elementor-widget elementor-widget-text-editor\" data-id=\"6e2d049\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><span style=\"color: #000000;\">Since both services provide similar feature, how do we choose which service to use in our application?<\/span><\/p><p><span style=\"color: #000000;\">For use cases where streaming data requires storing of intermediate results in order to perform additional processing before persisting records to an output destination, the Kinesis Data Analytics would be a better choice because it can be configured with multiple in-application streams. For example, if a data analytics application needs to first dynamically calculate a column value based on other column values of a record, and then perform a query that includes the newly calculated values, the application would need to store the intermediate results in an in-application stream, and then perform additional processing using windowed query in that stream.<\/span><\/p><p><span style=\"color: #000000;\">Another use case for using Kinesis Data Analytics service would be if user needs to persist the processed records to an S3 bucket. In such case the data analytics application can be configured to have its external destination to be a Kinesis Firehose delivery stream.<\/span><\/p><p><span style=\"color: #000000;\">For use cases where streaming data processing doesn&#8217;t require storing of intermediate results, and the external destination is an IoT endpoint, or a DynamoDB table, then using tumbling window Lambda would be a better choice. In ACME Industries application, since we don&#8217;t store any intermediate results, and we don&#8217;t need to persist the processed records, using the tumbling window Lambda would be the preferred choice.<\/span><\/p>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-bacc655 elementor-widget elementor-widget-heading\" data-id=\"bacc655\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h2 class=\"elementor-heading-title elementor-size-default\">Conclusion<\/h2>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-770014c elementor-widget elementor-widget-text-editor\" data-id=\"770014c\" data-element_type=\"widget\" data-e-type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<p><span style=\"color: #000000;\">In this post, I explain how the &#8216;Stats by latest minute&#8217; feature is implemented in ACME Industries sample application. I show two different ways of implementing this feature:<\/span><\/p><ol><li><span style=\"color: #000000;\">Using Amazon Kinesis Data Analytics for SQL Applications.<\/span><\/li><li><span style=\"color: #000000;\">Using the tumbling window feature of AWS Lambda.<\/span><\/li><\/ol><p><span style=\"color: #000000;\">This post explains the architecture and implementation details of each approach.<br \/><\/span><span style=\"color: #000000;\">I also show how to locally debug the tumbling window Lambda function without deploying it in the cloud.<br \/><\/span><span style=\"color: #000000;\">Finally, I explain which approach would be a better choice for some of the use cases.<\/span><\/p><p><span style=\"color: #000000;\">In\u00a0<span style=\"color: #3366ff;\"><a style=\"color: #3366ff;\" href=\"https:\/\/thecloud21.com\/?p=2069\" target=\"_blank\" rel=\"noopener\">Part 5<\/a><\/span> I explain how the command and control is implemented in ACME Industries application.<\/span><\/p>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t\t<\/div>\n\t\t<\/div>\n\t\t\t\t\t<\/div>\n\t\t<\/section>\n\t\t\t\t<\/div>\n\t\t","protected":false},"excerpt":{"rendered":"<p>In&nbsp;Part 3 I explain how the cumulative daily statistics of a running process is implemented in ACME Industries application. I also cover how statistics of a completed process is calculated, stored, and accessed by the frontend. In this post I explain how the latest minute statistics of streaming sensor data is implemented in ACME Industries [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"nf_dc_page":"","om_disable_all_campaigns":false,"_monsterinsights_skip_tracking":false,"_monsterinsights_sitenote_active":false,"_monsterinsights_sitenote_note":"","_monsterinsights_sitenote_category":0,"ocean_post_layout":"","ocean_both_sidebars_style":"","ocean_both_sidebars_content_width":0,"ocean_both_sidebars_sidebars_width":0,"ocean_sidebar":"0","ocean_second_sidebar":"0","ocean_disable_margins":"enable","ocean_add_body_class":"","ocean_shortcode_before_top_bar":"","ocean_shortcode_after_top_bar":"","ocean_shortcode_before_header":"","ocean_shortcode_after_header":"","ocean_has_shortcode":"","ocean_shortcode_after_title":"","ocean_shortcode_before_footer_widgets":"","ocean_shortcode_after_footer_widgets":"","ocean_shortcode_before_footer_bottom":"","ocean_shortcode_after_footer_bottom":"","ocean_display_top_bar":"default","ocean_display_header":"default","ocean_header_style":"","ocean_center_header_left_menu":"0","ocean_custom_header_template":"0","ocean_custom_logo":0,"ocean_custom_retina_logo":0,"ocean_custom_logo_max_width":0,"ocean_custom_logo_tablet_max_width":0,"ocean_custom_logo_mobile_max_width":0,"ocean_custom_logo_max_height":0,"ocean_custom_logo_tablet_max_height":0,"ocean_custom_logo_mobile_max_height":0,"ocean_header_custom_menu":"0","ocean_menu_typo_font_family":"0","ocean_menu_typo_font_subset":"","ocean_menu_typo_font_size":0,"ocean_menu_typo_font_size_tablet":0,"ocean_menu_typo_font_size_mobile":0,"ocean_menu_typo_font_size_unit":"px","ocean_menu_typo_font_weight":"","ocean_menu_typo_font_weight_tablet":"","ocean_menu_typo_font_weight_mobile":"","ocean_menu_typo_transform":"","ocean_menu_typo_transform_tablet":"","ocean_menu_typo_transform_mobile":"","ocean_menu_typo_line_height":0,"ocean_menu_typo_line_height_tablet":0,"ocean_menu_typo_line_height_mobile":0,"ocean_menu_typo_line_height_unit":"","ocean_menu_typo_spacing":0,"ocean_menu_typo_spacing_tablet":0,"ocean_menu_typo_spacing_mobile":0,"ocean_menu_typo_spacing_unit":"","ocean_menu_link_color":"","ocean_menu_link_color_hover":"","ocean_menu_link_color_active":"","ocean_menu_link_background":"","ocean_menu_link_hover_background":"","ocean_menu_link_active_background":"","ocean_menu_social_links_bg":"","ocean_menu_social_hover_links_bg":"","ocean_menu_social_links_color":"","ocean_menu_social_hover_links_color":"","ocean_disable_title":"default","ocean_disable_heading":"default","ocean_post_title":"","ocean_post_subheading":"","ocean_post_title_style":"","ocean_post_title_background_color":"","ocean_post_title_background":0,"ocean_post_title_bg_image_position":"","ocean_post_title_bg_image_attachment":"","ocean_post_title_bg_image_repeat":"","ocean_post_title_bg_image_size":"","ocean_post_title_height":0,"ocean_post_title_bg_overlay":0.5,"ocean_post_title_bg_overlay_color":"","ocean_disable_breadcrumbs":"default","ocean_breadcrumbs_color":"","ocean_breadcrumbs_separator_color":"","ocean_breadcrumbs_links_color":"","ocean_breadcrumbs_links_hover_color":"","ocean_display_footer_widgets":"default","ocean_display_footer_bottom":"default","ocean_custom_footer_template":"0","_jetpack_memberships_contains_paid_content":false,"ocean_post_oembed":"","ocean_post_self_hosted_media":"","ocean_post_video_embed":"","ocean_link_format":"","ocean_link_format_target":"self","ocean_quote_format":"","ocean_quote_format_link":"post","ocean_gallery_link_images":"off","ocean_gallery_id":[],"footnotes":""},"categories":[19,1],"tags":[33,34,32,20],"class_list":["post-1591","post","type-post","status-publish","format-standard","hentry","category-serverless","category-solutionarchitecture","tag-cloudformation","tag-aws-sam","tag-kinesis-analytics","tag-lambda","entry"],"aioseo_notices":[],"jetpack_featured_media_url":"","jetpack_sharing_enabled":true,"_links":{"self":[{"href":"https:\/\/thecloud21.com\/index.php?rest_route=\/wp\/v2\/posts\/1591","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/thecloud21.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/thecloud21.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/thecloud21.com\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/thecloud21.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=1591"}],"version-history":[{"count":6,"href":"https:\/\/thecloud21.com\/index.php?rest_route=\/wp\/v2\/posts\/1591\/revisions"}],"predecessor-version":[{"id":4185,"href":"https:\/\/thecloud21.com\/index.php?rest_route=\/wp\/v2\/posts\/1591\/revisions\/4185"}],"wp:attachment":[{"href":"https:\/\/thecloud21.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=1591"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/thecloud21.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=1591"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/thecloud21.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=1591"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}