Building an Infostealer Data Pipeline: A Technical Deep Dive


Infostealers are lightweight malware that exfiltrate saved credentials from web browsers, along with cookies, autofill data, and system information. The stolen data gets packaged into “logs” and distributed through Telegram channels, often within minutes of exfiltration. I built a pipeline to collect, process, and store this data at scale for threat intelligence purposes. At peak ingestion, the system processes around 2.5 million credentials within 24 hours and has accumulated over 6.1 billion records.

This post covers the technical architecture, design decisions, and lessons learned from building this system.

Architecture Overview

The pipeline consists of four interconnected components:

Telegram Channels → Downloader → Decompressor → Parser → ClickHouse
                         ↓              ↓           ↓
                         └──────── SQLite DB ───────┘
                                  (tracking)

A central SQLite database tracks files through every stage of the pipeline: downloaded archives, extraction status, parsing progress, and upload completion. Each component runs as an independent Python process, communicating through the filesystem and updating the tracking database. This decoupled architecture allows each stage to scale independently and provides natural checkpointing. If the parser crashes, the decompressor continues extracting archives. If the database goes down, parsed JSON files accumulate until it recovers.

The Downloader: Telegram Scraping at Scale

The downloader monitors Telegram channels using Telethon and downloads archives containing stealer logs. The challenge is handling the volume and variety of file formats while tracking what has already been downloaded.

File Tracking with SQLite

Early versions used filename matching to detect duplicates, which failed when channels posted the same file under different names. I implemented a central SQLite database that tracks files through the entire pipeline, from download through parsing and upload:

def create_metadata_file(file_path, message, password=None, channel_username=None, 
                         channel_title=None, channel_id=None):
    """Create a .meta JSON file with archive metadata."""
    filename = os.path.basename(file_path)
    file_size = os.path.getsize(file_path) if os.path.exists(file_path) else message.file.size
    
    # Calculate MD5 hash for deduplication
    md5_hash = None
    if os.path.exists(file_path):
        hash_md5 = hashlib.md5()
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        md5_hash = hash_md5.hexdigest()
    
    metadata = {
        "filename": filename,
        "password": password,
        "source_url": f"https://t.me/{channel_username[1:]}" if channel_username else None,
        "downloaded_at": datetime.utcnow().isoformat() + "Z",
        "file_size": file_size,
        "md5": md5_hash,
        "channel_title": channel_title,
        "channel_id": channel_id,
        "message_id": message.id,
        "message_date": message.date.isoformat() if hasattr(message, 'date') else None,
        "message_text": message.message
    }
    
    meta_path = f"{file_path}.meta"
    with open(meta_path, 'w', encoding='utf-8') as f:
        json.dump(metadata, f, indent=2, ensure_ascii=False)

The SQLite database tracks each file through all pipeline stages:

  • File hash (MD5) for exact deduplication
  • Original filename and sanitized filename
  • Source channel ID and message ID
  • Extracted password (if found in message)
  • Download timestamp and status
  • Extraction timestamp, status, and output path
  • Parse timestamp and credential count extracted
  • Upload timestamp and status

Password Extraction

Most password-protected archives use predictable passwords. Through analyzing hundreds of Telegram messages, I identified common patterns:

def extract_password_from_message(message):
    """Enhanced password extraction with multiple patterns."""
    if not message.message:
        return None, None
    
    patterns = [
        r'(?i)(?:🗃\s*)?(?:пароль\s*/\s*)?pass(?:word)?\s*:\s*(@\w+)',
        r'(?i)(?:🔶🔶)?pass(?:word)?\s+(@\w+)',
        r'(?i)pass(?:word)?\s+for\s+archive\s*:\s*(@\w+)',
        r'(?i)(?:📁\s*)?pass\s*:\s*(https://t\.me/\w+)',
        r'(?i)pass(?:word)?\s+for\s+archive\s*:?\s*\(([^)]+)\)',
        r'(?i)pass(?:word)?\s*[:\s]*\(([^)]+)\)',
        r'(?i)pass(?:word)?\s*:\s*([^\s\n]+)',
        r'(?i)🔐\s*([^\s\n]+)',
    ]
    
    text = message.message
    
    for pattern in patterns:
        match = re.search(pattern, text)
        if match:
            pw = match.group(1).strip()
            if pw.startswith('@'):
                pw = pw[1:]  # Remove @ prefix
            if pw.startswith('https://t.me/'):
                pw = pw.replace('https://t.me/', '')
            return pw, re.sub(r'[^\w.-]', '_', pw)
    
    return None, None

The most common password format is simply the channel’s @username. Threat actors use this as a form of attribution and to drive traffic to their channels.

Parallel Downloads with Backpressure

Telegram rate-limits aggressive scraping. The downloader uses semaphore-based concurrency control:

async def download_multiple_channels_parallel(self):
    """Download from multiple channels in parallel."""
    channel_semaphore = asyncio.Semaphore(3)  # Max 3 channels concurrently
    
    async def process_channel(username):
        async with channel_semaphore:
            return await self.process_single_channel(username)
    
    tasks = [process_channel(username) for username in config.TARGETS.keys()]
    results = await asyncio.gather(*tasks, return_exceptions=True)

Within each channel, files download in batches of 12 with up to 3 concurrent downloads per batch:

ENABLE_PARALLEL_DOWNLOADS = True
MAX_CONCURRENT_DOWNLOADS = 3
BATCH_SIZE = 12
INTER_BATCH_DELAY = 0.5  # seconds between batches

I monitor 10-20 channels at any given time. The number fluctuates because Telegram frequently bans channels distributing stolen data. New channels appear constantly, requiring manual discovery and configuration updates.

The Decompressor: Handling Malformed Archives

Stealer logs come in various archive formats (ZIP, RAR, 7z) with varying levels of corruption. The decompressor extracts these while handling failures gracefully.

Resource Monitoring

Early versions would spawn extraction processes without limits, eventually exhausting system memory. I added resource monitoring that throttles extraction when the system is under load:

class ResourceMonitor:
    """Monitor system resources to prevent overload"""
    
    def __init__(self):
        self.memory_threshold = 80  # Percent
        self.cpu_threshold = 90     # Percent
        self.disk_threshold = 95    # Percent
        
    def check_resources(self):
        """Check if system resources are available"""
        memory = psutil.virtual_memory()
        if memory.percent > self.memory_threshold:
            logger.warning(f"High memory usage: {memory.percent:.1f}%")
            return False
            
        cpu_percent = psutil.cpu_percent(interval=1)
        if cpu_percent > self.cpu_threshold:
            logger.warning(f"High CPU usage: {cpu_percent:.1f}%")
            return False
            
        disk = psutil.disk_usage('/')
        if disk.percent > self.disk_threshold:
            logger.error(f"Disk space critically low: {disk.percent:.1f}%")
            return False
            
        return True

Process Management with Timeouts

Some archives are malformed in ways that cause extraction tools to hang indefinitely. The process manager wraps subprocesses with timeouts and ensures cleanup:

class ProcessManager:
    """Manages subprocesses with timeout and resource monitoring"""
    
    def __init__(self):
        self.active_processes = []
        self.max_processes = 2
        self.process_timeout = 300  # 5 minutes per extraction
        
    @contextmanager
    def managed_process(self, cmd, timeout=None):
        """Context manager for subprocess with automatic cleanup"""
        timeout = timeout or self.process_timeout
        process = None
        try:
            process = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                preexec_fn=os.setsid  # Create new process group
            )
            self.active_processes.append(process)
            yield process
        finally:
            if process:
                self.cleanup_process(process)
    
    def cleanup_process(self, process):
        """Forcefully clean up a process and its children"""
        if process.poll() is None:
            pgid = os.getpgid(process.pid)
            os.killpg(pgid, signal.SIGTERM)
            try:
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                os.killpg(pgid, signal.SIGKILL)
                process.wait()

Failed Extraction Quarantine

Archives that fail extraction after multiple attempts get moved to a quarantine directory with a failure log:

def move_to_failed(self, archive_path, reason):
    """Move problematic archive to failed directory."""
    target_path = self.failed_dir / archive_path.name
    if target_path.exists():
        timestamp = int(time.time())
        name, ext = target_path.stem, target_path.suffix
        target_path = self.failed_dir / f"{name}_{timestamp}{ext}"
    
    shutil.move(str(archive_path), str(target_path))
    
    # Log failure reason
    failure_log = self.failed_dir / "failure_log.json"
    log_data = json.load(open(failure_log)) if failure_log.exists() else {}
    
    log_data[target_path.name] = {
        'original_path': str(archive_path),
        'reason': reason,
        'timestamp': time.time()
    }
    
    json.dump(log_data, open(failure_log, 'w'), indent=2)

This allows manual inspection of problematic files and prevents the pipeline from getting stuck retrying the same corrupted archive.

The Parser: Credential Extraction and Risk Scoring

The parser transforms raw stealer log files into structured Profile Objects with risk assessments.

Credential Format Detection

Different malware families output credentials in different formats. The parser handles multiple patterns:

def extract_credentials(self, content: str) -> List[Dict[str, str]]:
    """Extract credentials from various stealer log formats."""
    credentials = []
    
    # Primary format: SOFT, URL, USER, PASS (LummaC2, Redline, etc.)
    pattern = (
        r'SOFT:\s*([^\n]+)\s*\n'
        r'URL:\s*([^\n]+)\s*\n'
        r'USER:\s*([^\n]+)\s*\n'
        r'PASS:\s*([^\n]+)'
    )
    
    for match in re.finditer(pattern, content, re.IGNORECASE):
        software = match.group(1).strip()
        url = match.group(2).strip()
        username = match.group(3).strip()
        password = match.group(4).strip()
        
        if not (is_valid_url(url) and is_valid_username(username)):
            continue
        
        cred_obj = {
            "software": software,
            "url": url,
            "username": username,
            "password": password
        }
        
        # Extract domain from URL
        if url.startswith('android://'):
            # Android app credential
            parsed = urlparse(url)
            cred_obj["app_identifier"] = parsed.netloc
        else:
            # Web credential
            parsed = urlparse(url)
            domain = parsed.netloc.lower()
            if domain.startswith('www.'):
                domain = domain[4:]
            cred_obj["domain"] = domain
        
        credentials.append(cred_obj)
    
    return credentials

Government and Corporate Classification

Not all credentials carry equal risk. A compromised personal Gmail is different from a compromised .gov email:

def is_government_credential(credential):
    """Detect and classify government credentials"""
    domain = credential.get('domain', '').lower()
    username = credential.get('username', '').lower()
    
    # Direct government domains
    if domain.endswith('.gov') or domain.endswith('.mil'):
        return True, ['government_domain'], classify_government_agency(domain)
    
    # Government email addresses
    if '@' in username:
        email_domain = username.split('@')[1].lower()
        if email_domain.endswith('.gov') or email_domain.endswith('.mil'):
            return True, ['government_email'], classify_government_agency(email_domain)
    
    # Defense contractor domains
    CONTRACTOR_DOMAINS = [
        'lockheedmartin.com', 'boeing.com', 'raytheon.com', 
        'northropgrumman.com', 'generaldynamics.com', 'boozallen.com'
    ]
    
    if any(contractor in domain for contractor in CONTRACTOR_DOMAINS):
        return True, ['defense_contractor'], 'defense_contractor'
    
    return False, [], None

def classify_government_agency(domain):
    """Classify the type of government agency"""
    FEDERAL_CLASSIFICATIONS = {
        'defense': ['defense.gov', 'army.mil', 'navy.mil', 'af.mil', 'dod.gov'],
        'intelligence': ['cia.gov', 'nsa.gov', 'fbi.gov', 'dhs.gov', 'dni.gov'],
        'executive': ['whitehouse.gov', 'state.gov', 'treasury.gov', 'justice.gov'],
    }
    
    for classification, domains in FEDERAL_CLASSIFICATIONS.items():
        if any(agency_domain in domain for agency_domain in domains):
            return f'federal_{classification}'
    
    if domain.endswith('.gov'):
        return 'federal_unclassified'
    if domain.endswith('.mil'):
        return 'military_unclassified'
    
    return 'government_unclassified'

Risk Scoring

Each credential receives a risk score based on multiple factors:

def assess_credential_risk(credential):
    """Risk assessment for credentials"""
    domain = credential.get('domain', '').lower()
    username = credential.get('username', '').lower()
    
    base_risk_score = 10
    risk_indicators = []
    
    # Government credentials are critical
    is_gov, gov_indicators, gov_classification = is_government_credential(credential)
    if is_gov:
        return assess_government_credential_risk(credential, gov_indicators, gov_classification)
    
    # Critical financial/infrastructure domains
    ALWAYS_CRITICAL = [
        'chase.com', 'bankofamerica.com', 'wellsfargo.com',
        'paypal.com', 'stripe.com', 'coinbase.com',
        'console.aws.amazon.com', 'console.cloud.google.com', 'portal.azure.com'
    ]
    
    if any(critical in domain for critical in ALWAYS_CRITICAL):
        base_risk_score += 80
        risk_indicators.append('critical_financial_or_infrastructure')
    
    # Privileged usernames
    if any(priv in username for priv in ['admin', 'administrator', 'root', 'service']):
        base_risk_score += 40
        risk_indicators.append('privileged_username')
    
    # Weak password indicator
    password = credential.get('password', '')
    if len(password) < 8:
        base_risk_score += 15
        risk_indicators.append('weak_password')
    
    return {
        'risk_score': min(base_risk_score, 100),
        'risk_category': get_risk_category(base_risk_score),
        'risk_indicators': risk_indicators
    }

Rate Limiting

Without rate limiting, the parser would consume all available CPU and eventually crash or hang. I implemented adaptive throttling based on system resources:

class RateLimiter:
    """Rate limiter with adaptive throttling based on system resources"""
    
    def __init__(self, max_files_per_minute=30, max_concurrent_files=3):
        self.max_files_per_minute = max_files_per_minute
        self.max_concurrent_files = max_concurrent_files
        self.processing_times = deque(maxlen=100)
        self.files_processed_minute = deque()
        self.active_processes = 0
        
        self.cpu_threshold = 80
        self.memory_threshold = 85
    
    def should_throttle(self):
        """Determine if processing should be throttled"""
        current_time = time.time()
        
        # Clean old entries
        while self.files_processed_minute and (current_time - self.files_processed_minute[0]) > 60:
            self.files_processed_minute.popleft()
        
        # Rate limit check
        if len(self.files_processed_minute) >= self.max_files_per_minute:
            return True, "Rate limit exceeded"
        
        # Concurrent processing limit
        if self.active_processes >= self.max_concurrent_files:
            return True, "Concurrent limit reached"
        
        # System resource check
        cpu_percent = psutil.cpu_percent(interval=0.1)
        if cpu_percent > self.cpu_threshold:
            return True, f"High CPU: {cpu_percent:.1f}%"
        
        memory = psutil.virtual_memory()
        if memory.percent > self.memory_threshold:
            return True, f"High memory: {memory.percent:.1f}%"
        
        return False, "OK"
    
    def wait_if_needed(self):
        """Wait if throttling is needed"""
        should_wait, reason = self.should_throttle()
        
        if should_wait:
            if "Rate limit" in reason:
                oldest_time = self.files_processed_minute[0]
                wait_time = 60 - (time.time() - oldest_time) + 1
                wait_time = max(1, min(wait_time, 60))
            else:
                wait_time = 5
                
            logger.info(f"Throttling: {reason}, waiting {wait_time:.1f}s")
            time.sleep(wait_time)
            return self.wait_if_needed()  # Recursive check
        
        return True

The Database: ClickHouse for Billions of Records

PostgreSQL was my first choice, but it struggled with queries over 100 million rows. ClickHouse handles billions of rows with sub-second query times due to its columnar storage and vectorized query execution.

Schema Design

The schema is optimized for the most common query patterns: filtering by domain, email, or username:

CREATE TABLE IF NOT EXISTS vault.creds
(
    ts           DateTime        DEFAULT now(),
    victim_id    String,
    source_name  String,
    url          String DEFAULT '',
    domain       LowCardinality(String) DEFAULT '',
    email        String DEFAULT '',
    username     String DEFAULT '',
    password     Nullable(String),
    phone        Nullable(String),
    name         Nullable(String),
    address      Nullable(String),
    country      Nullable(String),
    
    -- System context
    hostname     Nullable(String),
    ip_address   String DEFAULT '',
    os_version   Nullable(String),
    hwid         Nullable(String),
    
    -- Risk classification
    account_type        LowCardinality(String) DEFAULT 'personal',
    risk_score          UInt8 DEFAULT 0,
    risk_category       LowCardinality(String) DEFAULT 'low',
    is_privileged       Bool DEFAULT false,
    breach_impact       LowCardinality(String) DEFAULT 'low',

    -- Bloom filter indexes for exact match lookups
    INDEX bf_email      email        TYPE tokenbf_v1(1024, 3, 0) GRANULARITY 64,
    INDEX bf_user       username     TYPE tokenbf_v1(1024, 3, 0) GRANULARITY 64,
    INDEX bf_url        url          TYPE tokenbf_v1(1024, 3, 0) GRANULARITY 64,
    INDEX set_domain    domain       TYPE set(8192)              GRANULARITY 64,
    INDEX bf_ip         ip_address   TYPE tokenbf_v1(1024, 3, 0) GRANULARITY 64
)
ENGINE = ReplacingMergeTree(ts)
ORDER BY (domain, email, username, victim_id)
SETTINGS index_granularity = 8192;

Why ClickHouse?

ClickHouse excels at this workload for several reasons:

Columnar Storage: When querying by domain, ClickHouse only reads the domain column, not the entire row. For a table with 20+ columns, this reduces I/O by 90%+.

Vectorized Execution: Operations process data in batches (vectors) rather than row-by-row, leveraging SIMD instructions for parallel processing within a single CPU core.

LowCardinality Optimization: The domain and account_type columns use LowCardinality(String), which dictionary-encodes values. Since most domains appear thousands of times, this dramatically reduces storage and speeds up filtering.

Bloom Filter Indexes: The tokenbf_v1 indexes enable fast exact-match lookups without full table scans. A query like WHERE email = 'user@example.com' uses the bloom filter to skip irrelevant data blocks.

ReplacingMergeTree: This engine automatically deduplicates records with the same primary key during background merges, handling the case where the same credential appears in multiple stealer logs.

Batch Insertion

Individual inserts are inefficient. The uploader batches records and uses async inserts:

def insert_rows(self, rows):
    """Insert credential rows into ClickHouse."""
    if not rows:
        return
        
    self.client.insert(
        'vault.creds',
        rows,
        column_names=self.column_names,
        settings={
            'async_insert': 1,
            'wait_for_async_insert': 1
        }
    )

The async_insert setting buffers inserts server-side before writing to storage, reducing write amplification. Batch size of 20,000 rows provides a good balance between memory usage and insertion efficiency.

Query Performance

Example query returning credentials for a specific domain across 6.1 billion records:

SELECT domain, email, username, risk_score, risk_category
FROM vault.creds 
WHERE domain = 'example.com'
ORDER BY risk_score DESC
LIMIT 100;

This typically executes in under 500ms thanks to the set_domain index and columnar storage.

For aggregate analysis:

SELECT 
    account_type,
    risk_category,
    count() as credential_count,
    uniqExact(email) as unique_emails,
    uniqExact(domain) as unique_domains
FROM vault.creds 
GROUP BY account_type, risk_category
ORDER BY credential_count DESC;

Infrastructure and Costs

Server Configuration

Processing Server (Hetzner AX102 in Helsinki):

  • AMD Ryzen 9 5950X (16 cores, 32 threads)
  • 128 GB DDR4 RAM
  • 2x 22TB SATA Enterprise HDD (RAID 1 for redundancy)
  • Cost: €156/month + €79 one-time setup

This server handles downloading, decompression, and parsing. The large HDDs store raw archives and extracted files before processing.

Database (ClickHouse Cloud Scale Plan):

  • Managed ClickHouse with automatic scaling
  • Stores the processed credential data
  • Cost: Variable based on compute and storage

The $644 Lesson

My first production deployment included a dashboard that displayed a live record count. The implementation was naive:

# DON'T DO THIS
while True:
    count = client.query("SELECT count() FROM vault.creds").result_rows[0][0]
    update_dashboard(count)
    time.sleep(5)  # Query every 5 seconds

A count(*) query on a 6+ billion row table is expensive, even in ClickHouse. Running it every 5 seconds, 24/7, accumulated massive compute costs. When the billing period ended, I owed $644.28 USD.

The fix was using an approximate count that’s nearly free:

SELECT sum(rows) FROM system.parts 
WHERE database = 'vault' AND table = 'creds' AND active = 1;

This queries metadata rather than scanning the table. The count is eventually consistent but accurate enough for a dashboard.

Lesson learned: always consider the cost implications of queries, especially in managed cloud services where you pay for compute.

Current Monthly Costs

ComponentCost
Hetzner AX102€156 (~$170)
ClickHouse CloudVariable, ~$50-100 with careful query patterns
Total~$220-270/month

This runs a pipeline capable of processing millions of credentials daily and storing billions of records.

Data Flow Example

To illustrate the complete flow, here’s what happens when a new stealer log archive appears on Telegram:

  1. Downloader detects new message with .rar attachment in monitored channel

  2. Extracts password from message text using regex patterns

  3. Downloads file to /storage/obscura/raw/CHANNEL_NAME/

  4. Creates .meta JSON file with password, hash, source info

  5. Records file in SQLite with status downloaded

  6. Decompressor queries SQLite for files with status downloaded

  7. Checks if already extracted (hash lookup)

  8. Verifies system resources are available

  9. Attempts extraction with detected password

  10. On failure, retries without password, then moves to quarantine

  11. Extracted files go to /storage/obscura/processed/VICTIM_ID/

  12. Updates SQLite with status extracted and output path

  13. Parser queries SQLite for files with status extracted

  14. Scans for credential files (All Passwords.txt, etc.)

  15. Extracts credentials using format-specific patterns

  16. Classifies each credential (government, corporate, personal)

  17. Calculates risk scores

  18. Generates Profile Object JSON in /storage/obscura/parsed/

  19. Updates SQLite with status parsed and credential count

  20. DB Uploader queries SQLite for files with status parsed

  21. Validates JSON file is complete (not being written)

  22. Transforms to ClickHouse row format

  23. Adds to batch buffer

  24. Flushes batch when size threshold reached (20,000 rows)

  25. Updates SQLite with status uploaded

Total time from Telegram post to queryable database: typically under 5 minutes for a single archive, with most time spent on download and extraction. The SQLite database provides a complete audit trail of every file through the pipeline.

Monitoring and Observability

Each component logs to both file and stdout with structured formatting:

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        RotatingFileHandler('component.log', maxBytes=10*1024*1024, backupCount=5),
        logging.StreamHandler()
    ]
)

Key metrics tracked:

  • Files downloaded/extracted/parsed per hour
  • Credential extraction rate
  • Database insertion latency
  • System resource utilization
  • Failed extraction count and reasons

For alerting, I monitor the parsed directory size. If it grows beyond a threshold, the database uploader has fallen behind and needs attention.

Future Improvements

The current architecture works but has room for optimization:

Message Queue: Replace filesystem-based communication with Redis or Kafka for better backpressure handling and exactly-once processing guarantees.

Streaming Parser: Currently the parser loads entire files into memory. For very large stealer logs (some exceed 1GB), a streaming approach would reduce memory pressure.

Distributed Processing: The parser is single-threaded by design (to avoid overwhelming the system). A proper job queue with worker processes would improve throughput while maintaining resource controls.

Better Deduplication: Current deduplication happens at the archive level (file hash) and database level (ReplacingMergeTree). Adding credential-level deduplication before database insertion would reduce storage costs.

Conclusion

Building a data pipeline at this scale taught me that the boring parts matter most. Resource monitoring, rate limiting, failure handling, and cost awareness are less exciting than the credential parsing logic, but they’re what keep the system running reliably.

The complete pipeline processes millions of credentials daily on infrastructure costing under $300/month. The key is choosing the right tool for each job: Telethon for Telegram API access, 7zip/unrar for extraction, regex for parsing, ClickHouse for storage and queries.

If you have questions about the implementation or want to discuss threat intelligence infrastructure, reach out on X @frankiejavv.