This guide walks you through five levels of Discord agent complexity, from a 20-line keyword responder to a fully autonomous production agent. Each level builds on the previous one, and every example is complete, runnable code.

The Five Levels

LevelNameLinesWhat It Does
1Reactive Bot~20Responds to keywords
2Context-Aware~35Fetches conversation history before responding
3Proactive Agent~60Creates threads, streams responses, detects intent
4Multi-Action~90Orchestrates multiple Discord actions per event
5Full Autonomous~120Production-ready with safety, error handling, graceful degradation

Level 1: Reactive Bot

The simplest possible agent. It listens for messages that mention the bot and responds based on keyword matching. Uses discli listen for events and discli message reply for responses.

import json
import subprocess
def reply(channel_id, message_id, text):
subprocess.run(
["discli", "message", "reply", channel_id, message_id, text],
capture_output=True,
)
proc = subprocess.Popen(
["discli", "--json", "listen", "--events", "messages"],
stdout=subprocess.PIPE,
text=True,
)
for line in proc.stdout:
event = json.loads(line.strip())
if not event.get("mentions_bot"):
continue
content = event["content"].lower()
if "help" in content:
reply(event["channel_id"], event["message_id"], "Hi! How can I help?")
elif "pricing" in content:
reply(event["channel_id"], event["message_id"], "Check example.com/pricing")
else:
reply(event["channel_id"], event["message_id"], "Thanks for reaching out!")

What makes this work

  • discli --json listen streams JSONL events to stdout, one per line
  • Each event includes mentions_bot, channel_id, message_id, and content
  • discli message reply is a fire-and-forget CLI call — no connection management
  • The for line in proc.stdout loop blocks until the next event arrives
Warning

What could go wrong: Every reply() call spawns a new discli process, which creates a new Discord connection, authenticates, sends the message, and disconnects. This takes 2-3 seconds per reply. Fine for low-traffic bots, but it will not scale.


Level 2: Context-Aware

The Level 1 bot responds to each message in isolation. A context-aware agent fetches recent conversation history before responding, so it understands what the user is actually asking about.

import json
import subprocess
def reply(channel_id, message_id, text):
subprocess.run(
["discli", "message", "reply", channel_id, message_id, text],
capture_output=True,
)
def get_context(channel_id, limit=5):
"""Fetch recent messages for conversation context."""
result = subprocess.run(
["discli", "--json", "message", "list", channel_id, "--limit", str(limit)],
capture_output=True,
text=True,
)
if result.returncode != 0:
return []
return json.loads(result.stdout)
def build_response(event, context):
"""Use conversation context to build a smarter response."""
content = event["content"].lower()
# Check if user already asked this question recently
recent_questions = [m["content"] for m in context if m["author_id"] == event["author_id"]]
if any("help" in q.lower() for q in recent_questions[:3]):
return "Looks like you've asked before -- let me connect you with a human. Stand by!"
if "help" in content:
return "Hi! How can I help? I can answer questions about pricing, setup, and billing."
if "pricing" in content:
return "Check example.com/pricing -- plans start at $10/mo."
return "Thanks for reaching out! Could you tell me more about what you need?"
proc = subprocess.Popen(
["discli", "--json", "listen", "--events", "messages"],
stdout=subprocess.PIPE,
text=True,
)
for line in proc.stdout:
event = json.loads(line.strip())
if not event.get("mentions_bot"):
continue
context = get_context(event["channel_id"])
response = build_response(event, context)
reply(event["channel_id"], event["message_id"], response)

What changed from Level 1

  • Added get_context() to fetch the last 5 messages before responding
  • The bot now detects repeated questions and changes its behavior
  • Response logic has access to conversation history, not just the current message
Warning

What could go wrong: Each response now requires two Discord connections — one for message list and one for message reply. If the channel is busy, the context fetch adds noticeable latency. The bot also has no memory between restarts.


Level 3: Proactive Agent

This is where the architecture shifts. Instead of fire-and-forget CLI calls, Level 3 uses discli serve for a persistent bidirectional connection. The bot can stream responses token-by-token, create threads, and act without being explicitly mentioned.

import json
import subprocess
import sys
import threading
def start_serve():
"""Start discli serve as a subprocess."""
return subprocess.Popen(
["discli", "serve", "--events", "messages"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True,
)
def send_action(proc, action):
"""Send a JSONL action to discli serve via stdin."""
proc.stdin.write(json.dumps(action) + "\n")
proc.stdin.flush()
def read_events(proc):
"""Read JSONL events from discli serve stdout."""
for line in proc.stdout:
line = line.strip()
if line:
yield json.loads(line)
def detect_intent(content):
"""Simple intent detection without explicit commands."""
content = content.lower()
if any(w in content for w in ["help", "issue", "problem", "broken", "error"]):
return "support"
if any(w in content for w in ["how do i", "how to", "tutorial", "guide"]):
return "question"
return None
proc = start_serve()
# Wait for ready event
for event in read_events(proc):
if event.get("event") == "ready":
print(f"Bot ready as {event['bot_name']}", file=sys.stderr)
break
# Main event loop
for event in read_events(proc):
if event.get("event") != "message":
continue
if event.get("is_bot"):
continue
intent = detect_intent(event["content"])
if intent is None:
continue
channel_id = event["channel_id"]
message_id = event["message_id"]
author = event["author"]
if intent == "support":
# Create a support thread for the user
send_action(proc, {
"action": "thread_create",
"channel_id": channel_id,
"message_id": message_id,
"name": f"Support: {author}",
"content": f"Hi {author}! I've created this thread to help you. What's going on?",
})
elif intent == "question":
# Stream a response token-by-token
send_action(proc, {"action": "typing_start", "channel_id": channel_id})
response = "Great question! Let me walk you through this step by step..."
# Start a stream
send_action(proc, {
"action": "stream_start",
"channel_id": channel_id,
"reply_to": message_id,
"req_id": "stream-1",
})
# In a real agent, you'd stream from an LLM here.
# For this example, we send the whole response as one chunk.
# Wait for stream_start response to get stream_id,
# then send chunks. See the Streaming guide for details.

What changed from Level 2

  • Persistent connection: discli serve maintains a single Discord WebSocket
  • Bidirectional JSONL: Send actions via stdin, receive events via stdout
  • Thread creation: Automatically creates support threads without being asked
  • Streaming: Can show responses being typed in real-time
  • Intent detection: Responds to patterns, not just direct mentions
Warning

What could go wrong: The read_events generator blocks the main thread. If you send an action that returns a response (like stream_start), you need to read from stdout to get the stream_id back. Without threading or async, this creates a deadlock risk. Level 4 addresses this.


Level 4: Multi-Action Agent

A production agent often needs to perform multiple Discord actions in response to a single event. Level 4 uses threading to handle stdin/stdout independently and orchestrates complex workflows like onboarding new members.

import json
import subprocess
import sys
import threading
import time
from collections import defaultdict
from queue import Queue
class DiscordAgent:
def __init__(self):
self.proc = subprocess.Popen(
["discli", "serve", "--events", "messages,members"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True,
)
self.responses = Queue()
self.events = Queue()
self.pending = {} # req_id -> Queue
self._req_counter = 0
# Start stdout reader thread
self._reader = threading.Thread(target=self._read_loop, daemon=True)
self._reader.start()
def _read_loop(self):
for line in self.proc.stdout:
line = line.strip()
if not line:
continue
data = json.loads(line)
req_id = data.get("req_id")
if req_id and req_id in self.pending:
self.pending[req_id].put(data)
else:
self.events.put(data)
def send(self, action, wait=False):
"""Send an action. If wait=True, block until the response arrives."""
self._req_counter += 1
req_id = f"req-{self._req_counter}"
action["req_id"] = req_id
if wait:
q = Queue()
self.pending[req_id] = q
self.proc.stdin.write(json.dumps(action) + "\n")
self.proc.stdin.flush()
if wait:
result = q.get(timeout=10)
del self.pending[req_id]
return result
return None
def next_event(self):
return self.events.get()
# Configuration
WELCOME_CHANNEL_ID = "1234567890" # #welcome channel
MODS_CHANNEL_ID = "9876543210" # #mod-log channel
NEWCOMER_ROLE_ID = "1111111111" # @Newcomer role
def handle_member_join(agent, event):
"""Full onboarding workflow for new members."""
member_id = event["member_id"]
member_name = event["member"]
server_id = event["server_id"]
# 1. Send a welcome DM
agent.send({
"action": "dm_send",
"user_id": member_id,
"content": (
f"Welcome to the server, {member_name}! 👋\n\n"
"Here's how to get started:\n"
"1. Introduce yourself in #introductions\n"
"2. Pick your roles in #roles\n"
"3. Ask questions in #help\n\n"
"Have fun!"
),
})
# 2. Assign the Newcomer role
agent.send({
"action": "role_assign",
"guild_id": server_id,
"member_id": member_id,
"role_id": NEWCOMER_ROLE_ID,
})
# 3. Create an introduction thread in #welcome
result = agent.send({
"action": "thread_create",
"channel_id": WELCOME_CHANNEL_ID,
"name": f"Welcome {member_name}!",
"content": f"Hey everyone, {member_name} just joined! Say hi! 🎉",
}, wait=True)
thread_id = result.get("thread_id")
# 4. Notify moderators
agent.send({
"action": "send",
"channel_id": MODS_CHANNEL_ID,
"content": (
f"📋 New member joined: **{member_name}** (ID: {member_id})\n"
f"Welcome thread: <#{thread_id}>" if thread_id else
f"📋 New member joined: **{member_name}** (ID: {member_id})"
),
})
print(f"[onboard] Completed onboarding for {member_name}", file=sys.stderr)
def handle_message(agent, event):
"""Handle incoming messages with multi-action responses."""
if event.get("is_bot"):
return
content = event["content"].lower()
channel_id = event["channel_id"]
message_id = event["message_id"]
if not event.get("mentions_bot"):
return
if "status" in content:
# React to acknowledge, then send a multi-part status report
agent.send({
"action": "reaction_add",
"channel_id": channel_id,
"message_id": message_id,
"emoji": "⏳",
})
# Gather data
servers = agent.send({"action": "server_list"}, wait=True)
server_count = len(servers.get("servers", []))
agent.send({
"action": "reply",
"channel_id": channel_id,
"message_id": message_id,
"content": f"All systems operational. Monitoring {server_count} server(s).",
})
# Replace the hourglass with a checkmark
agent.send({
"action": "reaction_remove",
"channel_id": channel_id,
"message_id": message_id,
"emoji": "⏳",
})
agent.send({
"action": "reaction_add",
"channel_id": channel_id,
"message_id": message_id,
"emoji": "✅",
})
# Main loop
agent = DiscordAgent()
# Wait for ready
while True:
event = agent.next_event()
if event.get("event") == "ready":
print(f"Bot ready as {event['bot_name']}", file=sys.stderr)
break
while True:
event = agent.next_event()
event_type = event.get("event")
if event_type == "member_join":
handle_member_join(agent, event)
elif event_type == "message":
handle_message(agent, event)

What changed from Level 3

  • DiscordAgent class: Encapsulates the serve subprocess with proper threading
  • Request/response correlation: Uses req_id to match responses to actions with wait=True
  • Multi-action workflows: handle_member_join performs 4 actions (DM, role assign, thread create, notification) for a single event
  • Non-blocking sends: Actions without wait=True fire-and-forget, keeping the event loop fast
Warning

What could go wrong: The q.get(timeout=10) in send(wait=True) can timeout if Discord is slow or rate-limited. DMs can fail if the user has DMs disabled. The hardcoded channel/role IDs break if the server configuration changes. Level 5 addresses all of these.


Level 5: Full Autonomous Agent

A production-ready agent with permission profiles for safety, structured error handling, rate limit awareness, audit logging, and graceful degradation when permissions are restricted.

import json
import subprocess
import sys
import threading
import time
from queue import Queue, Empty
class DiscordAgent:
"""Production Discord agent with safety and error handling."""
def __init__(self, profile="chat", slash_commands=None):
cmd = [
"discli", "serve",
"--profile", profile,
"--events", "messages,members,reactions",
]
if slash_commands:
cmd.extend(["--slash-commands", slash_commands])
self.proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True,
)
self.events = Queue()
self.pending = {}
self._req_counter = 0
self._lock = threading.Lock()
self._reader = threading.Thread(target=self._read_loop, daemon=True)
self._reader.start()
def _read_loop(self):
try:
for line in self.proc.stdout:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
except json.JSONDecodeError:
print(f"[warn] Invalid JSON from serve: {line[:100]}", file=sys.stderr)
continue
req_id = data.get("req_id")
if req_id and req_id in self.pending:
self.pending[req_id].put(data)
else:
self.events.put(data)
except Exception as e:
print(f"[fatal] Reader thread crashed: {e}", file=sys.stderr)
self.events.put({"event": "error", "message": str(e)})
def send(self, action, wait=False, timeout=15):
"""Send an action with optional response waiting and error handling."""
with self._lock:
self._req_counter += 1
req_id = f"req-{self._req_counter}"
action["req_id"] = req_id
if wait:
q = Queue()
self.pending[req_id] = q
try:
self.proc.stdin.write(json.dumps(action) + "\n")
self.proc.stdin.flush()
except (BrokenPipeError, OSError) as e:
print(f"[fatal] Cannot write to serve process: {e}", file=sys.stderr)
return {"error": "serve process died"}
if wait:
try:
result = q.get(timeout=timeout)
except Empty:
result = {"error": "timeout waiting for response"}
finally:
self.pending.pop(req_id, None)
return result
return {"ok": True}
def safe_send(self, action, wait=False, fallback=None):
"""Send an action, returning fallback on any error."""
result = self.send(action, wait=wait)
if "error" in result:
print(f"[warn] Action failed: {action.get('action')} -> {result['error']}", file=sys.stderr)
return fallback if fallback is not None else result
return result
def next_event(self, timeout=None):
try:
return self.events.get(timeout=timeout)
except Empty:
return None
def shutdown(self):
try:
self.proc.stdin.close()
self.proc.wait(timeout=5)
except Exception:
self.proc.kill()
class SupportAgent:
"""Autonomous support agent with safety boundaries."""
def __init__(self, config):
self.config = config
self.agent = DiscordAgent(
profile=config.get("profile", "chat"),
slash_commands=config.get("slash_commands"),
)
self.active_threads = {} # user_id -> thread_id
def run(self):
"""Main event loop with error recovery."""
# Wait for ready
while True:
event = self.agent.next_event(timeout=30)
if event is None:
print("[fatal] Timed out waiting for ready event", file=sys.stderr)
return
if event.get("event") == "ready":
print(f"[info] Bot ready as {event['bot_name']}", file=sys.stderr)
break
if event.get("event") == "error":
print(f"[error] Startup error: {event.get('message')}", file=sys.stderr)
# Process events
try:
while True:
event = self.agent.next_event(timeout=60)
if event is None:
continue # heartbeat timeout, keep going
try:
self._dispatch(event)
except Exception as e:
print(f"[error] Event handler crashed: {e}", file=sys.stderr)
except KeyboardInterrupt:
print("[info] Shutting down...", file=sys.stderr)
finally:
self.agent.shutdown()
def _dispatch(self, event):
handlers = {
"message": self._on_message,
"member_join": self._on_member_join,
"reaction_add": self._on_reaction,
"slash_command": self._on_slash_command,
"error": self._on_error,
}
handler = handlers.get(event.get("event"))
if handler:
handler(event)
def _on_message(self, event):
if event.get("is_bot"):
return
if not event.get("mentions_bot"):
return
channel_id = event["channel_id"]
message_id = event["message_id"]
user_id = event["author_id"]
content = event["content"].lower()
# Show typing indicator while processing
self.agent.send({"action": "typing_start", "channel_id": channel_id})
try:
# Check if user has an active support thread
if user_id in self.active_threads:
self.agent.safe_send({
"action": "reply",
"channel_id": channel_id,
"message_id": message_id,
"content": f"I'm already helping you in <#{self.active_threads[user_id]}>!",
})
return
# Detect intent and respond
if any(w in content for w in ["help", "issue", "problem", "bug"]):
self._create_support_thread(event)
elif any(w in content for w in ["thanks", "solved", "fixed"]):
self.agent.safe_send({
"action": "reaction_add",
"channel_id": channel_id,
"message_id": message_id,
"emoji": "💚",
})
else:
self._stream_response(event, "How can I help? Mention an issue and I'll create a support thread.")
finally:
self.agent.send({"action": "typing_stop", "channel_id": channel_id})
def _create_support_thread(self, event):
"""Create a support thread with error handling."""
result = self.agent.safe_send({
"action": "thread_create",
"channel_id": event["channel_id"],
"message_id": event["message_id"],
"name": f"Support: {event['author'][:50]}",
"content": f"Hi {event['author']}! I've created this thread to help you.\n\nCould you describe your issue in detail?",
}, wait=True, fallback=None)
if result and result.get("thread_id"):
self.active_threads[event["author_id"]] = result["thread_id"]
else:
# Graceful degradation: reply in channel instead
self.agent.safe_send({
"action": "reply",
"channel_id": event["channel_id"],
"message_id": event["message_id"],
"content": "I couldn't create a thread (I may not have permission). Could you describe your issue here?",
})
def _stream_response(self, event, text):
"""Stream a response with proper cleanup."""
result = self.agent.send({
"action": "stream_start",
"channel_id": event["channel_id"],
"reply_to": event["message_id"],
}, wait=True)
if "error" in result:
# Fallback to regular reply
self.agent.safe_send({
"action": "reply",
"channel_id": event["channel_id"],
"message_id": event["message_id"],
"content": text,
})
return
stream_id = result["stream_id"]
# Send content as chunks (simulating LLM streaming)
words = text.split()
for i in range(0, len(words), 3):
chunk = " ".join(words[i:i+3]) + " "
self.agent.send({
"action": "stream_chunk",
"stream_id": stream_id,
"content": chunk,
})
time.sleep(0.1)
self.agent.send({
"action": "stream_end",
"stream_id": stream_id,
}, wait=True)
def _on_member_join(self, event):
"""Welcome new members with graceful degradation."""
member_id = event["member_id"]
member_name = event["member"]
# Try to send a welcome DM (may fail if DMs are disabled)
self.agent.safe_send({
"action": "dm_send",
"user_id": member_id,
"content": f"Welcome, {member_name}! Check out #help if you have questions.",
})
# Post in welcome channel if configured
welcome_channel = self.config.get("welcome_channel_id")
if welcome_channel:
self.agent.safe_send({
"action": "send",
"channel_id": welcome_channel,
"content": f"Welcome to the server, **{member_name}**!",
})
def _on_reaction(self, event):
"""Handle reaction-based interactions."""
# Example: close support thread when a mod reacts with checkmark
if event.get("emoji") == "✅":
# Remove user from active threads if this message is in their thread
for user_id, thread_id in list(self.active_threads.items()):
if event.get("channel_id") == thread_id:
del self.active_threads[user_id]
self.agent.safe_send({
"action": "send",
"channel_id": thread_id,
"content": "This support thread has been marked as resolved. Thanks!",
})
break
def _on_slash_command(self, event):
"""Handle slash commands with interaction followup."""
command = event.get("command")
token = event.get("interaction_token")
if command == "status":
servers = self.agent.send({"action": "server_list"}, wait=True)
count = len(servers.get("servers", []))
active = len(self.active_threads)
self.agent.safe_send({
"action": "interaction_followup",
"interaction_token": token,
"content": f"Online. Monitoring {count} server(s), {active} active support thread(s).",
})
else:
self.agent.safe_send({
"action": "interaction_followup",
"interaction_token": token,
"content": f"Unknown command: {command}",
})
def _on_error(self, event):
print(f"[error] Discord error: {event.get('message')}", file=sys.stderr)
# ── Run ────────────────────────────────────────────────────────────
if __name__ == "__main__":
config = {
"profile": "chat", # Safe default: no destructive actions
"welcome_channel_id": "1234567890", # Set to your #welcome channel
"slash_commands": "commands.json", # Optional slash commands file
}
SupportAgent(config).run()

What changed from Level 4

  • Permission profile: Runs with chat profile by default, preventing accidental destructive actions
  • safe_send(): Wraps every action in error handling with optional fallbacks
  • Graceful degradation: If thread creation fails (missing permissions), falls back to in-channel replies
  • Structured dispatch: Clean event routing with _dispatch() instead of if/elif chains
  • Thread-safe: Uses a lock for request counter, handles broken pipe on process death
  • Timeout handling: Every wait=True call has a timeout; no infinite hangs
  • Audit awareness: The chat profile ensures all actions are logged and restricted to safe operations
  • Slash command support: Handles /status with interaction followup
  • Cleanup: shutdown() closes stdin and waits for the process to exit
Warning

What could go wrong at this level: Very little, by design. The remaining risks are: Discord API outages (mitigated by timeouts), bot token compromise (mitigate with environment variables, not hardcoded tokens), and logic bugs in intent detection (mitigate with logging and monitoring).


Choosing Your Level

terminal

Levels 1-2

Best for: Quick scripts, personal bots, prototyping. No persistent connection needed. Each action is a standalone CLI call.

robot

Levels 3-5

Best for: Production agents, real-time interactions, streaming responses. Uses discli serve for a single persistent connection.

Architecture Decision: listen vs serve

Featurediscli listen (Levels 1-2)discli serve (Levels 3-5)
ConnectionNew connection per actionSingle persistent connection
Latency2-3s per actionUnder 100ms per action
StreamingNot possibleFull support
ThreadsCan create (via CLI)Can create and manage
Typing indicatorNot practicalFull support
Slash commandsNot supportedFull support
ComplexityMinimalModerate

Next Steps