Detecting Incidents in Unbounded Data Streams

Liz Sanderson
Liz Sanderson
  • Updated

FME Version

Introduction

Rather than storing data from an unbounded stream, we can detect patterns in memory and then trigger an event when certain criteria are met. This means decision-makers can receive alerts when a noteworthy event happens without committing the stream data to a database.
 

Examples

Trigger An Event Depending Upon Location

Streams Diagrams Rebranded - Detect Incidents - Location.png

Scenarios

A city is monitoring the location of all of its operational vehicles (e.g. SUVs, refuse vehicles, maintenance vans).  For insurance purposes, refuse vehicles cannot leave the city boundaries. If a refuse vehicle leaves the city for more than 10 minutes, then a notification should be pushed to a decision-maker.
 

Workflow

The workspace connects to the message broker, in this case, Kafka is being used. The JSON-formatted payload, which represents the location of the vehicle, is then flattened into individual attributes and the attribute containing the WKT geometry is set to be the geometry of the feature.

The locations of the city vehicles are then compared to a geofence representing the city. All points that fall outside of the geofence are sent to a TimeWindower. The TimeWindower’s purpose is to check that the vehicle leaves the city for at least 10 minutes over a 30-minute period, otherwise, an alert does not need to be sent. To do this, for all vehicles in the 30-minute window we check their first and last known location, if both are outside of the city boundary and the time difference between the first and last point is more than 10 minutes apart an alert is triggered.

For more information and to see this workflow in action, see our video on Stream Processing: Event Detection Based on Location.

Trigger An Event Depending Upon Threshold

Streams Diagrams Rebranded - Detect Incidents - Threshold.png

Scenario

The province is tracking the air quality of locations around the Greater Vancouver Area with sensors that collect data on nitrogen dioxide (No2), ozone (O3), and atmospheric particulate matter (PM 2.5). A separate SQLite database (AQHI values.sqlite) contains air quality health index information on the ranges units that define the levels of health risk for each chemical/particulate.

They wish to trigger an email notification when the value from a sensor records a High or Very High air quality health index. To account for sensor errors the reading must twice within a 30 minute period before the alert is sent.
 

Workflow

The start of the workspace connects to the message broker, in this case, we are using an AWS IoT connector. The JSON-formatted payload is then flattened into individual attributes. Each message/feature is joined to the SQLite database on a one-to-many (1:M) relationship based on the atmospheric chemical/particle type (i.e: o3, no2, pm25). There are four rows per particle type in the SQLite database that define the Low, Moderate, High, and Very High health risk levels. When joining on a 1:M relationship, the sensor reading values are compared with the health index ranges defined by the Maximum and Minimum database columns. 

Features with a reading value that falls within a corresponding index range will be passed through the Tester. Additionally, particle concentrations detected as High or Very High are passed through the Tester since we’re only concerned about being warned when the air quality is poor. The features are passed to a TimeWindower that groups the data into 30-minute periods. If two readings from the same sensor are logged in the same 30 minute period, then an alert is triggered.

For more information and to see this workflow in action, see our video on Stream Processing: Event Detection Based on Threshold.

Trigger an Automation With a Stream Event

Streams Diagrams Rebranded - Detect Incidents - Trigger Automation.png

Scenario

This example uses the same air quality tracking scenario as Trigger An Event Depending Upon Threshold.

The government wishes to trigger an FME Flow (formerly FME Server) Automation when the value from a sensor records a High or Very High air quality health index. The Automation runs a series of workspaces to process the event such as writing to external databases and web applications before finally sending an email alert to FME Flow users about the event.

 

Workflow

The workspace is nearly identical to the Trigger An Event Depending Upon Threshold workspace. Instead of integrating with an alert system, the workspace ends with an HTTPCaller that makes a REST API request to FME Flow to trigger an Automation. Refer to the Automation trigger REST API call documentation here.

You can send data to an Automations Manual Trigger similar to published parameters. These are known as Output Attributes. The JSONTemplater translates each incoming stream feature into a custom JSON payload to be sent through the HTTPCaller’s request body. To learn more about the Manual Trigger in Automations, please refer to this article.

Automations are an excellent place to process low-volume events, such as events from a stream workspace. The Automation workspaces write the event to databases and web services. We’re handling the writing and web service requests in a separate workflow to reduce potential points of failure and slowdown, guaranteed delivery capabilities of Automations, and reduce the complexity of the stream processing workspace.

Was this article helpful?

Comments

0 comments

Please sign in to leave a comment.