Email infrastructure for Prefect AI agent flows

Connect MultiMail to any Prefect workflow to send, receive, and gate email with human approval steps — without restructuring your pipeline's retry, scheduling, or observability logic.


Prefect is a Python workflow orchestration platform built for production reliability: retries, scheduling, observability, and deployments on any compute infrastructure. It is a natural fit for AI agent systems that process data, call external services, and need durable execution around those calls.

Email sits at the edge of many agent workflows — notifying stakeholders, parsing inbound requests, routing decisions to humans for review. Without a purpose-built API, agents reach for SMTP libraries or transactional email services designed for marketing teams, not for programmatic agents that need to read replies, wait for approvals, and maintain audit trails.

MultiMail provides a REST API designed for this pattern. Each endpoint maps directly to the operations a Prefect task would perform: send, reply, check inbox, read a message, tag for routing, or pause and wait for a human decision. Oversight modes let you configure how much autonomy the agent has at each stage of the flow, and changing the mode requires no flow restructuring.

Built for Prefect AI Agents developers

Approval steps fit Prefect's retry model

MultiMail's gated_send and gated_all oversight modes hold a message in a pending queue until a human approves it. A Prefect task can poll list_pending on a retry schedule and raise until the approval clears — this is standard Prefect retry behavior with no custom webhook infrastructure required.

Idempotent sends survive task retries

Prefect retries failed tasks automatically. MultiMail's send_email endpoint is idempotent — if a task retries after a transient error, resubmitting the same request returns the existing message_id rather than queuing a duplicate. Retry logic in your flow needs no deduplication guard.

Inbound email as a Prefect trigger

MultiMail webhooks emit a structured payload on every inbound message, including sender, parsed body, thread ID, and tags. Point the webhook at a Prefect deployment's HTTP trigger endpoint to start a new flow run automatically for each incoming message.

Observable email actions

Every MultiMail API call produces a structured audit event — sender, recipient, oversight mode, approval status, and timestamps. These surface in the MultiMail dashboard alongside delivery and approval history, and can be exported to the same observability stack your Prefect flows report to.

Per-call oversight mode overrides

A single Prefect flow may need different email policies at different stages: gated_all during development, gated_send in staging, autonomous in production. Pass oversight_mode as a task parameter rather than baking it into the flow, so a single deployment covers all environments.


Get started in minutes

Send email from a Prefect task
python
import os
import requests
from prefect import flow, task

MULTIMAIL_API_KEY = os.environ["MULTIMAIL_API_KEY"]
MULTIMAIL_BASE = "https://api.multimail.dev/v1"

@task(retries=3, retry_delay_seconds=10)
def send_email(
    to: str,
    subject: str,
    body: str,
    oversight_mode: str = "gated_send",
) -> str:
    resp = requests.post(
        f"{MULTIMAIL_BASE}/send_email",
        headers={"Authorization": f"Bearer {MULTIMAIL_API_KEY}"},
        json={
            "from": "[email protected]",
            "to": to,
            "subject": subject,
            "body": body,
            "oversight_mode": oversight_mode,
        },
        timeout=15,
    )
    resp.raise_for_status()
    return resp.json()["message_id"]

@flow(name="notify-stakeholder")
def notify_stakeholder_flow(report_url: str, recipient: str) -> None:
    message_id = send_email(
        to=recipient,
        subject="Pipeline run complete",
        body=f"The pipeline finished. Report: {report_url}",
    )
    print(f"Queued message: {message_id}")

Wrap MultiMail's send_email endpoint in a Prefect task to get retries, state tracking, and observability automatically. The task raises on non-2xx responses so Prefect handles transient failures without extra logic in the flow.

Poll for human approval before continuing
python
import os
import requests
from prefect import flow, task

MULTIMAIL_API_KEY = os.environ["MULTIMAIL_API_KEY"]
MULTIMAIL_BASE = "https://api.multimail.dev/v1"
HEADERS = {"Authorization": f"Bearer {MULTIMAIL_API_KEY}"}

@task(retries=3, retry_delay_seconds=10)
def queue_gated_email(to: str, subject: str, body: str) -> str:
    resp = requests.post(
        f"{MULTIMAIL_BASE}/send_email",
        headers=HEADERS,
        json={
            "from": "[email protected]",
            "to": to,
            "subject": subject,
            "body": body,
            "oversight_mode": "gated_send",
        },
    )
    resp.raise_for_status()
    return resp.json()["message_id"]

@task(retries=30, retry_delay_seconds=60)
def wait_for_approval(message_id: str) -> str:
    resp = requests.get(
        f"{MULTIMAIL_BASE}/list_pending",
        headers=HEADERS,
    )
    resp.raise_for_status()
    statuses = {m["message_id"]: m["status"] for m in resp.json()["messages"]}
    status = statuses.get(message_id)
    if status == "approved":
        return "approved"
    if status == "cancelled":
        raise ValueError(f"Email {message_id} was cancelled by approver")
    raise RuntimeError(f"Still pending approval: {message_id}")

@flow(name="gated-customer-notification")
def gated_notification_flow(customer_email: str, content: str) -> None:
    message_id = queue_gated_email(
        to=customer_email,
        subject="Action required",
        body=content,
    )
    result = wait_for_approval(message_id)
    print(f"Message {message_id} cleared with status: {result}")

With oversight_mode set to gated_send, a message waits in a pending queue until a human approves or cancels it. This task polls list_pending and raises until the message clears, which maps directly to Prefect's retry-until-success pattern.

Process inbound emails on a schedule
python
import os
import requests
from prefect import flow, task
from datetime import timedelta

MULTIMAIL_API_KEY = os.environ["MULTIMAIL_API_KEY"]
MULTIMAIL_BASE = "https://api.multimail.dev/v1"
HEADERS = {"Authorization": f"Bearer {MULTIMAIL_API_KEY}"}

@task(retries=3, retry_delay_seconds=10)
def fetch_unprocessed(mailbox: str) -> list[dict]:
    resp = requests.get(
        f"{MULTIMAIL_BASE}/check_inbox",
        headers=HEADERS,
        params={"mailbox": mailbox, "exclude_tag": "processed"},
    )
    resp.raise_for_status()
    return resp.json()["messages"]

@task
def read_and_tag(message_id: str) -> dict:
    body = requests.get(
        f"{MULTIMAIL_BASE}/read_email",
        headers=HEADERS,
        params={"message_id": message_id},
    )
    body.raise_for_status()
    requests.post(
        f"{MULTIMAIL_BASE}/tag_email",
        headers=HEADERS,
        json={"message_id": message_id, "tags": ["processed"]},
    ).raise_for_status()
    return body.json()

@flow(name="process-inbound-emails")
def process_inbound_flow(mailbox: str = "[email protected]") -> None:
    messages = fetch_unprocessed(mailbox)
    for msg in messages:
        data = read_and_tag(msg["message_id"])
        print(f"Processed: {data[&"cm">#039;subject']} from {data['from']}")

Poll check_inbox on a Prefect schedule to fetch unprocessed messages, read each one, and tag it after processing to avoid reprocessing on the next run.

Route flow execution based on a human email decision
python
import os
import requests
from prefect import flow, task

MULTIMAIL_API_KEY = os.environ["MULTIMAIL_API_KEY"]
MULTIMAIL_BASE = "https://api.multimail.dev/v1"
HEADERS = {"Authorization": f"Bearer {MULTIMAIL_API_KEY}"}

@task(retries=3, retry_delay_seconds=10)
def request_decision(to: str, question: str, options: list[str]) -> str:
    resp = requests.post(
        f"{MULTIMAIL_BASE}/decide_email",
        headers=HEADERS,
        json={
            "from": "[email protected]",
            "to": to,
            "question": question,
            "options": options,
        },
    )
    resp.raise_for_status()
    return resp.json()["thread_id"]

@task(retries=30, retry_delay_seconds=60)
def poll_for_decision(thread_id: str) -> str:
    resp = requests.get(
        f"{MULTIMAIL_BASE}/get_thread",
        headers=HEADERS,
        params={"thread_id": thread_id},
    )
    resp.raise_for_status()
    for msg in resp.json().get("messages", []):
        if msg.get("is_reply") and msg.get("decision"):
            return msg["decision"]
    raise RuntimeError("No decision reply yet")

@flow(name="anomaly-decision-routing")
def decision_routing_flow(analyst_email: str, anomaly_summary: str) -> None:
    thread_id = request_decision(
        to=analyst_email,
        question=f"Anomaly detected: {anomaly_summary}. How should the pipeline proceed?",
        options=["escalate", "suppress", "investigate"],
    )
    choice = poll_for_decision(thread_id)
    if choice == "escalate":
        print("Routing to escalation subflow")
    elif choice == "suppress":
        print("Logging and suppressing anomaly")
    else:
        print("Starting investigation subflow")

Use decide_email to present a human with a question and branching options inside a Prefect flow. Poll get_thread for the reply, then branch based on the chosen option.


Step by step

1

Install Prefect and store your MultiMail API key

Install Prefect and the requests library. Store your MultiMail API key as an environment variable for local development, or as a Prefect secret block for deployed flows.

bash
pip install prefect requests

"cm"># Local development
export MULTIMAIL_API_KEY="mm_live_your_key_here"

"cm"># Prefect secret block for deployed flows
python - <<&"cm">#039;EOF'
from prefect.blocks.system import Secret
Secret(value="mm_live_your_key_here").save("multimail-api-key", overwrite=True)
EOF
2

Provision a mailbox for your agent

Create a mailbox using MultiMail's create_mailbox endpoint. Set the default oversight_mode for the mailbox — individual API calls can override this per message.

bash
import os, requests

resp = requests.post(
    "https://api.multimail.dev/v1/create_mailbox",
    headers={"Authorization": f"Bearer {os.environ[&"cm">#039;MULTIMAIL_API_KEY']}"},
    json={
        "address": "[email protected]",
        "oversight_mode": "gated_send",
    },
)
print(resp.json())
# {"mailbox_id": "mbx_...", "address": "[email protected]"}
3

Wrap MultiMail calls in Prefect tasks

Define each email operation as a Prefect task with a retry policy. Tasks raise on non-2xx responses so Prefect records failures and retries automatically.

bash
import os, requests
from prefect import task

MULTIMAIL_API_KEY = os.environ["MULTIMAIL_API_KEY"]

@task(retries=3, retry_delay_seconds=10)
def send_email(to: str, subject: str, body: str, oversight_mode: str = "gated_send") -> str:
    resp = requests.post(
        "https://api.multimail.dev/v1/send_email",
        headers={"Authorization": f"Bearer {MULTIMAIL_API_KEY}"},
        json={
            "from": "[email protected]",
            "to": to,
            "subject": subject,
            "body": body,
            "oversight_mode": oversight_mode,
        },
    )
    resp.raise_for_status()
    return resp.json()["message_id"]
4

Deploy and schedule the flow

Deploy the flow to a Prefect work pool with a schedule. The MULTIMAIL_API_KEY environment variable must be available in the worker environment.

bash
from prefect import flow
from prefect.client.schemas.schedules import IntervalSchedule
from datetime import timedelta

@flow(name="scheduled-email-pipeline")
def email_pipeline(recipient: str) -> None:
    message_id = send_email(
        to=recipient,
        subject="Daily digest",
        body="Here is your daily pipeline summary.",
    )
    print(f"Queued: {message_id}")

if __name__ == "__main__":
    email_pipeline.serve(
        name="daily-digest",
        interval=timedelta(hours=24),
        parameters={"recipient": "[email protected]"},
    )

Common questions

Does MultiMail have a native Prefect block or integration package?
Not yet. MultiMail exposes a REST API at https://api.multimail.dev/v1 and all operations are available via standard HTTP calls using the requests library. Wrapping each call in a Prefect task gives you retries, state tracking, and logging without a dedicated block. A first-party Prefect integration is on the roadmap.
How does oversight mode interact with Prefect's retry logic?
They are independent. Prefect retries happen when a task raises an exception — for example, on a network error or a non-2xx response from MultiMail. Oversight mode controls what MultiMail does with the message after it is accepted: hold for approval, send immediately, or log only. If a task retries after a transient failure, resubmitting the same request returns the existing message_id rather than creating a duplicate send.
Can inbound email trigger a Prefect flow run automatically?
Yes. Configure a MultiMail webhook on your mailbox pointing to a Prefect deployment's HTTP trigger endpoint. MultiMail will POST a structured payload for each inbound message — sender, subject, parsed body, thread ID, and any existing tags. Prefect creates a new flow run for each webhook call with the message data passed as parameters.
What happens if a flow is cancelled while an email is awaiting approval?
The message stays in the MultiMail pending queue until explicitly approved or cancelled. You should call cancel_message in a flow cancellation handler or a cleanup task to withdraw pending messages when the flow exits unexpectedly. Otherwise the email may be approved and sent after the flow that queued it has already been marked cancelled.
Which oversight mode should I use during local development?
Use gated_all during development so no emails are sent without explicit approval regardless of which endpoint the task calls. This gives you full visibility into what the agent would send before promoting to gated_send in staging or monitored and autonomous in production.
Can I run multiple concurrent flows that share the same mailbox?
Yes. MultiMail mailboxes are shared resources. If multiple flows call check_inbox on the same mailbox, use tag_email immediately after reading a message to mark it as claimed, and filter by the absence of that tag in subsequent check_inbox calls. This prevents two concurrent flow runs from processing the same message.

Explore more

The only agent email with a verifiable sender

Email infrastructure built for AI agents. Verifiable identity, graduated oversight, and a 38-tool MCP server. Formally verified in Lean 4.