Imagine you are on a train commuting home after a long day. You pull out your phone, open Telegram, and type: /deploy staging. Within two minutes, Claude Code on your dev machine spins up, runs the entire deployment pipeline, and sends you back a confirmation message with the deployment URL — all from your phone, without ever opening a laptop. This is not science fiction. You can build it today, in a single afternoon, with nothing more than a free messaging bot and a short Python script.
The moment I first set this up, it changed the way I think about development workflows. Suddenly, Claude Code was not something I could only use while sitting at my desk. It became an always-available assistant I could reach from anywhere — the grocery store, the gym, a coffee shop in another city. And the best part? The implementation is shockingly simple.
In this guide, I will walk you through building complete, production-ready bridges between Claude Code and the most popular messaging platforms: Telegram, Slack, Discord, and a generic webhook approach that works with virtually anything else. You will get full Python scripts, systemd service files, Docker configurations, and battle-tested security practices. By the end, you will have a remote control for Claude Code that fits in your pocket.
Why Remote Control Claude Code?
Before we dive into code, let us consider why you would want this in the first place. Claude Code is an extraordinarily powerful tool, but by default it is tethered to your terminal. You need to be at your machine, in your shell, actively watching the output. That constraint eliminates an enormous number of use cases.
The Case for Remote Access
Work from anywhere. Trigger builds, deployments, code generation, and analysis from your phone. You do not need your laptop. You do not even need a computer. Any device that can send a text message becomes a development terminal.
Asynchronous workflows. Send Claude Code a complex task — refactor a module, write tests for an entire package, generate a comprehensive code review — and then go about your day. You will get a notification when the work is done. No more staring at a terminal waiting for a long-running task to complete.
Team collaboration. Put the bot in a shared Slack channel, and suddenly anyone on the engineering team can trigger shared workflows. Your junior developer can run the deployment pipeline without SSH access to the server. Your PM can generate the daily status report without asking you to do it.
Emergency fixes. You are at the airport when production goes down. Instead of frantically searching for a quiet corner, opening your laptop, and tethering to your phone’s hotspot, you simply type /run fix the null pointer in src/auth.py and deploy to production from the Slack app on your phone.
Monitoring and response. Set up proactive alerts. When your CI/CD pipeline fails, get a Telegram notification with a one-tap command to retry or investigate. When server health degrades, get a Slack alert with an action button to restart the service.
Platform Comparison
Not all messaging platforms are created equal for this use case. Here is how the major options stack up:
| Feature | Telegram | Slack | Discord | MS Teams |
|---|---|---|---|---|
| Bot API ease | Excellent | Good | Good | Complex |
| Webhook support | Native polling + webhooks | Events API + Socket Mode | Gateway (WebSocket) | Outgoing webhooks |
| Free tier limits | Unlimited | 10k msg history | Unlimited | Requires M365 |
| Message length limit | 4,096 chars | 40,000 chars | 2,000 chars | 28,000 chars |
| Mobile app quality | Excellent | Excellent | Good | Good |
| Setup time | ~15 minutes | ~30 minutes | ~20 minutes | ~60 minutes |
| Best for | Personal use | Team workflows | Community/hobby | Enterprise |
Architecture Overview
Regardless of which messaging platform you choose, the architecture follows the same pattern. Understanding this pattern is key, because once you grasp it, you can adapt it to any platform in minutes.
The Message Flow
Here is the complete flow from your phone to Claude Code and back:
┌──────────┐ ┌───────────────┐ ┌──────────────┐ ┌─────────────┐
│ Your │───▶│ Messaging │───▶│ Bridge │───▶│ Claude │
│ Phone │ │ Platform │ │ Server │ │ Code CLI │
│ │◀───│ (Telegram) │◀───│ (Python) │◀───│ (claude) │
└──────────┘ └───────────────┘ └──────────────┘ └─────────────┘
│
┌─────┴─────┐
│ Auth │
│ Rate │
│ Limit │
│ Logging │
└───────────┘
The critical piece is the bridge server — a lightweight Python (or Node.js) application that does three things:
- Receives messages from the messaging platform’s bot API (via polling or webhooks)
- Validates and routes them through security checks (authentication, rate limiting, command allowlisting)
- Executes Claude Code as a subprocess and returns the result to the chat
The bridge server runs on the same machine where Claude Code is installed. If Claude Code is on your local dev machine, the bridge runs there too. If you want a more robust setup, you can run the bridge on a VPS and have it SSH into your dev machine to invoke Claude Code — but let us start with the simplest version first.
Why a Bridge Server?
You might wonder: why not connect the messaging platform directly to Claude Code? Because Claude Code is a CLI tool — it reads from stdin and writes to stdout. It does not speak HTTP or WebSocket natively. The bridge translates between the messaging platform’s API protocol and Claude Code’s command-line interface. Think of it as a thin adapter layer.
Running Claude Code Non-Interactively
Before we build any bot, you need to understand how to run Claude Code without an interactive terminal. This is the foundation that every bridge server relies on.
The Print Flag
The most important flag is -p (or --print). This runs Claude Code in non-interactive mode — it takes a prompt, processes it, prints the result, and exits. No interactive UI, no REPL, no terminal manipulation.
# Basic non-interactive usage
claude -p "List all Python files in the current directory"
# With a specific working directory
cd /path/to/project && claude -p "Explain the architecture of this project"
# JSON output for structured parsing
claude -p "List all functions in src/main.py" --output-format json
Key CLI Flags for Non-Interactive Use
| Flag | Purpose | Example |
|---|---|---|
-p / --print |
Non-interactive mode, prints output | claude -p "fix the bug" |
--output-format json |
Structured JSON output | claude -p "list files" --output-format json |
--max-turns N |
Limit agentic turns | claude -p "refactor" --max-turns 10 |
--allowedTools |
Restrict which tools Claude can use | claude -p "check" --allowedTools Read Grep |
--model |
Specify model to use | claude -p "analyze" --model sonnet |
Calling Claude Code from Python
Here is the core function that every bridge server will use. This is the heart of the entire system:
import subprocess
import os
def run_claude(prompt: str, working_dir: str = None, timeout: int = 300) -> dict:
"""
Run Claude Code non-interactively and return the result.
Args:
prompt: The prompt to send to Claude Code
working_dir: Directory to run in (uses CLAUDE_WORK_DIR env var as default)
timeout: Maximum seconds to wait (default 5 minutes)
Returns:
dict with 'success' (bool), 'output' (str), and 'error' (str)
"""
work_dir = working_dir or os.getenv("CLAUDE_WORK_DIR", os.path.expanduser("~"))
try:
result = subprocess.run(
["claude", "-p", prompt],
capture_output=True,
text=True,
timeout=timeout,
cwd=work_dir,
env={**os.environ, "TERM": "dumb"} # Prevent terminal escape codes
)
if result.returncode == 0:
return {
"success": True,
"output": result.stdout.strip(),
"error": None
}
else:
return {
"success": False,
"output": result.stdout.strip(),
"error": result.stderr.strip()
}
except subprocess.TimeoutExpired:
return {
"success": False,
"output": None,
"error": f"Command timed out after {timeout} seconds"
}
except FileNotFoundError:
return {
"success": False,
"output": None,
"error": "Claude Code CLI not found. Is it installed and in PATH?"
}
except Exception as e:
return {
"success": False,
"output": None,
"error": str(e)
}
TERM=dumb in the environment prevents Claude Code from emitting terminal escape codes (colors, cursor movements) that would clutter your chat messages. This is a small detail that makes a big difference in output readability.
Handling Long-Running Tasks
Some Claude Code tasks can run for several minutes — refactoring a large file, running a full test suite, generating comprehensive documentation. You need to handle this gracefully:
import asyncio
import subprocess
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=3)
async def run_claude_async(prompt: str, working_dir: str = None, timeout: int = 600):
"""Run Claude Code in a thread pool to avoid blocking the bot's event loop."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
executor,
lambda: run_claude(prompt, working_dir, timeout)
)
This pattern is essential. Messaging bot libraries like python-telegram-bot and slack-bolt run on async event loops. If you call subprocess.run directly, you block the entire bot — it cannot process any other messages while waiting for Claude Code to finish. Running the subprocess in a thread pool executor keeps the bot responsive.
Method 1: Telegram Bot — Complete Implementation
Telegram is the best starting point. Its bot API is free, unlimited, requires no server (it supports polling), and the mobile app is excellent. You can go from zero to a working remote control in fifteen minutes.
Step 1: Create a Telegram Bot
Open Telegram on your phone or desktop and search for @BotFather. This is Telegram’s official bot for creating and managing bots. Start a conversation and follow these steps:
- Send
/newbot - Enter a display name for your bot (e.g., “My Claude Code Bot”)
- Enter a username (must end in “bot”, e.g., “my_claude_code_bot”)
- BotFather will respond with your API token — save this securely
Next, set up the bot’s command menu so you get nice autocomplete in the chat:
# Send this to @BotFather:
/setcommands
# Then select your bot and paste:
run - Run a Claude Code prompt
deploy - Deploy to an environment
test - Run project tests
status - Check current task status
git - Run git commands (log, status, diff)
help - List available commands
Finally, you need your Telegram user ID for authentication. Send a message to @userinfobot and it will reply with your numeric user ID. Save this — it ensures only you can control the bot.
Step 2: Build the Bridge Server
Here is the complete, production-ready Telegram bridge server. This is not a toy example — it includes authentication, rate limiting, async execution, output truncation, and proper error handling:
#!/usr/bin/env python3
"""
Telegram Bridge for Claude Code
================================
Controls Claude Code sessions from Telegram messages.
Usage:
python telegram_bridge.py
Environment variables (in .env):
TELEGRAM_BOT_TOKEN - Bot token from @BotFather
TELEGRAM_ALLOWED_USERS - Comma-separated list of allowed user IDs
CLAUDE_WORK_DIR - Working directory for Claude Code
"""
import asyncio
import logging
import os
import subprocess
import time
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from functools import wraps
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
load_dotenv()
# --- Configuration ---
BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
ALLOWED_USERS = set(
int(uid.strip())
for uid in os.getenv("TELEGRAM_ALLOWED_USERS", "").split(",")
if uid.strip()
)
WORK_DIR = os.getenv("CLAUDE_WORK_DIR", os.path.expanduser("~/projects"))
MAX_MESSAGE_LENGTH = 4000 # Telegram limit is 4096, leave margin
RATE_LIMIT = 10 # Max commands per hour per user
COMMAND_TIMEOUT = 600 # 10 minutes max per command
# --- Logging ---
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
handlers=[
logging.StreamHandler(),
logging.FileHandler("telegram_bridge.log"),
],
)
logger = logging.getLogger(__name__)
# --- State ---
executor = ThreadPoolExecutor(max_workers=3)
rate_limits = defaultdict(list) # user_id -> list of timestamps
active_tasks = {} # user_id -> task description
# --- Helpers ---
def run_claude(prompt: str, working_dir: str = None, timeout: int = COMMAND_TIMEOUT) -> dict:
"""Run Claude Code non-interactively."""
work_dir = working_dir or WORK_DIR
try:
result = subprocess.run(
["claude", "-p", prompt],
capture_output=True,
text=True,
timeout=timeout,
cwd=work_dir,
env={**os.environ, "TERM": "dumb"},
)
return {
"success": result.returncode == 0,
"output": result.stdout.strip(),
"error": result.stderr.strip() if result.returncode != 0 else None,
}
except subprocess.TimeoutExpired:
return {"success": False, "output": None, "error": f"Timed out after {timeout}s"}
except FileNotFoundError:
return {"success": False, "output": None, "error": "Claude CLI not found in PATH"}
except Exception as e:
return {"success": False, "output": None, "error": str(e)}
def check_rate_limit(user_id: int) -> bool:
"""Return True if user is within rate limits."""
now = time.time()
hour_ago = now - 3600
rate_limits[user_id] = [t for t in rate_limits[user_id] if t > hour_ago]
if len(rate_limits[user_id]) >= RATE_LIMIT:
return False
rate_limits[user_id].append(now)
return True
def truncate_output(text: str, max_len: int = MAX_MESSAGE_LENGTH) -> str:
"""Truncate output to fit Telegram's message limit."""
if not text or len(text) <= max_len:
return text
return text[: max_len - 100] + f"\n\n... (truncated, {len(text)} chars total)"
def auth_required(func):
"""Decorator to restrict commands to allowed users."""
@wraps(func)
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
if ALLOWED_USERS and user_id not in ALLOWED_USERS:
logger.warning(f"Unauthorized access attempt by user {user_id}")
await update.message.reply_text("Unauthorized. Your user ID is not in the allow list.")
return
if not check_rate_limit(user_id):
await update.message.reply_text(
f"Rate limit exceeded. Max {RATE_LIMIT} commands per hour."
)
return
return await func(update, context)
return wrapper
# --- Command Handlers ---
@auth_required
async def cmd_run(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Run an arbitrary Claude Code prompt."""
if not context.args:
await update.message.reply_text("Usage: /run \nExample: /run list all Python files")
return
prompt = " ".join(context.args)
user_id = update.effective_user.id
logger.info(f"User {user_id} running: {prompt}")
status_msg = await update.message.reply_text("Working on it...")
active_tasks[user_id] = prompt
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, lambda: run_claude(prompt))
del active_tasks[user_id]
if result["success"]:
output = truncate_output(result["output"]) or "(no output)"
await status_msg.edit_text(f"Done:\n\n{output}")
else:
error = result["error"] or "Unknown error"
await status_msg.edit_text(f"Failed:\n\n{error}")
@auth_required
async def cmd_deploy(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Trigger a deployment."""
env = context.args[0] if context.args else "staging"
allowed_envs = ["staging", "production", "dev"]
if env not in allowed_envs:
await update.message.reply_text(f"Invalid environment. Choose from: {', '.join(allowed_envs)}")
return
if env == "production":
await update.message.reply_text(
"You requested a PRODUCTION deployment. Send /confirm-deploy to proceed."
)
context.user_data["pending_deploy"] = "production"
return
status_msg = await update.message.reply_text(f"Deploying to {env}...")
prompt = f"Run the deployment pipeline for the {env} environment. Show the deployment URL when done."
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, lambda: run_claude(prompt))
output = truncate_output(result["output"]) if result["success"] else result["error"]
emoji = "deployed" if result["success"] else "failed"
await status_msg.edit_text(f"Deployment {emoji}:\n\n{output}")
@auth_required
async def cmd_confirm_deploy(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Confirm a pending production deployment."""
pending = context.user_data.get("pending_deploy")
if pending != "production":
await update.message.reply_text("No pending deployment to confirm.")
return
del context.user_data["pending_deploy"]
status_msg = await update.message.reply_text("Deploying to PRODUCTION...")
prompt = "Run the deployment pipeline for the production environment. Show the deployment URL and run health checks."
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, lambda: run_claude(prompt))
output = truncate_output(result["output"]) if result["success"] else result["error"]
await status_msg.edit_text(f"Production deployment result:\n\n{output}")
@auth_required
async def cmd_test(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Run project tests."""
status_msg = await update.message.reply_text("Running tests...")
prompt = "Run the project's test suite and report results. Show passed, failed, and error counts."
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, lambda: run_claude(prompt))
output = truncate_output(result["output"]) if result["success"] else result["error"]
await status_msg.edit_text(f"Test results:\n\n{output}")
@auth_required
async def cmd_git(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Run git commands (read-only for safety)."""
if not context.args:
await update.message.reply_text("Usage: /git \nExamples: /git status, /git log --oneline -10")
return
git_cmd = " ".join(context.args)
safe_commands = ["status", "log", "diff", "branch", "show", "remote", "tag"]
first_word = git_cmd.split()[0] if git_cmd.split() else ""
if first_word not in safe_commands:
await update.message.reply_text(
f"Only read-only git commands are allowed: {', '.join(safe_commands)}"
)
return
prompt = f"Run this git command and show the output: git {git_cmd}"
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, lambda: run_claude(prompt))
output = truncate_output(result["output"]) if result["success"] else result["error"]
await update.message.reply_text(f"git {git_cmd}:\n\n{output}")
@auth_required
async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Show currently active tasks."""
if not active_tasks:
await update.message.reply_text("No active tasks.")
return
lines = [f"User {uid}: {task}" for uid, task in active_tasks.items()]
await update.message.reply_text("Active tasks:\n\n" + "\n".join(lines))
async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Show available commands."""
help_text = """Available commands:
/run - Run any Claude Code prompt
/deploy - Deploy (staging/production/dev)
/test - Run project tests
/git - Run read-only git commands
/status - Show active tasks
/help - Show this message
Examples:
/run fix the TypeError in src/auth.py
/deploy staging
/git log --oneline -5
/run write tests for src/utils.py"""
await update.message.reply_text(help_text)
# --- Main ---
def main():
if not BOT_TOKEN:
logger.error("TELEGRAM_BOT_TOKEN not set in .env")
return
if not ALLOWED_USERS:
logger.warning("TELEGRAM_ALLOWED_USERS not set — bot is open to everyone!")
app = Application.builder().token(BOT_TOKEN).build()
app.add_handler(CommandHandler("run", cmd_run))
app.add_handler(CommandHandler("deploy", cmd_deploy))
app.add_handler(CommandHandler("confirm_deploy", cmd_confirm_deploy))
app.add_handler(CommandHandler("test", cmd_test))
app.add_handler(CommandHandler("git", cmd_git))
app.add_handler(CommandHandler("status", cmd_status))
app.add_handler(CommandHandler("help", cmd_help))
app.add_handler(CommandHandler("start", cmd_help))
logger.info("Telegram bridge started. Polling for messages...")
app.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()
Step 3: Configuration
Create a .env file for the bridge server:
# .env for Telegram bridge
TELEGRAM_BOT_TOKEN=7123456789:AAH-your-token-here
TELEGRAM_ALLOWED_USERS=123456789,987654321
CLAUDE_WORK_DIR=/home/youruser/projects/myapp
And a requirements.txt:
python-telegram-bot>=21.0
python-dotenv>=1.0.0
Install and run:
pip install -r requirements.txt
python telegram_bridge.py
Step 4: Test It
Open Telegram on your phone and send a message to your bot:
/run list all Python files in the project and count them
You should see “Working on it…” followed by the actual output within a minute or so. If something goes wrong, check the telegram_bridge.log file for error details.
claude binary is in your PATH when running the bridge server. If you installed Claude Code via npm, you may need to set the full path in the run_claude function, e.g., /home/youruser/.npm-global/bin/claude.
Common Issues and Debugging
Bot does not respond: Check that your TELEGRAM_BOT_TOKEN is correct. Try sending /start — if you get no response at all, the token is wrong or the bot process is not running.
“Unauthorized” error: Your Telegram user ID is not in TELEGRAM_ALLOWED_USERS. Use @userinfobot to verify your ID.
Claude command times out: The default timeout is 10 minutes. For very long tasks, increase COMMAND_TIMEOUT. Also make sure Claude Code itself is authenticated (run claude in your terminal first to verify).
Garbled output: Make sure TERM=dumb is set in the subprocess environment. Without it, Claude Code may emit ANSI escape codes.
Method 2: Slack Bot — Complete Implementation
Slack is the natural choice for team environments. Its bot platform is more complex than Telegram’s, but it offers richer features: threads, file uploads, interactive buttons, and deep integration with other workplace tools.
Step 1: Create a Slack App
- Go to api.slack.com/apps
- Click Create New App → From scratch
- Name it (e.g., “Claude Code Bot”) and select your workspace
- Under OAuth & Permissions, add these Bot Token Scopes:
chat:write— send messagescommands— handle slash commandsfiles:write— upload files (for long output)app_mentions:read— respond to @mentions
- Under Socket Mode, enable it and create an app-level token (needed for local development without a public URL)
- Under Slash Commands, create a command called
/claude - Install the app to your workspace
- Copy the Bot User OAuth Token (starts with
xoxb-) and the App-Level Token (starts withxapp-)
Step 2: Build the Slack Bridge
#!/usr/bin/env python3
"""
Slack Bridge for Claude Code
==============================
Controls Claude Code sessions via Slack slash commands and mentions.
Usage:
python slack_bridge.py
Environment variables (in .env):
SLACK_BOT_TOKEN - Bot User OAuth Token (xoxb-...)
SLACK_APP_TOKEN - App-Level Token for Socket Mode (xapp-...)
SLACK_ALLOWED_CHANNELS - Comma-separated channel IDs (optional)
CLAUDE_WORK_DIR - Working directory for Claude Code
"""
import asyncio
import logging
import os
import subprocess
import tempfile
import time
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
load_dotenv()
# --- Configuration ---
BOT_TOKEN = os.getenv("SLACK_BOT_TOKEN")
APP_TOKEN = os.getenv("SLACK_APP_TOKEN")
ALLOWED_CHANNELS = set(
ch.strip()
for ch in os.getenv("SLACK_ALLOWED_CHANNELS", "").split(",")
if ch.strip()
)
WORK_DIR = os.getenv("CLAUDE_WORK_DIR", os.path.expanduser("~/projects"))
RATE_LIMIT = 10
COMMAND_TIMEOUT = 600
MAX_SLACK_LENGTH = 3900 # Leave margin under Slack's 4000-char block limit
# --- Logging ---
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
handlers=[
logging.StreamHandler(),
logging.FileHandler("slack_bridge.log"),
],
)
logger = logging.getLogger(__name__)
# --- State ---
executor = ThreadPoolExecutor(max_workers=3)
rate_limits = defaultdict(list)
app = App(token=BOT_TOKEN)
def run_claude(prompt: str, working_dir: str = None, timeout: int = COMMAND_TIMEOUT) -> dict:
"""Run Claude Code non-interactively."""
work_dir = working_dir or WORK_DIR
try:
result = subprocess.run(
["claude", "-p", prompt],
capture_output=True,
text=True,
timeout=timeout,
cwd=work_dir,
env={**os.environ, "TERM": "dumb"},
)
return {
"success": result.returncode == 0,
"output": result.stdout.strip(),
"error": result.stderr.strip() if result.returncode != 0 else None,
}
except subprocess.TimeoutExpired:
return {"success": False, "output": None, "error": f"Timed out after {timeout}s"}
except Exception as e:
return {"success": False, "output": None, "error": str(e)}
def check_rate_limit(user_id: str) -> bool:
now = time.time()
hour_ago = now - 3600
rate_limits[user_id] = [t for t in rate_limits[user_id] if t > hour_ago]
if len(rate_limits[user_id]) >= RATE_LIMIT:
return False
rate_limits[user_id].append(now)
return True
def upload_as_file(client, channel: str, thread_ts: str, content: str, filename: str):
"""Upload long output as a file snippet."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
f.write(content)
f.flush()
client.files_upload_v2(
channel=channel,
thread_ts=thread_ts,
file=f.name,
filename=filename,
title="Claude Code Output",
)
os.unlink(f.name)
@app.command("/claude")
def handle_claude_command(ack, say, command, client):
"""Handle /claude slash commands."""
ack() # Acknowledge within 3 seconds
user_id = command["user_id"]
channel_id = command["channel_id"]
text = command.get("text", "").strip()
# Channel restriction
if ALLOWED_CHANNELS and channel_id not in ALLOWED_CHANNELS:
say(f"This command is not allowed in this channel.", ephemeral=True)
return
# Rate limiting
if not check_rate_limit(user_id):
say(f"Rate limit exceeded. Max {RATE_LIMIT} commands per hour.")
return
if not text:
say(
"Usage: `/claude `\n"
"Actions: `run`, `deploy`, `test`, `git`, `status`\n"
"Example: `/claude run list all Python files`"
)
return
parts = text.split(maxsplit=1)
action = parts[0].lower()
args = parts[1] if len(parts) > 1 else ""
logger.info(f"User {user_id} in {channel_id}: /claude {action} {args}")
# Send initial "working" message in a thread
response = client.chat_postMessage(
channel=channel_id,
text=f"Working on: `{action} {args}`...",
)
thread_ts = response["ts"]
# Add reaction to show we're working
client.reactions_add(channel=channel_id, timestamp=thread_ts, name="hourglass_flowing_sand")
# Route command
if action == "run":
prompt = args or "Show project status"
elif action == "deploy":
env = args or "staging"
prompt = f"Run the deployment pipeline for the {env} environment."
elif action == "test":
prompt = "Run the project test suite and report results."
elif action == "git":
safe = ["status", "log", "diff", "branch", "show"]
first = args.split()[0] if args else ""
if first not in safe:
client.chat_postMessage(
channel=channel_id, thread_ts=thread_ts,
text=f"Only these git commands are allowed: {', '.join(safe)}",
)
return
prompt = f"Run this git command and show the output: git {args}"
else:
prompt = text # Treat the whole thing as a prompt
# Execute in thread pool
import concurrent.futures
future = executor.submit(run_claude, prompt)
try:
result = future.result(timeout=COMMAND_TIMEOUT + 30)
except concurrent.futures.TimeoutError:
result = {"success": False, "output": None, "error": "Execution timed out"}
# Remove working reaction, add result reaction
try:
client.reactions_remove(channel=channel_id, timestamp=thread_ts, name="hourglass_flowing_sand")
except Exception:
pass
if result["success"]:
client.reactions_add(channel=channel_id, timestamp=thread_ts, name="white_check_mark")
output = result["output"] or "(no output)"
if len(output) > MAX_SLACK_LENGTH:
# Upload as file for long output
client.chat_postMessage(
channel=channel_id, thread_ts=thread_ts,
text="Output is too long for a message. Uploading as file...",
)
upload_as_file(client, channel_id, thread_ts, output, "claude_output.txt")
else:
client.chat_postMessage(
channel=channel_id, thread_ts=thread_ts,
text=f"```\n{output}\n```",
)
else:
client.reactions_add(channel=channel_id, timestamp=thread_ts, name="x")
error = result["error"] or "Unknown error"
client.chat_postMessage(
channel=channel_id, thread_ts=thread_ts,
text=f"Failed:\n```\n{error}\n```",
)
@app.event("app_mention")
def handle_mention(event, say, client):
"""Handle @bot mentions in channels."""
text = event.get("text", "")
# Strip the bot mention to get just the prompt
# Mentions look like <@U12345> prompt here
import re
prompt = re.sub(r"<@\w+>\s*", "", text).strip()
if not prompt:
say("Mention me with a prompt! Example: `@Claude Code Bot list Python files`", thread_ts=event["ts"])
return
say(f"Working on it...", thread_ts=event["ts"])
import concurrent.futures
future = executor.submit(run_claude, prompt)
try:
result = future.result(timeout=COMMAND_TIMEOUT + 30)
except concurrent.futures.TimeoutError:
result = {"success": False, "output": None, "error": "Timed out"}
output = result["output"] if result["success"] else result["error"]
say(f"```\n{output}\n```", thread_ts=event["ts"])
if __name__ == "__main__":
if not BOT_TOKEN or not APP_TOKEN:
logger.error("SLACK_BOT_TOKEN and SLACK_APP_TOKEN must be set in .env")
exit(1)
logger.info("Slack bridge starting in Socket Mode...")
handler = SocketModeHandler(app, APP_TOKEN)
handler.start()
The corresponding .env file:
# .env for Slack bridge
SLACK_BOT_TOKEN=xoxb-your-bot-token
SLACK_APP_TOKEN=xapp-your-app-level-token
SLACK_ALLOWED_CHANNELS=C01ABCDEF,C02GHIJKL
CLAUDE_WORK_DIR=/home/youruser/projects/myapp
And requirements.txt:
slack-bolt>=1.18.0
python-dotenv>=1.0.0
Step 3: Advanced Slack Features
Slack’s Block Kit enables interactive messages with buttons. Here is how to add a confirmation dialog for deployments:
# Add this handler for interactive buttons
@app.action("approve_deploy")
def handle_approve(ack, body, client):
ack()
user = body["user"]["id"]
channel = body["channel"]["id"]
thread_ts = body["message"]["ts"]
client.chat_postMessage(
channel=channel, thread_ts=thread_ts,
text=f"<@{user}> approved the deployment. Deploying now...",
)
result = run_claude("Deploy to production and run health checks.")
output = result["output"] if result["success"] else result["error"]
client.chat_postMessage(
channel=channel, thread_ts=thread_ts,
text=f"Deployment result:\n```\n{output}\n```",
)
@app.action("reject_deploy")
def handle_reject(ack, body, client):
ack()
user = body["user"]["id"]
channel = body["channel"]["id"]
thread_ts = body["message"]["ts"]
client.chat_postMessage(
channel=channel, thread_ts=thread_ts,
text=f"<@{user}> cancelled the deployment.",
)
Thread-based responses keep your channel clean. Every command response is posted as a thread reply to the initial “Working on it…” message, so your #engineering channel does not get flooded with Claude Code output.
Method 3: Discord Bot
Discord works particularly well for open-source communities and hobby projects. The setup is slightly different from Telegram and Slack, but follows the same bridge pattern.
Create a Discord Bot
- Go to discord.com/developers/applications
- Click New Application, name it, and create it
- Go to Bot → click Add Bot
- Copy the Bot Token
- Under Privileged Gateway Intents, enable Message Content Intent
- Go to OAuth2 → URL Generator, select scopes
botandapplications.commands, and permissionsSend Messages,Read Message History,Attach Files - Use the generated URL to invite the bot to your server
Discord Bridge Server
#!/usr/bin/env python3
"""
Discord Bridge for Claude Code
================================
Controls Claude Code sessions via Discord slash commands.
"""
import asyncio
import logging
import os
import subprocess
from concurrent.futures import ThreadPoolExecutor
import discord
from discord import app_commands
from dotenv import load_dotenv
load_dotenv()
BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN")
ALLOWED_ROLES = os.getenv("DISCORD_ALLOWED_ROLES", "").split(",") # Role names
WORK_DIR = os.getenv("CLAUDE_WORK_DIR", os.path.expanduser("~/projects"))
COMMAND_TIMEOUT = 600
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
executor = ThreadPoolExecutor(max_workers=3)
def run_claude(prompt: str, timeout: int = COMMAND_TIMEOUT) -> dict:
try:
result = subprocess.run(
["claude", "-p", prompt],
capture_output=True, text=True, timeout=timeout,
cwd=WORK_DIR, env={**os.environ, "TERM": "dumb"},
)
return {
"success": result.returncode == 0,
"output": result.stdout.strip(),
"error": result.stderr.strip() if result.returncode != 0 else None,
}
except subprocess.TimeoutExpired:
return {"success": False, "output": None, "error": f"Timed out after {timeout}s"}
except Exception as e:
return {"success": False, "output": None, "error": str(e)}
class ClaudeBot(discord.Client):
def __init__(self):
intents = discord.Intents.default()
intents.message_content = True
super().__init__(intents=intents)
self.tree = app_commands.CommandTree(self)
async def setup_hook(self):
await self.tree.sync()
logger.info("Slash commands synced.")
bot = ClaudeBot()
def has_permission(interaction: discord.Interaction) -> bool:
if not ALLOWED_ROLES or ALLOWED_ROLES == [""]:
return True
user_roles = [r.name for r in interaction.user.roles] if hasattr(interaction.user, "roles") else []
return any(role in ALLOWED_ROLES for role in user_roles)
@bot.tree.command(name="claude", description="Run a Claude Code prompt")
@app_commands.describe(prompt="The prompt to send to Claude Code")
async def claude_command(interaction: discord.Interaction, prompt: str):
if not has_permission(interaction):
await interaction.response.send_message("You do not have permission.", ephemeral=True)
return
await interaction.response.send_message(f"Working on: `{prompt}`...")
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, lambda: run_claude(prompt))
if result["success"]:
output = result["output"] or "(no output)"
# Discord has a 2000 char limit
if len(output) > 1900:
# Send as file attachment
with open("/tmp/claude_output.txt", "w") as f:
f.write(output)
await interaction.followup.send(
"Output (see attached file):",
file=discord.File("/tmp/claude_output.txt"),
)
else:
await interaction.followup.send(f"```\n{output}\n```")
else:
await interaction.followup.send(f"Failed: {result['error']}")
@bot.tree.command(name="deploy", description="Deploy to an environment")
@app_commands.describe(environment="Target environment (staging/production)")
async def deploy_command(interaction: discord.Interaction, environment: str = "staging"):
if not has_permission(interaction):
await interaction.response.send_message("You do not have permission.", ephemeral=True)
return
await interaction.response.send_message(f"Deploying to {environment}...")
prompt = f"Run the deployment pipeline for {environment}. Show the URL when done."
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, lambda: run_claude(prompt))
output = result["output"] if result["success"] else result["error"]
await interaction.followup.send(f"Deploy result:\n```\n{output[:1900]}\n```")
@bot.tree.command(name="test", description="Run project tests")
async def test_command(interaction: discord.Interaction):
if not has_permission(interaction):
await interaction.response.send_message("You do not have permission.", ephemeral=True)
return
await interaction.response.send_message("Running tests...")
prompt = "Run the test suite and report results."
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, lambda: run_claude(prompt))
output = result["output"] if result["success"] else result["error"]
if len(output) > 1900:
with open("/tmp/test_output.txt", "w") as f:
f.write(output)
await interaction.followup.send("Test results:", file=discord.File("/tmp/test_output.txt"))
else:
await interaction.followup.send(f"```\n{output}\n```")
if __name__ == "__main__":
if not BOT_TOKEN:
logger.error("DISCORD_BOT_TOKEN not set")
exit(1)
bot.run(BOT_TOKEN)
Discord’s 2,000-character message limit is the most restrictive of all platforms. The bot handles this by automatically uploading long output as a file attachment — a pattern you will want for any platform with tight limits.
Method 4: Generic Webhook Approach
What if you use Microsoft Teams, WhatsApp, LINE, or some other platform? Instead of writing a platform-specific bot, you can build a generic webhook server that any platform can call. This is the most flexible approach.
FastAPI Webhook Server
#!/usr/bin/env python3
"""
Generic Webhook Bridge for Claude Code
========================================
A simple HTTP server that accepts webhook requests and runs Claude Code.
Works with any messaging platform that supports outgoing webhooks.
Usage:
uvicorn webhook_bridge:app --host 0.0.0.0 --port 8080
"""
import asyncio
import hashlib
import hmac
import logging
import os
import subprocess
import time
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Header, Request
from pydantic import BaseModel
load_dotenv()
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET", "change-me-to-a-random-string")
WORK_DIR = os.getenv("CLAUDE_WORK_DIR", os.path.expanduser("~/projects"))
COMMAND_TIMEOUT = 600
RATE_LIMIT = 10
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
executor = ThreadPoolExecutor(max_workers=3)
rate_limits = defaultdict(list)
app = FastAPI(title="Claude Code Webhook Bridge")
class CommandRequest(BaseModel):
command: str
working_dir: str | None = None
timeout: int | None = None
user_id: str | None = None
class CommandResponse(BaseModel):
success: bool
output: str | None
error: str | None
duration_seconds: float
def verify_signature(payload: bytes, signature: str) -> bool:
"""Verify HMAC-SHA256 webhook signature."""
expected = hmac.new(
WEBHOOK_SECRET.encode(), payload, hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
def run_claude(prompt: str, working_dir: str = None, timeout: int = COMMAND_TIMEOUT) -> dict:
work_dir = working_dir or WORK_DIR
try:
result = subprocess.run(
["claude", "-p", prompt],
capture_output=True, text=True, timeout=timeout,
cwd=work_dir, env={**os.environ, "TERM": "dumb"},
)
return {
"success": result.returncode == 0,
"output": result.stdout.strip(),
"error": result.stderr.strip() if result.returncode != 0 else None,
}
except subprocess.TimeoutExpired:
return {"success": False, "output": None, "error": f"Timed out after {timeout}s"}
except Exception as e:
return {"success": False, "output": None, "error": str(e)}
@app.post("/webhook/claude", response_model=CommandResponse)
async def handle_webhook(
cmd: CommandRequest,
request: Request,
x_webhook_signature: str = Header(None),
):
"""Execute a Claude Code command via webhook."""
# Verify signature
if x_webhook_signature:
body = await request.body()
if not verify_signature(body, x_webhook_signature):
raise HTTPException(status_code=401, detail="Invalid signature")
# Rate limiting
user_key = cmd.user_id or request.client.host
if not check_rate_limit(user_key):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
logger.info(f"Webhook from {user_key}: {cmd.command[:100]}")
start_time = time.time()
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor,
lambda: run_claude(
cmd.command,
cmd.working_dir,
cmd.timeout or COMMAND_TIMEOUT,
),
)
duration = time.time() - start_time
return CommandResponse(
success=result["success"],
output=result["output"],
error=result["error"],
duration_seconds=round(duration, 2),
)
def check_rate_limit(user_key: str) -> bool:
now = time.time()
hour_ago = now - 3600
rate_limits[user_key] = [t for t in rate_limits[user_key] if t > hour_ago]
if len(rate_limits[user_key]) >= RATE_LIMIT:
return False
rate_limits[user_key].append(now)
return True
@app.get("/health")
async def health():
return {"status": "ok", "timestamp": time.time()}
To call this webhook from any platform, you simply send a POST request:
curl -X POST http://your-server:8080/webhook/claude \
-H "Content-Type: application/json" \
-H "X-Webhook-Signature: sha256=..." \
-d '{"command": "list all Python files", "user_id": "user123"}'
This approach works with Microsoft Teams (outgoing webhooks), WhatsApp (via Twilio webhooks), LINE (via messaging API webhooks), and essentially any platform that can send HTTP POST requests. You configure the platform to send messages to your webhook URL, and the bridge handles the rest.
Security Best Practices
You are about to give a chat message the power to execute code on your machine. This is powerful and also dangerous if done carelessly. Security is not optional here — it is the most important part of the entire setup.
The Security Checklist
| Layer | What to Do | Why |
|---|---|---|
| Authentication | User ID / role allowlist | Only authorized users can run commands |
| Command allowlisting | Restrict to known safe actions | Prevent arbitrary shell execution |
| Rate limiting | Max N commands per hour | Prevent abuse and runaway costs |
| Directory sandboxing | Lock Claude Code to specific directories | Prevent access to sensitive files |
| Secrets management | Never pass secrets through chat | Chat history is not a secure channel |
| Audit logging | Log every command with user and timestamp | Traceability and incident response |
| Two-factor for danger | Require confirmation for deploy/delete | Prevent accidental destructive actions |
| Network security | HTTPS, firewall rules, VPN | Protect data in transit |
Implementing a Command Allowlist
Instead of letting users run arbitrary prompts, define a set of allowed command patterns:
import re
ALLOWED_PATTERNS = [
r"^list\s", # List files, functions, etc.
r"^explain\s", # Explain code
r"^run tests", # Run test suite
r"^deploy\s", # Deploy
r"^fix\s", # Fix bugs
r"^review\s", # Code review
r"^git\s(status|log|diff|branch)", # Read-only git
r"^show\s", # Show file contents
r"^analyze\s", # Analyze code
r"^write tests", # Write tests
]
BLOCKED_PATTERNS = [
r"rm\s+-rf", # Never allow recursive delete
r"curl.*\|.*sh", # No pipe-to-shell
r"eval\(", # No eval
r"exec\(", # No exec
r"__import__", # No dynamic imports
r"(password|secret|token|key)\s*=", # No credential setting
]
def is_command_allowed(prompt: str) -> tuple[bool, str]:
"""Check if a command is allowed. Returns (allowed, reason)."""
prompt_lower = prompt.lower().strip()
# Check blocklist first
for pattern in BLOCKED_PATTERNS:
if re.search(pattern, prompt_lower):
return False, f"Blocked pattern detected: {pattern}"
# Check allowlist (if strict mode)
# For permissive mode, you can skip this check
for pattern in ALLOWED_PATTERNS:
if re.search(pattern, prompt_lower):
return True, "Matched allowed pattern"
return False, "Command does not match any allowed pattern"
.claude/settings.json) and consider restricting its tool access with --allowedTools when running from a bot.
Audit Logging
Every command that comes through the bot should be logged with full context. This is crucial for debugging, accountability, and security incident response:
import json
from datetime import datetime, timezone
def log_command(user_id: str, platform: str, command: str, result: dict):
"""Log a command execution to an audit file."""
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"user_id": user_id,
"platform": platform,
"command": command,
"success": result["success"],
"output_length": len(result["output"]) if result["output"] else 0,
"error": result["error"],
}
with open("audit_log.jsonl", "a") as f:
f.write(json.dumps(entry) + "\n")
Production Deployment
Running the bridge with python telegram_bridge.py in a terminal works for testing. For production, you need it to start automatically, restart on failure, and run in the background.
Systemd Service File
Create /etc/systemd/system/claude-telegram-bridge.service:
[Unit]
Description=Claude Code Telegram Bridge
After=network.target
[Service]
Type=simple
User=youruser
WorkingDirectory=/home/youruser/claude-bridge
ExecStart=/home/youruser/claude-bridge/venv/bin/python telegram_bridge.py
Restart=always
RestartSec=10
StandardOutput=append:/var/log/claude-bridge.log
StandardError=append:/var/log/claude-bridge-error.log
Environment=PATH=/home/youruser/.local/bin:/usr/bin:/bin
Environment=HOME=/home/youruser
# Security hardening
NoNewPrivileges=true
ProtectSystem=strict
ReadWritePaths=/home/youruser/claude-bridge /home/youruser/projects
PrivateTmp=true
[Install]
WantedBy=multi-user.target
Enable and start it:
sudo systemctl daemon-reload
sudo systemctl enable claude-telegram-bridge
sudo systemctl start claude-telegram-bridge
# Check status
sudo systemctl status claude-telegram-bridge
# View logs
sudo journalctl -u claude-telegram-bridge -f
Docker Deployment
For containerized deployments, here is a Dockerfile:
FROM python:3.12-slim
WORKDIR /app
# Install Claude Code CLI (Node.js required)
RUN apt-get update && apt-get install -y curl && \
curl -fsSL https://deb.nodesource.com/setup_20.x | bash - && \
apt-get install -y nodejs && \
npm install -g @anthropic-ai/claude-code && \
apt-get clean && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY telegram_bridge.py .
COPY .env .
CMD ["python", "telegram_bridge.py"]
And a docker-compose.yml:
version: "3.8"
services:
claude-bridge:
build: .
restart: always
env_file: .env
volumes:
- /home/youruser/projects:/projects:rw
- claude-config:/root/.claude
environment:
- CLAUDE_WORK_DIR=/projects
logging:
driver: json-file
options:
max-size: "10m"
max-file: "3"
volumes:
claude-config:
SSH Tunnel Approach
If you want the bridge server on a VPS (for reliability and a public IP) but Claude Code on your local machine, you can use an SSH tunnel. The bridge SSHes into your dev machine to run Claude Code:
def run_claude_via_ssh(prompt: str, ssh_host: str = "dev-machine") -> dict:
"""Run Claude Code on a remote machine via SSH."""
# Escape the prompt for shell safety
import shlex
safe_prompt = shlex.quote(prompt)
try:
result = subprocess.run(
["ssh", ssh_host, f"cd ~/projects && claude -p {safe_prompt}"],
capture_output=True, text=True, timeout=COMMAND_TIMEOUT,
)
return {
"success": result.returncode == 0,
"output": result.stdout.strip(),
"error": result.stderr.strip() if result.returncode != 0 else None,
}
except Exception as e:
return {"success": False, "output": None, "error": str(e)}
This pattern gives you the best of both worlds: the bridge server is always-on (VPS), and Claude Code runs on your powerful dev machine with access to all your projects. Set up SSH key authentication so no password is needed, and use autossh to keep the connection alive.
Practical Workflow Examples
Theory is great, but let us look at real-world scenarios where remote-controlling Claude Code shines.
Morning Standup from Your Phone
It is 8:55 AM. You are walking to the office with a coffee. You pull out your phone and send:
/run Summarize: last 3 git commits, current branch status, any failing tests, and open PRs
By the time you sit down at your desk, Claude Code has replied with a clean summary of the project state. You walk into standup knowing exactly where things stand.
Deploy from Anywhere
Your PM pings you: “Can we push the latest to staging for the client demo in an hour?” You are at lunch. No problem:
/deploy staging
The bot responds with the build log, deployment URL, and health check results. You forward the staging URL to your PM and go back to your meal.
Quick Bug Fix
An error alert fires at 10 PM. You are watching a movie. Instead of getting up:
/run The error log shows a TypeError in src/auth.py line 42. Fix it, write a test for the fix, and show me the diff.
Claude Code analyzes the error, fixes the bug, writes a regression test, runs the test suite, and sends you back the diff and test results. You review the diff on your phone screen, and if it looks good:
/run Commit the changes with message "fix: handle None auth token in validate_session" and push to a new branch fix/auth-none-check, then create a PR
Code Review on the Go
A team member submitted a PR while you are commuting:
/run Review PR #123 on GitHub. Summarize changes, identify potential issues, check test coverage, and give your recommendation.
You get back a structured review with file-by-file analysis, flagged concerns, and an overall recommendation. All from the train.
Monitoring and Notifications
So far we have talked about reactive usage — you send a command, you get a response. But you can also set up proactive monitoring, where the system sends you alerts and you respond with actions.
Scheduled Monitoring Script
#!/usr/bin/env python3
"""
Scheduled monitoring that sends alerts via Telegram.
Run via cron: */30 * * * * /path/to/monitor.py
"""
import os
import subprocess
import requests
from dotenv import load_dotenv
load_dotenv()
BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
CHAT_ID = os.getenv("TELEGRAM_ALERT_CHAT_ID")
WORK_DIR = os.getenv("CLAUDE_WORK_DIR")
def send_telegram(message: str):
url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage"
requests.post(url, json={"chat_id": CHAT_ID, "text": message})
def check_tests():
"""Run tests and alert on failure."""
result = subprocess.run(
["claude", "-p", "Run the test suite. Report ONLY if there are failures. If all pass, say PASS."],
capture_output=True, text=True, timeout=300, cwd=WORK_DIR,
env={**os.environ, "TERM": "dumb"},
)
output = result.stdout.strip()
if "PASS" not in output.upper() or result.returncode != 0:
send_telegram(f"Test failure detected:\n\n{output[:3000]}")
def check_server_health():
"""Check if the production server is healthy."""
try:
r = requests.get("https://your-app.com/health", timeout=10)
if r.status_code != 200:
send_telegram(f"Server health check failed: HTTP {r.status_code}")
except Exception as e:
send_telegram(f"Server unreachable: {e}")
if __name__ == "__main__":
check_tests()
check_server_health()
Add this to your crontab to run every 30 minutes. When something fails, you get a Telegram notification and can immediately respond with a command to fix it — all from your phone.
CI/CD Integration
Add a webhook call to your CI/CD pipeline (GitHub Actions, GitLab CI, etc.) so that when a build fails, it notifies your bot:
# In your GitHub Actions workflow (.github/workflows/ci.yml)
- name: Notify on failure
if: failure()
run: |
curl -s -X POST "https://api.telegram.org/bot${{ secrets.TELEGRAM_BOT_TOKEN }}/sendMessage" \
-d chat_id=${{ secrets.TELEGRAM_CHAT_ID }} \
-d text="CI failed on ${{ github.ref }} by ${{ github.actor }}. Reply /run investigate the CI failure and suggest fixes."
This creates a natural loop: CI fails → you get notified → you send a fix command from your phone → CI passes. All without opening a laptop.
Limitations and Workarounds
This setup is powerful, but it has real limitations. Being aware of them will save you frustration.
| Limitation | Impact | Workaround |
|---|---|---|
| Message length limits | Telegram: 4,096 chars; Discord: 2,000 chars | Auto-upload as file attachment when exceeded |
| No real-time streaming | You wait for the full result; no progressive output | Send periodic “still working” updates; split into smaller tasks |
| Claude Code token limits | Very large tasks may exceed context window | Break into subtasks; use --max-turns flag |
| Network latency | SSH-based setups add latency | Async execution with callback; keep bridge on same machine |
| No interactive prompts | Cannot handle Claude Code’s confirmation dialogs | Use --allowedTools to pre-authorize or auto-accept permissions |
| Single concurrent task | Thread pool limits parallel execution | Queue commands and process sequentially; increase pool size carefully |
| Machine must be on | If your dev machine sleeps, the bridge goes down | Run on always-on VPS; use Wake-on-LAN for local machine |
Handling Long Output Gracefully
This is the most common issue you will encounter. Claude Code can generate very long output — test results, code reviews, diffs. Here is a robust pattern that works across all platforms:
def format_output(output: str, max_length: int, platform: str) -> dict:
"""
Format output for a messaging platform.
Returns {text: str, file: str|None} where file is a path to upload if needed.
"""
if not output:
return {"text": "(no output)", "file": None}
if len(output) <= max_length:
return {"text": output, "file": None}
# Create a summary + file for long output
import tempfile
tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False)
tmp.write(output)
tmp.close()
summary = output[:max_length - 200]
summary += f"\n\n... Output truncated ({len(output)} chars). Full output attached as file."
return {"text": summary, "file": tmp.name}
Adding Progress Updates
For long-running tasks, silence is anxiety-inducing. Here is how to send periodic "still working" updates:
async def run_with_progress(prompt, send_update, interval=30):
"""Run Claude Code with periodic progress updates."""
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=1)
loop = asyncio.get_event_loop()
future = loop.run_in_executor(executor, lambda: run_claude(prompt))
elapsed = 0
while not future.done():
await asyncio.sleep(interval)
elapsed += interval
await send_update(f"Still working... ({elapsed}s elapsed)")
return await future
Conclusion
What started as a simple idea — controlling Claude Code from my phone — has fundamentally changed how I work. The ability to trigger deployments, fix bugs, run tests, and review code from anywhere, at any time, removes the last friction point between having an idea and acting on it.
The technical implementation is surprisingly straightforward. At its core, it is just a messaging bot that calls claude -p in a subprocess. The complexity is in the details — security, reliability, output handling — and we have covered all of those thoroughly.
Here is what I recommend as your path forward:
- Start with Telegram. It takes 15 minutes, costs nothing, and requires no infrastructure. Copy the Telegram bridge script from this guide and run it.
- Add security. Set up user authentication, rate limiting, and command allowlisting before you share access with anyone.
- Graduate to Slack if you want team access, or stay with Telegram for personal use.
- Deploy properly with systemd or Docker once you rely on it daily.
- Add monitoring for proactive alerts and scheduled reports.
The bridge pattern described here is platform-agnostic. Once you understand it, you can adapt it to WhatsApp, LINE, Microsoft Teams, or any messaging platform that supports bots or webhooks. The core remains the same: receive a message, run claude -p, send back the result.
The future of development is not about being tethered to a desk. It is about having your tools available wherever you are. Claude Code already does the hard work of understanding and modifying code. The messaging bridge just makes it accessible from the device you carry everywhere — your phone.
References
- Claude Code Documentation — Anthropic
- Telegram Bot API Documentation
- Slack API — Building Apps
- Discord Developer Documentation
- python-telegram-bot Library Documentation
- Slack Bolt for Python
- discord.py Library Documentation
- FastAPI Documentation
- systemd Service Unit Configuration
- ngrok Documentation — Tunneling for Webhooks
Leave a Reply