Implementing SSE in MCP Servers
This guide explains how Server-Sent Events (SSE) are implemented in MCP servers and how to build clients that can consume these events.
Server-Side Implementation
MCP servers implement SSE endpoints following the EventSource specification. The implementation uses HTTP responses with specific headers and a text-based message format.
Response Headers
MCP server SSE endpoints return the following HTTP headers:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: no
The X-Accel-Buffering: no
header is particularly important for proxy servers like Nginx, ensuring that the response is not buffered and events are sent in real-time.
Message Format
SSE messages follow this format:
event: <event-type>
id: <event-id>
data: <JSON payload>
Note the blank line at the end, which is required by the SSE specification to denote the end of a message.
Example Server Implementation
Here's a simplified example of a Node.js implementation in an MCP server:
// Express.js route handler for SSE
app.get('/api/servers/:id/events', authenticate, async (req, res) => {
const serverId = req.params.id;
// Set headers for SSE
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no'
});
// Send a comment to initialize the connection
res.write(':\n\n');
// Function to send events
const sendEvent = (eventType, data, id = Date.now()) => {
res.write(`event: ${eventType}\n`);
res.write(`id: ${id}\n`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
};
// Send initial status event
const server = await getServerStatus(serverId);
sendEvent('status', server);
// Set up event listeners for this server
const metricsInterval = setInterval(async () => {
const metrics = await getServerMetrics(serverId);
sendEvent('metrics', metrics);
}, 5000);
// Log event listener
const logListener = (log) => {
if (log.serverId === serverId) {
sendEvent('log', log);
}
};
// Register log listener with the event emitter
eventEmitter.on('serverLog', logListener);
// Handle client disconnect
req.on('close', () => {
clearInterval(metricsInterval);
eventEmitter.off('serverLog', logListener);
console.log('SSE connection closed');
});
});
Client-Side Implementation
There are multiple ways to consume SSE events from MCP servers depending on your platform and requirements.
Browser JavaScript
Modern browsers provide the EventSource
API for consuming SSE streams:
// Create an EventSource with authentication
const eventSource = new EventSource('https://api.mcp-cloud.ai/api/servers/123/events', {
headers: {
'Authorization': 'Bearer YOUR_API_TOKEN'
}
});
// Listen for specific event types
eventSource.addEventListener('metrics', (event) => {
const metrics = JSON.parse(event.data);
console.log('Server metrics:', metrics);
updateDashboard(metrics);
});
eventSource.addEventListener('log', (event) => {
const log = JSON.parse(event.data);
console.log('Server log:', log);
appendToLogViewer(log);
});
eventSource.addEventListener('status', (event) => {
const status = JSON.parse(event.data);
console.log('Server status:', status);
updateStatusIndicator(status);
});
// Handle connection errors
eventSource.onerror = (error) => {
console.error('SSE connection error:', error);
// Implement reconnection strategy if needed
};
// Close the connection when no longer needed
function closeConnection() {
eventSource.close();
}
Node.js Client
For Node.js applications, you can use the eventsource
package:
const EventSource = require('eventsource');
// Create options with headers for authentication
const options = {
headers: {
'Authorization': 'Bearer YOUR_API_TOKEN'
}
};
// Create EventSource instance
const eventSource = new EventSource('https://api.mcp-cloud.ai/api/servers/123/events', options);
// Listen for events
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log(`Received data: ${data}`);
};
// Listen for specific event types
eventSource.addEventListener('metrics', (event) => {
const metrics = JSON.parse(event.data);
console.log('Server metrics:', metrics);
});
// Handle errors
eventSource.onerror = (error) => {
console.error('SSE Error:', error);
eventSource.close();
};
// Close the connection when done
function cleanup() {
eventSource.close();
}
Python Client
For Python applications, you can use the sseclient
or aiohttp
libraries:
import json
import sseclient
import requests
# Create a session with authentication
session = requests.Session()
session.headers.update({
'Authorization': 'Bearer YOUR_API_TOKEN'
})
# Connect to SSE endpoint
response = session.get('https://api.mcp-cloud.ai/api/servers/123/events', stream=True)
client = sseclient.SSEClient(response)
# Process events
for event in client:
if event.event == 'metrics':
metrics = json.loads(event.data)
print(f"Server metrics: {metrics}")
elif event.event == 'log':
log = json.loads(event.data)
print(f"Server log: {log}")
elif event.event == 'status':
status = json.loads(event.data)
print(f"Server status: {status}")
Advanced Usage
Reconnection Handling
The EventSource API automatically attempts to reconnect when the connection is lost, but you may want to implement custom reconnection logic:
let reconnectAttempts = 0;
const maxReconnectAttempts = 5;
const baseReconnectDelay = 1000; // 1 second
function connectSSE() {
const eventSource = new EventSource('https://api.mcp-cloud.ai/api/servers/123/events', {
headers: { 'Authorization': 'Bearer YOUR_API_TOKEN' }
});
eventSource.onopen = () => {
console.log('SSE connection established');
reconnectAttempts = 0;
};
eventSource.onerror = (error) => {
console.error('SSE connection error:', error);
eventSource.close();
if (reconnectAttempts < maxReconnectAttempts) {
reconnectAttempts++;
const delay = baseReconnectDelay * Math.pow(2, reconnectAttempts - 1);
console.log(`Reconnecting in ${delay}ms (attempt ${reconnectAttempts})`);
setTimeout(connectSSE, delay);
} else {
console.error('Maximum reconnection attempts reached');
}
};
return eventSource;
}
const eventSource = connectSSE();
Event Buffering and Resume
MCP servers support event resumption using event IDs. Clients can reconnect and specify the last event ID they received to continue from where they left off:
let lastEventId = localStorage.getItem('lastEventId') || '';
const eventSource = new EventSource(`https://api.mcp-cloud.ai/api/servers/123/events?lastEventId=${lastEventId}`, {
headers: { 'Authorization': 'Bearer YOUR_API_TOKEN' }
});
eventSource.onmessage = (event) => {
// Store the event ID for resumption
if (event.lastEventId) {
lastEventId = event.lastEventId;
localStorage.setItem('lastEventId', lastEventId);
}
// Process the event data
const data = JSON.parse(event.data);
console.log('Event data:', data);
};
Filtering Events
You can filter SSE events on the server side by adding query parameters:
const eventTypes = ['metrics', 'status'];
const url = `https://api.mcp-cloud.ai/api/servers/123/events?eventTypes=${eventTypes.join(',')}`;
const eventSource = new EventSource(url, {
headers: { 'Authorization': 'Bearer YOUR_API_TOKEN' }
});
Best Practices
- Always implement error handling - SSE connections can fail for various reasons
- Use exponential backoff for reconnection - Prevents overwhelming the server on repeated failures
- Store and use event IDs - Allows resumption of event streams after disconnections
- Close connections when not needed - Conserves server resources
- Filter events on the server side - Reduces bandwidth and processing requirements
- Implement timeout handling - Handle cases where events stop flowing without connection errors
Troubleshooting
Common Issues
- Connection refused - Check server status and network connectivity
- Authentication errors - Verify API token validity and permissions
- Proxy server buffering - Ensure proxy servers are configured correctly for SSE
- Memory leaks - Ensure proper cleanup of event listeners in client code
- Missing events - Implement proper event ID tracking and resumption
Testing SSE Connections
You can test SSE connections using command-line tools like curl
:
curl -N -H "Authorization: Bearer YOUR_API_TOKEN" https://api.mcp-cloud.ai/api/servers/123/events
For more interactive testing, browser tools like SSE Tester or Postman can be useful.