Connect MultiMail to any Prefect workflow to send, receive, and gate email with human approval steps — without restructuring your pipeline's retry, scheduling, or observability logic.
Prefect is a Python workflow orchestration platform built for production reliability: retries, scheduling, observability, and deployments on any compute infrastructure. It is a natural fit for AI agent systems that process data, call external services, and need durable execution around those calls.
Email sits at the edge of many agent workflows — notifying stakeholders, parsing inbound requests, routing decisions to humans for review. Without a purpose-built API, agents reach for SMTP libraries or transactional email services designed for marketing teams, not for programmatic agents that need to read replies, wait for approvals, and maintain audit trails.
MultiMail provides a REST API designed for this pattern. Each endpoint maps directly to the operations a Prefect task would perform: send, reply, check inbox, read a message, tag for routing, or pause and wait for a human decision. Oversight modes let you configure how much autonomy the agent has at each stage of the flow, and changing the mode requires no flow restructuring.
MultiMail's gated_send and gated_all oversight modes hold a message in a pending queue until a human approves it. A Prefect task can poll list_pending on a retry schedule and raise until the approval clears — this is standard Prefect retry behavior with no custom webhook infrastructure required.
Prefect retries failed tasks automatically. MultiMail's send_email endpoint is idempotent — if a task retries after a transient error, resubmitting the same request returns the existing message_id rather than queuing a duplicate. Retry logic in your flow needs no deduplication guard.
MultiMail webhooks emit a structured payload on every inbound message, including sender, parsed body, thread ID, and tags. Point the webhook at a Prefect deployment's HTTP trigger endpoint to start a new flow run automatically for each incoming message.
Every MultiMail API call produces a structured audit event — sender, recipient, oversight mode, approval status, and timestamps. These surface in the MultiMail dashboard alongside delivery and approval history, and can be exported to the same observability stack your Prefect flows report to.
A single Prefect flow may need different email policies at different stages: gated_all during development, gated_send in staging, autonomous in production. Pass oversight_mode as a task parameter rather than baking it into the flow, so a single deployment covers all environments.
import os
import requests
from prefect import flow, task
MULTIMAIL_API_KEY = os.environ["MULTIMAIL_API_KEY"]
MULTIMAIL_BASE = "https://api.multimail.dev/v1"
@task(retries=3, retry_delay_seconds=10)
def send_email(
to: str,
subject: str,
body: str,
oversight_mode: str = "gated_send",
) -> str:
resp = requests.post(
f"{MULTIMAIL_BASE}/send_email",
headers={"Authorization": f"Bearer {MULTIMAIL_API_KEY}"},
json={
"from": "[email protected]",
"to": to,
"subject": subject,
"body": body,
"oversight_mode": oversight_mode,
},
timeout=15,
)
resp.raise_for_status()
return resp.json()["message_id"]
@flow(name="notify-stakeholder")
def notify_stakeholder_flow(report_url: str, recipient: str) -> None:
message_id = send_email(
to=recipient,
subject="Pipeline run complete",
body=f"The pipeline finished. Report: {report_url}",
)
print(f"Queued message: {message_id}")Wrap MultiMail's send_email endpoint in a Prefect task to get retries, state tracking, and observability automatically. The task raises on non-2xx responses so Prefect handles transient failures without extra logic in the flow.
import os
import requests
from prefect import flow, task
MULTIMAIL_API_KEY = os.environ["MULTIMAIL_API_KEY"]
MULTIMAIL_BASE = "https://api.multimail.dev/v1"
HEADERS = {"Authorization": f"Bearer {MULTIMAIL_API_KEY}"}
@task(retries=3, retry_delay_seconds=10)
def queue_gated_email(to: str, subject: str, body: str) -> str:
resp = requests.post(
f"{MULTIMAIL_BASE}/send_email",
headers=HEADERS,
json={
"from": "[email protected]",
"to": to,
"subject": subject,
"body": body,
"oversight_mode": "gated_send",
},
)
resp.raise_for_status()
return resp.json()["message_id"]
@task(retries=30, retry_delay_seconds=60)
def wait_for_approval(message_id: str) -> str:
resp = requests.get(
f"{MULTIMAIL_BASE}/list_pending",
headers=HEADERS,
)
resp.raise_for_status()
statuses = {m["message_id"]: m["status"] for m in resp.json()["messages"]}
status = statuses.get(message_id)
if status == "approved":
return "approved"
if status == "cancelled":
raise ValueError(f"Email {message_id} was cancelled by approver")
raise RuntimeError(f"Still pending approval: {message_id}")
@flow(name="gated-customer-notification")
def gated_notification_flow(customer_email: str, content: str) -> None:
message_id = queue_gated_email(
to=customer_email,
subject="Action required",
body=content,
)
result = wait_for_approval(message_id)
print(f"Message {message_id} cleared with status: {result}")With oversight_mode set to gated_send, a message waits in a pending queue until a human approves or cancels it. This task polls list_pending and raises until the message clears, which maps directly to Prefect's retry-until-success pattern.
import os
import requests
from prefect import flow, task
from datetime import timedelta
MULTIMAIL_API_KEY = os.environ["MULTIMAIL_API_KEY"]
MULTIMAIL_BASE = "https://api.multimail.dev/v1"
HEADERS = {"Authorization": f"Bearer {MULTIMAIL_API_KEY}"}
@task(retries=3, retry_delay_seconds=10)
def fetch_unprocessed(mailbox: str) -> list[dict]:
resp = requests.get(
f"{MULTIMAIL_BASE}/check_inbox",
headers=HEADERS,
params={"mailbox": mailbox, "exclude_tag": "processed"},
)
resp.raise_for_status()
return resp.json()["messages"]
@task
def read_and_tag(message_id: str) -> dict:
body = requests.get(
f"{MULTIMAIL_BASE}/read_email",
headers=HEADERS,
params={"message_id": message_id},
)
body.raise_for_status()
requests.post(
f"{MULTIMAIL_BASE}/tag_email",
headers=HEADERS,
json={"message_id": message_id, "tags": ["processed"]},
).raise_for_status()
return body.json()
@flow(name="process-inbound-emails")
def process_inbound_flow(mailbox: str = "[email protected]") -> None:
messages = fetch_unprocessed(mailbox)
for msg in messages:
data = read_and_tag(msg["message_id"])
print(f"Processed: {data[&"cm">#039;subject']} from {data['from']}")Poll check_inbox on a Prefect schedule to fetch unprocessed messages, read each one, and tag it after processing to avoid reprocessing on the next run.
import os
import requests
from prefect import flow, task
MULTIMAIL_API_KEY = os.environ["MULTIMAIL_API_KEY"]
MULTIMAIL_BASE = "https://api.multimail.dev/v1"
HEADERS = {"Authorization": f"Bearer {MULTIMAIL_API_KEY}"}
@task(retries=3, retry_delay_seconds=10)
def request_decision(to: str, question: str, options: list[str]) -> str:
resp = requests.post(
f"{MULTIMAIL_BASE}/decide_email",
headers=HEADERS,
json={
"from": "[email protected]",
"to": to,
"question": question,
"options": options,
},
)
resp.raise_for_status()
return resp.json()["thread_id"]
@task(retries=30, retry_delay_seconds=60)
def poll_for_decision(thread_id: str) -> str:
resp = requests.get(
f"{MULTIMAIL_BASE}/get_thread",
headers=HEADERS,
params={"thread_id": thread_id},
)
resp.raise_for_status()
for msg in resp.json().get("messages", []):
if msg.get("is_reply") and msg.get("decision"):
return msg["decision"]
raise RuntimeError("No decision reply yet")
@flow(name="anomaly-decision-routing")
def decision_routing_flow(analyst_email: str, anomaly_summary: str) -> None:
thread_id = request_decision(
to=analyst_email,
question=f"Anomaly detected: {anomaly_summary}. How should the pipeline proceed?",
options=["escalate", "suppress", "investigate"],
)
choice = poll_for_decision(thread_id)
if choice == "escalate":
print("Routing to escalation subflow")
elif choice == "suppress":
print("Logging and suppressing anomaly")
else:
print("Starting investigation subflow")Use decide_email to present a human with a question and branching options inside a Prefect flow. Poll get_thread for the reply, then branch based on the chosen option.
Install Prefect and the requests library. Store your MultiMail API key as an environment variable for local development, or as a Prefect secret block for deployed flows.
pip install prefect requests
"cm"># Local development
export MULTIMAIL_API_KEY="mm_live_your_key_here"
"cm"># Prefect secret block for deployed flows
python - <<&"cm">#039;EOF'
from prefect.blocks.system import Secret
Secret(value="mm_live_your_key_here").save("multimail-api-key", overwrite=True)
EOFCreate a mailbox using MultiMail's create_mailbox endpoint. Set the default oversight_mode for the mailbox — individual API calls can override this per message.
import os, requests
resp = requests.post(
"https://api.multimail.dev/v1/create_mailbox",
headers={"Authorization": f"Bearer {os.environ[&"cm">#039;MULTIMAIL_API_KEY']}"},
json={
"address": "[email protected]",
"oversight_mode": "gated_send",
},
)
print(resp.json())
# {"mailbox_id": "mbx_...", "address": "[email protected]"}Define each email operation as a Prefect task with a retry policy. Tasks raise on non-2xx responses so Prefect records failures and retries automatically.
import os, requests
from prefect import task
MULTIMAIL_API_KEY = os.environ["MULTIMAIL_API_KEY"]
@task(retries=3, retry_delay_seconds=10)
def send_email(to: str, subject: str, body: str, oversight_mode: str = "gated_send") -> str:
resp = requests.post(
"https://api.multimail.dev/v1/send_email",
headers={"Authorization": f"Bearer {MULTIMAIL_API_KEY}"},
json={
"from": "[email protected]",
"to": to,
"subject": subject,
"body": body,
"oversight_mode": oversight_mode,
},
)
resp.raise_for_status()
return resp.json()["message_id"]Deploy the flow to a Prefect work pool with a schedule. The MULTIMAIL_API_KEY environment variable must be available in the worker environment.
from prefect import flow
from prefect.client.schemas.schedules import IntervalSchedule
from datetime import timedelta
@flow(name="scheduled-email-pipeline")
def email_pipeline(recipient: str) -> None:
message_id = send_email(
to=recipient,
subject="Daily digest",
body="Here is your daily pipeline summary.",
)
print(f"Queued: {message_id}")
if __name__ == "__main__":
email_pipeline.serve(
name="daily-digest",
interval=timedelta(hours=24),
parameters={"recipient": "[email protected]"},
)Email infrastructure built for AI agents. Verifiable identity, graduated oversight, and a 38-tool MCP server. Formally verified in Lean 4.