How to use GCP Pub/Sub Feature to trigger Vertex AI Pipelines from Airflow

Lasse Schmidt
4 min readOct 26, 2022
Photo by Rajeshwar Bachu on Unsplash

Vertex AI is a great tool for all Data Scientists out there. It facilitates prototyping of models, testing models and creating pipelines for your Data Science projects. However, as this is still a very new product from Google Cloud, some functionalities aren’t yet neatly integrated. As of now the scheduling part and integration part into Airflow is not well implemented. There are a few ways of scheduling and triggering your pipeline from Airflow, one of them is using the Pub/Sub feature. How this works I will show you here.

As a pre-requisit I expect you to have a working Vertex AI pipeline with the compiled pipeline saved as a json-file in your Google Cloud Storage. For many pipeline cases this would already be enough, but I want to show you how to run a customized container in that pipeline, so what I also take as a pre-requisit is for you to have a dockerized image of your code you want to run on Artifact Repository.

To sum up the pre-requisites:

— have a compiled Vertex AI pipeline json file saved in Google Cloud Storage

— have your custom code as a Docker image saved in Artifact Repository

The task we now want to solve is that we would like to trigger this pipeline from Airflow, so we can have it run on a scheduled basis.

The first step is to create a Cloud Function. For Pub/Sub to work, first you need to set a topic. The basic instructions for that can also be found in the Google Cloud Documentation here, but this does not directly show you how it will work then. So let me guide you through the process. Your main.py needs to look something like this:

What exactly is this doing? First, it expects a message to the subscribe method, here called event. This message we will be sending via our Airflow. From the given message it receives, it will extract two things: first, the Google Cloud Storage path to your compiled json-file, second the URI to your dockerized image on Artifact Repository. After extracting this information, it will simply use the python SDK for Vertex AI (which is weirdly enough called aiplatform) to trigger your pipeline.

As you can already see, this is highly customizable. This is a simple python function, and you can put in here anything you actually want. As mentioned before, I wanted to show you an example of how to run a customized Vertex AI pipeline with a Docker Image, but this can be anything you want the pipeline to do (for example using an AutoML task).

Next and lastly, we want to use Airflow to send the message and trigger our pipeline. So let’s have a look at the Airflow DAG:

What exactly is going on here? First, we use the PubSubPublishMessageOperator from Airflow. The MESSAGE as you can see consists of two parts, first the data part (which is expected to be a bytestring) and second the attributes part, which can be anything. Here I use a dict. This exact MESSAGE is then in turn send to the Cloud Function created earlier. If you have a look at the Cloud Function from above, you can see that we take the pipeline_spec_uri = base64.b64decode(event[‘data’]).decode(‘utf-8’) part to decode the part of the MESSAGE called data, and the attributes_dict = event[‘attributes’] part to get the dict we send in the attributes part.

How does it know that we want to send the MESSAGE to this specific Cloud Function? Because we gave the Cloud Function a topic, which is part of the Airflow Operator arguments. Whenever you now trigger this DAG (in this case every day at 11) all Cloud Functions with the corresponding topic will receive this MESSAGE.

After that MESSAGE has been send, you can see in your VERTEX AI pipelines tab that a job has been started. If that is the case than congrats, you did it ;)

A few concluding notes on this approach. The Airflow Operator will mark the task as completed once the MESSAGE is send, meaning it does not know when the actual Vertex AI pipeline actually finishes or whether it crashes. So whenever you have tasks after this pub/sub task which are dependend on the outcome of the pub/sub task, then this approach might not be the best for you. In this case, I recommend writing your own Airflow Operator with the python SDK that Google Cloud provides (called aiplatform).

I hope this article shed some light on how the Pub/Sub functionality in Google Cloud works, how the messages need to look like and how to trigger it via Airflow.

Cheers, Lasse

--

--