Introduction
Working with data streams in FME Form (formerly FME Desktop) requires a different approach compared to normal batch workflows that you are likely familiar with. Data streams are unbounded, as discussed in FME and Stream Processing, so a workspace that processes a data stream will never complete unless it is terminated. When processing unbounded data, additional considerations are needed for it to be processed when you expect.
Reading Data
For a workspace to run in stream mode, a transformer that supports stream mode must be present and configured correctly. The following transformers are supported.
- AmazonSQSConnector - Mode must be set to Stream.
- AWSIotConnector - Mode must be set to Stream.
- AzureEventHubConnector - Mode must be set to Stream.
- AzureQueuesStorageConnetor - Mode must be set to Stream.
- AzureIoTConnector - Mode must be set to Stream.
- IBMIoTConnector - Mode must be set to Stream.
- IBMMQConnector - Mode must be set to Stream.
- GoogleIoTConnector - Mode must be set to Stream.
- GoogleCloudPubSubConnector - Mode must be set to Stream.
- JMSReceiver - Can only run in stream mode.
- KafkaConnector - Mode must be set to Stream.
- MQTTConnector - Mode must be set to Stream.
- RabbitMQConnector - Mode must be set to Stream.
- TCPIPReceiver - Can only run in stream mode.
- TweetStreamer - Can only run in stream mode.
- WebSocketReceiver - Can only run in stream mode.
In FME Form, if you run a workspace with one of these transformers configured to run in stream mode, the transformer will run indefinitely until you stop the translation. FME Flow (formerly FME Server) Streams are designed to use a workspace configured this way, so that data is continuously processed without interruption.
A workspace should only contain one transformer set to receive messages in stream mode. While it's possible to add multiple, the workspace won't run as expected, and data will not be received. Create a separate workspace to stream data from another source or consider using topics if the provider you are connecting to supports them (for example, Kafka).
The Importance of Order
When running in stream mode, the order features enter and then progress through the workspace is critical.
Creation/Reader Order
If you are connecting to a stream and reading additional datasets, the order in which features enter the workspace needs to be understood. You normally need the stream data to be read after any other data. If you don’t do this, the other data will never be read, since a stream never finishes.
Key Settings
The Creator transformer will emit features before a Reader does unless "Create After Readers" is set to "Yes". This is important, as to initialize a stream transformer, you often use a Creator.
If you have multiple Creators, you can right-click a Creator and choose "Set Creator Runtime Order..." to change their creation order.
Junction Order
Order is also important after the initial read. If you are making copies of features for different parts of the stream (i.e., splitting the workflow), there might be a need to make sure one set of features is processed first. For example, ensuring candidate features arrive first at a NeighbourFinder with "Candidates First" set to Yes.
To control this, right-click on the connection where the features are duplicated and select “Set Creator Runtime Order”.
Process When Group Changes
Unless you are doing basic processing (filtering or event detection), the stream needs to be windowed into groups so you can process and summarize data. Using the TimeWindower in combination with Group By is how you achieve this. For more details on breaking a stream into groups using the TimeWindower, please see Windowing Data Streams in FME.
Group by Window ID
The TimeWindower assigns a different window ID to all features in each window. This window ID can then be used in transformers that support group processing, like the Sampler shown below, to process each group and then flush features when the next window arrives. When running in stream mode, you need to make sure that the Complete Groups is set to "When Group Changes (Advanced)".
Using "When Group Changes (Advanced)" introduces a potential problem: what happens when the stream isn't producing data for some time? There will be no new window ID to flush features being held in grouping transformers and this can lead to a delay in processing. To address this, most transformers that support stream mode can emit a feature after a period of inactivity. To learn more, please see Stream Heartbeat and the TimeWindower.
Grouping By Multiple Keys
Grouping by the time window ID plus another key is supported, but you need to ensure features are sorted correctly so they are delivered in the correct order. The Sorter transformer can help do this. Reducing data volumes is a key workflow when working with streams, and this can be done using a Sample with the Sorter.
Example
Let's look at an example. Fifty thousand vehicles are reporting their location every five seconds. Rather than storing all of that data, we want to use FME to remove duplication and lower the resolution to 30 seconds.
1. Use the TimeWindower transformer to group the messages into 30-second windows.
2. Messages in each window are then sorted based on the vehicle ID and timestamp.
This means that the last known location of each vehicle in the window will be sorted to the bottom.
3. Group by window ID and vehicle ID and sort
With the data sorted, a Sampler can be used to keep only the last known location for each vehicle in the window. Note: we are now grouping by window ID and vehicle ID.
In this example, if you don’t use the Sorter, the vehicle ID will change multiple times back and forth within a single window ID, which would cause the Group By to produce undesirable results. With the right grouping, it's easy to get the first or last message in a window.
Be mindful when grouping by multiple keys. In this example, if both the vehicle and the window do not change, then features will be held by the Sampler until they do.
Using "When Group Changes (Advanced)" With Multi-Input Transformers
Transformers with multiple input ports, like the SpatialFilter, Clipper, and NeighbourFinder, support group processing. But, if using Group Processing with Complete Groups set to "When Group Changes (Advanced)", the transformer will not output any features unless the Group By attribute is included on features across all input ports. For example, if you wanted to group by window ID in a SpatialFilter, all Filter and Candidate features need to have a window ID.
If you are reading features on window change using the TimeWindower's WindowChanged port, for example updated filter features to send to a SpatialFilter as described in Windowing Data Streams, you can add the Window ID by setting the Accumulation Mode to "Merge Initiator and Result" in the FeatureReader's Attribute and Geometry Handling settings.
Using the HTTPCaller in a Streaming Workspace
Sometimes, you need to send HTTP requests using the HTTPCaller transformer in a streaming workspace. For example, you may be detecting an event in your stream and then calling an API to trigger an external process.
There are several considerations when using the HTTPCaller in a streaming workspace.
Stream Velocity
Most streams quickly produce a high volume of data. HTTP requests are typically slower than the rate of data received from the stream. Only a subset of features should be sent to the HTTPCaller to avoid a blockage in your workflow that could cause excess features to build up or overwhelm a web service. This can be achieved using filtering or windowing techniques.
API Rate Limiting and Credits
Many web service providers impose a rate limit on HTTP requests and charge credits for use. Again, consider stream velocity and only send a subset of features to the HTTPCaller to avoid overwhelming a web service's API rate limit or consuming excess credits. When authoring a streaming workspace, consider temporarily using samplers or testers to limit HTTP requests.
Concurrent Requests and 'Stuck' Features
Even if HTTP requests are only made for a smaller subset of features in your stream, it's recommended to set the maximum number of concurrent HTTP requests to 1 in the HTTPCaller. This setting is found under Advanced > Concurrent Requests.
If concurrent HTTP requests are not set to 1, then features can be held in the HTTPCaller. While HTTP requests are made as soon as features are received (this can be observed in the translation log), features will not be released until 1) all concurrent requests in a group of features have completed and 2) a new feature is received by the HTTPCaller. An example is shown below. Here, a stream is being windowed every 5 minutes, and then the data is thinned using a Sorter and Sampler with Complete Groups set to "When Group Changes" as described earlier in this article. The HTTPCaller's concurrent HTTP requests is set to 25. Several of the requests in the group have finished, but you can see that no features are output. Features will not be released by the HTTPCaller until all requests have finished and the next group arrives.
Writing Stream Data to File
In most scenarios, writing to a database is likely the best option. For details, please see Writing to Databases When Running in Stream Mode. There are some scenarios where you may need to write a stream out to a file, for example, writing out data such as CSV or Parquet to a data lake.
If you need to write a stream out to a file, use a FeatureWriter. The FeatureWriter must be used in combination with a TimeWindower. Below is the configuration for the Feature Writer:
- Group Processing: Set the attribute to the window ID (and a secondary attribute if required). Make sure Complete Groups is set to "When Group Changes (Advanced)”.
-
File Name of Feature Type Name: Set the file name or feature type name (depending on the format) to be dynamically generated based on the group by. For example Name_@Value(_window_id).
-
Note that a DateTimeConverter may be useful to convert a time-based window ID (which is stamped in seconds since epoch -- %s in DateTimeConverter lingo) to a human-readable timestamp to use as part of the generated filename.
-
Below is an example that writes out a CSV file. A new file is created every time a window changes, and the files will be written out with the name Report_<WindowID>.csv.
Considerations for Heartbeat Features
Most transformers that support stream mode can emit a "heartbeat" feature after a period of inactivity, as discussed in Stream Heartbeat and the TimeWindower. When writing to file, the FeatureWriter must receive the heartbeat feature to trigger a group change, release held features, and perform the write. Note that this will also write the heartbeat feature to file. Since heartbeat features include the word "heartbeat" in their window_id, you can identify these when the window ID is included in the filename. Heartbeat files can then be removed manually or via another process.
Additional Resources
Writing to Databases When Running in Stream Mode
Webinar: Introduction to Data Stream Processing
Webinar: Empowering Real-Time Decision Making with Data Streaming