This guide walks you through five levels of Discord agent complexity, from a 20-line keyword responder to a fully autonomous production agent. Each level builds on the previous one, and every example is complete, runnable code.
The Five Levels
| Level | Name | Lines | What It Does |
|---|---|---|---|
| 1 | Reactive Bot | ~20 | Responds to keywords |
| 2 | Context-Aware | ~35 | Fetches conversation history before responding |
| 3 | Proactive Agent | ~60 | Creates threads, streams responses, detects intent |
| 4 | Multi-Action | ~90 | Orchestrates multiple Discord actions per event |
| 5 | Full Autonomous | ~120 | Production-ready with safety, error handling, graceful degradation |
Level 1: Reactive Bot
The simplest possible agent. It listens for messages that mention the bot and responds based on keyword matching. Uses discli listen for events and discli message reply for responses.
import jsonimport subprocess
def reply(channel_id, message_id, text): subprocess.run( ["discli", "message", "reply", channel_id, message_id, text], capture_output=True, )
proc = subprocess.Popen( ["discli", "--json", "listen", "--events", "messages"], stdout=subprocess.PIPE, text=True,)
for line in proc.stdout: event = json.loads(line.strip()) if not event.get("mentions_bot"): continue content = event["content"].lower() if "help" in content: reply(event["channel_id"], event["message_id"], "Hi! How can I help?") elif "pricing" in content: reply(event["channel_id"], event["message_id"], "Check example.com/pricing") else: reply(event["channel_id"], event["message_id"], "Thanks for reaching out!")What makes this work
discli --json listenstreams JSONL events to stdout, one per line- Each event includes
mentions_bot,channel_id,message_id, andcontent discli message replyis a fire-and-forget CLI call — no connection management- The
for line in proc.stdoutloop blocks until the next event arrives
What could go wrong: Every reply() call spawns a new discli process, which creates a new Discord connection, authenticates, sends the message, and disconnects. This takes 2-3 seconds per reply. Fine for low-traffic bots, but it will not scale.
Level 2: Context-Aware
The Level 1 bot responds to each message in isolation. A context-aware agent fetches recent conversation history before responding, so it understands what the user is actually asking about.
import jsonimport subprocess
def reply(channel_id, message_id, text): subprocess.run( ["discli", "message", "reply", channel_id, message_id, text], capture_output=True, )
def get_context(channel_id, limit=5): """Fetch recent messages for conversation context.""" result = subprocess.run( ["discli", "--json", "message", "list", channel_id, "--limit", str(limit)], capture_output=True, text=True, ) if result.returncode != 0: return [] return json.loads(result.stdout)
def build_response(event, context): """Use conversation context to build a smarter response.""" content = event["content"].lower() # Check if user already asked this question recently recent_questions = [m["content"] for m in context if m["author_id"] == event["author_id"]] if any("help" in q.lower() for q in recent_questions[:3]): return "Looks like you've asked before -- let me connect you with a human. Stand by!" if "help" in content: return "Hi! How can I help? I can answer questions about pricing, setup, and billing." if "pricing" in content: return "Check example.com/pricing -- plans start at $10/mo." return "Thanks for reaching out! Could you tell me more about what you need?"
proc = subprocess.Popen( ["discli", "--json", "listen", "--events", "messages"], stdout=subprocess.PIPE, text=True,)
for line in proc.stdout: event = json.loads(line.strip()) if not event.get("mentions_bot"): continue context = get_context(event["channel_id"]) response = build_response(event, context) reply(event["channel_id"], event["message_id"], response)What changed from Level 1
- Added
get_context()to fetch the last 5 messages before responding - The bot now detects repeated questions and changes its behavior
- Response logic has access to conversation history, not just the current message
What could go wrong: Each response now requires two Discord connections — one for message list and one for message reply. If the channel is busy, the context fetch adds noticeable latency. The bot also has no memory between restarts.
Level 3: Proactive Agent
This is where the architecture shifts. Instead of fire-and-forget CLI calls, Level 3 uses discli serve for a persistent bidirectional connection. The bot can stream responses token-by-token, create threads, and act without being explicitly mentioned.
import jsonimport subprocessimport sysimport threading
def start_serve(): """Start discli serve as a subprocess.""" return subprocess.Popen( ["discli", "serve", "--events", "messages"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True, )
def send_action(proc, action): """Send a JSONL action to discli serve via stdin.""" proc.stdin.write(json.dumps(action) + "\n") proc.stdin.flush()
def read_events(proc): """Read JSONL events from discli serve stdout.""" for line in proc.stdout: line = line.strip() if line: yield json.loads(line)
def detect_intent(content): """Simple intent detection without explicit commands.""" content = content.lower() if any(w in content for w in ["help", "issue", "problem", "broken", "error"]): return "support" if any(w in content for w in ["how do i", "how to", "tutorial", "guide"]): return "question" return None
proc = start_serve()
# Wait for ready eventfor event in read_events(proc): if event.get("event") == "ready": print(f"Bot ready as {event['bot_name']}", file=sys.stderr) break
# Main event loopfor event in read_events(proc): if event.get("event") != "message": continue if event.get("is_bot"): continue
intent = detect_intent(event["content"]) if intent is None: continue
channel_id = event["channel_id"] message_id = event["message_id"] author = event["author"]
if intent == "support": # Create a support thread for the user send_action(proc, { "action": "thread_create", "channel_id": channel_id, "message_id": message_id, "name": f"Support: {author}", "content": f"Hi {author}! I've created this thread to help you. What's going on?", })
elif intent == "question": # Stream a response token-by-token send_action(proc, {"action": "typing_start", "channel_id": channel_id}) response = "Great question! Let me walk you through this step by step..."
# Start a stream send_action(proc, { "action": "stream_start", "channel_id": channel_id, "reply_to": message_id, "req_id": "stream-1", })
# In a real agent, you'd stream from an LLM here. # For this example, we send the whole response as one chunk. # Wait for stream_start response to get stream_id, # then send chunks. See the Streaming guide for details.What changed from Level 2
- Persistent connection:
discli servemaintains a single Discord WebSocket - Bidirectional JSONL: Send actions via stdin, receive events via stdout
- Thread creation: Automatically creates support threads without being asked
- Streaming: Can show responses being typed in real-time
- Intent detection: Responds to patterns, not just direct mentions
What could go wrong: The read_events generator blocks the main thread. If you send an action that returns a response (like stream_start), you need to read from stdout to get the stream_id back. Without threading or async, this creates a deadlock risk. Level 4 addresses this.
Level 4: Multi-Action Agent
A production agent often needs to perform multiple Discord actions in response to a single event. Level 4 uses threading to handle stdin/stdout independently and orchestrates complex workflows like onboarding new members.
import jsonimport subprocessimport sysimport threadingimport timefrom collections import defaultdictfrom queue import Queue
class DiscordAgent: def __init__(self): self.proc = subprocess.Popen( ["discli", "serve", "--events", "messages,members"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True, ) self.responses = Queue() self.events = Queue() self.pending = {} # req_id -> Queue self._req_counter = 0
# Start stdout reader thread self._reader = threading.Thread(target=self._read_loop, daemon=True) self._reader.start()
def _read_loop(self): for line in self.proc.stdout: line = line.strip() if not line: continue data = json.loads(line) req_id = data.get("req_id") if req_id and req_id in self.pending: self.pending[req_id].put(data) else: self.events.put(data)
def send(self, action, wait=False): """Send an action. If wait=True, block until the response arrives.""" self._req_counter += 1 req_id = f"req-{self._req_counter}" action["req_id"] = req_id
if wait: q = Queue() self.pending[req_id] = q
self.proc.stdin.write(json.dumps(action) + "\n") self.proc.stdin.flush()
if wait: result = q.get(timeout=10) del self.pending[req_id] return result return None
def next_event(self): return self.events.get()
# ConfigurationWELCOME_CHANNEL_ID = "1234567890" # #welcome channelMODS_CHANNEL_ID = "9876543210" # #mod-log channelNEWCOMER_ROLE_ID = "1111111111" # @Newcomer role
def handle_member_join(agent, event): """Full onboarding workflow for new members.""" member_id = event["member_id"] member_name = event["member"] server_id = event["server_id"]
# 1. Send a welcome DM agent.send({ "action": "dm_send", "user_id": member_id, "content": ( f"Welcome to the server, {member_name}! 👋\n\n" "Here's how to get started:\n" "1. Introduce yourself in #introductions\n" "2. Pick your roles in #roles\n" "3. Ask questions in #help\n\n" "Have fun!" ), })
# 2. Assign the Newcomer role agent.send({ "action": "role_assign", "guild_id": server_id, "member_id": member_id, "role_id": NEWCOMER_ROLE_ID, })
# 3. Create an introduction thread in #welcome result = agent.send({ "action": "thread_create", "channel_id": WELCOME_CHANNEL_ID, "name": f"Welcome {member_name}!", "content": f"Hey everyone, {member_name} just joined! Say hi! 🎉", }, wait=True)
thread_id = result.get("thread_id")
# 4. Notify moderators agent.send({ "action": "send", "channel_id": MODS_CHANNEL_ID, "content": ( f"📋 New member joined: **{member_name}** (ID: {member_id})\n" f"Welcome thread: <#{thread_id}>" if thread_id else f"📋 New member joined: **{member_name}** (ID: {member_id})" ), })
print(f"[onboard] Completed onboarding for {member_name}", file=sys.stderr)
def handle_message(agent, event): """Handle incoming messages with multi-action responses.""" if event.get("is_bot"): return content = event["content"].lower() channel_id = event["channel_id"] message_id = event["message_id"]
if not event.get("mentions_bot"): return
if "status" in content: # React to acknowledge, then send a multi-part status report agent.send({ "action": "reaction_add", "channel_id": channel_id, "message_id": message_id, "emoji": "⏳", })
# Gather data servers = agent.send({"action": "server_list"}, wait=True) server_count = len(servers.get("servers", []))
agent.send({ "action": "reply", "channel_id": channel_id, "message_id": message_id, "content": f"All systems operational. Monitoring {server_count} server(s).", })
# Replace the hourglass with a checkmark agent.send({ "action": "reaction_remove", "channel_id": channel_id, "message_id": message_id, "emoji": "⏳", }) agent.send({ "action": "reaction_add", "channel_id": channel_id, "message_id": message_id, "emoji": "✅", })
# Main loopagent = DiscordAgent()
# Wait for readywhile True: event = agent.next_event() if event.get("event") == "ready": print(f"Bot ready as {event['bot_name']}", file=sys.stderr) break
while True: event = agent.next_event() event_type = event.get("event") if event_type == "member_join": handle_member_join(agent, event) elif event_type == "message": handle_message(agent, event)What changed from Level 3
DiscordAgentclass: Encapsulates the serve subprocess with proper threading- Request/response correlation: Uses
req_idto match responses to actions withwait=True - Multi-action workflows:
handle_member_joinperforms 4 actions (DM, role assign, thread create, notification) for a single event - Non-blocking sends: Actions without
wait=Truefire-and-forget, keeping the event loop fast
What could go wrong: The q.get(timeout=10) in send(wait=True) can timeout if Discord is slow or rate-limited. DMs can fail if the user has DMs disabled. The hardcoded channel/role IDs break if the server configuration changes. Level 5 addresses all of these.
Level 5: Full Autonomous Agent
A production-ready agent with permission profiles for safety, structured error handling, rate limit awareness, audit logging, and graceful degradation when permissions are restricted.
import jsonimport subprocessimport sysimport threadingimport timefrom queue import Queue, Empty
class DiscordAgent: """Production Discord agent with safety and error handling."""
def __init__(self, profile="chat", slash_commands=None): cmd = [ "discli", "serve", "--profile", profile, "--events", "messages,members,reactions", ] if slash_commands: cmd.extend(["--slash-commands", slash_commands])
self.proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True, ) self.events = Queue() self.pending = {} self._req_counter = 0 self._lock = threading.Lock()
self._reader = threading.Thread(target=self._read_loop, daemon=True) self._reader.start()
def _read_loop(self): try: for line in self.proc.stdout: line = line.strip() if not line: continue try: data = json.loads(line) except json.JSONDecodeError: print(f"[warn] Invalid JSON from serve: {line[:100]}", file=sys.stderr) continue req_id = data.get("req_id") if req_id and req_id in self.pending: self.pending[req_id].put(data) else: self.events.put(data) except Exception as e: print(f"[fatal] Reader thread crashed: {e}", file=sys.stderr) self.events.put({"event": "error", "message": str(e)})
def send(self, action, wait=False, timeout=15): """Send an action with optional response waiting and error handling.""" with self._lock: self._req_counter += 1 req_id = f"req-{self._req_counter}" action["req_id"] = req_id
if wait: q = Queue() self.pending[req_id] = q
try: self.proc.stdin.write(json.dumps(action) + "\n") self.proc.stdin.flush() except (BrokenPipeError, OSError) as e: print(f"[fatal] Cannot write to serve process: {e}", file=sys.stderr) return {"error": "serve process died"}
if wait: try: result = q.get(timeout=timeout) except Empty: result = {"error": "timeout waiting for response"} finally: self.pending.pop(req_id, None) return result return {"ok": True}
def safe_send(self, action, wait=False, fallback=None): """Send an action, returning fallback on any error.""" result = self.send(action, wait=wait) if "error" in result: print(f"[warn] Action failed: {action.get('action')} -> {result['error']}", file=sys.stderr) return fallback if fallback is not None else result return result
def next_event(self, timeout=None): try: return self.events.get(timeout=timeout) except Empty: return None
def shutdown(self): try: self.proc.stdin.close() self.proc.wait(timeout=5) except Exception: self.proc.kill()
class SupportAgent: """Autonomous support agent with safety boundaries."""
def __init__(self, config): self.config = config self.agent = DiscordAgent( profile=config.get("profile", "chat"), slash_commands=config.get("slash_commands"), ) self.active_threads = {} # user_id -> thread_id
def run(self): """Main event loop with error recovery.""" # Wait for ready while True: event = self.agent.next_event(timeout=30) if event is None: print("[fatal] Timed out waiting for ready event", file=sys.stderr) return if event.get("event") == "ready": print(f"[info] Bot ready as {event['bot_name']}", file=sys.stderr) break if event.get("event") == "error": print(f"[error] Startup error: {event.get('message')}", file=sys.stderr)
# Process events try: while True: event = self.agent.next_event(timeout=60) if event is None: continue # heartbeat timeout, keep going try: self._dispatch(event) except Exception as e: print(f"[error] Event handler crashed: {e}", file=sys.stderr) except KeyboardInterrupt: print("[info] Shutting down...", file=sys.stderr) finally: self.agent.shutdown()
def _dispatch(self, event): handlers = { "message": self._on_message, "member_join": self._on_member_join, "reaction_add": self._on_reaction, "slash_command": self._on_slash_command, "error": self._on_error, } handler = handlers.get(event.get("event")) if handler: handler(event)
def _on_message(self, event): if event.get("is_bot"): return if not event.get("mentions_bot"): return
channel_id = event["channel_id"] message_id = event["message_id"] user_id = event["author_id"] content = event["content"].lower()
# Show typing indicator while processing self.agent.send({"action": "typing_start", "channel_id": channel_id})
try: # Check if user has an active support thread if user_id in self.active_threads: self.agent.safe_send({ "action": "reply", "channel_id": channel_id, "message_id": message_id, "content": f"I'm already helping you in <#{self.active_threads[user_id]}>!", }) return
# Detect intent and respond if any(w in content for w in ["help", "issue", "problem", "bug"]): self._create_support_thread(event) elif any(w in content for w in ["thanks", "solved", "fixed"]): self.agent.safe_send({ "action": "reaction_add", "channel_id": channel_id, "message_id": message_id, "emoji": "💚", }) else: self._stream_response(event, "How can I help? Mention an issue and I'll create a support thread.") finally: self.agent.send({"action": "typing_stop", "channel_id": channel_id})
def _create_support_thread(self, event): """Create a support thread with error handling.""" result = self.agent.safe_send({ "action": "thread_create", "channel_id": event["channel_id"], "message_id": event["message_id"], "name": f"Support: {event['author'][:50]}", "content": f"Hi {event['author']}! I've created this thread to help you.\n\nCould you describe your issue in detail?", }, wait=True, fallback=None)
if result and result.get("thread_id"): self.active_threads[event["author_id"]] = result["thread_id"] else: # Graceful degradation: reply in channel instead self.agent.safe_send({ "action": "reply", "channel_id": event["channel_id"], "message_id": event["message_id"], "content": "I couldn't create a thread (I may not have permission). Could you describe your issue here?", })
def _stream_response(self, event, text): """Stream a response with proper cleanup.""" result = self.agent.send({ "action": "stream_start", "channel_id": event["channel_id"], "reply_to": event["message_id"], }, wait=True)
if "error" in result: # Fallback to regular reply self.agent.safe_send({ "action": "reply", "channel_id": event["channel_id"], "message_id": event["message_id"], "content": text, }) return
stream_id = result["stream_id"]
# Send content as chunks (simulating LLM streaming) words = text.split() for i in range(0, len(words), 3): chunk = " ".join(words[i:i+3]) + " " self.agent.send({ "action": "stream_chunk", "stream_id": stream_id, "content": chunk, }) time.sleep(0.1)
self.agent.send({ "action": "stream_end", "stream_id": stream_id, }, wait=True)
def _on_member_join(self, event): """Welcome new members with graceful degradation.""" member_id = event["member_id"] member_name = event["member"]
# Try to send a welcome DM (may fail if DMs are disabled) self.agent.safe_send({ "action": "dm_send", "user_id": member_id, "content": f"Welcome, {member_name}! Check out #help if you have questions.", })
# Post in welcome channel if configured welcome_channel = self.config.get("welcome_channel_id") if welcome_channel: self.agent.safe_send({ "action": "send", "channel_id": welcome_channel, "content": f"Welcome to the server, **{member_name}**!", })
def _on_reaction(self, event): """Handle reaction-based interactions.""" # Example: close support thread when a mod reacts with checkmark if event.get("emoji") == "✅": # Remove user from active threads if this message is in their thread for user_id, thread_id in list(self.active_threads.items()): if event.get("channel_id") == thread_id: del self.active_threads[user_id] self.agent.safe_send({ "action": "send", "channel_id": thread_id, "content": "This support thread has been marked as resolved. Thanks!", }) break
def _on_slash_command(self, event): """Handle slash commands with interaction followup.""" command = event.get("command") token = event.get("interaction_token")
if command == "status": servers = self.agent.send({"action": "server_list"}, wait=True) count = len(servers.get("servers", [])) active = len(self.active_threads) self.agent.safe_send({ "action": "interaction_followup", "interaction_token": token, "content": f"Online. Monitoring {count} server(s), {active} active support thread(s).", }) else: self.agent.safe_send({ "action": "interaction_followup", "interaction_token": token, "content": f"Unknown command: {command}", })
def _on_error(self, event): print(f"[error] Discord error: {event.get('message')}", file=sys.stderr)
# ── Run ────────────────────────────────────────────────────────────if __name__ == "__main__": config = { "profile": "chat", # Safe default: no destructive actions "welcome_channel_id": "1234567890", # Set to your #welcome channel "slash_commands": "commands.json", # Optional slash commands file } SupportAgent(config).run()What changed from Level 4
- Permission profile: Runs with
chatprofile by default, preventing accidental destructive actions safe_send(): Wraps every action in error handling with optional fallbacks- Graceful degradation: If thread creation fails (missing permissions), falls back to in-channel replies
- Structured dispatch: Clean event routing with
_dispatch()instead of if/elif chains - Thread-safe: Uses a lock for request counter, handles broken pipe on process death
- Timeout handling: Every
wait=Truecall has a timeout; no infinite hangs - Audit awareness: The
chatprofile ensures all actions are logged and restricted to safe operations - Slash command support: Handles
/statuswith interaction followup - Cleanup:
shutdown()closes stdin and waits for the process to exit
What could go wrong at this level: Very little, by design. The remaining risks are: Discord API outages (mitigated by timeouts), bot token compromise (mitigate with environment variables, not hardcoded tokens), and logic bugs in intent detection (mitigate with logging and monitoring).
Choosing Your Level
Levels 1-2
Best for: Quick scripts, personal bots, prototyping. No persistent connection needed. Each action is a standalone CLI call.
Levels 3-5
Best for: Production agents, real-time interactions, streaming responses. Uses discli serve for a single persistent connection.
Architecture Decision: listen vs serve
| Feature | discli listen (Levels 1-2) | discli serve (Levels 3-5) |
|---|---|---|
| Connection | New connection per action | Single persistent connection |
| Latency | 2-3s per action | Under 100ms per action |
| Streaming | Not possible | Full support |
| Threads | Can create (via CLI) | Can create and manage |
| Typing indicator | Not practical | Full support |
| Slash commands | Not supported | Full support |
| Complexity | Minimal | Moderate |
Next Steps
- Serve Mode — Deep dive into the bidirectional JSONL protocol
- Streaming Responses — Real-time token-by-token output
- Slash Commands — Register and handle slash commands
- Security & Permissions — Lock down your agent