Architecting and implementing serverless application with streaming sensor data: Part 2

Ingestion layer design and implementation in ACME Industries example application

In Part 1 I introduced ACME Industries application that streams sensor data from a running process of a facility. The facility runs short 10 minute processes. I described the application functionality, provided an architecture overview and deployment. 

In this post, I provide detailed architecture and implementation of the ingestion layer which includes the AWS IoT Core, and Amazon Kinesis Data Streams services.

The setup instructions for the example application are provided in the Readme.md file.

Please note that this project uses services that are not covered by the AWS Free Tier,  and will incur cost.

AWS IoT Core supports publish-subscribe capabilities for large numbers of client applications. In ACME Industries, there are two frontend applications: Admin, and Operator. In order to connect to the backend, publish messages to an IoT topic, or subscribe to an IoT topic of AWS IoT Core service, the applications use AWS IoT Device SDK which supports MQTT standard for IoT messaging.

The Admin frontend runs on a localhost, and has a built-in simulator that generates sensor data. On the other hand, there can be any number of Operator applications running on different devices. The Operator application subscribes to the backend via AWS IoT Core topic to receive streaming data published by the Admin application. Both Admin and Operator applications also subscribe to the backend to receive running process status updates, percent of process completion, as well as various statistics for sensor data.

Frontend implementation

Production facility in ACME Industries runs short 10 minute processes, one process at a time. User starts a process by clicking the “LAUNCH FACILITY” button on the dashboard.

The built-in simulator is implemented in Home.vue component of Admin application. It generates data points for each sensor instance every second. The data points are generated randomly between min and max values of a specific sensor type:

				
					    const simulateSensorData = (min, max) => {
      return Math.random() * (max - min) + min;
    };
				
			

Next, it creates a message with the generated data point as a JSON object. The message also contains name of the sensor, sensor ID, process ID, facility ID, and the current second of the running process:

				
					const sensormessage = {
          uuid: uuidv4(),
          event: this.event,
          deviceTimestamp: Date.now(),
          second: this.currentSecond,
          name: sensor.name,
          sensorId: sensor.id,
          processId: this.processId,
          facilityId: facilityId,
          sensorData: sensorData,
        };
				
			

I will explain the role of the “event” value in the message object later in this series of blog posts.

The IoT.vue component receives sensor data messages from the simulator, and publishes them to the AWS IoT Core “sensordata-publish” topic using an MQTT client. The connection logic in IoT.vue component is implemented using the AWS IoT Device SDK: 

The MQTT client is created in IoT.vue component after user has logged in:

				
					const authcredentials = this.$store.getters.authCredentials;
mqttClient = AWSIoTData.device({
        region: AWS.config.region,
        host: this.$store.getters.appConfiguration.iotHost,
        clientId: "sensordata-" + Math.floor(Math.random() * 100000 + 1),
        maximumReconnectTimeMs: 8000,
        debug: false,
        protocol: "wss",
        accessKeyId: authcredentials.accessKeyId,
        secretKey: authcredentials.secretAccessKey,
        sessionToken: authcredentials.sessionToken,
      });
				
			

The Home.vue component emits “sensorpublish” event:

				
					bus.$emit("sensorpublish", sensormessage);
				
			

The IoT.vue component subscribes to this event, and after it receives the message, it publishes to the “sensordata-publish” topic:

				
					bus.$on("sensorpublish", async (data) => {
  mqttClient.publish(topics.sensorpublish, JSON.stringify(data));
});
				
			

The MQTT protocol supports the Quality of Service (QoS) options: levels 0 and 1. The Admin application uses QoS level 0, which is a default level. With QoS level 0, the publisher makes only one attempt to send the message without waiting for confirmation that the message was received. With QoS level 1, the message is sent repeatedly until a PUBACK acknowledgement is received. QoS level 1 can be used for workloads requiring higher reliability.

Backend implementation

In ACME Industries, messages with sensor data are published to ‘sensordata-publish’ topic in AWS IoT Core. They are then routed to Kinesis Data Streams using a rule configured for this topic:

				
					# IoT topic rule to route sensor data from IoT Core to Kinesis Data Streams
  IotSensorTopicRule:
    Type: AWS::IoT::TopicRule
    Properties:
      RuleName: "sensordataIngest"
      TopicRulePayload:
        RuleDisabled: false
        Sql: "SELECT * FROM 'sensordata-publish'"
        Actions:
          - Kinesis:
              StreamName: "sensordata"
              PartitionKey: "${timestamp()}"
              RoleArn: !GetAtt IoTKinesisSensorRole.Arn

  IoTKinesisSensorRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - iot.amazonaws.com
            Action:
              - "sts:AssumeRole"
      Path: /
      Policies:
        - PolicyName: IoTKinesisSensorPutPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action: "kinesis:PutRecord"
                Resource: !GetAtt KinesisSensorStream.Arn
				
			

All of the backend resources for ACME Industries application are defined using the AWS Serverless Application Model (SAM). The IotSensorTopicRule resource is configured in the template.yaml template file in 1-iot-kinesis-ddb subfolder. The IoTKinesisSensorRole is assigned to the IotSensorTopicRule to give it permission to write to the Kinesis “sensordata” stream. 

Kinesis Data Streams under the hood

In Amazon Kinesis Data Streams, each data stream is composed of one or more shards that act as base throughput units. Shards provide a predefined write and read capacity which makes it easy to design and scale a streaming pipeline. Each shard supports up to 1000 writes per second, up to maximum total data of 1MB per second. Read capacity is up to 2MB per second per shard, or five read transactions per second. All consuming applications share the read capacity of a shard. Note that the enhanced fan-out feature allows you to scale the number of consumers with 2MB read capacity per consumer.

In ACME Industries application, each message routed by IoTSensorTopicRule gets written to Kinesis Data Streams using the PutRecord Kinesis API method which requires the following three parameters:

  • The data record to write to the stream – message with sensor data.
  • The stream name – “sensordata”.
  • A partition key – in the SAM template it is configured to be the current timestamp. However, since in the application the stream is configured with a single shard, all messages are routed to the same shard, and the partition key has no effect. The ACME Industries application is currently implemented with just one facility. In real production, there could be multiple facilities, each running its own processes. In such case, multiple shards can be configured. Partition key can be based on facility ID for example, and then messages from a specific facility will always be routed to the same shard:

The diagram above shows how a partition key value can be mapped to a particular shard. If ACME Industries had more than one facility, then the facility ID can be used as a partition key value. Kinesis would calculate the hash value of the facility ID using an MD5 hash function, and that hashed value would map a record with that facility ID to a particular shard. Kinesis then writes the record to that shard.

Using facility ID as a partition key is an example of an application-specific, or use case-based choice of a partition key. This can be a useful choice for efficient processing by downstream services. 

If downstream services of the application don’t require data stored in a shard to have high affinity, then using a random partition key could be a better option. Messages are randomly sent to different shards which will allow you to achieve higher overall throughput because messages are evenly distributed across all the shards in the stream. A universally unique identifier (UUID), for example, could be used as a partition key in this case.

Another option of choosing a partition key is the time-based one. In this case messages with the same timestamp have the same hash, and are sent to the same shard. 

Shard capacity optimization

Each shard can ingest data at a rate of 1MB per second or 1,000 records per second, whichever limit is reached first. If the message is greater than 1MB, it would need to be broken into smaller pieces to avoid an error. If a shard receives 1,000 messages per second, then, on average, each message payload must be 1KB or less.

The combination of the two payload limits – 1MB per second or 1,000 messages per second – provides different capacity profiles for a shard:

  • Data payload size varies, therefore, the number of messages that can be packed into 1MB varies per second.
  • Data payload size doesn’t change much, and messages use the 1MB per second capacity.
  • Data payloads are very small but there is a large number of them, consuming all 1,000 messages per second. However, the total capacity used is much less than 1MB.

In ACME Industries, the average message size is 216 bytes. Suppose that a facility was producing 1,000 messages per second, the workload would only be using a little over 20% of 1MB shard capacity limit. Since a PUT Payload Unit is counted in 25KB payload “chunks”, having messages that are smaller than 25KB is less cost-efficient. Therefore, if additional latency is acceptable for an application, then packing messages and sending them with less frequency, would result in lower cost. For example, sending messages every second with payload of 216 bytes for 10 seconds will result in 10 PUT Payload Units, however, if higher latency is acceptable, then the application can aggregate the outgoing messages, and send them every 10 seconds. Sending aggregated messages will result in 10 x 216 = 2,160 bytes of payload which is still less than 25KB, and is equal to just 1 PUT Payload Unit: 

The tradeoff between latency and cost when optimizing capacity of a shard will depend on overall application requirements, or a particular use case. Since ACME Industries is a low-latency application, messages are sent as soon as they are available.

Conclusion

In this post the focus was on the ingestion layer in ACME Industries application. I explained how Admin frontend connects to IoT Core to send data payloads generated by the built-in simulator. I also explained how messages are routed to Kinesis Data Streams by IoT rule. I discussed options for routing messages to shards using partition keys. Finally, I discussed the tradeoff between cost and latency when optimizing a capacity of a shard.

In Part 3, I explain how Amazon Kinesis Data Firehose is used to implement the cumulative daily statistics feature in ACME Industries application. I will also cover how statistics of a completed process is calculated, stored, and accessed by the frontend.