WebSockets

Real-time, low-latency email event streaming

WebSockets provide a persistent, bidirectional connection to AgentMail for receiving email events in real-time. Unlike webhooks, WebSockets don’t require a public URL or external tools like ngrok.

Why Use WebSockets?

FeatureWebhookWebSocket
SetupRequires public URL + ngrokNo external tools needed
ConnectionHTTP request per eventPersistent connection
DirectionAgentMail → Your serverBidirectional
FirewallMust expose portOutbound only
LatencyHTTP round-tripInstant streaming

Python SDK

The Python SDK provides both synchronous and asynchronous WebSocket clients.

Async Usage

1import asyncio
2from agentmail import AsyncAgentMail, Subscribe, Subscribed, MessageReceivedEvent
3
4client = AsyncAgentMail(api_key="YOUR_API_KEY")
5
6async def main():
7 async with client.websockets.connect() as socket:
8 # Subscribe to inboxes
9 await socket.send_subscribe(Subscribe(inbox_ids=["agent@agentmail.to"]))
10
11 # Process events as they arrive
12 async for event in socket:
13 if isinstance(event, Subscribed):
14 print(f"Subscribed to: {event.inbox_ids}")
15 elif isinstance(event, MessageReceivedEvent):
16 print(f"New email from: {event.message.from_}")
17 print(f"Subject: {event.message.subject}")
18
19asyncio.run(main())

Sync Usage

1from agentmail import AgentMail, Subscribe, Subscribed, MessageReceivedEvent
2
3client = AgentMail(api_key="YOUR_API_KEY")
4
5with client.websockets.connect() as socket:
6 # Subscribe to inboxes
7 socket.send_subscribe(Subscribe(inbox_ids=["agent@agentmail.to"]))
8
9 # Process events as they arrive
10 for event in socket:
11 if isinstance(event, Subscribed):
12 print(f"Subscribed to: {event.inbox_ids}")
13 elif isinstance(event, MessageReceivedEvent):
14 print(f"New email from: {event.message.from_}")
15 print(f"Subject: {event.message.subject}")

Event Handler Pattern

You can also use event handlers instead of iterating:

1import asyncio
2from agentmail import AsyncAgentMail, Subscribe, EventType
3
4client = AsyncAgentMail(api_key="YOUR_API_KEY")
5
6async def main():
7 async with client.websockets.connect() as socket:
8 # Register event handlers
9 socket.on(EventType.OPEN, lambda _: print("Connected"))
10 socket.on(EventType.MESSAGE, lambda msg: print("Received:", msg))
11 socket.on(EventType.CLOSE, lambda _: print("Disconnected"))
12 socket.on(EventType.ERROR, lambda err: print("Error:", err))
13
14 # Subscribe and start listening
15 await socket.send_subscribe(Subscribe(inbox_ids=["agent@agentmail.to"]))
16 await socket.start_listening()
17
18asyncio.run(main())

For sync usage with event handlers, run the listener in a background thread:

1import threading
2from agentmail import AgentMail, Subscribe, EventType
3
4client = AgentMail(api_key="YOUR_API_KEY")
5
6with client.websockets.connect() as socket:
7 socket.on(EventType.OPEN, lambda _: print("Connected"))
8 socket.on(EventType.MESSAGE, lambda msg: print("Received:", msg))
9 socket.on(EventType.CLOSE, lambda _: print("Disconnected"))
10 socket.on(EventType.ERROR, lambda err: print("Error:", err))
11
12 socket.send_subscribe(Subscribe(inbox_ids=["agent@agentmail.to"]))
13
14 # Start listening in background thread
15 listener = threading.Thread(target=socket.start_listening, daemon=True)
16 listener.start()
17 listener.join()

TypeScript SDK

The TypeScript SDK provides a WebSocket client with automatic reconnection.

Basic Usage

1import { AgentMailClient, AgentMail } from "agentmail";
2
3const client = new AgentMailClient({
4 apiKey: process.env.AGENTMAIL_API_KEY,
5});
6
7async function main() {
8 const socket = await client.websockets.connect();
9
10 // Handle events
11 socket.on("open", () => {
12 console.log("Connected");
13
14 // Subscribe to inboxes after connection is open
15 socket.sendSubscribe({
16 type: "subscribe",
17 inboxIds: ["agent@agentmail.to"],
18 });
19 });
20
21 socket.on("message", (event: AgentMail.Subscribed | AgentMail.MessageReceivedEvent) => {
22 if (event.type === "subscribed") {
23 console.log("Subscribed to:", event.inboxIds);
24 } else if (event.type === "message_received") {
25 console.log("New email from:", event.message.from_);
26 console.log("Subject:", event.message.subject);
27 }
28 });
29
30 socket.on("close", (event) => {
31 console.log("Disconnected:", event.code, event.reason);
32 });
33
34 socket.on("error", (error) => {
35 console.error("Error:", error);
36 });
37}
38
39main();

React/Next.js Usage

Using the SDK with React:

1import { useEffect, useState } from "react";
2import { AgentMailClient, AgentMail } from "agentmail";
3
4function useAgentMailWebSocket(apiKey: string, inboxIds: string[]) {
5 const [lastMessage, setLastMessage] = useState<AgentMail.MessageReceivedEvent | null>(null);
6 const [isConnected, setIsConnected] = useState(false);
7
8 useEffect(() => {
9 const client = new AgentMailClient({ apiKey });
10
11 let socket: Awaited<ReturnType<typeof client.websockets.connect>>;
12
13 async function connect() {
14 socket = await client.websockets.connect();
15
16 socket.on("open", () => {
17 setIsConnected(true);
18 socket.sendSubscribe({
19 type: "subscribe",
20 inboxIds,
21 });
22 });
23
24 socket.on("message", (event) => {
25 if (event.type === "message_received") {
26 setLastMessage(event);
27 }
28 });
29
30 socket.on("close", () => setIsConnected(false));
31 }
32
33 connect();
34 return () => socket?.close();
35 }, [apiKey, inboxIds.join(",")]);
36
37 return { lastMessage, isConnected };
38}

Subscribe Options

When subscribing to events, you can filter by inbox, pod, or event type:

Python:

1from agentmail import Subscribe
2
3# Subscribe to specific inboxes
4Subscribe(inbox_ids=["inbox1@agentmail.to", "inbox2@agentmail.to"])
5
6# Subscribe to pods
7Subscribe(pod_ids=["pod-id-1", "pod-id-2"])
8
9# Subscribe to specific event types
10Subscribe(
11 inbox_ids=["agent@agentmail.to"],
12 event_types=["message.received", "message.sent"]
13)

TypeScript:

1// Subscribe to specific inboxes
2socket.sendSubscribe({
3 type: "subscribe",
4 inboxIds: ["inbox1@agentmail.to", "inbox2@agentmail.to"],
5});
6
7// Subscribe to pods
8socket.sendSubscribe({
9 type: "subscribe",
10 podIds: ["pod-id-1", "pod-id-2"],
11});
12
13// Subscribe to specific event types
14socket.sendSubscribe({
15 type: "subscribe",
16 inboxIds: ["agent@agentmail.to"],
17 eventTypes: ["message.received", "message.sent"],
18});

Event Types

Connection Events

EventPythonTypeScriptDescription
subscribedSubscribedAgentMail.SubscribedSubscription confirmed

Message Events

EventPythonTypeScriptDescription
message_receivedMessageReceivedEventAgentMail.MessageReceivedEventNew email received
message_sentMessageSentEventAgentMail.MessageSentEventEmail was sent
message_deliveredMessageDeliveredEventAgentMail.MessageDeliveredEventEmail was delivered
message_bouncedMessageBouncedEventAgentMail.MessageBouncedEventEmail bounced
message_complainedMessageComplainedEventAgentMail.MessageComplainedEventEmail marked as spam
message_rejectedMessageRejectedEventAgentMail.MessageRejectedEventEmail was rejected

Domain Events

EventPythonTypeScriptDescription
domain_verifiedDomainVerifiedEventAgentMail.DomainVerifiedEventDomain verification completed

Message Properties

The event.message object contains:

PythonTypeScriptDescription
inbox_idinboxIdInbox that received the email
message_idmessageIdUnique message ID
thread_idthreadIdConversation thread ID
from_from_Sender email address
totoRecipients list
subjectsubjectSubject line
texttextPlain text body
htmlhtmlHTML body (if present)
attachmentsattachmentsList of attachments

Error Handling

Python:

1from agentmail import AsyncAgentMail, Subscribe, MessageReceivedEvent
2from agentmail.core.api_error import ApiError
3
4client = AsyncAgentMail(api_key="YOUR_API_KEY")
5
6async def main():
7 try:
8 async with client.websockets.connect() as socket:
9 await socket.send_subscribe(Subscribe(inbox_ids=["agent@agentmail.to"]))
10
11 async for event in socket:
12 if isinstance(event, MessageReceivedEvent):
13 await process_email(event.message)
14
15 except ApiError as e:
16 print(f"API error: {e.status_code} - {e.body}")
17 except Exception as e:
18 print(f"Connection error: {e}")

TypeScript:

1import { AgentMailClient, AgentMail, AgentMailError } from "agentmail";
2
3const client = new AgentMailClient({
4 apiKey: process.env.AGENTMAIL_API_KEY,
5});
6
7async function main() {
8 try {
9 const socket = await client.websockets.connect();
10
11 socket.on("open", () => {
12 socket.sendSubscribe({
13 type: "subscribe",
14 inboxIds: ["agent@agentmail.to"],
15 });
16 });
17
18 socket.on("message", (event: AgentMail.MessageReceivedEvent) => {
19 if (event.type === "message_received") {
20 processEmail(event.message);
21 }
22 });
23
24 socket.on("error", (error) => {
25 console.error("WebSocket error:", error);
26 });
27
28 socket.on("close", (event) => {
29 console.log("Disconnected:", event.code, event.reason);
30 });
31 } catch (err) {
32 if (err instanceof AgentMailError) {
33 console.error(`API error: ${err.statusCode} - ${err.message}`);
34 } else {
35 console.error("Connection error:", err);
36 }
37 }
38}
39
40main();