Automating Data Upload to Google BigQuery from Google Cloud Storage

Liz Sanderson
Liz Sanderson
  • Updated

FME Version

  • FME 2020.2

Introduction

As many organizations start to modernize their data they may find the need to integrate with different Cloud platforms. 
This guide will walk you through how to set up an Automation in FME Server that processes and uploads data into Google BigQuery as soon as it arrives in a Google Cloud Storage Bucket.
The data to be uploaded to Google Cloud Storage is the CellSignals.csv. FME will create points from the latitude and longitude columns and write it to Google BigQuery, where it can be inspected spatially.

This article assumes: 
You already have a Google Cloud Storage account and bucket set up.
Your FME Server is publicly accessible, or accessible by Google Cloud Storage Functions.
You have access to Google BigQuery and have already created a dataset.
  

Step-by-Step Instructions

Part 1: Create the FME Workspaces

Download from Google Cloud Storage Workspace

To get the data from Google Cloud Storage to FME Server, we will create one workspace to download the data using the GoogleCloudStorageConnector transformer.

This workspace will be very simple, with a Creator, GoogleCloudStorageConnector and an Automations writer.

1. Add a Creator and connect it to the GoogleCloudStorageConnector


2. Configure the GoogleCloudStorageConnector
You will need to add a credential source so that you can authenticate with Google Cloud. Once that is set up, enter or browse for the project where your bucket is.

Set the Action type to Download and set up the Bucket and Path to be User Parameters. We will need these parameters to receive the values from the Google Cloud Storage Triggers.



The Destination should be a file in the FME Server Temp location. It can go anywhere on FME Server, but for this demo it is being written to the temp location so that it doesn’t persist for a long time and take up space in the FME Server System Share.



For the Output Attributes, ensure _download_path is selected.

3. Add an Automations writer
The Automations writer will output the download location of the file to the next workspace for processing.
Make sure the _download_path attribute is defined on the writer



4. Publish the workspace to FME Server

 

Process and Upload to Google Big Query Workspace

This workspace will process the data that has been downloaded to FME Server before writing it to Google Big Query. We will be using the CellSignals.csv that is available in the FME Data folder, or can be found at the top of this article.

1. Add a CSV reader
Add a CSV reader, reading in the CellSignals.csv file.

2. Connect the CSV output to a VertexCreator transformer


We will be taking the latitude and longitude columns in the CSV to create a spatial dataset for Google Big Query. (Remember to get the X/Y and latitude/longitude the right way round!)



3. Add a CoordinateSystemSetter
Connect the CoordinateSystemSetter to the VertexCreator, so the created points have a location. Set the coordinate system to LL-WGS84.

4. Add a Google BigQuery writer
Now that the data is spatial, we are ready to write it to Google BigQuery.
When you add the writer, click on Parameters to set your credential source and dataset.

Make sure the writer is set up for spatial data:



5. Test and publish your workspace to FME Server
Your final workspace should look like this:



You can test this in FME Desktop and confirm in Google BigQuery that the data was written:


Part 2: Create the FME Server Automation

This automation will receive a notification from Google Cloud Storage Triggers via a webhook, which will trigger the two workspaces created in Part 1.

1. Add a webhook trigger
With a webhook trigger, the webhook URL will be generated once the automation is started. We will need this in Part 3.
On the Output Keys tab, add two webhook keys for Bucket and File:



In Part 3 when we create the Google Cloud Storage Triggers we will create JSON with those two keys.

2. Add a workspace action
Connect the workspace action to the success port of the webhook trigger:



This workspace needs to be the workspace that downloads data from Google Cloud Storage.

Set the published parameters to match the webhook output keys:



3. Add another workspace action
Add the final workspace action, which is the workspace that uploads the data to Google BigQuery. Connect this to the automations writer output:



Set the Source CSV File to be the _download_path from the previous workspace:



4. Start the automation
Save and start the automation.
 

Part 3: Create the Google Cloud Storage Triggers

In order to notify FME Server when a new file has been uploaded to Google Cloud Storage, we need to set up a Google Function. 
This part of the workflow will be done in the Google Cloud Console.
You can read more about Google Cloud Storage Triggers here.

1. Create and Configure Function
In Google Cloud Platform Console, navigate to the Function service.
Create a new function.

For Step 1: Configuration, use the following settings:

Function name: GCS2FMEServer
Region: Choose the same region that your bucket is in
Trigger type: Cloud Storage
Event type: Finalize/Create
Bucket: Choose your bucket

Click Save and proceed to the next step (Code).

2. Add code to the function
Set the Runtime to Python 3.7.
In main.py, paste the following:

import requests
import json
def hello_gcs(event, context):
    """Triggered by a change to a Cloud Storage bucket.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    file = event
    print('Bucket: {}'.format(event['bucket']))
    print('File: {}'.format(event['name']))

    final_url='INSERT WEBHOOK URL HERE'
    headers = {'content-type': 'application/json'}
    payload = {'Bucket': event['bucket'], 'File': event['name']}
    data=json.dumps(payload)
    response = requests.post(final_url, data=data, headers=headers)
    print("JSON payload", data)
    print("Response text",response.text)
    print("Response status", response.status_code, response.reason)


Make sure to add in the FME Server webhook URL, which you can copy from the webhook trigger:



3. Deploy the function
Click Deploy to deploy your function. On the main functions page, wait for the function to become active:

 

Part 4: Test the Workflow

Now that we have all of the parts of this workflow in place, it is time to test it.

If you set your Google BigQuery to Overwrite or Append, you can go ahead and upload CellSignals.csv to your Google Storage Bucket. If you have it set to Write if Empty you may need to delete the table beforehand.

1. Upload CellSignals.csv to your bucket


2. Check the Google Cloud Function Logs


You should see some information logged that shows the bucket, file, JSON payload and FME Server response:



3. Check the Automations log file
From the Automations page, open the Menu and click View Log File:



You should see two translations completed successfully.

4. View the data in Google BigQuery
You can check the table details to confirm that the Created or Last modified datetime is the same time that the workspace ran:

 

To visualize the data, click Export > Explore with GeoViz



Once GeoViz loads, run the query.

You should see the Cell Signal data points over Vancouver:



You can click on any of the points to see the attributes written by FME.



Data Attribution

Unless otherwise stated, the data used throughout this series originates from open data made available by the City of Vancouver, British Columbia. It contains information licensed under the Open Government License - Vancouver.

 

Was this article helpful?

Comments

0 comments

Please sign in to leave a comment.