Introduction to Stream Processing in FME

Liz Sanderson
Liz Sanderson
  • Updated

Introduction

FME’s stream processing capabilities allow you to leverage unbounded stream datasets without writing a single line of code. This tutorial will guide you through three different stream processing scenarios. In each scenario, we will walk through how to build the FME workflows and how to deploy the stream workspace to FME Flow or FME Flow-Hosted.

Each scenario will highlight specific functionality in FME for authoring stream-processing workspaces. 

Requirements

Access to FME Form and Flow

This tutorial assumes you have basic knowledge of creating FME workspaces and using FME Flow.

Unfortunately, we cannot provide public access to our Kafka queue due to the cost of running the stream continuously. Alternatively, replace the Creator and KafkaConnector in the provided workspaces with the contents of MimicStream.fmwt to imitate a stream data source.  

Step-by-Step Instructions

Part 1: How to Work with High-Volume Streams in FME

In this scenario, we’re using a simulated vehicle fleet. Each vehicle reports its location to a Kafka message queue every few seconds. There are a variety of vehicle types like sedans, SUVs, and trucks, but we’re only interested in tracking the trucks. Storing records from every vehicle would be very expensive and require far more storage space than needed. Instead, the records should be filtered to store only truck data. We will also enhance the stream messages with spatial data.

This tutorial will show how to connect to a Kafka message queue, reduce memory usage by filtering, and convert incoming stream data into spatial data.

Streams Diagrams Rebranded - Filtering - Dicard Based upon Attributes (1).png

1. Connect to a Kafka Message Broker in Batch Mode

First, add a Creator transformer and connect a KafkaConnector to it. The single feature released from the Creator will initiate the KafkaConnector to run.

image1.png

Open the KafkaConnector’s parameters and enter the following connection details:

  • Credential Source: Embedded
  • User Name: <username>
  • Password: <password>
  • Broker Host Name: <host>
  • Broker Port: 9092
  • Topic: cov-fleet-location
  • Consumer Group ID: simulate-geofence

Confirm that the KafkaConnector’s request action is “Receive” and the receive mode is set to “Batch”. Set the batch size to 1000. The KafkaConnector will only receive 1000 messages from the queue.

image14.png

2. Run the KafkaConnector

Run the workspace. The KafkaConnector will receive a batch of 1000 features from the message queue. When the workspace finishes running, you can view the feature cache at the output port to inspect the data.

image17.png

3. Extract the JSON Attributes

View the feature cache of the KafkaConnector output. Double-click one of the _value attribute values to view the JSON-formatted message contents. The JSON payload includes the vehicle type, which we’ll use to filter the data.

First, we must extract the JSON attributes from the payload. Add a JSONFlattener after the KafkaConnector, and connect the Output port to the JSONFlattener’s input.

image18.png

Open the JSONFlattener parameters and set the following: 

  • JSON Document: _value
    • select the dropdown menu > Attribute Value > _value

For the Attributes To Expose, click the ellipses button (...), and enter the following attributes in the pop-up window:

  • Attributes to Expose:
    • vehicle_id
    • timestamp
    • vehicle_category
    • vehicle_type
    • geo

image20.png

If you’re working with JSON that has many attributes, save a sample JSON payload as a .json file. Use the Import button in the ellipses pop-up to read the JSON file; FME will parse it and load the selected attributes.

Once you’ve filled in the attributes, click OK and OK again to save the parameter settings. Run the workspace to the JSONFlattener and inspect the output feature cache. The JSON attributes have now been extracted into separate attributes on each feature.

image5.png

4. Filter the Features with a Tester

Now that we’ve extracted the vehicle_type attribute along with other useful information about each message, we can filter the data to just trucks. To do this, connect a Tester to the JSONFlattener output. 

In the Tester parameters, set a clause to check if the vehicle_type = trucks. Click OK.

image21.png

Run the workspace to the Tester and watch how only truck features are routed through the Passed output port.

5. Enhance the Features with Spatial Data

Stream data is usually non-spatial, but spatial-related attributes may be included, such as latitude/longitude coordinates for spatial analysis. 

First, we need to generate points for each of our vehicles. Add a GeometryReplacer, and set the encoding to OGC Well Known Text and Geometry Source to the geo attribute. Leave the remaining parameters at their default settings. 

Our features don’t have an associated coordinate system, so we must provide the coordinate system information. Add a CoordinateSystemSetter, and select LL84. When running the workspace up to this transformer, you should see the points appear in Vancouver in the Visual Preview.

image25.png

6. Clean up the Attributes Before Writing

We’re just about ready to write the results, but there are several attributes that we don’t care to store in our output destination, such as the original JSON payload. 

Add an AttributeManager and connect it to the CoordinateSystemSetter. Remove the following five attributes and optionally reorder the remaining attributes:

  • _creation_instance
  • _value
  • _topic
  • _connector_id
  • timestamp

image7.png

7. (Optional) Write the Results to a Database

We’ve gone through the process of filtering and enhancing stream data, and now we’re ready to write the features to a database. 

Create a database writer of your choice, such as PostGIS or SQL Server Spatial. For the table definition, select Automatic. Once the writer feature type is added to the canvas, connect it to the AttributeManager. Set the table name to something unique. Confirm the Feature Operation is “Insert” and the Table Handling is “Create If Needed”. Click OK.

Select the writer feature type on the canvas, and click “Edit Parameters”. Expand the Advanced section, and note that the Features Per Transaction is 1000 by default. The writer will commit a bulk insert of 1000 records at a time. This value can be adjusted based on how frequently you want to write data to your database and the stream volumes. More information on this can be found in our article, "Writing to Databases When Running in Stream Mode."

image19.png

Your stream processing workspace is now writing filtered and spatially enhanced data to a database! The workspace is ready to be published and run continuously on FME Flow.

image16.png

Part 2: Run the Workspace Continuously on FME Flow

Now that we’ve built the workspace, we’ll need to deploy it to FME Flow to run continuously. To accomplish this, we will configure the KafkaConnector to run in Stream mode. 

Why can’t we just run the stream workspace in FME Form? It’s possible to run the workspace in stream mode within FME Form to verify that it functions properly, but this method wouldn’t be sufficient for a production environment. If FME Form closes, the workspace will stop running.

With FME Flow Streams, you can run a stream processing workspace for an indefinite amount of time. In this tutorial, we’ll demonstrate how to do this.

1. Set the KafkaConnector to Run in Stream Mode

We now want to publish the stream workspace to FME Flow and make sure the connector is set to Stream mode to run continuously. Open the KafkaConnector and set the receive behavior mode to Stream.

image8.png

Certain stream connectors like the WebSocketReceiver and JMSReceiver always run continuously, so you don’t need to set the mode.

2. Publish the Workspace to FME Flow

In the FME Form toolbar or the File menu, click Publish to FME Flow. Select your existing FME Flow web connection or create a new one by entering the server URL and your credentials. Click Continue.

Select the repository you’d like to publish the workspace to or create a new one. Click Continue. When prompted to upload the database connection, select the connection and click Continue.

In the next step, you’ll need to select the Apache Kafka Connector package to upload it to FME Flow. Many connectors are shipped as FME packages available through the FME Hub, which need to be uploaded to FME Flow via the publisher. Make sure the Kafka package is selected, and click Continue.

When registering the stream workspace to a service, only select the Job Submitter. Do not select Data Streaming because it’s not related to stream processing. More details on the Data Streaming service can be found here. Click Publish.

image10.png

3. Create an FME Flow Stream

Log in to your FME Flow. In the side menu, go to Streams > Create Stream. Enter a name for the stream, for example: Filter Vehicle Stream to PostGIS. Optionally add a description. 

Next, select the workspace that you just published. No published parameters need to be changed. Click OK.

image12.png

4. Assign an Engine and Run the Stream Workspace

The next page lets you manage your stream. Streams are started automatically when created, but the stream workspace job will not run until at least one FME Flow Engine has been assigned to the stream, allowing it to run continuously.

Under the Assigned Engines section, click Assign. In the Engines dropdown menu, select one of your FME Flow Engines. Note how the Engine will be removed from the queue it’s currently assigned to. Click OK.

image3.png

Within a few seconds, the stream job will start running, and you can see it under the Jobs section. Clicking the job will open the translation log. 

image24.png

To stop the stream workspace, go back to the stream page and click the Stop button at the top. You’ve successfully created and deployed a stream processing workspace on FME Flow!

Part 3: Time Windowing & Group By

When performing traditional batch processing in FME from a non-streaming data source (e.g., database or file), the data is finite with a defined beginning and end. You can read all of the data at once and perform group-based analyses such as sorting and aggregating. Grouping is often based on location or attributes.

However, when reading a data stream into FME, the data is infinite, and it will never finish loading. With this new data model, how can you perform group-based processing? This is where windowing comes into play. Windowing is the process of using time to break up a data stream into groups for filtering and analysis.

Streams Diagrams Rebranded - Filtering - Windowing.png

In this tutorial, we will use the TimeWindower transformer to group the vehicle fleet stream data by 30-second windows. If we used a longer time window, we’d reduce the data volume even further. Data in each window can then be thinned and snapped to a road network dataset.

1. Review the Workspace - Reading the Data

We are reading GPS stream data via the KafkaConnector and road network data via a File Geodatabase reader. 

image2.png

Using a Tester, the road network data is filtered to only include major routes, so we can snap vehicles to them. Attributes are tidied up using the AttributeKeeper, and the data is then assigned a coordinate system using the CoordinateSystemSetter.

For the GPS data, the configuration of the KafkaConnector and JSONFlattener is identical to the settings defined in part 1. Note, by default, the KafkaConnector is in Stream mode.

2. Window the Stream with a TimeWindower

We will now window the GPS data stream to reduce data volumes. Each vehicle reports its location every 5 seconds, but we only need an accuracy of one location per vehicle per 30 seconds. 

Add a TimeWindower after the JSONFlattener and enter the following parameters:

  • Window Duration: 30
  • Time Units: Seconds
  • Time Window Starts: With First Feature
  • Data Has Timestamps: No
  • Window ID Attribute: _window_id
  • ID Type: Window Number

When the feature arrives at the TimeWindower, a different window ID will be assigned to the feature for each 30-second period. That Window ID can then be used downstream to group the data for processing. The diagram below shows how window IDs are assigned to features when the window duration is 5 minutes.

Streams Diagrams Rebranded  - Windowing .png

3. Thinning the GPS Data

Using the Sorter and Sampler transformers, the GPS data can now be thinned by using the Window ID as a Group By. When sorting each vehicle in chronological order, the Sampler can extract the last feature of each vehicle, which is the most recent location.

Add a Sorter transformer, enable Group Processing, and set the following:

  • Group By: _window_id
  • Complete Groups: When Group Changes (Advanced)
  • Sort by
    • path_id, Alphabetical, Ascending
    • timestamp, Alphabetical, Ascending

Now, the GPS points for each vehicle will be grouped together in the window with the most recent location at the end. 

Tip: This process may be challenging to visualize. To see how it works, run the KafkaConnector in Batch mode with ~5000 features and inspect the feature caches in the Sorter and Sampler.

The Sampler transformer can be used to get the last location for each vehicle. Enable Group Processing and set the following:

  • Group By: _window_id, path_id
  • Complete Groups: When Group Changes (Advanced)
  • Sampling rate (N): 1
  • Sampling Type: Last N Features
  • Randomize Sampling: No

4. Snap the Stream Data to the Road Network

We have now thinned the data, but some points aren’t well aligned with the road network due to minor GPS inaccuracies. To fix this, we’ll snap the vehicle points to the road network.

Add a GeometryReplacer to extract the geometry defined in the JSON text and create point geometries.

  • Geometry Encoding: OGC Well Known Text
  • Geometry Source: geometry
  • Remove Attributes: Yes

Add a Reprojector to reproject to the same coordinate system as the roads

  • Source Coordinate System: EPSG:4326
  • Destination Coordinate System: NAD83.BC/Albers

Next, we need to add a NeighborFinder to snap the GPS points to the road network. Attach the road data to the Candidate port and the GPS data to the Base port. Then set the following:

  • Input: Bases and Candidates
  • Candidates First: Yes
  • Maximum Distance: 5
  • Attribute Accumulation: Merge Attributes
  • Accumulation Mode: Merge Candidates
  • Conflict Resolution: Use Base

Group By processing is not required here as we are simply assessing the GPS points individually against the road network.

Run the workspace, and features should exit from the MatchedBase port of the NeighborFinder.

5. Switch to Window Based Upon Timestamp

At the moment, data is being windowed based on when it arrives at the TimeWindower transformer. However, the data includes a timestamp that represents the actual event time. Let’s change the workflow to a window using the event time instead. The thing we have to account for here is the lag between when the event actually happened and when it arrives at the transformer. We will account for this by setting a tolerance.

Open the TimeWindower and set the following:

  • Data Has Timestamp: Yes
  • Timestamp Attribute: timestamp
  • Order Type: Chronological Order
  • Tolerance Interval: 10
  • Tolerance Time Units: Seconds

This tolerance setting effectively leaves the window open for 10 extra seconds, giving a chance for all data to arrive for processing before the window closes. The features that arrive after the window is closed are output via the OutOfSequence port. To learn more about this, please see our article on Windowing Data Streams.

Part 4: Triggering an Event with the WindowChanged Output

In part 3, we joined the stream of data to another data source. In that example, the dataset was static, so reading it in only once when the workspace starts running is fine. However, if you need to join the stream to a dataset that is constantly changing, a different approach is required using the TimeWindower.

1. Review the Workspace

In this scenario, we are operating a ride-hailing app. We are reading a stream of GPS points that represent the driver’s locations, and we want updates on app user locations to see if our drivers are close enough to them.

The driver stream data is extracted and then converted into point geometry so we can perform a spatial operation on it downstream. Using the TimeWindower, the data is windowed into 5-minute windows. 

Screen Shot 2022-08-15 at 1.20.44 PM.png

2. Reconfigure the TimeWindower

Once the driver's GPS data has been thinned and we have the last location of each vehicle during the 5-minute window, we want to perform a spatial operation to identify the current location of app users who are not within 1km of drivers. The crucial part here is the “current” location. We are in stream mode and need to join the Kafka stream to the users’ location. To do this, we use the WindowChanged port on the TimeWindower transformer.

Open the TimeWindower transformer and set the ID Type to be Window Start Time. This is important as we need time to query the database. 

3. Prepare Features for Database Query

The Window Start Time is in seconds since the Unix Epoch, so we need to change this to be a format that PostgreSQL can accept. Add a DateTimeConverter, and connect it to the TimeWindower’s WindowChanged port. Set the following parameters:

  • Datetime Attributes: _window_id
  • Input Format: %s
  • Output Format: %Y-%m-%d %H:%M:%S

image13.png

The PostgreSQL query uses a time range to retrieve all users who have logged a location within the last 5 minutes. To do this, we need to pass in start time and end time attributes to be used in the query. The _window_id is the start time; let’s calculate the end time. We need the end time to represent the end of the window; since the window is 5 minutes, we need to add that on. Connect a DateTimeCalculator to the DateTimeConverter and set the following:

  • Datetime: _window_id
  • Operator: Add
  • Minutes: 5
  • Result: _end_window

Now we also need to format the _end_window date time to be the format %Y-%m-%d %H:%M:%S. Add another DateTimeConverter after the DateTimeConverter and use the following settings:

  • Datetime Attributes: _end_window
  • Input Format: Auto Detect
  • Output Format: %Y-%m-%d %H:%M:%S

image11.png

4. Retrieve User Location for the Current Window

Next, we need to query the PostgreSQL user_location table. Connect the FeatureReader to the DateTimeConverter_2 and look at the settings. The key thing to understand is the WHERE Clause.

timestamp >= '@Value(_window_id)'
AND timestamp < '@Value(_end_window)'
ORDER BY device_id 

This will retrieve all users from the database whose event time fell within the current window.

Since each user’s location will have been logged multiple times within the window, we need to get the last known location for each user. The data returned from PostgreSQL is sorted by device_id, so we can use a Sampler to get a single location for each user. Connect a Sampler to the public.user_location port.

  • Group By: device_id
  • Complete Groups: When Group Changes (Advanced)
  • Sampling Rate: 1
  • Sampling Type: Last N Features

Lastly, make sure the FeatureReader's Output > Attribute and Geometry Handling has the Accumulation Mode set to "Merge Initiator and Result". This includes the _window_id attribute in the output features, which will be used in a downstream transformer's Group By settings.

5. Identify Drivers Who are Not Within 1km of Any User

Add a CoordinateSystemSetter after the Sampler_2 and set the Coordinate System to EPSG:4326. Next, let’s create a 1km buffer around all of the users. Add a Bufferer transformer and set the following:

  • Buffer Type: Area (2D)
  • Buffer Distance: 1
  • Buffer Distance Unit: Kilometers

Add a Reprojector after the Bufferer, and set the destination coordinate system to EPSG:26910. This way, we have matching coordinate systems between the two branches of data, which can then be passed into the SpatialFilter.

Lastly, we need to convert the window ID datetime back to the original format. Since we plan to Group By the _window_id attribute in the SpatialFilter, both the filter and candidate features must have matching window IDs.  Connect a DateTimeConverter after the Reprojector and set the following parameters:

  • Datetime Attribute: _window_id
  • Input Format: %Y-%m-%d %H:%M:%S
  • Output Format: %Es

Screen Shot 2022-09-02 at 11.32.01 AM.png

Now we are ready to join the location of all of the app users and the driver’s location, so we can try to identify the drivers that are not within 1km of the users. Add a SpatialFilter, connect the stream of drivers to the Candidate port, and the users to the Filter port. Set the following values:

  • Group Processing: Enabled
  • Group By: _window_id
  • Complete Groups: When Group Changes (Advanced)
  • Filter Type: Filters First
  • Spatial Predicates to Test: "Filter OGC-Contains Candidate"

The features that exist on the failed port are all of the drivers who are not within 1km of any users.

Screen Shot 2022-09-02 at 11.26.02 AM.png

Conclusion

You’ve now completed this tutorial series on getting started with stream processing in FME! To continue learning about different stream processing scenarios, authoring tips, considerations with databases, and more, be sure to check out our FME and Stream Processing landing page for more tutorials and articles.

Additional Resources

FME Form Tips for Working with Continuous Data Streams

Writing to Databases When Running in Stream Mode

Windowing Data Streams in FME

Filtering Unbounded Data Streams

Spatial Analysis on Unbounded Data Streams

Data Attribution

The data used here originates from open data made available by the City of Vancouver, British Columbia. It contains information licensed under the Open Government License - Vancouver.

Was this article helpful?

We're sorry to hear that.

Please tell us why.

As of January 14th, 2026, comments on knowledge base articles have been closed. To make sure questions don’t get missed and to enable more community support, we’ve moved discussions to the FME Community. If you have a question or a comment about this article, please create a new post or create a support ticket.