Mass Operations¶
Execute large-scale operations safely and efficiently.
Overview¶
Mass operations require careful planning to avoid account restrictions:
| Operation | Safe Daily Limit | Aggressive Limit | Recovery Time |
|---|---|---|---|
| Follows | 100-200 | 400 | 24-48 hours |
| Unfollows | 100-200 | 400 | 24-48 hours |
| Likes | 200-500 | 1000 | 12-24 hours |
| DMs | 50-100 | 200 | 48-72 hours |
Safe Mass Following¶
Follow thousands over multiple days:
from xeepy import Xeepy
from xeepy.actions.base import MassOperationConfig
import asyncio
config = MassOperationConfig(
total_target=1000,
daily_limit=150,
hourly_limit=25,
batch_size=25,
batch_delay=300, # 5 min between batches
session_duration_hours=8,
active_hours=(9, 22),
)
async def mass_follow_campaign():
"""Execute a large-scale follow campaign over multiple days"""
async with Xeepy() as x:
keywords = ["python developer", "tech founder", "startup"]
completed = 0
while completed < config.total_target:
# Daily session
daily_count = 0
while daily_count < config.daily_limit:
result = await x.follow.by_keyword(
keywords=keywords,
max_follows=config.batch_size,
filters=x.config.default_follow_filters,
)
completed += result.success_count
daily_count += result.success_count
print(f"Batch complete: +{result.success_count}")
print(f"Daily: {daily_count}/{config.daily_limit}")
print(f"Total: {completed}/{config.total_target}")
if daily_count >= config.daily_limit:
break
# Batch delay
print(f"Waiting {config.batch_delay}s...")
await asyncio.sleep(config.batch_delay)
# Wait until tomorrow
if completed < config.total_target:
print("\n📅 Daily limit reached. Resuming tomorrow...")
await asyncio.sleep(24 * 60 * 60)
asyncio.run(mass_follow_campaign())
Safe Mass Unfollowing¶
Clean up thousands of non-followers:
from xeepy import Xeepy
async def mass_unfollow_cleanup():
"""Unfollow non-followers over multiple days"""
async with Xeepy() as x:
# First, get total count
preview = await x.unfollow.non_followers(
max_unfollows=10000,
dry_run=True
)
total = preview.success_count
daily_limit = 150
days = (total // daily_limit) + 1
print(f"📊 Found {total} non-followers")
print(f"📅 Will take ~{days} days at {daily_limit}/day")
if input("\nStart campaign? (y/n): ").lower() != 'y':
return
completed = 0
day = 0
while completed < total:
day += 1
print(f"\n🗓️ Day {day}")
result = await x.unfollow.non_followers(
max_unfollows=daily_limit,
whitelist=["important1", "important2"],
min_following_days=7,
exclude_verified=True,
on_progress=lambda c, t, m: print(f" [{c}/{t}] {m}")
)
completed += result.success_count
print(f" ✅ Unfollowed: {result.success_count}")
print(f" 📈 Total progress: {completed}/{total}")
if completed < total:
print(" ⏳ Waiting until tomorrow...")
await asyncio.sleep(24 * 60 * 60)
print(f"\n🎉 Campaign complete! Unfollowed {completed} users")
asyncio.run(mass_unfollow_cleanup())
Resumable Operations¶
Handle interruptions gracefully:
from xeepy import Xeepy
from xeepy.storage import FollowTracker
async def resumable_follow_campaign(campaign_name: str):
"""Campaign that can be resumed after interruption"""
tracker = FollowTracker("xeepy.db")
# Check for existing session
session = tracker.get_pending_session(campaign_name)
if session:
print(f"📂 Found existing session: {session['id']}")
pending = tracker.get_pending_session_items(session['id'])
print(f" {len(pending)} items remaining")
if input("Resume? (y/n): ").lower() != 'y':
tracker.complete_session(session['id'])
session = None
async with Xeepy() as x:
if not session:
# Create new campaign
print("🆕 Creating new campaign...")
# Get target users
targets = await x.scrape.search_users(
"python developer",
limit=500
)
usernames = [t.username for t in targets]
# Create session
session_id = tracker.create_session(
campaign_name,
usernames
)
pending = usernames
print(f" Created session with {len(pending)} targets")
else:
session_id = session['id']
# Process pending items
for username in pending:
try:
result = await x.follow.user(username)
status = "success" if result.success else "failed"
except Exception as e:
status = f"error: {str(e)}"
tracker.update_session_item(
session_id,
username,
status
)
# Rate limiting
await asyncio.sleep(random.uniform(3, 8))
# Complete session
tracker.complete_session(session_id)
print("🎉 Campaign complete!")
Parallel Processing¶
Speed up read-only operations:
from xeepy import Xeepy
import asyncio
async def parallel_scrape(usernames: list, max_concurrent: int = 5):
"""Scrape multiple profiles in parallel"""
semaphore = asyncio.Semaphore(max_concurrent)
results = []
async def scrape_one(x, username):
async with semaphore:
try:
profile = await x.scrape.profile(username)
return {"username": username, "profile": profile}
except Exception as e:
return {"username": username, "error": str(e)}
async with Xeepy() as x:
tasks = [scrape_one(x, u) for u in usernames]
results = await asyncio.gather(*tasks)
return results
# Scrape 100 profiles with 5 concurrent requests
usernames = [f"user{i}" for i in range(100)]
results = asyncio.run(parallel_scrape(usernames, max_concurrent=5))
Parallel Limits
Only use parallel processing for read operations (scraping).
Never parallelize: - Follow/unfollow - Like/retweet - Comments/DMs
Batch Processing¶
Process in controlled batches:
from xeepy import Xeepy
async def batch_operation(items: list, batch_size: int = 25):
"""Process items in batches with delays"""
async with Xeepy() as x:
total = len(items)
for i in range(0, total, batch_size):
batch = items[i:i + batch_size]
batch_num = (i // batch_size) + 1
total_batches = (total // batch_size) + 1
print(f"\n📦 Batch {batch_num}/{total_batches}")
for item in batch:
# Process item
await x.follow.user(item)
await asyncio.sleep(random.uniform(2, 5))
# Longer pause between batches
if i + batch_size < total:
pause = 60 * 5 # 5 minutes
print(f"⏸️ Pausing {pause}s before next batch...")
await asyncio.sleep(pause)
Progress Tracking¶
Track and display progress:
from xeepy import Xeepy
from tqdm import tqdm
import time
async def tracked_mass_operation():
"""Mass operation with progress bar"""
async with Xeepy() as x:
# Get targets
targets = await x.scrape.followers("competitor", limit=500)
with tqdm(total=len(targets), desc="Following") as pbar:
success = 0
failed = 0
for user in targets:
result = await x.follow.user(user.username)
if result.success:
success += 1
pbar.set_postfix({"✓": success, "✗": failed})
else:
failed += 1
pbar.update(1)
await asyncio.sleep(random.uniform(3, 8))
print(f"\n✅ Complete: {success} followed, {failed} failed")
Error Recovery¶
Handle errors without losing progress:
from xeepy import Xeepy
import json
async def resilient_mass_operation():
"""Mass operation with error recovery"""
progress_file = "mass_op_progress.json"
# Load progress
try:
with open(progress_file) as f:
progress = json.load(f)
except FileNotFoundError:
progress = {"completed": [], "failed": [], "pending": []}
async with Xeepy() as x:
# Get targets (or use pending from progress)
if not progress["pending"]:
targets = await x.scrape.followers("competitor", limit=500)
progress["pending"] = [t.username for t in targets]
while progress["pending"]:
username = progress["pending"][0]
try:
result = await x.follow.user(username)
if result.success:
progress["completed"].append(username)
else:
progress["failed"].append(username)
except Exception as e:
print(f"Error: {e}")
progress["failed"].append(username)
finally:
# Remove from pending regardless of result
progress["pending"].pop(0)
# Save progress
with open(progress_file, "w") as f:
json.dump(progress, f)
await asyncio.sleep(random.uniform(3, 8))
print(f"✅ Complete!")
print(f" Succeeded: {len(progress['completed'])}")
print(f" Failed: {len(progress['failed'])}")
Rate Limit Handling¶
Automatically handle rate limits:
from xeepy import Xeepy
from xeepy.actions.base import RateLimitHandler
handler = RateLimitHandler(
initial_delay=3,
max_delay=300,
backoff_multiplier=2,
max_retries=3,
)
async def rate_limited_operation():
async with Xeepy() as x:
for username in usernames:
success = False
retries = 0
while not success and retries < handler.max_retries:
try:
result = await x.follow.user(username)
success = result.success
handler.reset()
except RateLimitError:
delay = handler.get_backoff_delay()
print(f"⏳ Rate limited. Waiting {delay}s...")
await asyncio.sleep(delay)
retries += 1
await asyncio.sleep(handler.get_delay())
Multi-Account Operations¶
Distribute load across accounts:
from xeepy import Xeepy, AccountRotator
async def multi_account_mass_follow():
"""Distribute follows across multiple accounts"""
accounts = ["account1", "account2", "account3"]
targets_per_account = 50
rotator = AccountRotator(
profiles=accounts,
strategy="round-robin",
cooldown_minutes=60
)
targets = ["target1", "target2", ...] # 150 targets
for i, target in enumerate(targets):
# Get next available account
profile = rotator.get_next()
async with Xeepy(profile=profile) as x:
await x.follow.user(target)
print(f"[{profile}] Followed @{target}")
await asyncio.sleep(random.uniform(3, 8))
Campaign Templates¶
Follower Growth Campaign¶
async def growth_campaign(days: int = 7):
"""7-day follower growth campaign"""
daily_follows = 100
daily_unfollows = 50
async with Xeepy() as x:
for day in range(days):
print(f"\n📅 Day {day + 1}/{days}")
# Morning: Follow new users
print("🌅 Morning session: Following...")
await x.follow.by_keyword(
keywords=["your niche"],
max_follows=daily_follows // 2
)
# Afternoon: Engage with existing
print("☀️ Afternoon session: Engaging...")
await x.engage.auto_like(
keywords=["your niche"],
max_likes=50
)
# Evening: Unfollow non-followers
print("🌙 Evening session: Cleanup...")
await x.unfollow.non_followers(
max_unfollows=daily_unfollows,
min_following_days=7
)
# Report
me = await x.scrape.profile("me")
print(f"📊 Stats: {me.followers_count} followers, {me.following_count} following")
if day < days - 1:
await asyncio.sleep(24 * 60 * 60)
Cleanup Campaign¶
async def cleanup_campaign():
"""Aggressive cleanup of low-quality follows"""
async with Xeepy() as x:
# Phase 1: Non-followers
print("Phase 1: Non-followers")
await x.unfollow.non_followers(max_unfollows=200)
# Phase 2: Inactive accounts
print("Phase 2: Inactive accounts")
await x.unfollow.by_criteria(
criteria=UnfollowCriteria(inactive_days=180),
max_unfollows=100
)
# Phase 3: Spam accounts
print("Phase 3: Spam accounts")
await x.unfollow.by_criteria(
criteria=UnfollowCriteria(
bio_contains=["follow back", "f4f"],
max_followers=50
),
max_unfollows=100
)
print("✅ Cleanup complete!")
Best Practices¶
Safety Checklist
- ✅ Always test with small batches first
- ✅ Use session tracking for resumability
- ✅ Implement exponential backoff
- ✅ Spread operations over multiple days
- ✅ Monitor account for warnings
- ✅ Keep detailed logs
Avoid
- ❌ Running multiple mass operations simultaneously
- ❌ Exceeding daily limits
- ❌ Ignoring rate limit warnings
- ❌ Operations without progress tracking
- ❌ Running during account cooldowns
Monitoring¶
Watch for account restrictions:
async def check_account_health():
"""Monitor account for restriction signs"""
async with Xeepy() as x:
health = await x.account.health_check()
if health.warnings:
print("⚠️ Warnings detected:")
for warning in health.warnings:
print(f" - {warning}")
return False
if health.rate_limited:
print(f"⏳ Rate limited until {health.reset_time}")
return False
print("✅ Account healthy")
return True
Next Steps¶
Rate Limiting - Configure safety limits
Scheduling - Automate campaigns
Multi-Account - Scale with multiple accounts