Performance Tuning¶
Optimize XTools for maximum throughput while respecting rate limits.
Caching Strategy¶
XTools includes a multi-level caching system to reduce redundant requests.
from xtools.storage.cache import Cache, CacheConfig
# Configure caching
cache_config = CacheConfig(
backend="sqlite", # or "redis", "memory"
ttl_seconds=3600, # 1 hour default TTL
max_size_mb=100,
compression=True
)
cache = Cache(cache_config)
# Manual cache usage
async def cached_profile_fetch(username: str):
"""Fetch profile with caching."""
cache_key = f"profile:{username}"
# Check cache first
cached = await cache.get(cache_key)
if cached:
return cached
# Fetch fresh data
async with XTools() as x:
profile = await x.scrape.profile(username)
# Store in cache
await cache.set(cache_key, profile, ttl=7200)
return profile
Cache Invalidation
Cache is automatically invalidated when actions modify data (follow, like, etc).
Parallel Execution¶
Execute independent operations concurrently using asyncio.gather:
import asyncio
from xtools import XTools
async def parallel_scraping():
"""Scrape multiple profiles in parallel."""
usernames = ["user1", "user2", "user3", "user4", "user5"]
async with XTools() as x:
# Parallel profile scraping
tasks = [
x.scrape.profile(username)
for username in usernames
]
profiles = await asyncio.gather(*tasks)
return profiles
async def parallel_with_semaphore():
"""Limit concurrency with semaphore."""
usernames = ["user" + str(i) for i in range(100)]
semaphore = asyncio.Semaphore(5) # Max 5 concurrent
async def fetch_with_limit(username):
async with semaphore:
async with XTools() as x:
return await x.scrape.profile(username)
tasks = [fetch_with_limit(u) for u in usernames]
return await asyncio.gather(*tasks)
Concurrency Limits
Too many parallel requests can trigger rate limits. Use semaphores wisely.
Connection Pooling¶
Reuse browser contexts for better performance:
from xtools.core.browser import BrowserPool
async def connection_pooling():
"""Use browser connection pool."""
pool = BrowserPool(
size=5, # Pool size
headless=True,
recycle_after=100 # Recycle after N uses
)
async with pool:
# Get context from pool
async with pool.acquire() as browser:
page = await browser.new_page()
# Use page...
# Context returned to pool
# With XTools
async def xtools_with_pool():
pool = BrowserPool(size=3)
async with XTools(browser_pool=pool) as x:
# XTools uses pool internally
results = await x.scrape.replies(url)
Batch Operations¶
Process items in batches for efficiency:
from xtools import XTools
from xtools.utils import batch_processor
async def batch_follow_users():
"""Follow users in batches."""
users_to_follow = ["user" + str(i) for i in range(500)]
async with XTools() as x:
results = await batch_processor(
items=users_to_follow,
processor=x.follow.user,
batch_size=10,
delay_between_batches=30, # seconds
on_progress=lambda done, total: print(f"{done}/{total}")
)
print(f"Followed: {results.success_count}")
print(f"Failed: {results.failure_count}")
Memory Optimization¶
Handle large datasets without exhausting memory:
from xtools import XTools
async def stream_large_dataset():
"""Stream results instead of loading all in memory."""
async with XTools() as x:
# Use async generator for large result sets
async for tweet in x.scrape.tweets_stream(
"username",
limit=10000
):
# Process one at a time
process_tweet(tweet)
# Optionally write to file incrementally
append_to_csv(tweet, "output.csv")
async def chunked_export():
"""Export large datasets in chunks."""
async with XTools() as x:
chunk_size = 1000
offset = 0
while True:
tweets = await x.scrape.tweets(
"username",
limit=chunk_size,
offset=offset
)
if not tweets:
break
x.export.to_csv(
tweets,
f"tweets_{offset}.csv"
)
offset += chunk_size
Generator Pattern
Use async generators (async for) when processing thousands of items.
Performance Monitoring¶
from xtools.monitoring import PerformanceMonitor
import time
async def monitored_scraping():
"""Monitor scraping performance."""
monitor = PerformanceMonitor()
async with XTools() as x:
with monitor.track("profile_scrape"):
profile = await x.scrape.profile("username")
with monitor.track("followers_scrape"):
followers = await x.scrape.followers("username", limit=1000)
# Get metrics
stats = monitor.get_stats()
print(f"Profile scrape: {stats['profile_scrape']['avg_ms']}ms avg")
print(f"Followers scrape: {stats['followers_scrape']['avg_ms']}ms avg")
# Export metrics
monitor.export_prometheus("metrics.txt")
Configuration Presets¶
from xtools import XTools
from xtools.config import PerformancePreset
# High-throughput preset
async with XTools(preset=PerformancePreset.HIGH_THROUGHPUT) as x:
# Aggressive caching, parallel execution
pass
# Conservative preset (safer for accounts)
async with XTools(preset=PerformancePreset.CONSERVATIVE) as x:
# Slower, more human-like behavior
pass
# Custom configuration
config = {
"cache_ttl": 1800,
"max_concurrent": 3,
"request_delay": (1, 3),
"retry_attempts": 3
}
async with XTools(config=config) as x:
pass