Windowing Data Streams in FME

Liz Sanderson
Liz Sanderson
  • Updated

FME Version

  • FME 2021.0

Introduction

In FME, when processing data from a non-streaming data source (e.g. database or file), because the data is at rest and finite, you can load the data into memory and then sort and group the data before analyzing it. When reading a data stream into FME, you cannot do this since the data is infinite, and it will never finish loading. This is where windowing comes in.

Windowing means that you take the data stream and, using time, break the stream up into groups. Once in groups, data is ready for analysis.

Streams Diagrams - Filtering - Windowing (1).png

For example, if you are reading an unbounded stream from a sensor, and the data points are coming in at roughly one message a second, you might choose to window the data into one-minute intervals. A one-minute window means every minute, all of the features that arrived during that one minute get tagged as belonging to that window and released downstream for further processing. The longer you make the window, the more you can likely infer because there will be more data points in each window. However, a longer window means more lag between receiving the data and writing out results. In FME, the transformer that enables windowing is the TimeWindower.

Note, when working with a data stream, grouping the data for processing isn't always necessary. For example, when performing simple filtering and event detection, you don't need to window. For any stream processing where you wish to compute results across multiple features, windowing is necessary.
 

Workspace Examples

In the stream demos, many of the workspaces use the TimeWindower transformer. For a simple implementation, see the demo Filtering Unbounded Data Streams. For a more complex implementation, see the Spatial Analysis on Unbounded Data Streams.

 

Windowing Using Processing Time

Windowing based upon when the message arrives into the TimeWindower is the simplest way to window the data. If your data does not have an event time (e.g. timestamp) associated with it, this is your only option.

For the processing time to be accurate, messages must arrive into FME in event-time order. If they don't, they risk getting assigned to the wrong window with results not necessarily representing reality.

To window based on processing time, set “Data Has Timestamps” to be No. This will assign the feature to the window based upon when it enters the TimeWindower transformer.

timewindower data has timestamps.png

 

Windowing Using Event Time

Windowing based upon event time is supported when data has an existing recorded timestamp. When setting "Data Has Timestamps" to Yes, the time windows will be determined by the specified timestamp attribute. 

In many cases, there will be a delay between the time a sensor detects/submits data and when the data actually arrives in FME. When considering latency, a feature may arrive late and miss the time window it belonged to! The late feature will still be assigned to the earlier window ID, but this leads to unwanted behaviour with your group by settings. 

Streams Diagrams - Out of Order (1).png

Consider the following scenario...A stream workspace has a TimeWindower with 30-second windows. When a time window 'closes', the TimeWindower assigns the next window ID to the following set of incoming features. Transformers using the Group By Advanced mode (Process when the group changes) will process and release the features they're currently holding because a new time window ID was detected. However, a delayed stream feature arrives in the workspace that belonged to the previous window, so the TimeWindower assigns the previous window ID to this delayed feature. The downstream Group By Advanced transformers will detect a change in the time window ID from that single late feature, and they'll process/release their features too early! This issue can be handled with the Order Settings.

Order Settings.png

These new settings allow you to reorder features based on their timestamps to avoid compromising downstream transformers and the integrity of your data. The OutOfSequence port is where late features are sent to be written in a separate location like a database, added to the results through a different process, or discarded. 

The Order Settings have three order types: None, Chronological Order, and Reverse Chronological Order. Use None if the stream data is not ordered. For Chronological Order, features that arrive from a previous window and outside the Tolerance Interval are considered "out of sequence".

Tolerance intervals are grace periods for late features to be reordered and sent through the Output port. If the interval is zero, all late features will be considered out of sequence. For example (when using the settings in the screenshot above), a window 'closes' at the 5-minute mark and the TimeWindower holds onto incoming features for 30 seconds. In these 30 seconds, many incoming features will belong to the next time window. However, if a feature's timestamp indicates that it belonged to the previous window, it will jump the queue and be released through the Output port. After the tolerance interval ends, the next window's features will be released. If a feature for the previous window arrives now, it will be sent to OutOfSequence.

In the table below, the stream results have been output to a PostgreSQL database. Notice how the highlighted records are in a different order than how they arrived at the TimeWindower (based on the _original_order attribute). Features 99 and 110 belonged to window 1 but they arrived after features 97 and 98 which belonged to window 2. Since features 99 and 110 arrived within the tolerance interval, they were released first. Features 97 and 98 were held until the tolerance interval ended and window 2 features began to be released.

ReorderedFeatures.png

In summary, the Order Settings are meant to reorder features based on their timestamps to account for delays, and features that arrive outside the tolerance interval will be considered out of sequence. These settings ensure that all features in a window stick together as a group and do not prematurely trigger downstream Group By Advanced transformers.

 

Window Changed Port

When running in stream mode in FME, a workspace could run for months before restarting. Meaning if you are reading non-static or dynamic data into FME in addition to the stream, you will need to refresh this data. The WindowChanged port can trigger a data refresh when used in combination with a transformer that can read data (e.g. HTTPCaller, FeatureReader, SQLExecutor).

The WindowChanged port can also be used in any situation where you wish to perform an action before the data in the new window appears. For example, force the flushing of the previous window’s data.
 

The Spatial Analysis on Unbounded Data scenario is a good example. The incoming stream messages are geocoded and compared to a geofence. Rather than comparing the stream to a static geofence that loads only when the workspace starts, the WindowChanged port is connected to an HTTPCaller that retrieves the location of buses via an API. Every time a window closes, the feature leaving the WindowChanged port will trigger a data read, and the new geofences representing the real-time location of the buses can then be compared with the features that fell within the window.

 

How It Works

When a workspace runs with a TimeWindower configured, a new feature will exit from the WindowChanged port every time the window changes. For example, if you have the window duration set to 30 seconds, and during the first window, one hundred features enter the transformer. After the 30-second window ends and the window closes, the hundred original features leave via the Output port, and a single feature leaves via the WindowChanged port.

window changd.png

When the window changed event is used to trigger a data refresh, the data read can be either compared or merged with the features in the most recent window. 

 

Window Types

The ID of the window can be represented by Window Number, Window Start Time, or Window End Time.
 

Window Number

This is calculated based upon the number of windows that have passed since the initialization time of the first window. For example, if you set “At Timestamp = 2021-03-09 19:00:00 UTC” and a window of “30 seconds” on the TimeStamper. Then started FME running 10 minutes after the timestamp at 2021-03-09 19:10:00 UTC. The first three windows would be:

  • “2021-03-09 19:10:00 UTC” with a window ID of 20.
  • “2021-03-09 19:10:30 UTC" with a window ID of 21.
  • “2021-03-09 19:11:00 UTC” with a window ID of 22.

Note, the IDs do not start at zero because the window ID is calculated on the number of windows past the At Timestamp value that was set on the transformer. In the example above, this was 10 minutes earlier (or 20 windows) than when the first window is triggered when this workflow is run.

 

Window Start Time

The start time of an assigned window in seconds since the start of the Unix epoch, 00:00:00 UTC on 1 January 1970. For example, if you set “At Timestamp = 2021-03-09 19:00:00 UTC” and a window of “30 seconds” on the TimeStamper. Then started FME running 10 minutes after the timestamp at 2021-03-09 19:10:00 GMT. The first three windows would be:

  • “2021-03-09 19:10:00 UTC” with a window start time of 1615317000 seconds.
  • “2021-03-09 19:10:30 UTC" with a window start time of 1615317030 seconds
  • “2021-03-09 19:11:00 UTC” with a window start time of 1615317060 seconds.

 

Window End Time

The end time of an assigned window in seconds since the start of the Unix epoch, 00:00:00 UTC on 1 January 1970. For example, if you set “At Timestamp = 2021-03-09 19:00:00 UTC” and a window of “30 seconds” on the TimeStamper. Then started FME running 10 minutes after the timestamp at 2021-03-09 19:10:00 UTC. The first three windows would be:

  • “2021-03-09 19:10:00 UTC” with a window end time of 1615317030 seconds.
  • “2021-03-09 19:10:30 UTC" with a window end time of 1615317060 seconds
  • “2021-03-09 19:11:00 UTC” with a window end time of 1615317090 seconds.

 

Scenarios

For most scenarios, using the window number will be sufficient. This will give you a value that you can use in Group By parameters to process the unbounded stream in batches. You can also use the start/end times for exactly the same purpose. 

The start/end times also provide additional value if you are joining an external data source (e.g. SQLExecutor) whenever the window changes. Often, you may want to use the time in the query to retrieve the data. If this is the case you can use the window timestamp as a value. 

Using the window start time to query BigQuery for all of the data with timestamps in a 30-second window.

SELECT * FROM `city_of_vancouver_rush_hour`
WHERE timestamp >= (DATETIME "@Value(_window_start_id)")
AND timestamp < DATETIME_ADD(DATETIME "@Value(_window_start_id)", INTERVAL 30 second)
ORDER BY timestamp


 

Windowing Across Multiple FME Engines

On FME Flow, to handle higher throughput you can scale the number of FME Engines that are assigned to a stream. While you cannot aggregate across FME Engines (analysis is per FME Engine), a MapReduce processing pattern can be adopted which breaks the processing of the stream up producing an output that can then be further compared and analyzed. To enable this you use the window ID.

As already discussed above, when windowing in a workflow, you can ensure that the same window ID is used for the same time period on each FME Engine. The result is when you write a value out to the data store, the output for the same period of time on each FME Engine can be grouped together.

Streams Diagrams - Vert Multiple Engines Stream.png

Imagine a workspace that aggregates the stream data using the TimeWindower and then calculates statistics on each window for a one-minute period. When run, this FME workspace can process 1000 points a second but the data source is producing 3000 points a second. To handle processing, the stream needs to be scaled up to three FME Engines.

With three FME Engines running, over a sixty-second period, you are reducing the data volume from 180,000 messages a minute down to three values. You can then run further processing to compute the average across these three values and you will have an average value for all data across each 60 second period.

 

How It Works

Two things need to happen to enable this workflow.

  • Windows on each FME Engine need to start at the same time.
  • Windows on each FME Engine need to have the same ID as they increment.

To ensure the windows start at the same time you need to set “Time Window Starts” to At Timestamp and then set a timestamp that is in the past from when the workspace will run. As discussed above, if the same timestamp is used for the window start time on each FME Engine, the same window ID will be generated on each FME Engine for each comparable window.

TIme Windower At Tiemstamp.png

For example, if the window duration is “30 seconds” and the window start time is set to “2021-03-10 17:00:00 UTC”. If we start a stream, and two FME Engines are assigned to the stream and the first features on each FME Engine enter the TimeWindower at 2021-03-10 17:11:35, they will be assigned the window ID 24.

If two days later we add a third FME Engine to the stream and the first feature enters the TimeWindower at 2021-03-12 18:10:05, it will be assigned the window ID 5,901. This is because 5901 windows have elapsed since the first window at 2021-03-10 17:00:00. The other two FME Engines will also be on the same window ID.

 

Considerations

  • The system clocks need to be synchronized on the FME Engine hosts.
  • On initialization of the FME Engine, processing may begin part way through the first window, so the first window won't have a complete dataset. Results from the first window should therefore be discarded.

 

Additional Resources

Webinar: Introduction to Data Stream Processing

Webinar: Empowering Real-Time Decision Making with Data Streaming

 
 

Was this article helpful?

Comments

0 comments

Please sign in to leave a comment.