Architecting and implementing serverless application with streaming sensor data: Part 4

In Part 3 I explain how the cumulative daily statistics of a running process is implemented in ACME Industries application. I also cover how statistics of a completed process is calculated, stored, and accessed by the frontend.

In this post I explain how the latest minute statistics of streaming sensor data is implemented in ACME Industries application. I show two different ways to implement this:

  • Using Amazon Kinesis Data Analytics for SQL Applications
  • Using the tumbling window feature of AWS Lambda

The setup instructions for the example application are provided in the Readme.md file.

Please note that this project uses services that are not covered by the AWS Free Tier,  and will incur cost.

Implementing the latest minute statistics of streaming sensor data with Amazon Kinesis Data Analytics for SQL Applications

Architecture overview

ACME Industries uses the following architecture to process and calculate latest minute statistics for each sensor of a running process: 

  1. Kinesis Data Analytics for SQL Applications is one of the consumers for the data stream.
  2. Kinesis data analytics application – ‘sensor-stats’ – continuously reads streaming data. The application executes time-based windowed query in a 30 second tumbling window based on ROWTIME column. 
  3. The AWS Lambda function – SensorStatsFunction – is configured to be an external destination for ‘sensor-stats’ Kinesis data analytics application.
  4. The invoked SensorStatsFunction sorts out sensor stats records per facility and per running process, and publishes them to the application’s IoT topic to update the frontend.

Implementation details

The code that implements the ‘sensor-stats’ Kinesis Data Analytics application and SensorStatsFunction Lambda function is located in the 4-sensorapp-streaming-kda directory in the GitHub repository. 

The template.yaml file defines the ‘sensor-stats’ application with an aggregation query in a tumbling window with 30 second interval:

				
					KinesisAnalyticsSensorApplication:
    Type: "AWS::KinesisAnalytics::Application"
    Properties:
      ApplicationName: "sensor-stats"
      ApplicationCode: |
        CREATE OR REPLACE STREAM "SENSORCALC_STREAM" (             
            "deviceTimestamp"  TIMESTAMP,
            "name" VARCHAR(16),              
            "facilityId" INTEGER,
            "processId" BIGINT,
            "sensorId" INTEGER NOT NULL, 
            "min_value" REAL,
            "max_value" REAL,
            "stddev_value" REAL);
            
        CREATE OR REPLACE PUMP "SENSORCALC_STREAM_PUMP" AS 
        INSERT INTO "SENSORCALC_STREAM" 
        SELECT STREAM STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '30' SECOND) AS "deviceTimestamp",              
            "name", 
            "facilityId",             
            "processId",             
            "sensorId",
            MIN("sensorData") AS "min_value",
            MAX("sensorData") AS "max_value",
            STDDEV_SAMP("sensorData") AS "stddev_value"                             
        FROM "SOURCE_SQL_STREAM_001"
        GROUP BY "facilityId", "processId", "sensorId", "name",
               STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '30' SECOND);
				
			

The ‘sensor-stats’ data analytics application consists of two in-application streams and a pump:

  • The Kinesis Data Streams ‘sensordata’ stream is the streaming source that is mapped to the in-application stream ‘SOURCE_SQL_STREAM_001’.  Data continuously flows from the streaming source into the in-application stream.
  • The ‘SENSORCALC_STREAM_PUMP’ is a continuous windowed query that inserts query results into the ‘SENSORCALC_STREAM’ in-application stream. The windowed query processes each window in a non-overlapping manner (tumbling window); it uses the GROUP BY clause to group sensor data records from ‘SOURCE_SQL_STREAM_001’ in-application stream in a 30-second window using the ROWTIME column values. The query emits one output record per each sensor of a running process every 30 seconds providing min, max, and standard deviation for the 30-second interval. Note that, although the feature is called ‘latest minute stats’, the application is configured for the latest 30-second window. If you want to  see the full latest minute stats, you just need to replace the 30-second interval with the 60-second one in the Select statement in the template.yaml file.
  • The ‘deviceTimestamp’ value in the ‘SENSORCALC_STREAM’ approximates the time when the query was processed.

Output configuration for 'sensor-stats' application

Output configuration for ‘sensor-stats’ application is provided in the template.yaml file:

				
					KinesisAnalyticsSensorApplicationOutput:
    Type: "AWS::KinesisAnalytics::ApplicationOutput"
    DependsOn: KinesisAnalyticsSensorApplication
    Properties:
      ApplicationName: !Ref KinesisAnalyticsSensorApplication
      Output:
        Name: "SENSORCALC_STREAM"
        LambdaOutput:
          ResourceARN: !GetAtt SensorStatsFunction.Arn
          RoleARN: !GetAtt KinesisAnalyticsSensorRole.Arn
        DestinationSchema:
          RecordFormatType: "JSON"
				
			
  • ‘SENSORCALC_STREAM’ is the in-application stream that we want to persist to an external destination.
  • SensorStatsFunction Lambda function ARN is provided as the external destination of the application output.
  • KinesisAnalyticsSensorRole ARN is provided in order for Kinesis Data Analytics to assume the role to be able to invoke the Lambda destination function on our behalf.
  • RecordFormatType (JSON, CSV) specifies the record format to use when writing to the external destination.

Configuring and implementing the SensorStatsFunction Lambda function for analytics application destination

The configuration for the SensorStatsFunction is provided in the template.yaml file:

				
					SensorStatsFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: SensorStatsFunction/
      Handler: app.handler
      Runtime: nodejs14.x
      Timeout: 30
      MemorySize: 128
      Environment:
        Variables:
          IOT_DATA_ENDPOINT: !Ref IoTdataEndpoint
          TOPIC: "latest-minutestats"
      Policies:
        - Statement:
            - Sid: PublishToIotPolicy
              Effect: Allow
              Action:
                - "iot:Publish"
              Resource: "*"
				
			
  • The ‘sensor-stats’ application invokes the SensorStatsFunction Lambda function and passes records from the ‘SENSORCALC_STREAM’.
  • The ‘IOT_DATA_ENDPOINT’ variable with topic ‘latest-minutestats’ is an IoT endpoint provided as a final destination for the SensorStatsFunction to send the results.

The SensorStatsFunction aggregates sensor stats per running process of a facility into a single record, then publishes it to the ‘latest-minutestats’ IoT topic to update the frontend:

				
					const AWS = require("aws-sdk");
const iotdata = new AWS.IotData({ endpoint: process.env.IOT_DATA_ENDPOINT });

exports.handler = async (event) => {
  console.log(`Received sensor stats: ${event.records.length} messages`);
  console.log(JSON.stringify(event, null, 2));

  const results = await publishSensorStatsToIoT(event.records);
  console.log("results:", results);

  return { records: results };
};

const publishSensorStatsToIoT = async (records) => {
  let success = 0;
  let failure = 0;
  let processMap = {};

  records.map((record) => {
    const payload = Buffer.from(record.data, "base64").toString("ascii");
    console.log("Payload for sensor stats: ", payload);
    const payloadObject = JSON.parse(payload);

    // Sort out sensor stats messages per facility and per process for
    // each facility. Note that currently the simulator only assumes
    // one facility and one running process:
    if (!processMap[payloadObject.facilityId]) {
      processMap[payloadObject.facilityId] = {};
      processMap[payloadObject.facilityId][payloadObject.processId] = {};
      processMap[payloadObject.facilityId][payloadObject.processId][
        payloadObject.sensorId
      ] = {};
    } else if (!processMap[payloadObject.facilityId][payloadObject.processId]) {
      processMap[payloadObject.facilityId][payloadObject.processId] = {};
      processMap[payloadObject.facilityId][payloadObject.processId][
        payloadObject.sensorId
      ] = {};
    } else if (
      !processMap[payloadObject.facilityId][payloadObject.processId][
        payloadObject.sensorId
      ]
    ) {
      processMap[payloadObject.facilityId][payloadObject.processId][
        payloadObject.sensorId
      ] = {};
    }

    processMap[payloadObject.facilityId][payloadObject.processId][
      payloadObject.sensorId
    ].payload = payloadObject;

    // After messages are published, we must return
    // the recordId(s) and the result(s) back to KDA application:
    processMap[payloadObject.facilityId][payloadObject.processId][
      payloadObject.sensorId
    ].output = {
      recordId: record.recordId,
      result: "",
    };
  });

  let payloadObjectArray = [];
  let promises = [];

  // Publish sensor stats per running process to IoT topic:
  for (let facilityId in processMap) {
    for (let processId in processMap[facilityId]) {
      for (const [sensorId, sensorDataInfo] of Object.entries(
        processMap[facilityId][processId]
      )) {
        payloadObjectArray.push(sensorDataInfo.payload);
      }

      // console.log("payloadObject:", payloadObject);
      const JSONpayload = {
        msg: "sensorstats",
        facilityId: facilityId,
        processId: `proc-${processId}`,
        sensorstats: JSON.stringify(payloadObjectArray),
      };

      try {
        let promise = iotdata
          .publish({
            topic: process.env.TOPIC,
            qos: 0,
            payload: JSON.stringify(JSONpayload),
          })
          .promise();
        promises.push(promise);

        success++;

        // Set the result for each sensor data record of the process to "Ok":
        for (const [sensorId, sensorDataInfo] of Object.entries(
          processMap[facilityId][processId]
        )) {
          processMap[facilityId][processId][sensorId].output.result = "Ok";
        }
      } catch (err) {
        failure++;
        console.error(err);

        // Set the result for each sensor data record of the process to "DeliveryFailed":
        for (const [sensorId, sensorDataInfo] of Object.entries(
          processMap[facilityId][processId]
        )) {
          processMap[facilityId][processId][sensorId].output.result =
            "DeliveryFailed";
        }
      }

      console.log(
        `Successfully delivered records ${success}, Failed delivered records ${failure}.`
      );

      const results = await Promise.allSettled(promises);
      results.map((result) =>
        result.status === "rejected"
          ? console.log("Promise rejected:", result)
          : null
      );
    }
  }

  let output = [];
  for (let facilityId in processMap) {
    for (let processId in processMap[facilityId]) {
      for (let sensorId in processMap[facilityId][processId]) {
        output.push(processMap[facilityId][processId][sensorId].output);
      }
    }
  }

  console.log("output:", output);
  return output;
};

				
			

Note that it is important for the invoked Lambda function to return an array of JSON objects that have the exact number of record IDs it received from the analytics application together with a result value (‘Ok’ or ‘DeliveryFailed’). If this is not implemented correctly, the application will assume delivery failure, and will keep re-sending records.

The Admin and Operator frontends both subscribe to the ‘latest-minutestats’ IoT topic and receive updated latest minute sensor data stats of a running process:

IAM role for the data analytics application

The IAM role – ‘KinesisAnalyticsSensorRole’ – is attached to the ‘sensor-stats’ application to allow it to read records from the Kinesis Data Streams ‘sensordata’ stream and invoke the SensorStatsFunction Lambda function:

				
					 KinesisAnalyticsSensorRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - kinesisanalytics.amazonaws.com
            Action:
              - "sts:AssumeRole"
      Path: /
      Policies:
        - PolicyName: ReadInputSensorKinesis
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "kinesis:DescribeStream"
                  - "kinesis:GetShardIterator"
                  - "kinesis:GetRecords"
                Resource: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/sensordata"
        - PolicyName: InvokeSensorStatsLambdaFunction
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "lambda:InvokeFunction"
                  - "lambda:GetFunctionConfiguration"
                Resource: !GetAtt SensorStatsFunction.Arn
				
			

The ReadInputSensorKinesis and the InvokeSensorStatsLambdaFunction policies grant such permissions.

Implementing the latest minute statistics of streaming sensor data with AWS Lambda tumbling window function

Architecture overview

ACME Industries uses the following architecture that includes the tumbling window Lambda function to calculate latest minute sensor statistics of a running process: 

  • The tumbling window function is one of the consumers of Kinesis Data Streams.
  • It receives sensor data of a running process from Kinesis Data Streams every second.
  • The function aggregates sensor data per process during a 30-second window, then calculates sensor stats (min, max, and standard deviation) over the aggregated sensor data. 
  • It combines calculated sensor stats for each running process into a single payload and publishes it to an IoT topic to update the frontend.

Implementation details

The tumbling window feature of Lambda provides an alternative way to implement near real-time analytics in addition to Amazon Kinesis Data Analytics service. This feature allows the function invocations from streaming source to pass state between invocations.

ACME Industries uses this feature to aggregate sensor data per sensor of a running process on each invocation during a 30-second window. At the end of the window, it calculates sensor statistics using the aggregated data from the state object.

The template.yaml file describes the tumbling window Lambda function – SensorStatsByLatestMinuteFunction:

				
					SensorStatsByLatestMinuteFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: sensorStatsByLatestMinute/
      Handler: app.handler
      Runtime: nodejs14.x
      Timeout: 300
      MemorySize: 256
      Environment:
        Variables:          
          IOT_DATA_ENDPOINT: !Ref IoTdataEndpoint
          TOPIC: "latest-minutestats"
      Policies:
        - Statement:
            - Sid: PublishToIotPolicy
              Effect: Allow
              Action:
                - "iot:Publish"
              Resource: "*"
      Events:
        Stream:
          Type: Kinesis
          Properties:
            Stream: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${KinesisSensorStreamName}"
            BatchSize: 1000
            StartingPosition: TRIM_HORIZON
            TumblingWindowInSeconds: 30
				
			

The tumbling window duration is specified in the TumblingWindowInSeconds attribute of the Events definition.

When tumbling window is enabled, the Lambda function’s event payload contains several new attributes:

  • State – this is an object where aggregated data is stored between Lambda function invocations. The state object can contain up to 1MB of data. It is initially empty in a new window. Here is an example of the state object in the event payload:
				
					"state": {
    "1656765771320": {
      "0": {
        "sensorData": [
          75.65867806305917, 28.85717410712807, 7.862853766610045,
          75.39909061184122, 29.01159697707094, 93.40315875898159,
          56.52958345117889, 71.11442413234643, 37.564327993605914,
          6.412071893480054, 28.13930698035374, 68.17103246373301,
          25.073777450988487, 82.10792619144244, 28.272966638142492,
          43.39891863983387, 43.51220820711272, 16.148025582768888,
          93.8989727250565
        ],
        "name": "temperature_0"
      },
      "1": {
        "sensorData": [
          35.512093577877835, 92.30443351178394, 91.1013009317126,
          31.20491262528111, 5.574814343573187, 33.94246386030873,
          6.494679575043105, 71.11215754104599, 8.126400488748109,
          0.07655841090292714, 88.8128976264104, 87.25682260247095,
          83.33884324596261, 8.684308092071191, 51.80094439944403,
          24.897329381789124, 3.8911734219942318, 79.00718198212083,
          39.36909519579619
        ],
        "name": "temperature_1"
      },
      "2": {
        "sensorData": [
          30.784271169992937, 45.4010703443011, 42.96638448264816,
          52.817118625082045, 96.21402855527317, 2.930590439236158,
          72.09102178943726, 87.07955921671082, 62.868664604001204,
          83.21783269378844, 90.8021362093286, 36.40327662383835,
          54.20156864736951, 26.899868288525397, 6.237623237252254,
          47.51428556765154, 38.194251401905156, 45.07520152439093
        ],
        "name": "temperature_2"
      }
    }
  }
				
			
  • Window start and end – the beginning and ending timestamps of the current window:
				
					"window": {
    "start": "2022-07-02T12:44:00Z",
    "end": "2022-07-02T12:44:30Z"
  }
				
			
  • isFinalInvokeForWindow – Indicates if this is the last invocation for the current window. It can have values set to ‘true’ or ‘false’:
				
					"isFinalInvokeForWindow": false, 
or
"isFinalInvokeForWindow": true
				
			
  • isWindowTerminatedEarly – A window ends early if the state exceeds the maximum allowed size of 1 MB. It can have values set to ‘true’ or ‘false’:
				
					 "isWindowTerminatedEarly": false
				
			

Let’s dive deeper into the implementation details of the SensorStatsByLatestMinuteFunction:

  • The event handler in app.js receives event payload every second and retrieves sensor data records from the payload:

				
					exports.handler = async (event) => {
  console.log(`Received sensor data: ${event.Records.length} messages`);
  let jsonRecords = getRecordsFromPayload(event.Records);
  
  -----------------

const getRecordsFromPayload = (eventRecords) => {
  let jsonRecords = [];
  // Get records from event payload
  eventRecords.map((record) => {
    // Extract JSON record from base64 data
    const buffer = Buffer.from(record.kinesis.data, "base64").toString();
    const jsonRecord = JSON.parse(buffer);

    jsonRecords.push(jsonRecord);
  });
  return jsonRecords;
};
				
			
  • It then checks if the process is complete by looking for the ‘event’ value set to ‘complete’ by the simulator:
				
					// Check for the "complete" event:
  const completedProcessRecords = jsonRecords.filter(
    (record) => record.event === "complete"
  );
				
			
  • If there are completed processes, then we check if there are any sensor data records in the event payload in order to determine if the number of remaining sensor data records is sufficient to calculate statistics over them:
				
					// If the "complete" event was issued, then we need to check the current state,
// and, if the total number of data points for each sensor is less than 15, then we
// ignore those values, and return without publishing any new aggregate stats:
  if (completedProcessRecords.length !== 0) {
    console.log("Received 'complete' message!");
    // Check if there are any current data records passed from the data stream:
    const sensorDataRecords = jsonRecords.filter(
      (record) => record.event === "update"
    );

    // console.log("sensorDataRecords:", sensorDataRecords);

    // Retrieve existing state passed during tumbling window
    let state = event.state || {};
    jsonRecords.map(
      (record) => (processMap[record.processId] = record.facilityId)
    );

    if (sensorDataRecords.length !== 0) {
      getSensorDataByProcessId(state, sensorDataRecords);
    }

    console.log("Publish last sensor stats after complete event");
    await publishToIoT(state);

    console.log("Done publishing stats per last minute for the process");
    if (event.isFinalInvokeForWindow) {
      console.log("This is finalInvokeForWindow after complete event");
      // We don't need the state anymore, just return:
      return;
    } else {
      // We need to return the state object here since this is not
      // the final invoke window.
      // Remove state data for the completed processes:
      completedProcessRecords.map((record) => {
        delete state[record.processId];
      });
      return { state };
    }
  }
				
			
  • The getSensorDataByProcessId function combines sensor data records from the event payload with the data in the state object:
				
					const getSensorDataByProcessId = (state, jsonRecords) => {
// console.log("getSensorDataByProcessId: ", state);
  jsonRecords.map((record) => {
    // Add processId if not in state
    if (!state[record.processId]) {
      state[record.processId] = {};
    }

    if (!state[record.processId][record.sensorId]) {
      state[record.processId][record.sensorId] = {};
      state[record.processId][record.sensorId].sensorData = [];
      state[record.processId][record.sensorId].name = record.name;
    }
    state[record.processId][record.sensorId].sensorData.push(record.sensorData);
  });
};
				
			
  • We next call the publishToIoT method, and pass the state object. The method checks if there are less than 15 data points of sensor data per sensor, then it is ignored, otherwise, the statistics are calculated, combined into a single payload, and published to the IoT topic ‘latest-minutestats’:
				
					const publishToIoT = async (processSensorData) => {
  let promises = [];
  let payloadObjectArray = [];
  for (let processId in processSensorData) {
    for (const [sensorId, sensorDataInfo] of Object.entries(
      processSensorData[processId]
    )) {
      // Ignore if sensor data size is less than 15:
      if (sensorDataInfo.sensorData.length < 15) {
        console.log(
          "sensorData size is < 15. Ignore sensor data: ",
          sensorDataInfo
        );
        continue;
      }
      //  console.log("sensorDataInfo: ", sensorDataInfo);
      const payloadObject = {
        name: sensorDataInfo.name,
        sensorId: sensorId,
        deviceTimestamp: Date.now(),
        min_value: Math.min(...sensorDataInfo.sensorData),
        max_value: Math.max(...sensorDataInfo.sensorData),
        stddev_value: getStandardDevitation(sensorDataInfo.sensorData),
      };
      payloadObjectArray.push(payloadObject);
    }
    const JSONpayload = {
      msg: "sensorstats",
      facilityId: processMap[processId],
      processId: `proc-${processId}`,
      sensorstats: JSON.stringify(payloadObjectArray),
    };

    let promise = iotdata
      .publish({
        topic: process.env.TOPIC,
        qos: 0,
        payload: JSON.stringify(JSONpayload),
      })
      .promise();
    promises.push(promise);
  }

  // Wait for all promises to be settled
  const results = await Promise.allSettled(promises);

  // Log out any rejected results
  results.map((result) =>
    result.status === "rejected" ? console.log(result) : null
  );
};
				
			
  • When we publish the last sensor stats after the ‘complete’ event, we then check if this is the final  invoke for the tumbling window; if it is, we just return, otherwise, we remove the state sensor data for the completed process, and return the state object:
				
					console.log("Done publishing stats per last minute for the process");
    if (event.isFinalInvokeForWindow) {
      console.log("This is finalInvokeForWindow after the 'complete' event");
      // We don't need the state anymore, just return:
      return;
    } else {
      // We need to return the state object here since this is not
      // the final invoke window.
      // Remove state data for the completed processes:
      completedProcessRecords.map((record) => {
        delete state[record.processId];
      });
      return { state };
    }
				
			
  • If there was no ‘complete’ event in the payload records: 
    • We retrieve existing state passed during tumbling window.
    • Call getSensorDataByProcessId(state, jsonRecords) to combine sensor data records in the event payload with the existing state.
    • Check if this is final invoke for window; if it is, and the state object is not empty, we call publishToIoT(state).
    • We return the state object if this is not the final invoke for window.
				
					// There was no "complete" event in the payload records:
  else {
    // Retrieve existing state passed during tumbling window
    let state = event.state || {};

    // Get sensor data of a process from event
    jsonRecords.map(
      (record) => (processMap[record.processId] = record.facilityId)
    );
    // console.log("Payload records: ", JSON.stringify(jsonRecords, null, 2));

    getSensorDataByProcessId(state, jsonRecords);

    // Since tumbling window is configured, publish to IoT endpoint
    // on the final invoke window:
    if (event.isFinalInvokeForWindow) {
      // Make sure the state is not empty before publishing to IoT.
      // This is the use case when the 'complete' event has been already processed
      // during the window invocation that wasn't final.
      if (Object.entries(state).length === 0) {
        console.log("isFinalInvokeForWindow but the state is empty.");
        return;
      }

      //  console.log("Final invoke state: ", JSON.stringify(state, null, 2));
      await publishToIoT(state);
    } else {
      //  console.log("Returning state: ", JSON.stringify(state, null, 2));
      return { state };
    }
  }
				
			

Debugging 'SensorStatsByLatestMinuteFunction' locally

AWS SAM allows developers to step through Lambda functions locally without actually deploying them in the cloud. To be able to test and debug Lambda functions locally in the IDE of your choice you need to install an AWS Toolkit plugin for that IDE. For those who are using VS Code, here is a link to the documentation page that contains instructions on how to install the toolkit plugin: https://docs.aws.amazon.com/toolkit-for-vscode/latest/userguide/setup-toolkit.html.

There are three distinct use cases for which I wanted to test (debug to be precise) ‘SensorStatsByLatestMinuteFunction’ tumbling window Lambda function:

  1. Function is invoked with ‘isFinalInvokeForWindow’ attribute in the event payload set to ‘false’.
  2. Function is invoked with the ‘event’ value in the message from the simulator set to ‘complete’.
  3. Function is invoked with ‘isFinalInvokeForWindow’ attribute in the event payload set to ‘true’.

The test harnesses and test events for each use case can be found in this GitHub repository.

In VS Code the test folder looks like this:

I will explain how to step through the first use case; the other two are very similar:

  1. In the testHaressNoFinalInvoke.js make sure to replace the IOT_DATA_ENDPOINT value with your account’s device data endpoint. You can find your account’s device data endpoint in the Settings page of your AWS IoT Console, or use the following CLI command: aws iot describe-endpoint –endpoint-type iot:Data-ATS
  2. In the testHaressNoFinalInvoke.js set a breakpoint (F9) where the function handler gets called.
  3. Press the F5 key to start Debugging.
  4. When the program breaks at the breakpoint, press F11 to step into the function we are debugging. Notice that for each test harness, there is a sample event object that is used to test the code.
  5. Once you are inside the function, use the debugger the same way as you would usually use to view variables, collections, etc.

In the Readme file of this series, I provide additional section on the local debugging of other Lambda functions in this project.

Kinesis Data Analytics vs. tumbling window Lambda function

Since both services provide similar feature, how do we choose which service to use in our application?

For use cases where streaming data requires storing of intermediate results in order to perform additional processing before persisting records to an output destination, the Kinesis Data Analytics would be a better choice because it can be configured with multiple in-application streams. For example, if a data analytics application needs to first dynamically calculate a column value based on other column values of a record, and then perform a query that includes the newly calculated values, the application would need to store the intermediate results in an in-application stream, and then perform additional processing using windowed query in that stream.

Another use case for using Kinesis Data Analytics service would be if user needs to persist the processed records to an S3 bucket. In such case the data analytics application can be configured to have its external destination to be a Kinesis Firehose delivery stream.

For use cases where streaming data processing doesn’t require storing of intermediate results, and the external destination is an IoT endpoint, or a DynamoDB table, then using tumbling window Lambda would be a better choice. In ACME Industries application, since we don’t store any intermediate results, and we don’t need to persist the processed records, using the tumbling window Lambda would be the preferred choice.

Conclusion

In this post, I explain how the ‘Stats by latest minute’ feature is implemented in ACME Industries sample application. I show two different ways of implementing this feature:

  1. Using Amazon Kinesis Data Analytics for SQL Applications.
  2. Using the tumbling window feature of AWS Lambda.

This post explains the architecture and implementation details of each approach.
I also show how to locally debug the tumbling window Lambda function without deploying it in the cloud.
Finally, I explain which approach would be a better choice for some of the use cases.

In Part 5 I explain how the command and control is implemented in ACME Industries application.