TUSHAR.
Article18 min read

CoderSathi: Building a Full-Stack AI Coding Agent with LangGraph and MCP

A detailed walkthrough of building CoderSathi — a production-style AI coding assistant with a LangGraph ReAct agent, real-time WebSocket streaming, JWT auth, per-conversation workspaces, and a custom MCP server providing 25+ developer tools.

What Is CoderSathi?

CoderSathi ("Coder's Friend" in Nepali) is a full-stack AI coding assistant I built to explore what a real, production-oriented coding agent looks like — not a toy demo, but a system with auth, persistent conversations, workspace isolation, streaming responses, and safe tool execution.

The agent can read and write code files, run shell commands, inspect Git history, install packages, start dev servers, and answer questions about a codebase — all through a conversational interface with real-time streaming.

GitHub: github.com/tusharrayamajhi/codersathi


The Full Stack at a Glance

LayerTechnology
Backend frameworkFastAPI + Uvicorn (async)
AI agentLangGraph ReAct agent
LLM providersGoogle Gemini 2.5, Groq (Llama 3.3 70B, Qwen3 32B)
Developer toolsCustom MCP server — devtoolkit-mcp
DatabasePostgreSQL 15 + pgvector extension
ORMSQLAlchemy (async) + asyncpg
AuthJWT (python-jose) + bcrypt
RealtimeWebSockets
FrontendReact + Vite + TypeScript

Why LangGraph?

When I started, I tried a simple while True loop: call LLM → parse tool call → execute tool → feed result back → repeat. It worked for demos but broke under real conditions: no retry logic, no state management, no clean way to inject system context per step.

LangGraph solves all of this. It models the agent as a state machine where nodes (LLM call, tool execution) and edges (conditional routing) are explicit. The state — the conversation's message list plus workspace metadata — flows through the graph in a typed TypedDict.

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    workspace_path: str
    conv_id: str

The graph has exactly two nodes:

[agent_node] → (has tool calls?) → [sequential_tool_node] → [agent_node]
                                  ↓ (no tool calls)
                                 END

agent_node calls the LLM with the full message history. If the LLM returns tool calls, sequential_tool_node executes them one by one (not in parallel — some tools depend on previous results). After every tool batch, control returns to agent_node so the LLM can decide what to do next.


The ReAct Loop in Detail

ReAct (Reason + Act) is the prompting pattern that makes this work. The LLM alternates between reasoning in plain text and calling tools:

User: "Add error handling to the login function"

Agent (reasoning): I need to read the login function first.
Agent (tool): read_file("src/auth/login.ts")
Tool result: [file contents]
Agent (reasoning): The function doesn't catch database errors. I'll add try/catch.
Agent (tool): write_file("src/auth/login.ts", [updated content])
Tool result: file written
Agent: I've added a try/catch block around the database call in login.ts. 
       The error is now logged and a 500 is returned to the client instead of crashing.

Every step is streamed to the frontend as it happens, so the user sees the agent's reasoning and tool calls in real time.


Real-Time Streaming via WebSockets

REST endpoints are wrong for AI agents. A single agent turn can take 30+ seconds and involve multiple LLM calls and tool executions. HTTP would timeout or force you into polling.

WebSockets let me stream every event as it happens:

# ws.py — simplified
@router.websocket("/ws/{conv_id}")
async def websocket_endpoint(ws: WebSocket, conv_id: str, ...):
    await ws.accept()
    
    async def ws_send(event: dict):
        await ws.send_json(event)
    
    async for event in run_agent_stream(user_message, history, mcp_session, ...):
        await ws.send_json(event)

The event types sent to the frontend:

{ "type": "token", "content": "..." }          // LLM text token
{ "type": "tool_start", "tool": "read_file", "description": "Reading src/index.ts" }
{ "type": "tool_end", "tool": "read_file", "output": "..." }
{ "type": "file_changed", "path": "src/foo.ts" } // triggers file panel refresh
{ "type": "terminal_output", "command": "npm install", "output": "..." }
{ "type": "rate_limit", "hint": "Try again in 30 seconds." }
{ "type": "done", "full_response": "..." }

The frontend renders these as a live activity feed — the user can watch the agent think and act.


MCP Integration

Rather than hardcoding tool implementations inside the Python backend, I offloaded all developer tools to devtoolkit-mcp — a TypeScript MCP server I built separately.

The backend spawns the MCP server as a subprocess per conversation and communicates via the stdio transport:

class MCPSession:
    def __init__(self, workspace_path: str):
        self.workspace_path = workspace_path
    
    async def start(self):
        params = StdioServerParameters(command="node", args=[MCP_SERVER_PATH])
        self._stdio_cm = stdio_client(params)
        read, write = await self._stdio_cm.__aenter__()
        self._session = ClientSession(read, write)
        await self._session.initialize()
        tools_result = await self._session.list_tools()
        self._tools = tools_result.tools

Each MCP tool is wrapped in a LangGraph BaseTool subclass with a Pydantic schema generated from the MCP tool's JSON Schema. This lets LangGraph's type system validate tool arguments before they're sent to the MCP server.

The global session registry maps conv_id → MCPSession, so each conversation gets its own isolated server process and workspace directory:

workspaces/
  1/                  ← conversation 1
    abc-def-12.../    ← workspace UUID
      src/
      package.json
  2/                  ← conversation 2
    xyz-789.../
      main.py

Permission Gating

Every tool call requires explicit user approval before execution. When the agent wants to call a tool, the backend:

  1. Sends a permission_request event to the frontend via WebSocket
  2. Waits for the user to click Allow or Deny
  3. Executes (or skips) the tool based on the response
async def permission_callback(tool_name: str, args: dict) -> bool:
    await ws_send({
        "type": "permission_request",
        "tool": tool_name,
        "args": args,
        "description": get_tool_description(tool_name, args),
    })
    # Wait for user response
    response = await permission_queue.get()
    return response["granted"]

The frontend shows a modal: "The agent wants to run npm install in /workspace/abc123. Allow?" This is critical for destructive operations — no file gets written or deleted without the user seeing it first.


Multi-LLM Support

I built the backend to support multiple providers so users can choose cost vs. quality:

AVAILABLE_MODELS = {
    "gemini-2.5-flash-lite": { "provider": "gemini", "rpm": 10, "rpd": 20 },
    "gemini-2.5-flash":      { "provider": "gemini", "rpm": 5,  "rpd": 20 },
    "groq/llama-3.3-70b-versatile": { "provider": "groq", "rpm": 30, "rpd": 1000 },
    "groq/qwen/qwen3-32b":   { "provider": "groq", "rpm": 60, "rpd": 1000 },
}

def _build_llm(model_id: str) -> BaseChatModel:
    info = AVAILABLE_MODELS[model_id]
    if info["provider"] == "groq":
        return ChatGroq(model=model_id[len("groq/"):], api_key=GROQ_API_KEY)
    return ChatGoogleGenerativeAI(model=model_id, google_api_key=GEMINI_API_KEY)

Both providers are free-tier compatible. Gemini 2.5 Flash Lite is the default — it's fast and handles tool use well. Groq's Llama 3.3 70B gives the best reasoning quality at the cost of lower daily request limits.

I also handle rate limit errors gracefully — if Gemini returns a 429, the frontend shows a user-friendly message with the retry delay extracted from the error, rather than a raw stack trace.


Database Design

PostgreSQL with pgvector handles conversations and message history:

-- Conversations
CREATE TABLE conversations (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    title TEXT,
    workspace_path TEXT,
    model TEXT DEFAULT 'gemini-2.5-flash-lite',
    created_at TIMESTAMP DEFAULT NOW()
);

-- Messages
CREATE TABLE messages (
    id SERIAL PRIMARY KEY,
    conversation_id INTEGER REFERENCES conversations(id),
    role TEXT,           -- 'user' | 'assistant'
    content TEXT,
    created_at TIMESTAMP DEFAULT NOW()
);

The pgvector extension is included for future semantic search over conversation history — letting the agent retrieve relevant past exchanges rather than naively prepending all 50 previous messages.


Auth: JWT + bcrypt

Standard stateless auth:

POST /api/auth/register  →  hash password, store user, return JWT
POST /api/auth/login     →  verify password, return JWT
WS   /ws/{conv_id}       →  validate JWT from query param before upgrade

JWT tokens are validated on every WebSocket handshake so session state doesn't need to be stored server-side. Each conversation is tied to the authenticated user — no cross-user data leakage.


Frontend: React + Vite

The React frontend has three main panels:

  • Chat panel — message thread with streaming token display and tool activity feed
  • File panel — live file tree for the conversation's workspace, refreshed on file_changed events
  • Terminal panel — shell command output streamed in as terminal_output events

State is managed with Zustand. The WebSocket connection is opened when the user selects or creates a conversation and closed on unmount or navigation.


Error Handling

Rate limits — Gemini and Groq both impose RPM/RPD caps. The agent catches 429 errors, parses the retry delay from the error message, and sends a structured rate_limit event with a human-readable hint.

failed_generation — Gemini occasionally fails to form a valid tool call (usually with complex nested schemas). The agent detects this, injects a recovery prompt ("please use ONE tool at a time with only required arguments"), and retries up to 2 times before giving up.

Tool errors — If an MCP tool throws (e.g., file not found, command exits non-zero), the error is returned as the tool's result string. The LLM sees the error and can decide to retry, try a different approach, or explain the problem to the user.


What I Learned

LangGraph's explicit state graph pays off. The ability to add retry logic, inject messages between nodes, and trace execution through astream_events would be very hard with a plain while loop.

MCP as a subprocess is surprisingly robust. stdio transport has no network overhead, no auth complexity, and process isolation means a crashing tool doesn't take down the backend.

Permission gating is table stakes for agentic systems. Users trust the agent more when they can see and approve every action. Without it, a hallucination or prompt injection could cause real damage.

Rate limits are a product problem, not just a tech problem. Showing users a clear message ("the AI hit its rate limit, try again in 30 seconds") is far better than a generic error or silent failure.


What's Next

  • Vector-based conversation memory using pgvector
  • Agent memory: persist facts across conversations
  • Shareable workspaces (collaboration)
  • Support for Claude and OpenAI models

Source code: github.com/tusharrayamajhi/codersathi

MCP server: github.com/tusharrayamajhi/devtoolkit-mcp

If you have questions about the architecture or want to contribute, open an issue on GitHub.

Tushar Rayamajhi | AI Engineer & Backend Developer