Skip to main content

Build an email drip campaign with Python

Temporal Python SDK

Introduction

In this tutorial, you'll build an email subscription web application using Temporal and Python. You'll create a web server using the Flask framework to handle requests and use Temporal Workflows, Activities, and Queries to build the core of the application. Your web server will handle requests from the end user and interact with a Temporal Workflow to manage the email subscription process. Since you're building the business logic with Temporal's Workflows and Activities, you'll be able to use Temporal to manage each subscription rather than relying on a separate database or task queue. This reduces the complexity of the code you have to write and support.

You'll create an endpoint for users to give their email address, and then create a new Workflow execution using that email address which will simulate sending an email message at certain intervals. The user can check on the status of their subscription, which you'll handle using a Query, and they can end the subscription at any time by unsubscribing, which you'll handle by cancelling the Workflow Execution. You can view the user's entire process through Temporal's Web UI. For this tutorial, you'll simulate sending emails, but you can adapt this example to call a live email service in the future.

By the end of this tutorial, you'll have a clear understand how to use Temporal to create and manage long-running Workflows within a web application.

You'll find the code for this tutorial on GitHub in the email-subscription-project-python repository.

Prerequisites

Before starting this tutorial:

Develop the Workflow

A Workflow defines a sequence of steps defined by writing code, known as a Workflow Definition, and are carried out by running that code, which results in a Workflow Execution.

The Temporal Python SDK recommends the use of a single data class for parameters and return types. This lets you add fields without breaking compatibility. Before writing the Workflow Definition, you'll define the data objects used by the Workflow Definitions. You'll also define the Task Queue name you'll use in your Worker.

Create a new file called shared_objects.py in your project directory.

Add the following code to the shared_objects.py file which will:

  1. Import the dataclasses library.
  2. Set the Task Queue variable name to email_subscription.
  3. Add WorkflowOptions and EmailDetails data classes.

shared_objects.py

from dataclasses import dataclass

task_queue_name = "email_subscription"


@dataclass
class WorkflowOptions:
email: str


@dataclass
class EmailDetails:
email: str = ""
message: str = ""
count: int = 0
subscribed: bool = False


The following describes each data class and their objects.

  • WorkflowOptions: this data class starts the Workflow Execution.
    • It will contain the following field:
      • email: a string to pass the user's email
  • EmailDetails: this data class holds data about the current state of the subscription.
    • It will contain the following field:
      • email: as a string to pass a user's email
      • message: as a string to pass a message to the user
      • count: as an integer to track the number of emails sent
      • subscribed: as a boolean to track whether the user is currently subscribed

When you Query your Workflow to retrieve the current statue of the Workflow, you'll use the EmailDetails data class.

Now that you have the Task Queue and the data classes defined, you can write the Workflow Definition.

To create a new Workflow Definition, create a new file called workflows.py. This file will contain the SendEmailWorkflow class and its attributes.

Use the workflows.py file to write deterministic logic inside your Workflow Definition and to execute the Activity.

Add the following code to define the Workflow:

workflows.py


import asyncio
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from activities import send_email
from shared_objects import EmailDetails, WorkflowOptions


@workflow.defn
class SendEmailWorkflow:
def __init__(self) -> None:
self.email_details = EmailDetails()

@workflow.run
async def run(self, data: WorkflowOptions) -> None:
duration = 12
self.email_details.email = data.email
self.email_details.message = "Welcome to our Subscription Workflow!"
self.email_details.subscribed = True
self.email_details.count = 0

while self.email_details.subscribed:
self.email_details.count += 1
if self.email_details.count > 1:
self.email_details.message = "Thank you for staying subscribed!"

try:
await workflow.execute_activity(
send_email,
self.email_details,
start_to_close_timeout=timedelta(seconds=10),
)
await asyncio.sleep(duration)

except asyncio.CancelledError as err:
# Cancelled by the user. Send them a goodbye message.
self.email_details.subscribed = False
self.email_details.message = "Sorry to see you go"
await workflow.execute_activity(
send_email,
self.email_details,
start_to_close_timeout=timedelta(seconds=10),
)
# raise error so workflow shows as cancelled.
raise err

The run() method, decorated with @workflow.run, takes in the email address as an argument. This method initializes the _email, _message, _subscribed, and _count attributes of the SendEmailWorkflow instance.

The SendEmailWorkflow class has a loop that checks if the subscription is active by checking if self.email_details.subscribed is True. If it is, it starts the send_email() Activity.

The while loop increments the self.email_details.count attribute and calls the send_email() Activity with the current EmailDetails object. The loop continues as long as the self.email_details.subscribed attribute is true.

The execute_activity() method executes the send_email() Activity with the following parameters:

  • The send_email() Activity Definition
  • The EmailDetails data class
  • A start_to_close_timeout parameter, which tells the Temporal Server to time out the Activity 10 seconds from when the Activity starts

The loop also includes a asyncio.sleep() statement that causes the Workflow to pause for a set amount of time between email. You can define this in seconds, days, months, or even years, depending on your business logic.

If there's a cancellation request, the request raises asyncio.CancelledError, which you can catch and respond. In this application, you'll use cancellation requests to unsubscribe users. You'll send one last email when they unsubscribe, before completing the Workflow Execution.

Since the user's email address is set to the Workflow Id, attempting to subscribe with the same email address twice will result in a Workflow Execution already started error and prevent the Workflow Execution from spawning again.

Therefore, only one running Workflow Execution per email address can exist within the associated Namespace. This ensures that the user won't receive multiple email subscriptions. This also helps reduce the complexity of the code you have to write and maintain.

With this Workflow Definition in place, you can now develop an Activity to send emails.

Develop an Activity

You'll need an Activity to send the email to the subscriber so you can handle failures.

Create a new file called activities.py and add the following code to define the asynchronous Activity Definition:

activities.py

from temporalio import activity

from shared_objects import EmailDetails


@activity.defn
async def send_email(details: EmailDetails) -> str:
print(
f"Sending email to {details.email} with message: {details.message}, count: {details.count}"
)
return "success"

This implementation only prints a message, but you could replace the implementation with one that uses an email API.

Each iteration of the Workflow loop will execute this Activity, which simulates sending a message to the user.

Now that you have the Activity Definition and Workflow Definition, it's time to write the Worker process.

Create the Worker to handle the Workflow and Activity Executions

Create a new file called run_worker.py and develop the Worker process to execute your Workflow and Activity Definitions.

run_worker.py

import asyncio

from temporalio.client import Client
from temporalio.worker import Worker

from activities import send_email
from shared_objects import task_queue_name
from workflows import SendEmailWorkflow


async def main():
client = await Client.connect("localhost:7233")

worker = Worker(
client,
task_queue=task_queue_name,
workflows=[SendEmailWorkflow],
activities=[send_email],
)
await worker.run()


if __name__ == "__main__":
asyncio.run(main())

Now that you've written the logic to execute the Workflow and Activity Definitions, try to build the gateway.

Build the API server to handle subscription requests

This tutorial uses the Flask web framework to build a web server that acts as the entry point for initiating Workflow Execution and communicating with the subscribe, get-details, and unsubscribe routes. The web server will handle HTTP requests and perform the appropriate operations with the Workflow.

Create a new file called run_flask.py to develop your Flask endpoints.

First, register the Temporal Client function to run before the first request to this instance of the application. A Temporal Client enables you to communicate with the Temporal Cluster. Communication with a Temporal Cluster includes, but isn't limited to, the following:

  • Starting Workflow Executions
  • Sending Queries to Workflow Executions
  • Getting the results of a Workflow Execution

Add the following code to import your libraries and connect to the Temporal Server.

run_flask.py

# ...
import asyncio

from flask import Flask, current_app, jsonify, make_response, request
from temporalio.client import Client

from run_worker import SendEmailWorkflow
from shared_objects import WorkflowOptions, task_queue_name

app = Flask(__name__)


async def connect_temporal(app):
client = await Client.connect("localhost:7233")
app.temporal_client = client


def get_client() -> Client:
return current_app.temporal_client

The get_client() function retrieves the Client connection from the Flask app once it's initialized. You'll use this in your endpoints.

Now that your connection to the Temporal Server is open, define your first Flask endpoint.

First, build the /subscribe endpoint.

In the run_flask.py file, define a /subscribe endpoint as an asynchronous function, so that users can subscribe to the emails.

run_flask.py

# ...
@app.route("/subscribe", methods=["POST"])
async def start_subscription():
client = get_client()

email: str = str(request.json.get("email"))
data: WorkflowOptions = WorkflowOptions(email=email)
await client.start_workflow(
SendEmailWorkflow.run,
data,
id=data.email,
task_queue=task_queue_name,
)

message = jsonify({"message": "Resource created successfully"})
response = make_response(message, 201)
return response

In the start_subscription() function, get the Temporal Server connection from the Flask application. The WorkflowOptions object is used to pass the email address given by the user to the Workflow Execution and sets the Workflow Id. This ensures that the email is unique across all Workflows so that the user can't sign up multiple times, only receive the emails they've subscribed to, and when they cancel; they cancel the Workflow run.

With this endpoint in place, you can now send a POST request to /subscribe with an email address in the request body to start a new Workflow that sends an email to that address.

But how would you get details about the subscription? In the next section, you'll query your Workflow to get back information on the state of things in the next section.

Add a Query

Now create a method in which a user can get information about their subscription details. Add a new method called details() to the SendEmailWorkflow class and use the @workflow.query decorator.

To allow users to retrieve information about their subscription details, add a new method called details() to the SendEmailWorkflow class in the workflows.py file. Decorate this method with @workflow.query.

workflows.py

# ...
@workflow.query
def details(self) -> EmailDetails:
return self.email_details

The email_details object is an instance of EmailDetails. Queries can be used even if after the Workflow completes, which is useful for when the user unsubscribes but still wants to retrieve information about their subscription.

Queries should never mutate anything in the Workflow.

Now that you've added the ability to Query your Workflow, add the ability to Query from the Flask application.

To enable users to query the Workflow from the Flask application, add a new endpoint called /get_details to the run_flask.py file.

Use the get_workflow_handle() function to return a Workflow handle by a Workflow Id.

run_flask.py

# ...
@app.route("/get_details", methods=["GET"])
async def get_query():
client = get_client()
email = request.args.get("email")
handle = client.get_workflow_handle_for(SendEmailWorkflow.run, email)
results = await handle.query(SendEmailWorkflow.details)
message = jsonify(
{
"email": results.email,
"message": results.message,
"subscribed": results.subscribed,
"numberOfEmailsSent": results.count,
}
)

response = make_response(message, 200)
return response

Using handle.query() creates a Handle on the Workflow and calls the Query method on the handle to get the value of the variables. This function enables you to return all the information about the user's email subscription that's declared in the Workflow.

Now that users can subscribe and view the details of their subscription, you need to provide them with a way to unsubscribe.

Unsubscribe users with a Workflow Cancellation Request

Users will want to unsubscribe from the email list at some point, so give them a way to do that.

You cancel a Workflow by sending a cancellation request to the Workflow Execution. Your Workflow code can respond to this cancellation and perform additional operations in response. This is how you will handle unsubscribe requests.

With the run_flask.py file open, add a new endpoint called /unsubscribe to the Flask application.

To send a cancellation notice to an endpoint, use the HTTP DELETE method on the unsubscribe endpoint to return a cancel() method on the Workflow's handle.

run_flask.py

# ...

@app.route("/unsubscribe", methods=["DELETE"])
async def end_subscription():
client = get_client()
email: str = str(request.json.get("email"))
handle = client.get_workflow_handle(
email,
)
await handle.cancel()
message = jsonify({"message": "Requesting cancellation"})

# Return 202 because this is a request to cancel and the API has accepted
# the request but has not processed yet.
response = make_response(message, 202)
return response


if __name__ == "__main__":
# Create Temporal connection.
asyncio.run(connect_temporal(app))

# Start API
app.run(debug=True)

The handle.cancel() method sends a cancellation request to the Workflow Execution that was started with the /subscribe endpoint.

When the Temporal Service receives the cancellation request, it will cancel the Workflow Execution and return a CancelledError to the Workflow Execution, which your Workflow Definition already handles in the try/except block. Here's the relevant section as a reminder:

workflows.py

# ...
try:
await workflow.execute_activity(
send_email,
self.email_details,
start_to_close_timeout=timedelta(seconds=10),
)
await asyncio.sleep(duration)

except asyncio.CancelledError as err:
# Cancelled by the user. Send them a goodbye message.
self.email_details.subscribed = False
self.email_details.message = "Sorry to see you go"
await workflow.execute_activity(
send_email,
self.email_details,
start_to_close_timeout=timedelta(seconds=10),
)
# raise error so workflow shows as cancelled.
raise err

With this endpoint in place, users can send a DELETE request to /unsubscribe with an email address in the request body to cancel the Workflow associated with that email address. This allows users to unsubscribe from the email list and prevent any further emails from sending.

Now that you've added the ability to unsubscribe from the email list, test your application code to ensure it works as expected.

Create an integration test

Integration testing is an essential part of software development that helps ensure that different components of an application work together correctly.

The Temporal Python SDK includes functions that help you test your Workflow Executions.

Workflow testing can be done in an integration test fashion against a test server or from a given Client.

In this section, you'll write an integration test using the Temporal Python SDK to test the cancellation of a Workflow. Now, you can add tests to the application to ensure the Cancellation works as expected.

To set up the test environment, create two new files called test_run_worker.py and __init__.py in the tests directory.

The Temporal Python SDK includes functions that help you test your Workflow Executions. In this section, you will import the necessary modules and classes to test the cancellation of a Workflow.

In this code, you are defining two test functions test_create_email() and test_cancel_workflow() that use the Temporal SDK to create and cancel a Workflow Execution.

tests/test_run_worker.py


import pytest
from temporalio.client import WorkflowExecutionStatus, WorkflowFailureError
from temporalio.exceptions import CancelledError
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from activities import send_email
from run_worker import SendEmailWorkflow
from shared_objects import EmailDetails


@pytest.mark.asyncio
async def test_create_email() -> None:
task_queue_name: str = "email_subscription"

async with await WorkflowEnvironment.start_local() as env:
data: EmailDetails = EmailDetails(
email="test@example.com", message="Here's your message!"
)

async with Worker(
env.client,
task_queue=task_queue_name,
workflows=[SendEmailWorkflow],
activities=[send_email],
):

handle = await env.client.start_workflow(
SendEmailWorkflow.run,
data,
id=data.email,
task_queue=task_queue_name,
)

assert WorkflowExecutionStatus.RUNNING == (await handle.describe()).status


@pytest.mark.asyncio
async def test_cancel_workflow() -> None:
task_queue_name: str = "email_subscription"

async with await WorkflowEnvironment.start_local() as env:
data: EmailDetails = EmailDetails(
email="test@example.com", message="Here's your message!"
)

async with Worker(
env.client,
task_queue=task_queue_name,
workflows=[SendEmailWorkflow],
activities=[send_email],
):

handle = await env.client.start_workflow(
SendEmailWorkflow.run,
data,
id=data.email,
task_queue=task_queue_name,
)

await handle.cancel()

# Cancelling a workflow requests cancellation. Need to wait for the
# workflow to complete.
with pytest.raises(WorkflowFailureError) as err:
await handle.result()

assert isinstance(err.value.cause, CancelledError)

assert WorkflowExecutionStatus.CANCELED == (await handle.describe()).status


The test_create_email() function creates a Workflow Execution by starting the SendEmailWorkflow with some test data. The function then asserts that the status of the Workflow Execution is RUNNING.

The test_cancel_workflow() function also starts a Workflow Execution, but it then immediately cancels it using the cancel() method on the Workflow's handle. It then waits for the Workflow Execution to complete and asserts that the status is CANCELED. Finally, the function checks that the Workflow Execution was cancelled due to a CancelledError.

Now that you've created a test function for the Workflow Cancellation, run pytest to see if that works.

To test the function, run pytest from the command line to automatically discover and execute tests.

============================= test session starts ==============================
platform darwin -- Python 3.11.3, pytest-7.2.2, pluggy-1.0.0
rootdir: email-subscription-project-python, configfile: pyproject.toml
plugins: asyncio-0.20.3, anyio-3.6.2
asyncio: mode=Mode.AUTO
collected 2 items

tests/test_run_worker.py::test_create_email
-------------------------------- live log call ---------------------------------
12:01:12 [ INFO] Beginning worker shutdown, will wait 0:00:00 before cancelling activities (_worker.py:425)
PASSED [ 50%]
tests/test_run_worker.py::test_cancel_workflow
-------------------------------- live log call ---------------------------------
12:01:23 [ INFO] Beginning worker shutdown, will wait 0:00:00 before cancelling activities (_worker.py:425)
PASSED [100%]

============================== 2 passed in 13.24s ==============================

You've successfully written, executed, and passed a Cancellation Workflow test, just as you would any other code written in Python. Temporal's Python SDK provides a number of functions that help you test your Workflow Executions. By following the best practices for testing your code, you can be confident that your Workflows are reliable and performant.

Conclusion

This tutorial demonstrates how to build an email subscription web application using Temporal and Python. By leveraging Temporal's Workflows, Activities, and Queries, the tutorial shows how to create a web server that interacts with Temporal to manage the email subscription process.

With this knowledge, you will be able to take on more complex Workflows and Activities to create even stronger applications.