Windowing Data Streams in FME

Liz Sanderson
Liz Sanderson
  • Updated

Introduction

In FME, when processing data from a non-streaming data source (e.g., database or file), because the data is at rest and finite, you can load the data into memory and then sort and group it before analysis. When reading a data stream into FME, you cannot do this since the data is infinite, and it will never finish loading. This is where windowing comes in.

Windowing means taking the data stream and, using time, breaking the stream up into groups. Once in groups, the data is ready for analysis.

Streams Diagrams Rebranded - Filtering - Windowing.png

For example, if you are reading an unbounded stream from a sensor, and the data points are arriving at roughly one message a second, you might choose to window the data into one-minute intervals. A one-minute window means every minute, all of the features that arrive during that one minute get tagged as belonging to that window and released downstream for further processing. The longer you make the window, the more likely you can infer because there will be more data points in each window. However, a longer window means more lag between receiving the data and writing out results. In FME, the transformer that enables windowing is the TimeWindower.

Note, when working with a data stream, grouping the data for processing isn't always necessary. For example, when performing simple filtering and event detection, you don't need to window. For any stream processing where you wish to compute results across multiple features, windowing is necessary.
 

TimeWindower Examples

In the stream demos, many of the workspace examples use the TimeWindower transformer. For a simple implementation, see the demo Filtering Unbounded Data Streams. For a more complex implementation, see the Spatial Analysis on Unbounded Data Streams.

This section explains different TimeWindower configurations with examples.

Windowing Using Processing Time

Windowing based on when the message arrives at the TimeWindower is the simplest way to window the data. If your data does not have an event time (e.g., timestamp) associated with it, this is your only option.

For the processing time to be accurate, messages must arrive in FME in event-time order. If they don't, they risk getting assigned to the wrong window, affecting results.

To window based on processing time, set “Data Has Timestamps” to be No. This will assign the feature to the window based on when it enters the TimeWindower transformer.

 

Windowing Using Event Time

Windowing based on event time is supported when data has timestamps. When setting "Data Has Timestamps" to Yes, the time windows will be determined by the specified timestamp attribute. 

In many cases, there will be a delay between when a sensor detects/submits data and when the data arrives in FME. When considering latency, a feature may arrive late and miss the time window it belonged to! The late feature will still be assigned to the earlier window ID, but this leads to unwanted behaviour with your group by settings. 

Streams Diagrams Rebranded - Out of Order.png

Consider the following scenario...A stream workspace has a TimeWindower with 30-second windows. When a time window closes, the TimeWindower assigns the next window ID to the following set of incoming features. Transformers using the group processing setting "When Group Changes (Advanced)" will process and release the features they're currently holding because a new time window ID was detected. However, a delayed stream feature arrives in the workspace that belonged to the previous window, so the TimeWindower assigns the previous window ID to this delayed feature. The downstream Group By Advanced transformers will detect a change in the time window ID from that single late feature, and they'll release their features too early! This issue can be handled with the Order Settings.

These settings allow you to reorder features based on their timestamps to avoid compromising downstream transformers and the integrity of your data. The OutOfSequence port is where late features are sent to be written in a separate location, like a database, added to the results through a different process, or discarded. 

The Order Settings have three order types: None, Chronological Order, and Reverse Chronological Order. Use None if the stream data is not ordered. For Chronological Order, features that arrive from a previous window and outside the Tolerance Interval are considered "out of sequence".

Tolerance intervals are grace periods for late features to be reordered and sent through the Output port. If the interval is zero, all late features will be considered out of sequence. For example (when using the settings in the screenshot above), a window closes at the 5-minute mark, and the TimeWindower holds onto incoming features for 30 seconds. In these 30 seconds, many incoming features will belong to the next time window. However, if a feature's timestamp indicates that it belonged to the previous window, it will jump the queue and be released through the Output port. After the tolerance interval ends, the next window's features will be released. If a feature for the previous window arrives now, it will be sent to OutOfSequence.

In the table below, the stream results have been output to a PostgreSQL database. Notice how the highlighted records are in a different order than how they arrived at the TimeWindower (based on the _original_order attribute). Features 99 and 110 belonged to window 1, but they arrived after features 97 and 98, which belonged to window 2. Since features 99 and 110 arrived within the tolerance interval, they were released first. Features 97 and 98 were held until the tolerance interval ended and window 2 features began to be released.

ReorderedFeatures.png

In summary, the Order Settings are meant to reorder features based on their timestamps to account for delays, and features that arrive outside the tolerance interval will be considered out of sequence. These settings ensure that all features in a window stick together as a group and do not prematurely trigger downstream Group By Advanced transformers.

 

Window Changed Port

When running in stream mode in FME, a workspace could run for months before restarting. If you are reading non-static or dynamic data into FME in addition to the stream, you will need to refresh this data. The WindowChanged port can trigger a data refresh when used in combination with a transformer that can read data (e.g., HTTPCaller, FeatureReader, SQLExecutor).

The WindowChanged port can also be used in any situation where you want to perform an action before the data in the new window appears. For example, force the flushing of the previous window’s data.
 

The Spatial Analysis on Unbounded Data scenario is a good example. The incoming stream messages are geocoded and compared to a geofence. Rather than comparing the stream to a static geofence that loads only when the workspace starts, the WindowChanged port is connected to an HTTPCaller that retrieves the location of buses via an API. Every time a window closes, the feature leaving the WindowChanged port will trigger a data read, and the new geofences representing the real-time location of the buses can then be compared with the features that fell within the window.

 

How It Works

When a workspace runs with a TimeWindower configured, a new feature will exit from the WindowChanged port every time the window changes. For example, if you have the window duration set to 30 seconds, and during the first window, one hundred features enter the transformer. After the 30-second window ends and the window closes, the hundred original features leave via the Output port, and a single feature leaves via the WindowChanged port.



When the window changed event is used to trigger a data refresh, the data read can be either compared or merged with the features in the most recent window. 

 

Window Types

The ID of the window can be represented by Window Number, Window Start Time, or Window End Time.
 

Window Number

This is calculated based on the number of windows that have passed since the initialization time of the first window. For example, if you set “At Timestamp = 2021-03-09 19:00:00 UTC” and a window of “30 seconds” on the TimeStamper, then ran the workspace 10 minutes after the timestamp at 2021-03-09 19:10:00 UTC, the first three windows would be:

  • “2021-03-09 19:10:00 UTC” with a window ID of 20.
  • “2021-03-09 19:10:30 UTC" with a window ID of 21.
  • “2021-03-09 19:11:00 UTC” with a window ID of 22.

Note, the IDs do not start at zero because the window ID is calculated on the number of windows past the At Timestamp value set on the transformer. In the example above, this was 10 minutes earlier (or 20 windows) than when the first window is triggered when this workflow is run.

 

Window Start Time

The start time of an assigned window in seconds since the start of the Unix epoch, 00:00:00 UTC on 1 January 1970. For example, if you set “At Timestamp = 2021-03-09 19:00:00 UTC” and a window of “30 seconds” on the TimeStamper, then ran the workspace 10 minutes after the timestamp at 2021-03-09 19:10:00 GMT, the first three windows would be:

  • “2021-03-09 19:10:00 UTC” with a window start time of 1615317000 seconds.
  • “2021-03-09 19:10:30 UTC" with a window start time of 1615317030 seconds
  • “2021-03-09 19:11:00 UTC” with a window start time of 1615317060 seconds.

 

Window End Time

The end time of an assigned window in seconds since the start of the Unix epoch, 00:00:00 UTC on 1 January 1970. For example, if you set “At Timestamp = 2021-03-09 19:00:00 UTC” and a window of “30 seconds” on the TimeStamper, then ran the workspace 10 minutes after the timestamp at 2021-03-09 19:10:00 UTC, the first three windows would be:

  • “2021-03-09 19:10:00 UTC” with a window end time of 1615317030 seconds.
  • “2021-03-09 19:10:30 UTC" with a window end time of 1615317060 seconds
  • “2021-03-09 19:11:00 UTC” with a window end time of 1615317090 seconds.

 

Scenarios

For most scenarios, using the window number will be sufficient. This will give you a value that you can use in Group By parameters to process the unbounded stream in batches. You can also use the start/end times for the same purpose. 

The start/end times also provide additional value if you are joining an external data source (e.g. SQLExecutor) whenever the window changes. You may want to use the time in the query to retrieve the data. In this scenario, you can use the window timestamp as a value. 

For example, using the window start time to query BigQuery for all of the data with timestamps in a 30-second window:

SELECT * FROM `city_of_vancouver_rush_hour`
WHERE timestamp >= (DATETIME "@Value(_window_start_id)")
AND timestamp < DATETIME_ADD(DATETIME "@Value(_window_start_id)", INTERVAL 30 second)
ORDER BY timestamp

 

Stream Heartbeat and the TimeWindower

Using the group processing setting "When Group Changes (Advanced)" as described in the TimeWindower examples above introduces a potential problem: what happens during an inactive period when the stream isn't producing data? There will be no new window ID to release features being held in grouping transformers, and this can lead to a delay in processing. Features won't be released until the stream becomes active again.

To address this, most transformers that support stream mode can be configured to emit a special "Heartbeat" feature after a period of inactivity. When this heartbeat feature is received by the TimeWindower, it will release a new window to release features held in downstream transformers using Group By Advanced mode. 

Take the KafkaConnector, for example. When the Action is "Receive" and the Mode is "Stream", a Heartbeat can be set. In this example, the KafkaConnector will emit a Heartbeat feature if no messages are received from the topic for over an hour.

In the example below, the Sorter is grouping on the window ID with Complete Groups set to "When Group Changes (Advanced)". As a result, features are held in the Sorter until the next window ID arrives.

When the stream becomes inactive, the KafkaConnector emits a heartbeat feature with a heartbeat ID. This feature will contain the next window ID when it exits the TimeWindower. When the feature arrives at the Sorter, held features are released due to the change in window ID. Now, the heartbeat feature with the new window ID is held in the Sorter, but this doesn't matter since it has fulfilled its purpose and doesn't contain stream data.

Heartbeat features must be filtered out before writing. This can be done with a tester that allows only features without a heartbeat ID to pass.

 

Windowing Across Multiple FME Engines

On FME Flow, to handle higher throughput, you can scale the number of FME Engines assigned to a stream. While you cannot aggregate across FME Engines (analysis is per FME Engine), a MapReduce processing pattern can be adopted, which breaks the processing of the stream up, producing an output that can then be further compared and analyzed. To do this, you use the window ID.

As discussed earlier in this article, when windowing in a workflow, you can ensure that the same window ID is used for the same period on each FME Engine. The result is that when you write a value out to the data store, the output for the same period on each FME Engine can be grouped.

Streams Diagrams Rebranded - Vert Multiple Engines Stream.png

Imagine a workspace that aggregates the stream data using the TimeWindower and then calculates statistics on each window for one minute. When run, this FME workspace can process 1000 points a second, but the data source is producing 3000 points a second. To handle processing, the stream needs to be scaled up to three FME Engines.

With three FME Engines running for over sixty seconds, you are reducing the data volume from 180,000 messages a minute down to three values by summarizing statistics. You can then run further processing to compute the average across these three values and you will have an average value for all data across each 60-second period.

 

How It Works

Two things need to happen to enable this workflow.

  • Windows on each FME Engine need to start at the same time.
  • Windows on each FME Engine need to have the same ID when they are incremented.

To ensure the windows start at the same time, you need to set “Time Window Starts” to At Timestamp and then set a timestamp that is in the past from when the workspace will run. As discussed earlier, if the same timestamp is used for the window start time on each FME Engine, the same window ID will be generated on each FME Engine for each comparable window.



For example, if the window duration is “30 seconds” and the window start time is set to “2021-03-10 17:00:00 UTC”. If we start a stream, and two FME Engines are assigned to the stream and the first features on each FME Engine enter the TimeWindower at 2021-03-10 17:11:35, they will be assigned the window ID 24.

If two days later we add a third FME Engine to the stream and the first feature enters the TimeWindower at 2021-03-12 18:10:05, it will be assigned the window ID 5,901. This is because 5901 windows have elapsed since the first window at 2021-03-10 17:00:00. The other two FME Engines will also be on the same window ID.

 

Considerations

  • The system clocks need to be synchronized on the FME Engine hosts.
  • On initialization of the FME Engine, processing may begin partway through the first window, so the first window won't have a complete dataset. Results from the first window should therefore be discarded.

 

Additional Resources

Webinar: Introduction to Data Stream Processing

Webinar: Empowering Real-Time Decision Making with Data Streaming

 
 

Was this article helpful?

We're sorry to hear that.

Please tell us why.

As of January 14th, 2026, comments on knowledge base articles have been closed. To make sure questions don’t get missed and to enable more community support, we’ve moved discussions to the FME Community. If you have a question or a comment about this article, please create a new post or create a support ticket.