Architecting and implementing serverless application with streaming sensor data: Part 5

Command and control in ACME Industries application

ACME Industries is a fictional industrial company which has one facility that runs short 10 minute processes as part of its overall production process. There are two types of frontend applications that allow employees to view data emitted by various sensors during running process:

  • Admin application – allows users to view sensor data, issue  commands, and configure sensor types.
  • Operator application – allows users to only view sensor data, and issue commands.

In Part 4 I explain how the latest minute statistics of streaming sensor data is implemented in ACME Industries application.

In this post I explain how command and control functionality is implemented in ACME Industries application using AWS IoT Core service.

Command and control is the operation of sending a message to a specific device (or a facility in the case of ACME Industries) requesting it to perform some action.

The setup instructions for the example application are provided in the Readme.md file.

Please note that this project uses services that are not covered by the AWS Free Tier, and will incur cost.

Architecture overview

  • Both the Admin and the Operator frontends use the MQTT protocol to communicate with the backend of ACME Industries application. The backend uses the Message Broker feature of the IoT Core service to process messages on behalf of the Admin and Operator frontends.
  • Both frontends establish an MQTT connection to the AWS IoT Core endpoint.
  • ACME Industries uses a number of MQTT topics in order to implement the command and control functionality by publishing and subscribing to these topics.
  • The UI of both Admin and Operator frontends is implemented in Vue.js framework. 
  • The data simulator is part of the Admin frontend. It receives a number of commands from both Admin and Operator applications which change the state of the simulated production process:

Implementation details

Admin frontend

The code that implements the Admin frontend is located in the AdminAppFrontend directory in the GitHub repository.

The Admin frontend uses the MQTT protocol to communicate with the backend of ACME Industries. It uses a number of IoT topics in order to implement the command and control functionality: 

  • The IoT topics to which the Admin frontend publishes messages:
  • The IoT topics to which the Admin frontend subscribes to receive messages:

The UI of both Admin and Operator frontends is implemented in Vue.js framework. The components that implement main functionality are Home.vue and IoT.vue. The IoT.vue component implements an MQTT client using the AWS IoT Device SDK. It does the following:

  • Establishes an MQTT connection to the AWS IoT Core endpoint:
				
					mqttClient = AWSIoTData.device({
        region: AWS.config.region,
        host: this.$store.getters.appConfiguration.iotHost, //can be queried using 'aws iot describe-endpoint --endpoint-type iot:Data-ATS' - doesn't work with just 'describe-endpoint'
        clientId: "sensordata-" + Math.floor(Math.random() * 100000 + 1),
        maximumReconnectTimeMs: 8000,
        debug: false,
        protocol: "wss",
        accessKeyId: authcredentials.accessKeyId,
        secretKey: authcredentials.secretAccessKey,
        sessionToken: authcredentials.sessionToken,
      });
				
			
  • Once connected, it subscribes to the topics we are interested in:
				
					// We are connected, subscribe to the topics we are interested in.
// Note that from the admin/simulator node, we don't subscribe to
// facility status updates. Only operator/view nodes subscribe to status:
    mqttClient.on("connect", function() {
      console.log("mqttClient connected");

      mqttClient.subscribe(topics.facilitycommand);
      mqttClient.subscribe(topics.sensorsubscribe);
      mqttClient.subscribe(topics.facilityconfigrequest);
      mqttClient.subscribe(topics.sensorInstanceInfoRequest);
      mqttClient.subscribe(topics.procdailystats);
      mqttClient.subscribe(topics.completedprocinfo);
      mqttClient.subscribe(topics.latestminutestats);
    });
				
			
  • It next starts listening to various events from the Home.vue component to publish messages to the IoT Core topics:
				
					// Publish message to IoT Core topic
    bus.$on("sensorpublish", async (data) => {
      //  console.log("Sensorpublish: ", data);
      mqttClient.publish(topics.sensorpublish, JSON.stringify(data));
    });

    // User issued command to sensor facility:
    bus.$on("facilitycommandissued", async (data) => {
      console.log("Commandpublish: ", data);
      mqttClient.publish(topics.facilitycommand, JSON.stringify(data));
    });

    // run-time status of a facility changed:
    bus.$on("facilitystatusupdate", async (data) => {
      console.log("Facility status publish: ", data);
      mqttClient.publish(topics.facilitystatus, JSON.stringify(data));
    });

    // User requested facility configuration:
    bus.$on("facilityconfigpublish", async (data) => {
      console.log("Facility configuration publish: ", data);
      mqttClient.publish(topics.facilityconfigupdate, JSON.stringify(data));
    });

    // User requested list of facility sensor instances:
    bus.$on("sensorInstanceInfoPublish", async (data) => {
      console.log("Sensor instance info to publish: ", data);
      mqttClient.publish(topics.sensorInstanceInfoUpdate, JSON.stringify(data));
    });

    bus.$on("updatepercentcomplete", async (data) => {
      mqttClient.publish(topics.percentcompleteupdate, JSON.stringify(data));
    });

    bus.$on("currentProcessIdPublish", async (data) => {
      mqttClient.publish(topics.currentProcessIdUpdate, JSON.stringify(data));
    });
				
			
  • Finally, it starts listening for AWS IoT Core MQTT messages, and when a message arrives, it determines the topic, and notifies the Home.vue component with an appropriate event:
				
					mqttClient.on("message", function(topic, payload) {
      const payloadEnvelope = JSON.parse(payload.toString());
      //  console.log("IoT::onMessage: ", topic, payloadEnvelope);
      if (topic === topics.facilitycommand) {
        bus.$emit("facilitycommandreceived", payloadEnvelope);
      } else if (topic === topics.facilityconfigrequest) {
        bus.$emit("facilityconfigrequest", payloadEnvelope);
      } else if (topic === topics.sensorInstanceInfoRequest) {
        bus.$emit("sensorInstanceInfoRequest", payloadEnvelope);
      } else if (topic === topics.procdailystats) {
        console.log("Received message for topic: ", topics.procdailystats);
        bus.$emit("procdailystats", payloadEnvelope);
      } else if (topic === topics.completedprocinfo) {
        console.log("Received message for topic: ", topics.completedprocinfo);
        bus.$emit("completedprocinfo", payloadEnvelope);
      } else if (topic === topics.latestminutestats) {
        console.log("Received message for topic: ", topics.latestminutestats);
        bus.$emit("latestminutestats", payloadEnvelope);
      } else {
        bus.$emit("message", payloadEnvelope);
      }
    });
				
			

Communication between the Home.vue and the IoT.vue components is implemented using Vue.js Event Bus

There are two different ways that can affect the status of a simulated process/facility in the Admin application:

  • By issuing commands from the UI by either Admin or Operator frontend.
  • By setting the value of the ‘event’ attribute in the message payload of a running process:
				
					{
    "uuid": "5cabc1f1-2914-4b66-bdf2-d3a83c7b36cf",
    "event": "update",
    "deviceTimestamp": 1654083170431,
    "second": 554,
    "name": "temperature_4",
    "sensorId": 4,
    "processId": 1654083155416,
    "facilityId": 1,
    "sensorData": 26.902948081006596
  }
				
			

Normally, the ‘event’ attribute is set to ‘update’ while a process is not complete. When a process completes, the ‘event’ attribute value is set to ‘complete’. Here is an example of the message payload of a completed process:

				
					{
 "deviceTimestamp": 1656765873333,
 "event": "complete",
 "facilityId": 1,
 "processId": 1654083155416,
 "second": 601,
 "uuid": "987179a5-d51a-474e-a18b-17b553ae897d"
}
				
			

Based on a use case, the backend of ACME Industries checks for the ‘complete’ value of the ‘event’ attribute in a message payload to determine if a current process has finished:

  • In Part 3, I explain how the backend uses the ‘complete’ event to copy sensor data to the historical S3 bucket, then performs final calculation of sensor data statistics, stores the results into a DynamoDB table, and publishes the facility ID and the completed process ID to an IoT topic ‘completed-processinfo’
  • In Part 4, I explain how the tumbling window Lambda function uses the ‘complete’ event to calculate the latest minute statistics of a completed process.

State transition of a process/facility

A running process can transition to a number of states throughout its lifecycle. Commands issued from the UI by either Admin or Operator frontend are published and received on IoT topic ‘facility-command’:

In addition to the commands issued from the UI, when the process completes, it is set to the ‘COMPLETING’ state to allow the backend to perform additional processing of the entire process data (saving into a historical bucket, perform final calculation of sensor data stats). Note that, while a process/facility is in the ‘COMPLETING’ state, the command buttons in the UI are disabled. After the backend finishes processing of a completed process data, it publishes a message on the ‘completed-processinfo’ IoT topic. The Admin application then sets the state of the process to ‘COMPLETE’.

Maintaining application state

In the Admin frontend, user can navigate between the Dashboard and Configure pages implemented in the Home.vue and the Configure.vue components correspondingly. In Vue.js, by default, as user navigates between components, they get destroyed and recreated each time.

In order to maintain the application state across components in both the Admin and Operator frontends, I use Vuex state management library for Vue.js. The code for state management is located in store.js in GitHub repository. 

Note that another option that Vue.js provides for state management is the <KeepAlive> built-in component. It allows to conditionally cache component instances when dynamically switching between components. This could be a more efficient option for a production-level implementation.

Operator frontend

The implementation of the Operator frontend has many similarities with the Admin frontend. In this section, I will mostly focus on the important differences of the Operator frontend functionality and implementation.

The code that implements the Operator frontend is located in the OperatorAppFrontend directory in the GitHub repository.

The Operator frontend uses the MQTT protocol to communicate with the backend of ACME Industries. It uses a number of IoT topics in order to implement the command and control functionality: 

  • The IoT topics to which the Operator frontend publishes messages:
  • The IoT topics to which the Operator frontend subscribes to receive messages:

By subscribing to various IoT topics, the Operator frontend continuously receives updates to the current status of a running process. It also receives updates of the facility configuration when user navigates back to the Dashboard from the Configure page.

In the Admin frontend, user can navigate away from the dashboard by either selecting the “CONFIGURE” , or “LOGOUT” buttons on the application bar, or just reloading the page, and that will put the facility offline. The offline state of the facility will depend on the state it was in before going offline:

When the Operator frontends receive one of the offline state notifications from the Admin, their command buttons in the UI get disabled and they stop receiving sensor data updates.

Message sequence examples for command and control

The diagrams below show sequences of published and received messages for command and control in ACME Industries frontends. The example use cases are: 

  1. From the Admin frontend, user issues the following commands:
    • start (LAUNCH FACILITY)
    • pause (PAUSE FACILITY)
    • resume (RESUME FACILITY)
  2. User issues the same sequence of commands from the Operator frontend.

Connect and subscribe (note that the order of connection and subscription between the Admin and Operator frontends doesn’t matter) :

  • Both Admin and Operator frontends connect to an IoT endpoint.
  • The Admin subscribes to the ‘facilitycommand’ (‘facility-command’ internally) IoT topic.
  • The Operator subscribes to the ‘facilitystatus’ (‘facility-status’) IoT topic.
  • Both Admin and Operator frontends subscribe to the ‘completedprocinfo’ (‘completed-processinfo’) IoT topic.

Publish commands and receive updates:

  1. The Admin publishes the ‘start’ (‘LAUNCH FACILITY’ in the UI) message to the ‘facilitycommand’ topic.
  2. The Admin receives response on the ‘facilitycommand’ topic from the MQTT broker. 
  3. The Admin launches a simulated process.
  4. The Admin publishes the ‘RUNNING’ status message on the ‘facilitystatus’ topic.
  5. The Operator receives an update message on the ‘facilitystatus’ topic with the status ‘RUNNING’.
  6. The Admin publishes the ‘pause’ (‘PAUSE FACILITY’ in the UI) message to the ‘facilitycommand’ topic.
  7. The Admin receives response on the ‘facilitycommand’ topic from the MQTT broker. 
  8. The Admin puts the process into the ‘PAUSED’ state.
  9. The Admin publishes the ‘PAUSED’ status message to the ‘facilitystatus’ topic.
  10. The Operator receives an update message on the ‘facilitystatus’ topic with the status ‘PAUSED’.
  11. The Admin publishes the ‘resume’ (‘RESUME FACILITY’ in the UI) message to the ‘facilitycommand’ topic.
  12. The Admin receives response on the ‘facilitycommand’ topic from the MQTT broker. 
  13. The Admin sets the process back into the ‘RUNNING’ state.
  14. When the process completes (after 10 minutes) the Admin sets the process state to ‘COMPLETING’.
  15. The Admin publishes the ‘COMPLETING’ status message to the ‘facilitystatus’ topic.
  16. The Operator receives an update message on the ‘facilitystatus’ topic with the status ‘COMPLETING’.
  17. After finishing processing the completed process data, the backend fans out a message on the ‘completedprocinfo’ topic to both the Admin and Operator frontends.
  18. The Admin and the Operator frontends set the status to ‘COMPLETE’.

The sequence diagram below provides details for the second use case when commands are initiated by the Operator frontend. Since the diagram is similar to the previous one, it should be easy to follow based on the details provided above:

Future work: standalone simulator

As I mentioned previously, the simulator currently is built into the Admin frontend. In my next series of articles where I plan to address the scalability issues, the simulator will need to be a standalone application running either on an EC2 instance in the cloud, or running on a local host. This will allow for a configuration with more than one facility and more than one process per facility. 

The diagram shows this architecture:

Conclusion

In this post I explained how command and control is implemented in ACME Industries application. I showed how both the Admin and Operator frontends connect to the IoT Core using MQTT protocol. I also provided detailed description of various IoT topics to which each of the frontends publish messages or subscribe to receive messages. Finally, I explained how various commands affect state transitions of a running process. 

In Part 6 I explain the authentication and authorization in ACME Industries using the Amazon Cognito service.