Building an Infostealer Data Pipeline: A Technical Deep Dive
Infostealers are lightweight malware that exfiltrate saved credentials from web browsers, along with cookies, autofill data, and system information. The stolen data gets packaged into “logs” and distributed through Telegram channels, often within minutes of exfiltration. I built a pipeline to collect, process, and store this data at scale for threat intelligence purposes. At peak ingestion, the system processes around 2.5 million credentials within 24 hours and has accumulated over 6.1 billion records.
This post covers the technical architecture, design decisions, and lessons learned from building this system.
Architecture Overview
The pipeline consists of four interconnected components:
Telegram Channels → Downloader → Decompressor → Parser → ClickHouse
↓ ↓ ↓
└──────── SQLite DB ───────┘
(tracking)
A central SQLite database tracks files through every stage of the pipeline: downloaded archives, extraction status, parsing progress, and upload completion. Each component runs as an independent Python process, communicating through the filesystem and updating the tracking database. This decoupled architecture allows each stage to scale independently and provides natural checkpointing. If the parser crashes, the decompressor continues extracting archives. If the database goes down, parsed JSON files accumulate until it recovers.
The Downloader: Telegram Scraping at Scale
The downloader monitors Telegram channels using Telethon and downloads archives containing stealer logs. The challenge is handling the volume and variety of file formats while tracking what has already been downloaded.
File Tracking with SQLite
Early versions used filename matching to detect duplicates, which failed when channels posted the same file under different names. I implemented a central SQLite database that tracks files through the entire pipeline, from download through parsing and upload:
def create_metadata_file(file_path, message, password=None, channel_username=None,
channel_title=None, channel_id=None):
"""Create a .meta JSON file with archive metadata."""
filename = os.path.basename(file_path)
file_size = os.path.getsize(file_path) if os.path.exists(file_path) else message.file.size
# Calculate MD5 hash for deduplication
md5_hash = None
if os.path.exists(file_path):
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
md5_hash = hash_md5.hexdigest()
metadata = {
"filename": filename,
"password": password,
"source_url": f"https://t.me/{channel_username[1:]}" if channel_username else None,
"downloaded_at": datetime.utcnow().isoformat() + "Z",
"file_size": file_size,
"md5": md5_hash,
"channel_title": channel_title,
"channel_id": channel_id,
"message_id": message.id,
"message_date": message.date.isoformat() if hasattr(message, 'date') else None,
"message_text": message.message
}
meta_path = f"{file_path}.meta"
with open(meta_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
The SQLite database tracks each file through all pipeline stages:
- File hash (MD5) for exact deduplication
- Original filename and sanitized filename
- Source channel ID and message ID
- Extracted password (if found in message)
- Download timestamp and status
- Extraction timestamp, status, and output path
- Parse timestamp and credential count extracted
- Upload timestamp and status
Password Extraction
Most password-protected archives use predictable passwords. Through analyzing hundreds of Telegram messages, I identified common patterns:
def extract_password_from_message(message):
"""Enhanced password extraction with multiple patterns."""
if not message.message:
return None, None
patterns = [
r'(?i)(?:🗃\s*)?(?:пароль\s*/\s*)?pass(?:word)?\s*:\s*(@\w+)',
r'(?i)(?:🔶🔶)?pass(?:word)?\s+(@\w+)',
r'(?i)pass(?:word)?\s+for\s+archive\s*:\s*(@\w+)',
r'(?i)(?:📁\s*)?pass\s*:\s*(https://t\.me/\w+)',
r'(?i)pass(?:word)?\s+for\s+archive\s*:?\s*\(([^)]+)\)',
r'(?i)pass(?:word)?\s*[:\s]*\(([^)]+)\)',
r'(?i)pass(?:word)?\s*:\s*([^\s\n]+)',
r'(?i)🔐\s*([^\s\n]+)',
]
text = message.message
for pattern in patterns:
match = re.search(pattern, text)
if match:
pw = match.group(1).strip()
if pw.startswith('@'):
pw = pw[1:] # Remove @ prefix
if pw.startswith('https://t.me/'):
pw = pw.replace('https://t.me/', '')
return pw, re.sub(r'[^\w.-]', '_', pw)
return None, None
The most common password format is simply the channel’s @username. Threat actors use this as a form of attribution and to drive traffic to their channels.
Parallel Downloads with Backpressure
Telegram rate-limits aggressive scraping. The downloader uses semaphore-based concurrency control:
async def download_multiple_channels_parallel(self):
"""Download from multiple channels in parallel."""
channel_semaphore = asyncio.Semaphore(3) # Max 3 channels concurrently
async def process_channel(username):
async with channel_semaphore:
return await self.process_single_channel(username)
tasks = [process_channel(username) for username in config.TARGETS.keys()]
results = await asyncio.gather(*tasks, return_exceptions=True)
Within each channel, files download in batches of 12 with up to 3 concurrent downloads per batch:
ENABLE_PARALLEL_DOWNLOADS = True
MAX_CONCURRENT_DOWNLOADS = 3
BATCH_SIZE = 12
INTER_BATCH_DELAY = 0.5 # seconds between batches
I monitor 10-20 channels at any given time. The number fluctuates because Telegram frequently bans channels distributing stolen data. New channels appear constantly, requiring manual discovery and configuration updates.
The Decompressor: Handling Malformed Archives
Stealer logs come in various archive formats (ZIP, RAR, 7z) with varying levels of corruption. The decompressor extracts these while handling failures gracefully.
Resource Monitoring
Early versions would spawn extraction processes without limits, eventually exhausting system memory. I added resource monitoring that throttles extraction when the system is under load:
class ResourceMonitor:
"""Monitor system resources to prevent overload"""
def __init__(self):
self.memory_threshold = 80 # Percent
self.cpu_threshold = 90 # Percent
self.disk_threshold = 95 # Percent
def check_resources(self):
"""Check if system resources are available"""
memory = psutil.virtual_memory()
if memory.percent > self.memory_threshold:
logger.warning(f"High memory usage: {memory.percent:.1f}%")
return False
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > self.cpu_threshold:
logger.warning(f"High CPU usage: {cpu_percent:.1f}%")
return False
disk = psutil.disk_usage('/')
if disk.percent > self.disk_threshold:
logger.error(f"Disk space critically low: {disk.percent:.1f}%")
return False
return True
Process Management with Timeouts
Some archives are malformed in ways that cause extraction tools to hang indefinitely. The process manager wraps subprocesses with timeouts and ensures cleanup:
class ProcessManager:
"""Manages subprocesses with timeout and resource monitoring"""
def __init__(self):
self.active_processes = []
self.max_processes = 2
self.process_timeout = 300 # 5 minutes per extraction
@contextmanager
def managed_process(self, cmd, timeout=None):
"""Context manager for subprocess with automatic cleanup"""
timeout = timeout or self.process_timeout
process = None
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid # Create new process group
)
self.active_processes.append(process)
yield process
finally:
if process:
self.cleanup_process(process)
def cleanup_process(self, process):
"""Forcefully clean up a process and its children"""
if process.poll() is None:
pgid = os.getpgid(process.pid)
os.killpg(pgid, signal.SIGTERM)
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
os.killpg(pgid, signal.SIGKILL)
process.wait()
Failed Extraction Quarantine
Archives that fail extraction after multiple attempts get moved to a quarantine directory with a failure log:
def move_to_failed(self, archive_path, reason):
"""Move problematic archive to failed directory."""
target_path = self.failed_dir / archive_path.name
if target_path.exists():
timestamp = int(time.time())
name, ext = target_path.stem, target_path.suffix
target_path = self.failed_dir / f"{name}_{timestamp}{ext}"
shutil.move(str(archive_path), str(target_path))
# Log failure reason
failure_log = self.failed_dir / "failure_log.json"
log_data = json.load(open(failure_log)) if failure_log.exists() else {}
log_data[target_path.name] = {
'original_path': str(archive_path),
'reason': reason,
'timestamp': time.time()
}
json.dump(log_data, open(failure_log, 'w'), indent=2)
This allows manual inspection of problematic files and prevents the pipeline from getting stuck retrying the same corrupted archive.
The Parser: Credential Extraction and Risk Scoring
The parser transforms raw stealer log files into structured Profile Objects with risk assessments.
Credential Format Detection
Different malware families output credentials in different formats. The parser handles multiple patterns:
def extract_credentials(self, content: str) -> List[Dict[str, str]]:
"""Extract credentials from various stealer log formats."""
credentials = []
# Primary format: SOFT, URL, USER, PASS (LummaC2, Redline, etc.)
pattern = (
r'SOFT:\s*([^\n]+)\s*\n'
r'URL:\s*([^\n]+)\s*\n'
r'USER:\s*([^\n]+)\s*\n'
r'PASS:\s*([^\n]+)'
)
for match in re.finditer(pattern, content, re.IGNORECASE):
software = match.group(1).strip()
url = match.group(2).strip()
username = match.group(3).strip()
password = match.group(4).strip()
if not (is_valid_url(url) and is_valid_username(username)):
continue
cred_obj = {
"software": software,
"url": url,
"username": username,
"password": password
}
# Extract domain from URL
if url.startswith('android://'):
# Android app credential
parsed = urlparse(url)
cred_obj["app_identifier"] = parsed.netloc
else:
# Web credential
parsed = urlparse(url)
domain = parsed.netloc.lower()
if domain.startswith('www.'):
domain = domain[4:]
cred_obj["domain"] = domain
credentials.append(cred_obj)
return credentials
Government and Corporate Classification
Not all credentials carry equal risk. A compromised personal Gmail is different from a compromised .gov email:
def is_government_credential(credential):
"""Detect and classify government credentials"""
domain = credential.get('domain', '').lower()
username = credential.get('username', '').lower()
# Direct government domains
if domain.endswith('.gov') or domain.endswith('.mil'):
return True, ['government_domain'], classify_government_agency(domain)
# Government email addresses
if '@' in username:
email_domain = username.split('@')[1].lower()
if email_domain.endswith('.gov') or email_domain.endswith('.mil'):
return True, ['government_email'], classify_government_agency(email_domain)
# Defense contractor domains
CONTRACTOR_DOMAINS = [
'lockheedmartin.com', 'boeing.com', 'raytheon.com',
'northropgrumman.com', 'generaldynamics.com', 'boozallen.com'
]
if any(contractor in domain for contractor in CONTRACTOR_DOMAINS):
return True, ['defense_contractor'], 'defense_contractor'
return False, [], None
def classify_government_agency(domain):
"""Classify the type of government agency"""
FEDERAL_CLASSIFICATIONS = {
'defense': ['defense.gov', 'army.mil', 'navy.mil', 'af.mil', 'dod.gov'],
'intelligence': ['cia.gov', 'nsa.gov', 'fbi.gov', 'dhs.gov', 'dni.gov'],
'executive': ['whitehouse.gov', 'state.gov', 'treasury.gov', 'justice.gov'],
}
for classification, domains in FEDERAL_CLASSIFICATIONS.items():
if any(agency_domain in domain for agency_domain in domains):
return f'federal_{classification}'
if domain.endswith('.gov'):
return 'federal_unclassified'
if domain.endswith('.mil'):
return 'military_unclassified'
return 'government_unclassified'
Risk Scoring
Each credential receives a risk score based on multiple factors:
def assess_credential_risk(credential):
"""Risk assessment for credentials"""
domain = credential.get('domain', '').lower()
username = credential.get('username', '').lower()
base_risk_score = 10
risk_indicators = []
# Government credentials are critical
is_gov, gov_indicators, gov_classification = is_government_credential(credential)
if is_gov:
return assess_government_credential_risk(credential, gov_indicators, gov_classification)
# Critical financial/infrastructure domains
ALWAYS_CRITICAL = [
'chase.com', 'bankofamerica.com', 'wellsfargo.com',
'paypal.com', 'stripe.com', 'coinbase.com',
'console.aws.amazon.com', 'console.cloud.google.com', 'portal.azure.com'
]
if any(critical in domain for critical in ALWAYS_CRITICAL):
base_risk_score += 80
risk_indicators.append('critical_financial_or_infrastructure')
# Privileged usernames
if any(priv in username for priv in ['admin', 'administrator', 'root', 'service']):
base_risk_score += 40
risk_indicators.append('privileged_username')
# Weak password indicator
password = credential.get('password', '')
if len(password) < 8:
base_risk_score += 15
risk_indicators.append('weak_password')
return {
'risk_score': min(base_risk_score, 100),
'risk_category': get_risk_category(base_risk_score),
'risk_indicators': risk_indicators
}
Rate Limiting
Without rate limiting, the parser would consume all available CPU and eventually crash or hang. I implemented adaptive throttling based on system resources:
class RateLimiter:
"""Rate limiter with adaptive throttling based on system resources"""
def __init__(self, max_files_per_minute=30, max_concurrent_files=3):
self.max_files_per_minute = max_files_per_minute
self.max_concurrent_files = max_concurrent_files
self.processing_times = deque(maxlen=100)
self.files_processed_minute = deque()
self.active_processes = 0
self.cpu_threshold = 80
self.memory_threshold = 85
def should_throttle(self):
"""Determine if processing should be throttled"""
current_time = time.time()
# Clean old entries
while self.files_processed_minute and (current_time - self.files_processed_minute[0]) > 60:
self.files_processed_minute.popleft()
# Rate limit check
if len(self.files_processed_minute) >= self.max_files_per_minute:
return True, "Rate limit exceeded"
# Concurrent processing limit
if self.active_processes >= self.max_concurrent_files:
return True, "Concurrent limit reached"
# System resource check
cpu_percent = psutil.cpu_percent(interval=0.1)
if cpu_percent > self.cpu_threshold:
return True, f"High CPU: {cpu_percent:.1f}%"
memory = psutil.virtual_memory()
if memory.percent > self.memory_threshold:
return True, f"High memory: {memory.percent:.1f}%"
return False, "OK"
def wait_if_needed(self):
"""Wait if throttling is needed"""
should_wait, reason = self.should_throttle()
if should_wait:
if "Rate limit" in reason:
oldest_time = self.files_processed_minute[0]
wait_time = 60 - (time.time() - oldest_time) + 1
wait_time = max(1, min(wait_time, 60))
else:
wait_time = 5
logger.info(f"Throttling: {reason}, waiting {wait_time:.1f}s")
time.sleep(wait_time)
return self.wait_if_needed() # Recursive check
return True
The Database: ClickHouse for Billions of Records
PostgreSQL was my first choice, but it struggled with queries over 100 million rows. ClickHouse handles billions of rows with sub-second query times due to its columnar storage and vectorized query execution.
Schema Design
The schema is optimized for the most common query patterns: filtering by domain, email, or username:
CREATE TABLE IF NOT EXISTS vault.creds
(
ts DateTime DEFAULT now(),
victim_id String,
source_name String,
url String DEFAULT '',
domain LowCardinality(String) DEFAULT '',
email String DEFAULT '',
username String DEFAULT '',
password Nullable(String),
phone Nullable(String),
name Nullable(String),
address Nullable(String),
country Nullable(String),
-- System context
hostname Nullable(String),
ip_address String DEFAULT '',
os_version Nullable(String),
hwid Nullable(String),
-- Risk classification
account_type LowCardinality(String) DEFAULT 'personal',
risk_score UInt8 DEFAULT 0,
risk_category LowCardinality(String) DEFAULT 'low',
is_privileged Bool DEFAULT false,
breach_impact LowCardinality(String) DEFAULT 'low',
-- Bloom filter indexes for exact match lookups
INDEX bf_email email TYPE tokenbf_v1(1024, 3, 0) GRANULARITY 64,
INDEX bf_user username TYPE tokenbf_v1(1024, 3, 0) GRANULARITY 64,
INDEX bf_url url TYPE tokenbf_v1(1024, 3, 0) GRANULARITY 64,
INDEX set_domain domain TYPE set(8192) GRANULARITY 64,
INDEX bf_ip ip_address TYPE tokenbf_v1(1024, 3, 0) GRANULARITY 64
)
ENGINE = ReplacingMergeTree(ts)
ORDER BY (domain, email, username, victim_id)
SETTINGS index_granularity = 8192;
Why ClickHouse?
ClickHouse excels at this workload for several reasons:
Columnar Storage: When querying by domain, ClickHouse only reads the domain column, not the entire row. For a table with 20+ columns, this reduces I/O by 90%+.
Vectorized Execution: Operations process data in batches (vectors) rather than row-by-row, leveraging SIMD instructions for parallel processing within a single CPU core.
LowCardinality Optimization: The domain and account_type columns use LowCardinality(String), which dictionary-encodes values. Since most domains appear thousands of times, this dramatically reduces storage and speeds up filtering.
Bloom Filter Indexes: The tokenbf_v1 indexes enable fast exact-match lookups without full table scans. A query like WHERE email = 'user@example.com' uses the bloom filter to skip irrelevant data blocks.
ReplacingMergeTree: This engine automatically deduplicates records with the same primary key during background merges, handling the case where the same credential appears in multiple stealer logs.
Batch Insertion
Individual inserts are inefficient. The uploader batches records and uses async inserts:
def insert_rows(self, rows):
"""Insert credential rows into ClickHouse."""
if not rows:
return
self.client.insert(
'vault.creds',
rows,
column_names=self.column_names,
settings={
'async_insert': 1,
'wait_for_async_insert': 1
}
)
The async_insert setting buffers inserts server-side before writing to storage, reducing write amplification. Batch size of 20,000 rows provides a good balance between memory usage and insertion efficiency.
Query Performance
Example query returning credentials for a specific domain across 6.1 billion records:
SELECT domain, email, username, risk_score, risk_category
FROM vault.creds
WHERE domain = 'example.com'
ORDER BY risk_score DESC
LIMIT 100;
This typically executes in under 500ms thanks to the set_domain index and columnar storage.
For aggregate analysis:
SELECT
account_type,
risk_category,
count() as credential_count,
uniqExact(email) as unique_emails,
uniqExact(domain) as unique_domains
FROM vault.creds
GROUP BY account_type, risk_category
ORDER BY credential_count DESC;
Infrastructure and Costs
Server Configuration
Processing Server (Hetzner AX102 in Helsinki):
- AMD Ryzen 9 5950X (16 cores, 32 threads)
- 128 GB DDR4 RAM
- 2x 22TB SATA Enterprise HDD (RAID 1 for redundancy)
- Cost: €156/month + €79 one-time setup
This server handles downloading, decompression, and parsing. The large HDDs store raw archives and extracted files before processing.
Database (ClickHouse Cloud Scale Plan):
- Managed ClickHouse with automatic scaling
- Stores the processed credential data
- Cost: Variable based on compute and storage
The $644 Lesson
My first production deployment included a dashboard that displayed a live record count. The implementation was naive:
# DON'T DO THIS
while True:
count = client.query("SELECT count() FROM vault.creds").result_rows[0][0]
update_dashboard(count)
time.sleep(5) # Query every 5 seconds
A count(*) query on a 6+ billion row table is expensive, even in ClickHouse. Running it every 5 seconds, 24/7, accumulated massive compute costs. When the billing period ended, I owed $644.28 USD.
The fix was using an approximate count that’s nearly free:
SELECT sum(rows) FROM system.parts
WHERE database = 'vault' AND table = 'creds' AND active = 1;
This queries metadata rather than scanning the table. The count is eventually consistent but accurate enough for a dashboard.
Lesson learned: always consider the cost implications of queries, especially in managed cloud services where you pay for compute.
Current Monthly Costs
| Component | Cost |
|---|---|
| Hetzner AX102 | €156 (~$170) |
| ClickHouse Cloud | Variable, ~$50-100 with careful query patterns |
| Total | ~$220-270/month |
This runs a pipeline capable of processing millions of credentials daily and storing billions of records.
Data Flow Example
To illustrate the complete flow, here’s what happens when a new stealer log archive appears on Telegram:
-
Downloader detects new message with
.rarattachment in monitored channel -
Extracts password from message text using regex patterns
-
Downloads file to
/storage/obscura/raw/CHANNEL_NAME/ -
Creates
.metaJSON file with password, hash, source info -
Records file in SQLite with status
downloaded -
Decompressor queries SQLite for files with status
downloaded -
Checks if already extracted (hash lookup)
-
Verifies system resources are available
-
Attempts extraction with detected password
-
On failure, retries without password, then moves to quarantine
-
Extracted files go to
/storage/obscura/processed/VICTIM_ID/ -
Updates SQLite with status
extractedand output path -
Parser queries SQLite for files with status
extracted -
Scans for credential files (
All Passwords.txt, etc.) -
Extracts credentials using format-specific patterns
-
Classifies each credential (government, corporate, personal)
-
Calculates risk scores
-
Generates Profile Object JSON in
/storage/obscura/parsed/ -
Updates SQLite with status
parsedand credential count -
DB Uploader queries SQLite for files with status
parsed -
Validates JSON file is complete (not being written)
-
Transforms to ClickHouse row format
-
Adds to batch buffer
-
Flushes batch when size threshold reached (20,000 rows)
-
Updates SQLite with status
uploaded
Total time from Telegram post to queryable database: typically under 5 minutes for a single archive, with most time spent on download and extraction. The SQLite database provides a complete audit trail of every file through the pipeline.
Monitoring and Observability
Each component logs to both file and stdout with structured formatting:
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
RotatingFileHandler('component.log', maxBytes=10*1024*1024, backupCount=5),
logging.StreamHandler()
]
)
Key metrics tracked:
- Files downloaded/extracted/parsed per hour
- Credential extraction rate
- Database insertion latency
- System resource utilization
- Failed extraction count and reasons
For alerting, I monitor the parsed directory size. If it grows beyond a threshold, the database uploader has fallen behind and needs attention.
Future Improvements
The current architecture works but has room for optimization:
Message Queue: Replace filesystem-based communication with Redis or Kafka for better backpressure handling and exactly-once processing guarantees.
Streaming Parser: Currently the parser loads entire files into memory. For very large stealer logs (some exceed 1GB), a streaming approach would reduce memory pressure.
Distributed Processing: The parser is single-threaded by design (to avoid overwhelming the system). A proper job queue with worker processes would improve throughput while maintaining resource controls.
Better Deduplication: Current deduplication happens at the archive level (file hash) and database level (ReplacingMergeTree). Adding credential-level deduplication before database insertion would reduce storage costs.
Conclusion
Building a data pipeline at this scale taught me that the boring parts matter most. Resource monitoring, rate limiting, failure handling, and cost awareness are less exciting than the credential parsing logic, but they’re what keep the system running reliably.
The complete pipeline processes millions of credentials daily on infrastructure costing under $300/month. The key is choosing the right tool for each job: Telethon for Telegram API access, 7zip/unrar for extraction, regex for parsing, ClickHouse for storage and queries.
If you have questions about the implementation or want to discuss threat intelligence infrastructure, reach out on X @frankiejavv.