Sales Agent with WebSocket

Overview

Learn how to build a real-time sales agent that processes emails instantly using WebSocket connections. Unlike webhook-based agents that require ngrok and public URLs, this WebSocket approach connects directly to AgentMail for true real-time processing with minimal setup.

This agent demonstrates a practical sales workflow: a manager delegates customer outreach to the agent, which then handles the entire conversation autonomously while keeping the manager informed of key signals.

What You’ll Build

By the end of this guide, you’ll have a working sales agent that:

  1. Connects via WebSocket for instant, real-time email processing
  2. Handles manager emails by extracting customer info and sending personalized outreach
  3. Processes customer replies with AI-powered, context-aware responses
  4. Notifies the manager when customers show strong buying signals

Here’s the workflow:

Manager sends email with customer info
Agent extracts customer email
Agent generates AI sales pitch → Sends to customer
Agent confirms to manager
[Customer replies]
Agent detects intent + generates AI response
If interested → Notifies manager

WebSocket vs Webhook: Why WebSocket?

FeatureWebhook ApproachWebSocket Approach
SetupRequires ngrok + public URLNo external tools needed
ArchitectureFlask server + HTTPPure async Python
LatencyHTTP round-tripInstant streaming
FirewallMust expose portOutbound only

For more details, see the WebSocket API Reference and the Python SDK WebSocket documentation.

Prerequisites

Before you begin, make sure you have:

Required:

Project Setup

Step 1: Create Project Directory

Create a new directory for your agent:

$mkdir sales-agent-websocket
$cd sales-agent-websocket

Step 2: Create the Agent Code

Create a file named main.py and paste the following code:

1"""
2Sales Agent using AgentMail WebSocket
3
4This is a simple example showing how to:
5- Connect to AgentMail via WebSocket for real-time email processing
6- Use OpenAI to handle sales conversations
7- Send emails to customers and respond to replies
8"""
9
10import asyncio
11import os
12import re
13from dotenv import load_dotenv
14from agentmail import AsyncAgentMail, Subscribe, Subscribed, MessageReceivedEvent
15from openai import AsyncOpenAI
16
17# Load environment variables
18load_dotenv()
19
20# Initialize clients
21agentmail = AsyncAgentMail(api_key=os.getenv("AGENTMAIL_API_KEY"))
22openai = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
23
24# Simple conversation history (thread_id -> messages)
25conversations = {}
26
27# Store manager email for notifications
28manager_email = None
29
30
31def extract_email(from_field):
32 """Extract email address from 'Name <email@example.com>' format"""
33 match = re.search(r'<(.+?)>', from_field)
34 return match.group(1) if match else from_field
35
36
37def is_from_manager(email_body):
38 """Simple check if email is from sales manager (contains customer info)"""
39 keywords = ['customer', 'lead', 'contact', 'reach out', 'email']
40 return any(keyword in email_body.lower() for keyword in keywords)
41
42
43def extract_customer_info(email_body):
44 """Extract customer email from manager's message"""
45 email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
46 emails = re.findall(email_pattern, email_body)
47
48 # Return the first email found (should be the customer's email in the message body)
49 if emails:
50 return emails[0]
51 return None
52
53
54async def get_ai_response(messages, system_prompt):
55 """Get response from OpenAI"""
56 try:
57 response = await openai.chat.completions.create(
58 model="gpt-4o-mini",
59 messages=[
60 {"role": "system", "content": system_prompt},
61 *messages
62 ],
63 temperature=0.7,
64 )
65 return response.choices[0].message.content
66 except Exception as e:
67 print(f"Error getting AI response: {e}")
68 return "I apologize, but I encountered an error. Please try again."
69
70
71async def send_email(inbox_id, to_email, subject, body):
72 """Send a new email"""
73 try:
74 await agentmail.inboxes.messages.send(
75 inbox_id=inbox_id,
76 to=[to_email],
77 subject=subject,
78 text=body
79 )
80 print(f"✓ Sent email to {to_email}")
81 except Exception as e:
82 print(f"Error sending email: {e}")
83
84
85async def reply_to_email(inbox_id, message_id, to_email, body):
86 """Reply to an email"""
87 try:
88 await agentmail.inboxes.messages.reply(
89 inbox_id=inbox_id,
90 message_id=message_id,
91 to=[to_email], # Required parameter for replies
92 text=body
93 )
94 print(f"✓ Sent reply to {to_email}")
95 except Exception as e:
96 print(f"Error replying: {e}")
97
98
99async def handle_manager_email(inbox_id, message_id, from_email, subject, body):
100 """Handle email from sales manager - extract customer and send sales pitch"""
101 global manager_email
102 manager_email = from_email # Remember manager for future notifications
103
104 print(f"\n📧 Email from MANAGER: {from_email}")
105
106 # Extract customer email
107 customer_email = extract_customer_info(body)
108 print(f"→ Extracted customer email: {customer_email}")
109
110 if not customer_email:
111 await reply_to_email(
112 inbox_id,
113 message_id,
114 from_email, # Reply back to the manager
115 "I couldn't find a customer email address. Please include it in your message."
116 )
117 return
118
119 # Generate sales pitch using AI
120 system_prompt = """You are a helpful sales agent. Generate a brief, professional sales email
121 based on the manager's request. Keep it under 150 words. Be friendly and professional."""
122
123 messages = [{"role": "user", "content": f"Create a sales email based on this: {body}"}]
124 sales_pitch = await get_ai_response(messages, system_prompt)
125
126 # Send email to customer
127 await send_email(
128 inbox_id,
129 customer_email,
130 f"Introduction: {subject}" if subject else "Quick Introduction",
131 sales_pitch
132 )
133
134 # Confirm to manager
135 await reply_to_email(
136 inbox_id,
137 message_id,
138 from_email, # Reply back to the manager
139 f"✓ I've sent an introduction email to {customer_email}.\n\nHere's what I sent:\n\n{sales_pitch}"
140 )
141
142
143async def handle_customer_email(inbox_id, message_id, thread_id, from_email, subject, body):
144 """Handle email from customer - track conversation, detect intent, and notify manager"""
145 print(f"\n📧 Email from CUSTOMER: {from_email}")
146
147 # Track conversation history
148 if thread_id not in conversations:
149 conversations[thread_id] = []
150 conversations[thread_id].append({"role": "user", "content": body})
151
152 # Detect customer intent
153 intent_keywords = {
154 'interested': ['interested', 'demo', 'meeting', 'tell me more', 'sounds good'],
155 'not_interested': ['not interested', 'no thank', 'not right now', 'maybe later'],
156 'question': ['?', 'how', 'what', 'when', 'why', 'can you']
157 }
158
159 body_lower = body.lower()
160 intent = 'question' # default
161 for key, keywords in intent_keywords.items():
162 if any(keyword in body_lower for keyword in keywords):
163 intent = key
164 break
165
166 # Generate AI response
167 system_prompt = """You are a helpful sales agent. Answer customer questions professionally
168 and helpfully. Keep responses brief (under 100 words). Be friendly but professional."""
169
170 response = await get_ai_response(conversations[thread_id], system_prompt)
171
172 # Reply to customer
173 await reply_to_email(inbox_id, message_id, from_email, response)
174
175 # Notify manager if strong intent signal
176 if manager_email and intent in ['interested', 'not_interested']:
177 status = "showing interest" if intent == 'interested' else "not interested at this time"
178 await send_email(
179 inbox_id,
180 manager_email,
181 f"Update: {from_email}",
182 f"Customer {from_email} is {status}.\n\nTheir message:\n{body}\n\nMy response:\n{response}"
183 )
184 print(f"→ Notified manager about customer's {intent}")
185
186 # Update conversation history
187 conversations[thread_id].append({"role": "assistant", "content": response})
188
189
190async def handle_new_email(message):
191 """Process incoming email from WebSocket"""
192 try:
193 # Extract message data using object attributes
194 inbox_id = message.inbox_id
195 message_id = message.message_id
196 thread_id = message.thread_id
197 from_field = message.from_ or "" # SDK uses from_
198 from_email = extract_email(from_field)
199 subject = message.subject or ""
200 body = message.text or "" # SDK uses text for the body
201
202 print(f"\n{'='*60}")
203 print(f"New email from: {from_email}")
204 print(f"Subject: {subject}")
205 print(f"{'='*60}")
206
207 # Determine if from manager or customer
208 if is_from_manager(body):
209 await handle_manager_email(inbox_id, message_id, from_email, subject, body)
210 else:
211 await handle_customer_email(inbox_id, message_id, thread_id, from_email, subject, body)
212
213 except Exception as e:
214 print(f"Error handling email: {e}")
215
216
217async def main():
218 """Main WebSocket loop"""
219 inbox_username = os.getenv("INBOX_USERNAME", "sales-agent")
220 inbox_id = f"{inbox_username}@agentmail.to"
221
222 print(f"\nSales Agent starting...")
223 print(f"Inbox: {inbox_id}")
224 print(f"✓ Connecting to AgentMail WebSocket...")
225
226 # Connect to WebSocket
227 try:
228 async with agentmail.websockets.connect() as socket:
229 print(f"✓ Connected! Listening for emails...\n")
230
231 # Subscribe to inbox
232 await socket.send_subscribe(Subscribe(inbox_ids=[inbox_id]))
233
234 # Listen for events
235 async for event in socket:
236 if isinstance(event, Subscribed):
237 print(f"✓ Subscribed to: {event.inbox_ids}\n")
238
239 elif isinstance(event, MessageReceivedEvent):
240 print(f"📨 New email received!")
241 await handle_new_email(event.message)
242
243 except (KeyboardInterrupt, asyncio.CancelledError):
244 print("\n\nShutting down gracefully...")
245 except Exception as e:
246 print(f"\nError: {e}")
247
248
249def run():
250 """Run the main function"""
251 try:
252 asyncio.run(main())
253 except KeyboardInterrupt:
254 print("\n✓ Shutdown complete")
255
256
257if __name__ == "__main__":
258 run()

Step 3: Create Requirements File

Create a file named requirements.txt:

agentmail>=0.0.19
openai>=1.0.0
python-dotenv>=1.0.0

Step 4: Install Dependencies

Install the required Python packages:

$pip install -r requirements.txt
$# or with pyproject.toml
$pip install .

Step 5: Configure Environment Variables

Create a .env file with your credentials:

1# AgentMail Configuration
2AGENTMAIL_API_KEY=your_agentmail_api_key_here
3
4# OpenAI Configuration
5OPENAI_API_KEY=your_openai_api_key_here
6
7# Inbox Settings
8INBOX_USERNAME=sales-agent

Note: Unlike webhook-based agents, you don’t need ngrok or a public URL. The WebSocket connection is outbound only, so it works behind firewalls without any port forwarding.

Code Walkthrough

Let’s understand how the agent works by breaking down the key components.

Architecture Overview

┌─────────────────────────────────────────────────────────────┐
│ Your Python Script │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌─────────────┐ │
│ │ AsyncAgent │────▶│ WebSocket │────▶│ Event │ │
│ │ Mail Client │ │ Connection │ │ Handler │ │
│ └──────────────┘ └──────────────┘ └─────────────┘ │
│ │ ▲ │ │
│ │ │ ▼ │
│ │ Real-time ┌─────────────┐ │
│ │ Events │ OpenAI │ │
│ │ │ Integration │ │
│ ▼ └─────────────┘ │
│ ┌──────────────┐ │
│ │ Send/Reply │◀──────────────────────────────────────────│
│ │ Emails │ │
│ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
┌──────────────────┐
│ AgentMail │
│ Cloud │
└──────────────────┘

1. WebSocket Connection Setup

The core of this agent is the WebSocket connection:

1from agentmail import AsyncAgentMail, Subscribe, Subscribed, MessageReceivedEvent
2
3# Initialize the async client
4agentmail = AsyncAgentMail(api_key=os.getenv("AGENTMAIL_API_KEY"))
5
6# Connect and subscribe
7async with agentmail.websockets.connect() as socket:
8 await socket.send_subscribe(Subscribe(inbox_ids=[inbox_id]))

Key points:

  • AsyncAgentMail is the async version of the client
  • agentmail.websockets.connect() creates the WebSocket connection
  • Subscribe specifies which inboxes to monitor

2. Event Handling Loop

The agent uses an async iterator pattern to process events:

1async for event in socket:
2 if isinstance(event, Subscribed):
3 print(f"✓ Subscribed to: {event.inbox_ids}")
4
5 elif isinstance(event, MessageReceivedEvent):
6 await handle_new_email(event.message)

Event types:

  • Subscribed - Confirmation that subscription was successful
  • MessageReceivedEvent - A new email arrived in the inbox

3. Email Processing Flow

The agent routes emails based on content:

1async def handle_new_email(message):
2 # Extract fields from the message object
3 inbox_id = message.inbox_id
4 message_id = message.message_id
5 thread_id = message.thread_id
6 from_field = message.from_ or "" # Note: SDK uses from_
7 from_email = extract_email(from_field)
8 subject = message.subject or ""
9 body = message.text or ""
10
11 # Route based on email content
12 if is_from_manager(body):
13 await handle_manager_email(inbox_id, message_id, from_email, subject, body)
14 else:
15 await handle_customer_email(inbox_id, message_id, thread_id, from_email, subject, body)

Email extraction helper:

1def extract_email(from_field):
2 """Extract email from 'Name <email@example.com>' format"""
3 match = re.search(r'<(.+?)>', from_field)
4 return match.group(1) if match else from_field

4. Manager Email Handler

When the manager sends an email with customer info:

1async def handle_manager_email(inbox_id, message_id, from_email, subject, body):
2 global manager_email
3 manager_email = from_email # Remember for notifications
4
5 # Extract customer email using regex
6 customer_email = extract_customer_info(body)
7
8 if not customer_email:
9 await reply_to_email(inbox_id, message_id, from_email,
10 "I couldn't find a customer email. Please include it.")
11 return
12
13 # Generate AI sales pitch
14 sales_pitch = await get_ai_response(
15 [{"role": "user", "content": f"Create a sales email based on: {body}"}],
16 "You are a helpful sales agent. Generate a brief, professional email..."
17 )
18
19 # Send to customer
20 await send_email(inbox_id, customer_email, f"Introduction: {subject}", sales_pitch)
21
22 # Confirm to manager
23 await reply_to_email(inbox_id, message_id, from_email,
24 f"✓ Sent email to {customer_email}.\n\nContent:\n{sales_pitch}")

5. Customer Email Handler with Intent Detection

The agent tracks conversations and detects customer intent:

1async def handle_customer_email(inbox_id, message_id, thread_id, from_email, subject, body):
2 # Track conversation history per thread
3 if thread_id not in conversations:
4 conversations[thread_id] = []
5 conversations[thread_id].append({"role": "user", "content": body})
6
7 # Detect intent with keyword matching
8 intent_keywords = {
9 'interested': ['interested', 'demo', 'meeting', 'tell me more'],
10 'not_interested': ['not interested', 'no thank', 'maybe later'],
11 'question': ['?', 'how', 'what', 'when', 'why']
12 }
13
14 intent = 'question' # default
15 for key, keywords in intent_keywords.items():
16 if any(kw in body.lower() for kw in keywords):
17 intent = key
18 break
19
20 # Generate contextual AI response using conversation history
21 response = await get_ai_response(conversations[thread_id], system_prompt)
22
23 # Reply to customer
24 await reply_to_email(inbox_id, message_id, from_email, response)
25
26 # Notify manager of strong signals
27 if manager_email and intent in ['interested', 'not_interested']:
28 await send_email(inbox_id, manager_email, f"Update: {from_email}",
29 f"Customer is {intent}.\n\nTheir message:\n{body}")

6. AI Response Generation

The agent uses OpenAI for generating responses:

1async def get_ai_response(messages, system_prompt):
2 try:
3 response = await openai.chat.completions.create(
4 model="gpt-4o-mini",
5 messages=[
6 {"role": "system", "content": system_prompt},
7 *messages # Include conversation history
8 ],
9 temperature=0.7,
10 )
11 return response.choices[0].message.content
12 except Exception as e:
13 print(f"Error: {e}")
14 return "I apologize, but I encountered an error."

Key points:

  • Includes full conversation history for context
  • Graceful error handling with fallback message

Running the Agent

Start the agent:

$python main.py

You should see output like this:

Sales Agent starting...
Inbox: sales-agent@agentmail.to
✓ Connecting to AgentMail WebSocket...
✓ Connected! Listening for emails...
✓ Subscribed to: ['sales-agent@agentmail.to']

Success! Your agent is now running and listening for emails in real-time.

Leave this terminal window open - closing it will stop the agent.

Testing Your Agent

Let’s verify everything works with some test scenarios.

Test Scenario: Manager Outreach Request

Send this email from your personal email:

To: sales-agent@agentmail.to
Subject: New lead - AI startup
Body: Please reach out to this customer: customer-email@gmail.com
They're interested in our API platform.

Expected console output:

============================================================
New email from: your-email@gmail.com
Subject: New lead - AI startup
============================================================
📧 Email from MANAGER: your-email@gmail.com
→ Extracted customer email: customer-email@gmail.com
✓ Sent email to customer-email@gmail.com
✓ Sent reply to your-email@gmail.com

You’ll receive: A confirmation email with the sales pitch that was sent.

It works! The agent processed emails in real-time, generated AI responses, and notified you about the interested customer.

Customization

Modifying Intent Detection

Update the intent_keywords dictionary in handle_customer_email():

1intent_keywords = {
2 'interested': ['interested', 'demo', 'meeting', 'pricing', 'sign up'],
3 'not_interested': ['not interested', 'unsubscribe', 'remove me'],
4 'question': ['?', 'how', 'what', 'when', 'can you'],
5 'urgent': ['urgent', 'asap', 'immediately'] # Add new intent
6}

Adding More Inbox Subscriptions

Subscribe to multiple inboxes:

1await socket.send_subscribe(Subscribe(inbox_ids=[
2 "sales-agent@agentmail.to",
3 "support-agent@agentmail.to",
4 "info@yourdomain.com"
5]))

Troubleshooting

Common Issues

Problem: Cannot connect to AgentMail WebSocket.

Solutions:

  1. Verify your API key is correct:
1client = AsyncAgentMail(api_key="your-key")
2print(await client.inboxes.list()) # Should succeed
  1. Check your internet connection and firewall settings
  2. Ensure you’re using agentmail>=0.0.19 which includes WebSocket support:
$pip show agentmail

Problem: Agent is running but not receiving emails.

Checklist:

  1. Verify the inbox exists:
1client = AsyncAgentMail()
2print(await client.inboxes.get("sales-agent@agentmail.to"))
  1. Check subscription confirmation in console output
  2. Send test email to the correct inbox address
  3. Verify the email isn’t being filtered as spam

Problem: Async-related exceptions.

Solutions:

  1. Ensure Python 3.11+ is installed:
$python --version
  1. Don’t mix sync and async code improperly
  2. Use asyncio.run(main()) as the entry point

If you build something cool with AgentMail, we’d love to hear about it. Share in our Discord community!