Filtering Unbounded Data Streams

Liz Sanderson
Liz Sanderson
  • Updated

FME Version

  • FME 2021.0

Introduction

As sensors and devices become more ubiquitous, the volumes of data we have to handle will continue to increase. In some situations, storing all of this data (usually in a data lake) will be a requirement for downstream analysis. However, it is often the case that storing all the data coming from a stream is either not required for analysis or the costs to do so would be prohibitively expensive.

The primary goal of filtering unbounded data streams is therefore to reduce data volumes in memory before committing data to disk.

There are several ways in which data can be filtered: attribute values, location, or time.

 

Examples

Filter on Attribute Values

Read the attribute values on the incoming data, apply a filter condition, and then discard data that doesn’t meet the condition.

Streams Diagrams - Filtering - Discard Based upon Attributes

Scenario

A city is monitoring the location of all of its operational vehicles (e.g. SUVs, refuse vehicles, maintenance vans) so they can better understand vehicle movement. The specific study is looking at trucks only so they wish to identify vehicles that are of type trucks and commit them to a database.
 

Workflow

The workspace connects to the message broker, in this case, we are using Kafka. The JSON-formatted payload, which represents the location of the vehicle, is then flattened into individual attributes. The vehicle type attribute on each message/feature is then tested to see if the message originated from a truck. From there, the attributes are cleaned up in the AttributeManager and pushed to the output destination.

 

Filter on Location

Compare the location of incoming messages to a geofence and discard the message if it doesn’t fall within the geofence.

Streams Diagrams - Filtering - Discard Based Upon Location

Scenario

A city is discussing adding tolls for vehicles to gain access to the downtown core. They wish to analyze how much time their vehicle fleet spends inside the downtown core so they can assess the impact on budgets.
 

Workflow

The workspace connects to the message broker, in this case, we are using Kafka. The JSON-formatted payload, which represents the location of the vehicle, is then flattened into individual attributes and the attribute containing the WKT geometry is set to be the geometry of the feature. Using the Clipper transformer, each vehicle is then compared to a geofence defined by the administrative boundary of the City of Vancouver (COV) to see if it lies within the geofence or not. If the vehicle falls within the COV then the attributes are cleaned up in the AttributeManager and pushed to the output destination.

 

Filter data within a period of time

Grouping data in batch processing is simple as you can read all of the data into memory and then run a query across all of the data. With unbounded data streams, you never stop reading the data, so to perform stateful operations on data with the same key (e.g. aggregations or joins) you have to simulate a grouping. To do this, a technique called windowing is used.

Windowing utilizes time to group features. Stateful operations can then be performed on data within a window. Read more on windowing here.
Streams Diagrams - Filtering - Windowing

Scenario

An insurance company is trialing a new program where the driver pays for insurance based upon the distance they travel. They have 50,000 people in the trial and each participant has a sensor in their vehicle which tracks their location. The sensors report their location every five seconds. For this analysis a lower resolution is acceptable, so to save storage costs we will only record the location at a 30-second interval.
 

Workflow

The workspace connects to the message broker, in this case, we are using Kafka. The JSON-formatted payload, which represents the location of the vehicle, is then flattened into individual attributes. 

The TimeWindower transformer then groups the messages into 30-second windows. All messages in each window are then sorted based on their timestamp and a Sampler is then used with a Group By to get the last location for each vehicle in the window. This reduces data storage volumes by about 6x.

Was this article helpful?

Comments

0 comments

Please sign in to leave a comment.