Desktop Tips for Working with Continuous Data Streams

Liz Sanderson
Liz Sanderson
  • Updated

FME Version

  • FME 2021.0

Introduction

Working with data streams in FME requires a different approach compared to normal batch workflows that you are likely familiar with.
 

Reading Data

For a workspace to run in stream mode, a transformer that supports stream mode must be present and configured correctly. The following transformers are supported.

  • AmazonSQSConnector - Mode must be set to Stream.
  • AWSIotConnector - Mode must be set to Stream.
  • AzureEventHubConnector - Mode must be set to Stream.
  • AzureQueuesStorageConnetor - Mode must be set to Stream.
  • AzureIoTConnector - Mode must be set to Stream.
  • IBMIoTConnector - Mode must be set to Stream.
  • GoogleIoTConnector - Mode must be set to Stream.
  • GoogleCloudPubSubConnector  - Mode must be set to Stream.
  • JMSReceiver - Can only run in stream mode.
  • KafkaConnector - Mode must be set to Stream.
  • MQTTConnector - Mode must be set to Stream.
  • RabbitMQConnector - Mode must be set to Stream.
  • TCPIPReceiver - Can only run in stream mode.
  • TweetStreamer  - Can only run in stream mode.
  • WebSocketReceiver - Can only run in stream mode.
 

In FME Form (formerly FME Desktop), if you run a workspace with one of these transformers configured to run in stream mode, the transformer will run indefinitely until you stop the translation. If a workspace is run as an FME Flow (formerly FME Server) Stream, the workspace must have one of these transformers configured to run in stream mode or it will fail.

In addition to a stream-enabled transformer, static data can be read using either a Reader or FeatureReader.

 

The Importance of Order

When running in stream mode, the order features enter and then progress through the workspace is critical.
 

Creation/Reader Order

If you are connecting to a stream and reading additional datasets, the order in which features enter the workspace needs to be understood. You normally need the stream data to be read after any other data. If you don’t do this, since a stream never finishes, the other data will never be read. 

Key Settings

  • The Creator transformer will emit features before a Reader does unless the "Create At End" is set to "Yes" on the Creator. This is important, as to initialize a stream transformer you often use a Creator.
  • You can avoid issues by using FeatureReaders instead of Readers to trigger reads.
  • If you have several Creators (and they have the same “Create at End” setting), you can right-click and select “Set Creator Runtime Order” to change their creation order.
creator runtme order.png

Junction Order

Order is also important after the initial read. If you are making copies of features for different parts of the stream, there might be a need to make sure one part of the workflow runs first. For example, ensuring features go into a transformer downstream that requires features to come in first—e.g. NeighbourFinder with Candidates First set to Yes.

To control this right-click on the connection where the features are duplicated and select “Set Creator Runtime Order”.

junction order.png


Process When Group Changes

Unless you are doing basic processing (filtering or event detection), the stream needs windowing into groups so you can process and summarize data. Using Group By in combination with the TimeWindower is how you achieve this. This article goes into detail on how to use the TimeWindower to break a stream into groups for processing. 


Group by Window ID

The TimeWindower assigns a different window ID to all features in each window. It is this window ID that can then be used to ensure the transformers with this set will process all features in that window as one group, and then flush when the next window arrives. When running in stream mode you need to make sure that the Group By Mode is set to "Process When Group Changes".

spatial filter parameters.png


Grouping By Multiple Keys

Grouping by the time window ID plus another key is supported, it just can be tricky as you need to ensure the features are sorted correctly to ensure delivery is in the correct order. The Sorter transformer can help do this. Reducing data volumes is a key workflow when working with streams and the Sorter + Sampler is needed for this.


Example

To explain we can walk through an example. Fifty thousand vehicles are reporting their location every five seconds. Rather than storing all of that data, we want to use FME to remove duplication and lower the resolution to 30 seconds.

  1. The TimeWindower transformer then groups the messages into 30-second windows.
timewindower.png
  1. Messages in each window are then sorted based on the vehicle ID and timestamp. This means the data is grouped by vehicle and then the vehicle's location is sorted with the first known location of the vehicle in the window first and the last known location of the vehicle last. Note we are also using the Window ID as the group by.
sorter.png
  1. Now the data is grouped by window ID and vehicle ID and sorted by time, a Sampler can be used to get the last location for each vehicle in the window. Note we are now grouping by window ID and vehicle ID.
 sampler.png

In this example, if you don’t use the Sorter the vehicle ID will change multiple times back and forth within a single window ID which would cause the Group By to produce undesirable results. With the right grouping, it is easy to get the first or last in a window. Because you are only storing the features in each window in memory then it is performant.
 

Writing Out File-Based Data

In most scenarios writing out to a database is likely the best option. More details on this here. There are some scenarios where you may need to write a stream out to a file, for example, writing out data such as CSV or Parquet to a data lake.

If you do need to write a stream out to a file you need to use a FeatureWriter. The FeatureWriter must be used in combination with a TimeWindower and the following need to be configured on the Feature Writer:-

  • Group Processing: Set the attribute to the Window ID (and a secondary attribute if required). Make sure the Group By Mode is set to “Process When Group Changes (Advanced)”.
  • Set the filename to be dynamically generated based upon the group by. For example Name_@Value(_window_id).
    • Note that a DateTimeConverter may be useful to convert a time-based window ID (which is stamped in seconds since epoch -- %s in DateTimeConvertor lingo) to a human-readable timestamp to use as part of the generated filename.  

Below is an example that writes out a CSV file. A new file is created every time a window changes and the files will be written out with the name Report_<WindowID>.csv.

File Writer Parameters

 

Additional Resources

Writing to Databases When Running in Stream Mode

Webinar: Introduction to Data Stream Processing

Webinar: Empowering Real-Time Decision Making with Data Streaming

Was this article helpful?

Comments

0 comments

Please sign in to leave a comment.