Tutorial 2025-01-08 6 min

ArgoFusion Quick Start: Modern SSH Management Platform

Step-by-step guide to getting started with ArgoFusion SSH platform, from setup to advanced host groups management.

ArgoFusion SSH is a modern, web-based SSH management platform that revolutionizes how teams manage servers, execute commands, and automate tasks. This comprehensive quick start guide will walk you through every feature with practical examples and code insights.

1. Getting Started: Registration and Login

ArgoFusion offers multiple authentication methods to suit different organizational needs:

Email Registration

# Registration process with email verification
POST /api/register
{
    "email": "admin@company.com",
    "password": "SecurePassword123",
    "code": "123456"  // Email verification code
}

OAuth Integration

ArgoFusion supports seamless OAuth login with popular platforms:

  • Linux.do OAuth - Perfect for Linux community members
  • Google OAuth - Enterprise-friendly single sign-on
  • Custom OAuth - Integrate with your organization's identity provider
# OAuth implementation example (from app.py)
@app.route('/oauth2/callback')
async def oauth2_callback():
    """Handle OAuth2 callback with automatic user provisioning"""
    try:
        # Exchange code for access token
        token_response = await exchange_oauth_code(request.args.get('code'))
        
        # Get user information
        user_info = await get_oauth_user_info(token_response['access_token'])
        
        # Check if user exists, create if not
        user_id = await find_or_create_user(user_info)
        
        # Set session with extended expiry for OAuth users
        session['user_id'] = user_id
        session['email'] = user_info.get('email')
        session.permanent = True
        session['expires'] = (datetime.now(timezone.utc) + timedelta(days=30)).isoformat()
        
        # Welcome bonus for new users
        if is_new_user:
            await config.give_welcome_membership(user_id)
            session['show_welcome_gift'] = True
            
        return redirect('/')
        
    except Exception as e:
        logger.error(f"OAuth callback error: {str(e)}")
        return redirect('/login?error=oauth_failed')

2. Dashboard Overview: Your Command Center

Upon login, you'll see ArgoFusion's intuitive dashboard that provides:

Main Navigation Features

  • Host Management - Add, organize, and manage your servers
  • Task Scheduler - Create and monitor automated tasks
  • Quick Connect - Instant SSH access without configuration
  • File Manager - SFTP file operations through web interface
  • Group Operations - Batch commands across server groups
# Dashboard initialization (from app.py)
@app.route('/')
@login_required
async def home():
    """Main dashboard with personalized user experience"""
    user_id = session.get('user_id')
    
    # Set user context for personalized experience
    config.set_current_user(user_id)
    
    # Get user's language preference
    lang = await config.get_user_config(user_id, 'LANGUAGE')
    if not lang:
        lang = session.get('preferred_language', config.LANGUAGE)
    
    # Check for welcome gift notification
    show_welcome_gift = session.pop('show_welcome_gift', False)
    gift_expiry = None
    
    if show_welcome_gift:
        user_level = await config.get_user_level(user_id)
        if user_level and user_level.get('expires_at'):
            gift_expiry = user_level.get('expires_at').strftime('%Y-%m-%d')
    
    return await render_template('index.html',
        lang=lang,
        translations=get_translation(language=lang),
        authenticated=True,
        email=session.get('email'),
        username=session.get('username'),
        show_welcome_gift=show_welcome_gift,
        gift_expiry=gift_expiry
    )

3. Host Management: Organizing Your Infrastructure

ArgoFusion's host management system allows you to organize servers efficiently:

Adding Your First Server

# Server configuration example
{
    "customhostname": "web-server-01",
    "hostname": "192.168.1.100",
    "port": 22,
    "username": "admin",
    "authentication_method": "key",  // or "password"
    "private_key": "-----BEGIN OPENSSH PRIVATE KEY-----...",
    "group": "web-servers",
    "description": "Primary web server",
    "tags": ["production", "web", "nginx"]
}

Host Groups for Scalable Management

Organize servers into logical groups for batch operations:

# Host group management implementation
class HostGroupManager:
    """Manage server groups with advanced filtering and operations"""
    
    def __init__(self, user_id):
        self.user_id = user_id
        self.hosts = self.load_user_hosts()
        
    def create_group(self, group_name, host_filters=None):
        """Create dynamic host group with filtering"""
        if host_filters:
            # Filter hosts by tags, environment, or custom criteria
            filtered_hosts = self.filter_hosts(host_filters)
        else:
            filtered_hosts = []
            
        group_config = {
            'name': group_name,
            'hosts': filtered_hosts,
            'filters': host_filters,
            'created_at': datetime.now().isoformat(),
            'user_id': self.user_id
        }
        
        return self.save_group_config(group_config)
        
    def filter_hosts(self, filters):
        """Advanced host filtering"""
        result = self.hosts
        
        if 'tags' in filters:
            result = [h for h in result if any(tag in h.get('tags', []) for tag in filters['tags'])]
            
        if 'environment' in filters:
            result = [h for h in result if h.get('environment') == filters['environment']]
            
        if 'hostname_pattern' in filters:
            import re
            pattern = re.compile(filters['hostname_pattern'])
            result = [h for h in result if pattern.match(h.get('hostname', ''))]
            
        return result

4. SSH Key Management: Security Made Simple

ArgoFusion provides enterprise-grade SSH key management:

Key Upload and Validation

# Key management implementation (from upload_keys.py)
class SSHKeyManager:
    """Secure SSH key handling with validation and encryption"""
    
    async def upload_key(self, key_data, key_type='private'):
        """Upload and validate SSH keys"""
        try:
            # Validate key format
            if key_type == 'private':
                # Check if it's a valid private key
                if not self.is_valid_private_key(key_data):
                    raise ValueError("Invalid private key format")
                    
                # Extract public key from private key
                public_key = self.extract_public_key(key_data)
                
            elif key_type == 'public':
                if not self.is_valid_public_key(key_data):
                    raise ValueError("Invalid public key format")
                public_key = key_data
                
            # Encrypt private key for storage
            encrypted_key = await self.encrypt_key(key_data)
            
            # Store key with metadata
            key_record = {
                'user_id': self.user_id,
                'key_type': key_type,
                'encrypted_key': encrypted_key,
                'public_key': public_key,
                'fingerprint': self.generate_fingerprint(public_key),
                'created_at': datetime.now(),
                'last_used': None
            }
            
            return await self.save_key_record(key_record)
            
        except Exception as e:
            logger.error(f"Key upload failed: {str(e)}")
            raise
            
    def is_valid_private_key(self, key_data):
        """Validate private key format"""
        try:
            from cryptography.hazmat.primitives import serialization
            serialization.load_pem_private_key(
                key_data.encode(), password=None
            )
            return True
        except Exception:
            return False

5. CRON Task Scheduling: Automation at Scale

Create sophisticated automation workflows with ArgoFusion's advanced scheduler:

Creating Your First Automated Task

# Task creation example
{
    "name": "Daily System Backup",
    "schedule_type": "cron",
    "cron_expression": "0 2 * * *",  // Daily at 2 AM
    "target_hosts": ["web-servers", "db-servers"],
    "commands": [
        "sudo systemctl stop nginx",
        "sudo tar -czf /backup/$(date +%Y%m%d).tar.gz /var/www",
        "sudo systemctl start nginx",
        "echo 'Backup completed at $(date)'"
    ],
    "timeout": 3600,  // 1 hour timeout
    "retry_count": 3,
    "notification_channels": ["email", "telegram"],
    "end_time": "2024-12-31T23:59:59Z"
}

Advanced Scheduling Features

# Complex scheduling with dependencies
{
    "name": "Database Maintenance Workflow",
    "schedule_type": "cron", 
    "cron_expression": "0 3 * * 0",  // Weekly on Sunday 3 AM
    "workflow": [
        {
            "phase": "preparation",
            "target_hosts": ["app-servers"],
            "commands": ["sudo systemctl stop application"]
        },
        {
            "phase": "backup",
            "target_hosts": ["db-servers"],
            "commands": [
                "mysqldump --all-databases > /backup/weekly_backup.sql",
                "gzip /backup/weekly_backup.sql"
            ],
            "depends_on": ["preparation"]
        },
        {
            "phase": "maintenance",
            "target_hosts": ["db-servers"],
            "commands": [
                "mysql -e 'OPTIMIZE TABLE mysql.innodb_table_stats'",
                "mysql -e 'ANALYZE TABLE mysql.innodb_index_stats'"
            ],
            "depends_on": ["backup"]
        },
        {
            "phase": "restart",
            "target_hosts": ["app-servers"],
            "commands": ["sudo systemctl start application"],
            "depends_on": ["maintenance"]
        }
    ]
}

6. WebSSH Terminal: Browser-Based Access

Access your servers directly from your browser with ArgoFusion's advanced WebSSH implementation:

WebSocket-Based Terminal

# WebSSH connection implementation (from websocket_sftp_proxy.py)
class WebSSHHandler:
    """Advanced WebSSH with real-time terminal emulation"""
    
    async def handle_websocket_connection(self, websocket, host_info):
        """Handle WebSSH WebSocket connection"""
        try:
            # Establish SSH connection
            ssh_client = await self.create_ssh_connection(host_info)
            
            # Create interactive shell
            channel = ssh_client.invoke_shell()
            channel.settimeout(0.1)
            
            # Set terminal size
            channel.resize_pty(width=80, height=24)
            
            # Handle bidirectional communication
            await asyncio.gather(
                self.forward_ssh_to_websocket(channel, websocket),
                self.forward_websocket_to_ssh(websocket, channel)
            )
            
        except Exception as e:
            logger.error(f"WebSSH connection failed: {str(e)}")
            await websocket.send(json.dumps({
                'type': 'error',
                'message': f'Connection failed: {str(e)}'
            }))
            
    async def forward_ssh_to_websocket(self, channel, websocket):
        """Forward SSH output to WebSocket"""
        while True:
            try:
                if channel.recv_ready():
                    data = channel.recv(1024).decode('utf-8', errors='ignore')
                    await websocket.send(json.dumps({
                        'type': 'output',
                        'data': data
                    }))
                await asyncio.sleep(0.01)
                
            except Exception as e:
                logger.error(f"SSH to WebSocket forwarding error: {str(e)}")
                break
                
    async def forward_websocket_to_ssh(self, websocket, channel):
        """Forward WebSocket input to SSH"""
        async for message in websocket:
            try:
                data = json.loads(message)
                if data['type'] == 'input':
                    channel.send(data['data'])
                elif data['type'] == 'resize':
                    channel.resize_pty(
                        width=data['cols'], 
                        height=data['rows']
                    )
            except Exception as e:
                logger.error(f"WebSocket to SSH forwarding error: {str(e)}")
                break

7. File Management: SFTP Made Easy

ArgoFusion's integrated file manager provides full SFTP capabilities through a web interface:

File Operations

# SFTP file operations (from ftp_async.py)
class SFTPManager:
    """Asynchronous SFTP operations with progress tracking"""
    
    async def upload_file(self, local_path, remote_path, progress_callback=None):
        """Upload file with progress tracking"""
        try:
            sftp = await self.get_sftp_client()
            file_size = os.path.getsize(local_path)
            bytes_uploaded = 0
            
            async def progress_tracker(bytes_transferred):
                nonlocal bytes_uploaded
                bytes_uploaded += bytes_transferred
                if progress_callback:
                    progress = (bytes_uploaded / file_size) * 100
                    await progress_callback(progress)
            
            # Upload with progress tracking
            await sftp.put(local_path, remote_path, callback=progress_tracker)
            
            return {
                'status': 'success',
                'bytes_transferred': bytes_uploaded,
                'remote_path': remote_path
            }
            
        except Exception as e:
            logger.error(f"File upload failed: {str(e)}")
            return {'status': 'error', 'message': str(e)}
            
    async def batch_download(self, file_list, local_dir):
        """Download multiple files concurrently"""
        tasks = []
        for remote_file in file_list:
            local_file = os.path.join(local_dir, os.path.basename(remote_file))
            task = self.download_file(remote_file, local_file)
            tasks.append(task)
            
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        successful = [r for r in results if isinstance(r, dict) and r.get('status') == 'success']
        failed = [r for r in results if not (isinstance(r, dict) and r.get('status') == 'success')]
        
        return {
            'successful': len(successful),
            'failed': len(failed),
            'details': results
        }

8. Group Operations: Batch Command Execution

Execute commands across multiple servers simultaneously with real-time output:

# Group operations implementation (from group_run.py)
class GroupCommandExecutor:
    """Execute commands across server groups with real-time monitoring"""
    
    async def execute_group_command(self, group_hosts, command, timeout=300):
        """Execute command on multiple hosts concurrently"""
        tasks = []
        results = {}
        
        # Create execution tasks for each host
        for host in group_hosts:
            task = asyncio.create_task(
                self.execute_single_host_command(host, command, timeout)
            )
            tasks.append((host['customhostname'], task))
            
        # Execute all tasks concurrently
        for hostname, task in tasks:
            try:
                result = await task
                results[hostname] = {
                    'status': 'success',
                    'output': result['output'],
                    'exit_code': result['exit_code'],
                    'execution_time': result['execution_time']
                }
            except Exception as e:
                results[hostname] = {
                    'status': 'error',
                    'error': str(e),
                    'execution_time': 0
                }
                
        return results
        
    async def execute_single_host_command(self, host, command, timeout):
        """Execute command on single host with detailed logging"""
        start_time = time.time()
        
        try:
            # Create SSH connection
            ssh_client = await self.create_ssh_connection(host)
            
            # Execute command
            stdin, stdout, stderr = ssh_client.exec_command(
                command, timeout=timeout
            )
            
            # Read output streams
            output = stdout.read().decode('utf-8')
            error = stderr.read().decode('utf-8')
            exit_code = stdout.channel.recv_exit_status()
            
            execution_time = time.time() - start_time
            
            return {
                'output': output,
                'error': error,
                'exit_code': exit_code,
                'execution_time': execution_time
            }
            
        except Exception as e:
            execution_time = time.time() - start_time
            logger.error(f"Command execution failed on {host['customhostname']}: {str(e)}")
            raise Exception(f"Execution failed: {str(e)}")
        
        finally:
            if ssh_client:
                ssh_client.close()

9. Monitoring and Notifications

Stay informed about your infrastructure with comprehensive monitoring:

Task Execution Monitoring

# Real-time task monitoring
class TaskMonitor:
    """Monitor task execution with alerts and notifications"""
    
    async def monitor_task_execution(self, task_id):
        """Monitor running task with real-time updates"""
        while task_id in executing_tasks:
            try:
                # Get current task status
                task_status = await self.get_task_status(task_id)
                
                # Send status update to WebSocket clients
                await self.broadcast_task_update(task_id, task_status)
                
                # Check for timeout or failure conditions
                if self.should_alert(task_status):
                    await self.send_alert(task_id, task_status)
                    
                await asyncio.sleep(5)  # Check every 5 seconds
                
            except Exception as e:
                logger.error(f"Task monitoring error: {str(e)}")
                break
                
    async def send_alert(self, task_id, status):
        """Send alerts through configured channels"""
        alert_data = {
            'task_id': task_id,
            'status': status,
            'timestamp': datetime.now().isoformat(),
            'severity': self.determine_severity(status)
        }
        
        # Send to configured notification channels
        channels = await self.get_notification_channels()
        
        for channel in channels:
            if channel['type'] == 'telegram':
                await self.send_telegram_alert(channel, alert_data)
            elif channel['type'] == 'email':
                await self.send_email_alert(channel, alert_data)
            elif channel['type'] == 'webhook':
                await self.send_webhook_alert(channel, alert_data)

10. Best Practices for Production Use

Security Configuration

  • Key Rotation - Regularly rotate SSH keys using ArgoFusion's key management
  • Access Control - Use host groups to implement least-privilege access
  • Audit Logging - Enable comprehensive logging for compliance
  • Network Security - Configure firewall rules and VPN access

Performance Optimization

# Optimize for large-scale operations
{
    "batch_size": 50,  // Process hosts in batches
    "concurrent_limit": 10,  // Maximum concurrent connections
    "timeout_settings": {
        "connection": 30,
        "command_execution": 300,
        "file_transfer": 1800
    },
    "retry_policy": {
        "max_retries": 3,
        "backoff_factor": 2,
        "retry_on": ["connection_error", "timeout"]
    }
}

11. Integration and API Usage

ArgoFusion provides comprehensive APIs for integration with existing tools:

# API integration example
import requests

class ArgoFusionAPI:
    """Python client for ArgoFusion API"""
    
    def __init__(self, base_url, api_token):
        self.base_url = base_url
        self.headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
    async def create_task(self, task_config):
        """Create scheduled task via API"""
        response = requests.post(
            f'{self.base_url}/api/tasks',
            json=task_config,
            headers=self.headers
        )
        return response.json()
        
    async def execute_group_command(self, group_name, command):
        """Execute command on host group"""
        payload = {
            'group': group_name,
            'command': command,
            'timeout': 300
        }
        
        response = requests.post(
            f'{self.base_url}/api/groups/execute',
            json=payload,
            headers=self.headers
        )
        return response.json()

# Usage example
api = ArgoFusionAPI('https://your-argofusion-instance.com', 'your-api-token')

# Create automated backup task
backup_task = {
    'name': 'Daily Database Backup',
    'schedule_type': 'cron',
    'cron_expression': '0 2 * * *',
    'target_hosts': ['db-servers'],
    'commands': ['mysqldump --all-databases | gzip > /backup/daily.sql.gz']
}

result = await api.create_task(backup_task)
print(f"Task created: {result['task_id']}")

Conclusion

ArgoFusion SSH transforms traditional server management by providing a modern, web-based platform that combines the power of SSH with the convenience of a graphical interface. With features like host groups, CRON scheduling, WebSSH terminals, and comprehensive monitoring, it's the perfect solution for teams looking to scale their infrastructure management.

Key benefits of ArgoFusion SSH:

  • Unified Interface - Manage all servers from a single web dashboard
  • Advanced Automation - Sophisticated CRON scheduling with dependency management
  • Team Collaboration - Host groups and role-based access control
  • Enterprise Security - Comprehensive audit logging and key management
  • Modern Technology - WebSocket-based real-time communication

Ready to Get Started?

Experience the future of SSH management with ArgoFusion's comprehensive platform:

  • Free Trial - 3-day premium access for new users
  • No Credit Card - Start immediately with email or OAuth registration
  • Full Features - Access to all enterprise features during trial
  • Expert Support - Get help from our technical team

Related Articles

SSH Security Risks & Protection

Learn about common SSH security threats and how to protect your infrastructure.

CRON Job Scheduling

Master advanced CRON scheduling for automated server maintenance and monitoring.

WebSSH Tools Comparison

Compare different WebSSH solutions and find the best fit for your needs.

Ready to Implement These Best Practices?

ArgoFusion SSH platform makes it easy to implement professional SSH management with host groups, automation, and security features.