Enriching Unbounded Data Streams

Liz Sanderson
Liz Sanderson
  • Updated

FME Version

Introduction

When working with data streams, enhancing data with more information is just as important as filtering out unnecessary data. Adding supplementary data before writing to disk storage greatly increases the efficiency of your workflows and reduces the number of pre-processing steps. 

The following workspaces demonstrate how you can enrich incoming data streams with attribute and location data before being committed to disk. 

Streams Diagrams Rebranded - Enrich Data - Join.png

Examples

Join Depending Upon Attribute Value

Read the incoming data, join it to a database, test where the incoming matches with the database, and discard any unnecessary data.
 

Scenario

The province is tracking the air quality of locations around the Greater Vancouver Area with sensors that collect data on nitrogen dioxide (No2), ozone (O3), and atmospheric particulate matter (PM 2.5). A separate SQLite database (AQHI values.sqlite) contains air quality health index information on the ranges units that define the levels of health risk for each chemical/particulate. When the stream data arrives in the workspace, the data will be joined to the database based on a common attribute value and cleaned up before being pushed out.
 

Workflow

The start of the workspace connects to the message broker, in this case, we are using an AWS IoT connector. The JSON-formatted payload is then flattened into individual attributes. Each message/feature is joined to the SQLite database on a one-to-many (1:M) relationship based on the atmospheric chemical/particle type (i.e: o3, no2, pm25). There are four rows per particle type in the SQLite database that define the Low, Moderate, High and Very High health risk levels. When joining on a 1:M relationship, the sensor reading values are compared with the health index ranges defined by the Maximum and Minimum database columns. 

Features with a reading value that falls within a corresponding index range will be passed through the Tester. Additionally, particle concentrations detected as High or Very High are passed through the Tester since we’re only concerned about being warned when the air quality is poor. Afterward, the attributes are cleaned up and sent to the output destination.

For more information, and to see this workflow in action, see our video on Stream Processing: Join Data Depending on Attribute Values.
 

Enrich based upon location

Scenario

This scenario builds upon the previous example by using the locations of weather stations to make requests to a weather web service. The API will provide live forecast information for each location. After retrieving the weather information, the enriched stream data is then written into a database to power dashboards for decision-makers.
 

Workflow

The start of the workspace connects to the message broker, in this case, we are using an AWS IoT connector. The JSON-formatted payload is then flattened into individual attributes. Each message/feature is joined to the SQLite database on a one-to-many cardinality based on the atmospheric chemical/particle type (i.e: o3, no2, pm25). 

This workspace expands upon the previous example by making calls to the Norwegian Meteorological Institute’s weather API for each incoming feature based on the sensor’s latitude and longitude coordinates. Local weather data is gathered and extracted from the API response to enhance the sensor data with information including temperature, wind speed, and 6-hour weather predictions. From there, the attributes are cleaned up in the AttributeManager and pushed to the output destination.

For more information, and to see this workflow in action, see our video on Stream Processing: Join Data Depending on Attribute Values.

Was this article helpful?

Comments

0 comments

Please sign in to leave a comment.