WebSockets
Real-time, low-latency email event streaming
WebSockets provide a persistent, bidirectional connection to AgentMail for receiving email events in real-time. Unlike webhooks, WebSockets don’t require a public URL or external tools like ngrok.
Why Use WebSockets?
| Feature | Webhook | WebSocket |
|---|---|---|
| Setup | Requires public URL + ngrok | No external tools needed |
| Connection | HTTP request per event | Persistent connection |
| Direction | AgentMail → Your server | Bidirectional |
| Firewall | Must expose port | Outbound only |
| Latency | HTTP round-trip | Instant streaming |
Python SDK
The Python SDK provides both synchronous and asynchronous WebSocket clients.
Async Usage
1 import asyncio 2 from agentmail import AsyncAgentMail, Subscribe, Subscribed, MessageReceivedEvent 3 4 client = AsyncAgentMail(api_key="YOUR_API_KEY") 5 6 async def main(): 7 async with client.websockets.connect() as socket: 8 # Subscribe to inboxes 9 await socket.send_subscribe(Subscribe(inbox_ids=["agent@agentmail.to"])) 10 11 # Process events as they arrive 12 async for event in socket: 13 if isinstance(event, Subscribed): 14 print(f"Subscribed to: {event.inbox_ids}") 15 elif isinstance(event, MessageReceivedEvent): 16 print(f"New email from: {event.message.from_}") 17 print(f"Subject: {event.message.subject}") 18 19 asyncio.run(main())
Sync Usage
1 from agentmail import AgentMail, Subscribe, Subscribed, MessageReceivedEvent 2 3 client = AgentMail(api_key="YOUR_API_KEY") 4 5 with client.websockets.connect() as socket: 6 # Subscribe to inboxes 7 socket.send_subscribe(Subscribe(inbox_ids=["agent@agentmail.to"])) 8 9 # Process events as they arrive 10 for event in socket: 11 if isinstance(event, Subscribed): 12 print(f"Subscribed to: {event.inbox_ids}") 13 elif isinstance(event, MessageReceivedEvent): 14 print(f"New email from: {event.message.from_}") 15 print(f"Subject: {event.message.subject}")
Event Handler Pattern
You can also use event handlers instead of iterating:
1 import asyncio 2 from agentmail import AsyncAgentMail, Subscribe, EventType 3 4 client = AsyncAgentMail(api_key="YOUR_API_KEY") 5 6 async def main(): 7 async with client.websockets.connect() as socket: 8 # Register event handlers 9 socket.on(EventType.OPEN, lambda _: print("Connected")) 10 socket.on(EventType.MESSAGE, lambda msg: print("Received:", msg)) 11 socket.on(EventType.CLOSE, lambda _: print("Disconnected")) 12 socket.on(EventType.ERROR, lambda err: print("Error:", err)) 13 14 # Subscribe and start listening 15 await socket.send_subscribe(Subscribe(inbox_ids=["agent@agentmail.to"])) 16 await socket.start_listening() 17 18 asyncio.run(main())
For sync usage with event handlers, run the listener in a background thread:
1 import threading 2 from agentmail import AgentMail, Subscribe, EventType 3 4 client = AgentMail(api_key="YOUR_API_KEY") 5 6 with client.websockets.connect() as socket: 7 socket.on(EventType.OPEN, lambda _: print("Connected")) 8 socket.on(EventType.MESSAGE, lambda msg: print("Received:", msg)) 9 socket.on(EventType.CLOSE, lambda _: print("Disconnected")) 10 socket.on(EventType.ERROR, lambda err: print("Error:", err)) 11 12 socket.send_subscribe(Subscribe(inbox_ids=["agent@agentmail.to"])) 13 14 # Start listening in background thread 15 listener = threading.Thread(target=socket.start_listening, daemon=True) 16 listener.start() 17 listener.join()
TypeScript SDK
The TypeScript SDK provides a WebSocket client with automatic reconnection.
Basic Usage
1 import { AgentMailClient, AgentMail } from "agentmail"; 2 3 const client = new AgentMailClient({ 4 apiKey: process.env.AGENTMAIL_API_KEY, 5 }); 6 7 async function main() { 8 const socket = await client.websockets.connect(); 9 10 // Handle events 11 socket.on("open", () => { 12 console.log("Connected"); 13 14 // Subscribe to inboxes after connection is open 15 socket.sendSubscribe({ 16 type: "subscribe", 17 inboxIds: ["agent@agentmail.to"], 18 }); 19 }); 20 21 socket.on("message", (event: AgentMail.Subscribed | AgentMail.MessageReceivedEvent) => { 22 if (event.type === "subscribed") { 23 console.log("Subscribed to:", event.inboxIds); 24 } else if (event.type === "message_received") { 25 console.log("New email from:", event.message.from_); 26 console.log("Subject:", event.message.subject); 27 } 28 }); 29 30 socket.on("close", (event) => { 31 console.log("Disconnected:", event.code, event.reason); 32 }); 33 34 socket.on("error", (error) => { 35 console.error("Error:", error); 36 }); 37 } 38 39 main();
React/Next.js Usage
Using the SDK with React:
1 import { useEffect, useState } from "react"; 2 import { AgentMailClient, AgentMail } from "agentmail"; 3 4 function useAgentMailWebSocket(apiKey: string, inboxIds: string[]) { 5 const [lastMessage, setLastMessage] = useState<AgentMail.MessageReceivedEvent | null>(null); 6 const [isConnected, setIsConnected] = useState(false); 7 8 useEffect(() => { 9 const client = new AgentMailClient({ apiKey }); 10 11 let socket: Awaited<ReturnType<typeof client.websockets.connect>>; 12 13 async function connect() { 14 socket = await client.websockets.connect(); 15 16 socket.on("open", () => { 17 setIsConnected(true); 18 socket.sendSubscribe({ 19 type: "subscribe", 20 inboxIds, 21 }); 22 }); 23 24 socket.on("message", (event) => { 25 if (event.type === "message_received") { 26 setLastMessage(event); 27 } 28 }); 29 30 socket.on("close", () => setIsConnected(false)); 31 } 32 33 connect(); 34 return () => socket?.close(); 35 }, [apiKey, inboxIds.join(",")]); 36 37 return { lastMessage, isConnected }; 38 }
Subscribe Options
When subscribing to events, you can filter by inbox, pod, or event type:
Python:
1 from agentmail import Subscribe 2 3 # Subscribe to specific inboxes 4 Subscribe(inbox_ids=["inbox1@agentmail.to", "inbox2@agentmail.to"]) 5 6 # Subscribe to pods 7 Subscribe(pod_ids=["pod-id-1", "pod-id-2"]) 8 9 # Subscribe to specific event types 10 Subscribe( 11 inbox_ids=["agent@agentmail.to"], 12 event_types=["message.received", "message.sent"] 13 )
TypeScript:
1 // Subscribe to specific inboxes 2 socket.sendSubscribe({ 3 type: "subscribe", 4 inboxIds: ["inbox1@agentmail.to", "inbox2@agentmail.to"], 5 }); 6 7 // Subscribe to pods 8 socket.sendSubscribe({ 9 type: "subscribe", 10 podIds: ["pod-id-1", "pod-id-2"], 11 }); 12 13 // Subscribe to specific event types 14 socket.sendSubscribe({ 15 type: "subscribe", 16 inboxIds: ["agent@agentmail.to"], 17 eventTypes: ["message.received", "message.sent"], 18 });
Event Types
Connection Events
| Event | Python | TypeScript | Description |
|---|---|---|---|
subscribed | Subscribed | AgentMail.Subscribed | Subscription confirmed |
Message Events
| Event | Python | TypeScript | Description |
|---|---|---|---|
message_received | MessageReceivedEvent | AgentMail.MessageReceivedEvent | New email received |
message_sent | MessageSentEvent | AgentMail.MessageSentEvent | Email was sent |
message_delivered | MessageDeliveredEvent | AgentMail.MessageDeliveredEvent | Email was delivered |
message_bounced | MessageBouncedEvent | AgentMail.MessageBouncedEvent | Email bounced |
message_complained | MessageComplainedEvent | AgentMail.MessageComplainedEvent | Email marked as spam |
message_rejected | MessageRejectedEvent | AgentMail.MessageRejectedEvent | Email was rejected |
Domain Events
| Event | Python | TypeScript | Description |
|---|---|---|---|
domain_verified | DomainVerifiedEvent | AgentMail.DomainVerifiedEvent | Domain verification completed |
Message Properties
The event.message object contains:
| Python | TypeScript | Description |
|---|---|---|
inbox_id | inboxId | Inbox that received the email |
message_id | messageId | Unique message ID |
thread_id | threadId | Conversation thread ID |
from_ | from_ | Sender email address |
to | to | Recipients list |
subject | subject | Subject line |
text | text | Plain text body |
html | html | HTML body (if present) |
attachments | attachments | List of attachments |
Error Handling
Python:
1 from agentmail import AsyncAgentMail, Subscribe, MessageReceivedEvent 2 from agentmail.core.api_error import ApiError 3 4 client = AsyncAgentMail(api_key="YOUR_API_KEY") 5 6 async def main(): 7 try: 8 async with client.websockets.connect() as socket: 9 await socket.send_subscribe(Subscribe(inbox_ids=["agent@agentmail.to"])) 10 11 async for event in socket: 12 if isinstance(event, MessageReceivedEvent): 13 await process_email(event.message) 14 15 except ApiError as e: 16 print(f"API error: {e.status_code} - {e.body}") 17 except Exception as e: 18 print(f"Connection error: {e}")
TypeScript:
1 import { AgentMailClient, AgentMail, AgentMailError } from "agentmail"; 2 3 const client = new AgentMailClient({ 4 apiKey: process.env.AGENTMAIL_API_KEY, 5 }); 6 7 async function main() { 8 try { 9 const socket = await client.websockets.connect(); 10 11 socket.on("open", () => { 12 socket.sendSubscribe({ 13 type: "subscribe", 14 inboxIds: ["agent@agentmail.to"], 15 }); 16 }); 17 18 socket.on("message", (event: AgentMail.MessageReceivedEvent) => { 19 if (event.type === "message_received") { 20 processEmail(event.message); 21 } 22 }); 23 24 socket.on("error", (error) => { 25 console.error("WebSocket error:", error); 26 }); 27 28 socket.on("close", (event) => { 29 console.log("Disconnected:", event.code, event.reason); 30 }); 31 } catch (err) { 32 if (err instanceof AgentMailError) { 33 console.error(`API error: ${err.statusCode} - ${err.message}`); 34 } else { 35 console.error("Connection error:", err); 36 } 37 } 38 } 39 40 main();
