Streaming features from PostGIS to FME using WebSockets

Liz Sanderson
Liz Sanderson
  • Updated

FME Version

  • FME 2017.x

Introduction

For information on FME Server as a WebSocket server, see the WebSocket documentation.

Every time a feature (row) is changed in a table in PostGIS, Node.js receives information about the feature. In the javascript file, when Node.js receives a feature from PostGIS it uses the ws module to stream features to FME Server.

const WebSocket = require('ws');
var pg = require ('pg');
var request = require('request');
var pgConString = "postgres://username:password@server:port/database";

const ws = new WebSocket('ws://FMEserver:7078/websocket/',
  {
    headers: {
      "ws_op" : "open", "ws_stream_id" : "postgres"
    }
  });
console.log("pre open ready state " + ws.readyState);

ws.on('open', function() {
    ws.send('{ "ws_op" : "open", "ws_stream_id" : "postgres", "ws_msg" : "open" }');
});

pg.connect(pgConString, function(err, client) {
  if(err) {
    console.log(err);
  } else {
    console.log('HELLO');
  client.on('notification', function(msg) {
    console.log("msg is "+msg.payload);
    //db_body = JSON.parse(msg.payload);
      msgType = typeof msg.payload;
      sendmsg2 = ('{ "ws_op" : "send", "ws_stream_id" : "postgres", "ws_msg" : '+msg.payload+'}');
      ws.send(sendmsg2);

  });
}

  var query = client.query("LISTEN watchers2");
});

This is useful if large amounts of data is being edited. Instead of sending a notification to a topic every time and triggering a workspace for every feature, WebSockets allows the use of one workspace that will process features as they appear.

WebSockets can be set as a Publication in FME Server and each message published to a topic.

websocketpublication.png

WebSockets can also be received using the WebSocketReceiver transformer. The same parameters must be specified: WebSocket Server URL and Stream ID.

websocketreceiverparameters.png

The complete workspace is available here: websocket-receiver-complete.fmw

 

Step-by-step Instructions

Modify the postgis2ws.js file to match your database and FME server connection requirements.

In a command prompt window, navigate to the folder where the node executable is. In the command window type: node <file/path/if/different>postgis2ws.js

If nothing happens, that's good. Any errors in the code will get reported here.

In FME Workbench add a Creator, a WebSocketReceiver and a Logger. The WebSocketReceiver Connection Preamble needs to be set up to connect to the stream in the postgis2ws.js file, in this example the WebSocket Stream is 'postgres'.

{
   "ws_op"         : "open",
   "ws_stream_ids" : ["postgres"]
}

Once the workspace is complete, run it. Only one feature will travel from the Creator to the WebSocketReceiver.

creatorwsreceiverlogger.png

You may notice in the Translation Log that the WebSocketReceiver is waiting for incoming WebSocket messages. This workspace will continue running until cancelled and will stay waiting for messages.

WebSocketReceiver: Attempting to connect to host 'localhost' on port '7078'
WebSocketReceiver: Successfully connected to host 'localhost' on port '7078'
WebSocketReceiver: Performing handshake with remote host
WebSocketReceiver: Handshake successful
WebSocketReceiver: Transmitted Connection Preamble
WebSocketReceiver: Waiting for incoming WebSocket messages...

Now is the time to go and edit, create or delete some features from the PostGIS table. This can be using FME (in a separate workspace) or using a GIS.

This workspace can be published to FME Server and ran or set as a Schedule. The workspace will use one engine for the whole time the job is running. This may prove less 'expensive' than running lots of quick, smaller jobs constantly if lots of features come through as topic messages, creating one job per feature.

FME Server now has Advanced Properties when Running Jobs or Scheduling:

advancedsettings.png

This will allow the job to continue until cancelled or cancel until a set expiry time. This would allow the WebSocketReceiver workspace to run during business hours or whenever there will be activity on the database table.

As the _output attribute is JSON, a JSONFlattener would be helpful to turn each key into an attribute. The AttributeManager could then be used to create a message that could be used to notify someone of the changes to features.

 

Results

As features are created, deleted or changed in PostGIS features should start streaming through the workspace. The message sent will appear in the Translation Log window due to the Logger transformer.

websocketresults.png

This workspace could easily be extended to start performing validation processes on new features or notifying interested parties.

 

Was this article helpful?

Comments

0 comments

Please sign in to leave a comment.