ArgoFusion SSH is a modern, web-based SSH management platform that revolutionizes how teams manage servers, execute commands, and automate tasks. This comprehensive quick start guide will walk you through every feature with practical examples and code insights.
1. Getting Started: Registration and Login
ArgoFusion offers multiple authentication methods to suit different organizational needs:
Email Registration
# Registration process with email verification
POST /api/register
{
"email": "admin@company.com",
"password": "SecurePassword123",
"code": "123456" // Email verification code
}
OAuth Integration
ArgoFusion supports seamless OAuth login with popular platforms:
- Linux.do OAuth - Perfect for Linux community members
- Google OAuth - Enterprise-friendly single sign-on
- Custom OAuth - Integrate with your organization's identity provider
# OAuth implementation example (from app.py)
@app.route('/oauth2/callback')
async def oauth2_callback():
"""Handle OAuth2 callback with automatic user provisioning"""
try:
# Exchange code for access token
token_response = await exchange_oauth_code(request.args.get('code'))
# Get user information
user_info = await get_oauth_user_info(token_response['access_token'])
# Check if user exists, create if not
user_id = await find_or_create_user(user_info)
# Set session with extended expiry for OAuth users
session['user_id'] = user_id
session['email'] = user_info.get('email')
session.permanent = True
session['expires'] = (datetime.now(timezone.utc) + timedelta(days=30)).isoformat()
# Welcome bonus for new users
if is_new_user:
await config.give_welcome_membership(user_id)
session['show_welcome_gift'] = True
return redirect('/')
except Exception as e:
logger.error(f"OAuth callback error: {str(e)}")
return redirect('/login?error=oauth_failed')
2. Dashboard Overview: Your Command Center
Upon login, you'll see ArgoFusion's intuitive dashboard that provides:
Main Navigation Features
- Host Management - Add, organize, and manage your servers
- Task Scheduler - Create and monitor automated tasks
- Quick Connect - Instant SSH access without configuration
- File Manager - SFTP file operations through web interface
- Group Operations - Batch commands across server groups
# Dashboard initialization (from app.py)
@app.route('/')
@login_required
async def home():
"""Main dashboard with personalized user experience"""
user_id = session.get('user_id')
# Set user context for personalized experience
config.set_current_user(user_id)
# Get user's language preference
lang = await config.get_user_config(user_id, 'LANGUAGE')
if not lang:
lang = session.get('preferred_language', config.LANGUAGE)
# Check for welcome gift notification
show_welcome_gift = session.pop('show_welcome_gift', False)
gift_expiry = None
if show_welcome_gift:
user_level = await config.get_user_level(user_id)
if user_level and user_level.get('expires_at'):
gift_expiry = user_level.get('expires_at').strftime('%Y-%m-%d')
return await render_template('index.html',
lang=lang,
translations=get_translation(language=lang),
authenticated=True,
email=session.get('email'),
username=session.get('username'),
show_welcome_gift=show_welcome_gift,
gift_expiry=gift_expiry
)
3. Host Management: Organizing Your Infrastructure
ArgoFusion's host management system allows you to organize servers efficiently:
Adding Your First Server
# Server configuration example
{
"customhostname": "web-server-01",
"hostname": "192.168.1.100",
"port": 22,
"username": "admin",
"authentication_method": "key", // or "password"
"private_key": "-----BEGIN OPENSSH PRIVATE KEY-----...",
"group": "web-servers",
"description": "Primary web server",
"tags": ["production", "web", "nginx"]
}
Host Groups for Scalable Management
Organize servers into logical groups for batch operations:
# Host group management implementation
class HostGroupManager:
"""Manage server groups with advanced filtering and operations"""
def __init__(self, user_id):
self.user_id = user_id
self.hosts = self.load_user_hosts()
def create_group(self, group_name, host_filters=None):
"""Create dynamic host group with filtering"""
if host_filters:
# Filter hosts by tags, environment, or custom criteria
filtered_hosts = self.filter_hosts(host_filters)
else:
filtered_hosts = []
group_config = {
'name': group_name,
'hosts': filtered_hosts,
'filters': host_filters,
'created_at': datetime.now().isoformat(),
'user_id': self.user_id
}
return self.save_group_config(group_config)
def filter_hosts(self, filters):
"""Advanced host filtering"""
result = self.hosts
if 'tags' in filters:
result = [h for h in result if any(tag in h.get('tags', []) for tag in filters['tags'])]
if 'environment' in filters:
result = [h for h in result if h.get('environment') == filters['environment']]
if 'hostname_pattern' in filters:
import re
pattern = re.compile(filters['hostname_pattern'])
result = [h for h in result if pattern.match(h.get('hostname', ''))]
return result
4. SSH Key Management: Security Made Simple
ArgoFusion provides enterprise-grade SSH key management:
Key Upload and Validation
# Key management implementation (from upload_keys.py)
class SSHKeyManager:
"""Secure SSH key handling with validation and encryption"""
async def upload_key(self, key_data, key_type='private'):
"""Upload and validate SSH keys"""
try:
# Validate key format
if key_type == 'private':
# Check if it's a valid private key
if not self.is_valid_private_key(key_data):
raise ValueError("Invalid private key format")
# Extract public key from private key
public_key = self.extract_public_key(key_data)
elif key_type == 'public':
if not self.is_valid_public_key(key_data):
raise ValueError("Invalid public key format")
public_key = key_data
# Encrypt private key for storage
encrypted_key = await self.encrypt_key(key_data)
# Store key with metadata
key_record = {
'user_id': self.user_id,
'key_type': key_type,
'encrypted_key': encrypted_key,
'public_key': public_key,
'fingerprint': self.generate_fingerprint(public_key),
'created_at': datetime.now(),
'last_used': None
}
return await self.save_key_record(key_record)
except Exception as e:
logger.error(f"Key upload failed: {str(e)}")
raise
def is_valid_private_key(self, key_data):
"""Validate private key format"""
try:
from cryptography.hazmat.primitives import serialization
serialization.load_pem_private_key(
key_data.encode(), password=None
)
return True
except Exception:
return False
5. CRON Task Scheduling: Automation at Scale
Create sophisticated automation workflows with ArgoFusion's advanced scheduler:
Creating Your First Automated Task
# Task creation example
{
"name": "Daily System Backup",
"schedule_type": "cron",
"cron_expression": "0 2 * * *", // Daily at 2 AM
"target_hosts": ["web-servers", "db-servers"],
"commands": [
"sudo systemctl stop nginx",
"sudo tar -czf /backup/$(date +%Y%m%d).tar.gz /var/www",
"sudo systemctl start nginx",
"echo 'Backup completed at $(date)'"
],
"timeout": 3600, // 1 hour timeout
"retry_count": 3,
"notification_channels": ["email", "telegram"],
"end_time": "2024-12-31T23:59:59Z"
}
Advanced Scheduling Features
# Complex scheduling with dependencies
{
"name": "Database Maintenance Workflow",
"schedule_type": "cron",
"cron_expression": "0 3 * * 0", // Weekly on Sunday 3 AM
"workflow": [
{
"phase": "preparation",
"target_hosts": ["app-servers"],
"commands": ["sudo systemctl stop application"]
},
{
"phase": "backup",
"target_hosts": ["db-servers"],
"commands": [
"mysqldump --all-databases > /backup/weekly_backup.sql",
"gzip /backup/weekly_backup.sql"
],
"depends_on": ["preparation"]
},
{
"phase": "maintenance",
"target_hosts": ["db-servers"],
"commands": [
"mysql -e 'OPTIMIZE TABLE mysql.innodb_table_stats'",
"mysql -e 'ANALYZE TABLE mysql.innodb_index_stats'"
],
"depends_on": ["backup"]
},
{
"phase": "restart",
"target_hosts": ["app-servers"],
"commands": ["sudo systemctl start application"],
"depends_on": ["maintenance"]
}
]
}
6. WebSSH Terminal: Browser-Based Access
Access your servers directly from your browser with ArgoFusion's advanced WebSSH implementation:
WebSocket-Based Terminal
# WebSSH connection implementation (from websocket_sftp_proxy.py)
class WebSSHHandler:
"""Advanced WebSSH with real-time terminal emulation"""
async def handle_websocket_connection(self, websocket, host_info):
"""Handle WebSSH WebSocket connection"""
try:
# Establish SSH connection
ssh_client = await self.create_ssh_connection(host_info)
# Create interactive shell
channel = ssh_client.invoke_shell()
channel.settimeout(0.1)
# Set terminal size
channel.resize_pty(width=80, height=24)
# Handle bidirectional communication
await asyncio.gather(
self.forward_ssh_to_websocket(channel, websocket),
self.forward_websocket_to_ssh(websocket, channel)
)
except Exception as e:
logger.error(f"WebSSH connection failed: {str(e)}")
await websocket.send(json.dumps({
'type': 'error',
'message': f'Connection failed: {str(e)}'
}))
async def forward_ssh_to_websocket(self, channel, websocket):
"""Forward SSH output to WebSocket"""
while True:
try:
if channel.recv_ready():
data = channel.recv(1024).decode('utf-8', errors='ignore')
await websocket.send(json.dumps({
'type': 'output',
'data': data
}))
await asyncio.sleep(0.01)
except Exception as e:
logger.error(f"SSH to WebSocket forwarding error: {str(e)}")
break
async def forward_websocket_to_ssh(self, websocket, channel):
"""Forward WebSocket input to SSH"""
async for message in websocket:
try:
data = json.loads(message)
if data['type'] == 'input':
channel.send(data['data'])
elif data['type'] == 'resize':
channel.resize_pty(
width=data['cols'],
height=data['rows']
)
except Exception as e:
logger.error(f"WebSocket to SSH forwarding error: {str(e)}")
break
7. File Management: SFTP Made Easy
ArgoFusion's integrated file manager provides full SFTP capabilities through a web interface:
File Operations
# SFTP file operations (from ftp_async.py)
class SFTPManager:
"""Asynchronous SFTP operations with progress tracking"""
async def upload_file(self, local_path, remote_path, progress_callback=None):
"""Upload file with progress tracking"""
try:
sftp = await self.get_sftp_client()
file_size = os.path.getsize(local_path)
bytes_uploaded = 0
async def progress_tracker(bytes_transferred):
nonlocal bytes_uploaded
bytes_uploaded += bytes_transferred
if progress_callback:
progress = (bytes_uploaded / file_size) * 100
await progress_callback(progress)
# Upload with progress tracking
await sftp.put(local_path, remote_path, callback=progress_tracker)
return {
'status': 'success',
'bytes_transferred': bytes_uploaded,
'remote_path': remote_path
}
except Exception as e:
logger.error(f"File upload failed: {str(e)}")
return {'status': 'error', 'message': str(e)}
async def batch_download(self, file_list, local_dir):
"""Download multiple files concurrently"""
tasks = []
for remote_file in file_list:
local_file = os.path.join(local_dir, os.path.basename(remote_file))
task = self.download_file(remote_file, local_file)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = [r for r in results if isinstance(r, dict) and r.get('status') == 'success']
failed = [r for r in results if not (isinstance(r, dict) and r.get('status') == 'success')]
return {
'successful': len(successful),
'failed': len(failed),
'details': results
}
8. Group Operations: Batch Command Execution
Execute commands across multiple servers simultaneously with real-time output:
# Group operations implementation (from group_run.py)
class GroupCommandExecutor:
"""Execute commands across server groups with real-time monitoring"""
async def execute_group_command(self, group_hosts, command, timeout=300):
"""Execute command on multiple hosts concurrently"""
tasks = []
results = {}
# Create execution tasks for each host
for host in group_hosts:
task = asyncio.create_task(
self.execute_single_host_command(host, command, timeout)
)
tasks.append((host['customhostname'], task))
# Execute all tasks concurrently
for hostname, task in tasks:
try:
result = await task
results[hostname] = {
'status': 'success',
'output': result['output'],
'exit_code': result['exit_code'],
'execution_time': result['execution_time']
}
except Exception as e:
results[hostname] = {
'status': 'error',
'error': str(e),
'execution_time': 0
}
return results
async def execute_single_host_command(self, host, command, timeout):
"""Execute command on single host with detailed logging"""
start_time = time.time()
try:
# Create SSH connection
ssh_client = await self.create_ssh_connection(host)
# Execute command
stdin, stdout, stderr = ssh_client.exec_command(
command, timeout=timeout
)
# Read output streams
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
exit_code = stdout.channel.recv_exit_status()
execution_time = time.time() - start_time
return {
'output': output,
'error': error,
'exit_code': exit_code,
'execution_time': execution_time
}
except Exception as e:
execution_time = time.time() - start_time
logger.error(f"Command execution failed on {host['customhostname']}: {str(e)}")
raise Exception(f"Execution failed: {str(e)}")
finally:
if ssh_client:
ssh_client.close()
9. Monitoring and Notifications
Stay informed about your infrastructure with comprehensive monitoring:
Task Execution Monitoring
# Real-time task monitoring
class TaskMonitor:
"""Monitor task execution with alerts and notifications"""
async def monitor_task_execution(self, task_id):
"""Monitor running task with real-time updates"""
while task_id in executing_tasks:
try:
# Get current task status
task_status = await self.get_task_status(task_id)
# Send status update to WebSocket clients
await self.broadcast_task_update(task_id, task_status)
# Check for timeout or failure conditions
if self.should_alert(task_status):
await self.send_alert(task_id, task_status)
await asyncio.sleep(5) # Check every 5 seconds
except Exception as e:
logger.error(f"Task monitoring error: {str(e)}")
break
async def send_alert(self, task_id, status):
"""Send alerts through configured channels"""
alert_data = {
'task_id': task_id,
'status': status,
'timestamp': datetime.now().isoformat(),
'severity': self.determine_severity(status)
}
# Send to configured notification channels
channels = await self.get_notification_channels()
for channel in channels:
if channel['type'] == 'telegram':
await self.send_telegram_alert(channel, alert_data)
elif channel['type'] == 'email':
await self.send_email_alert(channel, alert_data)
elif channel['type'] == 'webhook':
await self.send_webhook_alert(channel, alert_data)
10. Best Practices for Production Use
Security Configuration
- Key Rotation - Regularly rotate SSH keys using ArgoFusion's key management
- Access Control - Use host groups to implement least-privilege access
- Audit Logging - Enable comprehensive logging for compliance
- Network Security - Configure firewall rules and VPN access
Performance Optimization
# Optimize for large-scale operations
{
"batch_size": 50, // Process hosts in batches
"concurrent_limit": 10, // Maximum concurrent connections
"timeout_settings": {
"connection": 30,
"command_execution": 300,
"file_transfer": 1800
},
"retry_policy": {
"max_retries": 3,
"backoff_factor": 2,
"retry_on": ["connection_error", "timeout"]
}
}
11. Integration and API Usage
ArgoFusion provides comprehensive APIs for integration with existing tools:
# API integration example
import requests
class ArgoFusionAPI:
"""Python client for ArgoFusion API"""
def __init__(self, base_url, api_token):
self.base_url = base_url
self.headers = {
'Authorization': f'Bearer {api_token}',
'Content-Type': 'application/json'
}
async def create_task(self, task_config):
"""Create scheduled task via API"""
response = requests.post(
f'{self.base_url}/api/tasks',
json=task_config,
headers=self.headers
)
return response.json()
async def execute_group_command(self, group_name, command):
"""Execute command on host group"""
payload = {
'group': group_name,
'command': command,
'timeout': 300
}
response = requests.post(
f'{self.base_url}/api/groups/execute',
json=payload,
headers=self.headers
)
return response.json()
# Usage example
api = ArgoFusionAPI('https://your-argofusion-instance.com', 'your-api-token')
# Create automated backup task
backup_task = {
'name': 'Daily Database Backup',
'schedule_type': 'cron',
'cron_expression': '0 2 * * *',
'target_hosts': ['db-servers'],
'commands': ['mysqldump --all-databases | gzip > /backup/daily.sql.gz']
}
result = await api.create_task(backup_task)
print(f"Task created: {result['task_id']}")
Conclusion
ArgoFusion SSH transforms traditional server management by providing a modern, web-based platform that combines the power of SSH with the convenience of a graphical interface. With features like host groups, CRON scheduling, WebSSH terminals, and comprehensive monitoring, it's the perfect solution for teams looking to scale their infrastructure management.
Key benefits of ArgoFusion SSH:
- Unified Interface - Manage all servers from a single web dashboard
- Advanced Automation - Sophisticated CRON scheduling with dependency management
- Team Collaboration - Host groups and role-based access control
- Enterprise Security - Comprehensive audit logging and key management
- Modern Technology - WebSocket-based real-time communication
Ready to Get Started?
Experience the future of SSH management with ArgoFusion's comprehensive platform:
- • Free Trial - 3-day premium access for new users
- • No Credit Card - Start immediately with email or OAuth registration
- • Full Features - Access to all enterprise features during trial
- • Expert Support - Get help from our technical team