Account Change Monitoring¶
Monitor profile changes, tweet deletions, and following behavior across multiple accounts.
Overview¶
Account monitoring helps you:
- Detect bio, name, or avatar changes
- Track tweet deletions
- Monitor following/unfollowing activity
- Watch multiple accounts simultaneously
- Store change history for analysis
Bio/Name/Avatar Change Detection¶
Basic Profile Change Monitor¶
import asyncio
from datetime import datetime
from xtools import Xtools
class ProfileChangeMonitor:
def __init__(self, username: str):
self.username = username
self.last_profile = None
async def check_changes(self) -> list:
"""Check for profile changes."""
async with Xtools() as x:
profile = await x.scrape.profile(self.username)
changes = []
if self.last_profile:
# Check name change
if profile.name != self.last_profile.get("name"):
changes.append({
"field": "name",
"old": self.last_profile.get("name"),
"new": profile.name
})
# Check bio change
if profile.bio != self.last_profile.get("bio"):
changes.append({
"field": "bio",
"old": self.last_profile.get("bio"),
"new": profile.bio
})
# Check avatar change
if profile.profile_image_url != self.last_profile.get("avatar"):
changes.append({
"field": "avatar",
"old": self.last_profile.get("avatar"),
"new": profile.profile_image_url
})
# Check location change
if profile.location != self.last_profile.get("location"):
changes.append({
"field": "location",
"old": self.last_profile.get("location"),
"new": profile.location
})
# Check website change
if profile.url != self.last_profile.get("url"):
changes.append({
"field": "website",
"old": self.last_profile.get("url"),
"new": profile.url
})
# Update stored profile
self.last_profile = {
"name": profile.name,
"bio": profile.bio,
"avatar": profile.profile_image_url,
"location": profile.location,
"url": profile.url
}
return changes
# Usage
async def main():
monitor = ProfileChangeMonitor("elonmusk")
# Initial load
await monitor.check_changes()
print("Profile loaded. Monitoring for changes...")
while True:
await asyncio.sleep(3600) # Check hourly
changes = await monitor.check_changes()
for change in changes:
print(f"\n🔔 @{monitor.username} changed their {change['field']}:")
print(f" Old: {change['old']}")
print(f" New: {change['new']}")
asyncio.run(main())
Comprehensive Profile Tracker¶
import asyncio
import json
from datetime import datetime
from pathlib import Path
from xtools import Xtools
class ComprehensiveProfileTracker:
TRACKED_FIELDS = [
"name", "username", "bio", "location", "url",
"profile_image_url", "profile_banner_url",
"protected", "verified"
]
def __init__(self, username: str, storage_dir: str = "profile_history"):
self.username = username
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
self.history_file = self.storage_dir / f"{username}_history.json"
self.history = self._load_history()
def _load_history(self) -> list:
if self.history_file.exists():
return json.loads(self.history_file.read_text())
return []
def _save_history(self):
self.history_file.write_text(json.dumps(self.history, indent=2))
async def snapshot(self) -> dict:
"""Take a profile snapshot and detect changes."""
async with Xtools() as x:
profile = await x.scrape.profile(self.username)
current = {
"timestamp": datetime.now().isoformat(),
"name": profile.name,
"username": profile.username,
"bio": profile.bio,
"location": profile.location,
"url": profile.url,
"profile_image_url": profile.profile_image_url,
"profile_banner_url": profile.profile_banner_url,
"protected": profile.protected,
"verified": profile.verified,
"followers_count": profile.followers_count,
"following_count": profile.following_count
}
changes = []
if self.history:
last = self.history[-1]
for field in self.TRACKED_FIELDS:
if current.get(field) != last.get(field):
changes.append({
"field": field,
"old": last.get(field),
"new": current.get(field),
"timestamp": current["timestamp"]
})
# Always save snapshot
self.history.append(current)
self._save_history()
return {
"snapshot": current,
"changes": changes
}
def get_field_history(self, field: str) -> list:
"""Get history of changes for a specific field."""
changes = []
last_value = None
for snapshot in self.history:
value = snapshot.get(field)
if value != last_value and last_value is not None:
changes.append({
"timestamp": snapshot["timestamp"],
"old": last_value,
"new": value
})
last_value = value
return changes
# Usage
async def main():
tracker = ComprehensiveProfileTracker("elonmusk")
result = await tracker.snapshot()
if result["changes"]:
print("🔔 Profile changes detected:")
for change in result["changes"]:
print(f" {change['field']}: {change['old']} → {change['new']}")
else:
print("No changes detected")
# View bio history
bio_history = tracker.get_field_history("bio")
print(f"\nBio changed {len(bio_history)} times")
asyncio.run(main())
Tweet Deletion Monitoring¶
Track Deleted Tweets¶
import asyncio
import json
from datetime import datetime
from pathlib import Path
from xtools import Xtools
class TweetDeletionMonitor:
def __init__(self, username: str, storage_dir: str = "tweet_archive"):
self.username = username
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
self.archive_file = self.storage_dir / f"{username}_tweets.json"
self.deleted_file = self.storage_dir / f"{username}_deleted.json"
self.known_tweets = self._load_json(self.archive_file)
self.deleted_tweets = self._load_json(self.deleted_file)
def _load_json(self, path: Path) -> dict:
if path.exists():
return json.loads(path.read_text())
return {}
def _save_json(self, path: Path, data: dict):
path.write_text(json.dumps(data, indent=2))
async def check_for_deletions(self) -> list:
"""Check for deleted tweets."""
async with Xtools() as x:
# Get current tweets
result = await x.scrape.tweets(self.username, limit=200)
current_ids = {t.id for t in result.tweets}
# Find deleted tweets
deleted = []
for tweet_id, tweet_data in list(self.known_tweets.items()):
if tweet_id not in current_ids:
# Tweet was deleted
tweet_data["deleted_at"] = datetime.now().isoformat()
deleted.append(tweet_data)
self.deleted_tweets[tweet_id] = tweet_data
del self.known_tweets[tweet_id]
# Add new tweets to archive
for tweet in result.tweets:
if tweet.id not in self.known_tweets:
self.known_tweets[tweet.id] = {
"id": tweet.id,
"text": tweet.text,
"created_at": tweet.created_at.isoformat() if tweet.created_at else None,
"archived_at": datetime.now().isoformat(),
"likes": tweet.like_count,
"retweets": tweet.retweet_count
}
# Save state
self._save_json(self.archive_file, self.known_tweets)
self._save_json(self.deleted_file, self.deleted_tweets)
return deleted
def get_deleted_tweets(self) -> list:
"""Get all recorded deleted tweets."""
return list(self.deleted_tweets.values())
# Usage
async def main():
monitor = TweetDeletionMonitor("target_username")
# Check for deletions
deleted = await monitor.check_for_deletions()
if deleted:
print(f"🗑️ {len(deleted)} tweets deleted:")
for tweet in deleted:
print(f"\n [{tweet['id']}]")
print(f" {tweet['text'][:100]}...")
print(f" Originally posted: {tweet['created_at']}")
else:
print("No deletions detected")
# View all deleted tweets
all_deleted = monitor.get_deleted_tweets()
print(f"\nTotal archived deletions: {len(all_deleted)}")
asyncio.run(main())
Following Changes Monitoring¶
Track Who User Follows/Unfollows¶
import asyncio
import json
from datetime import datetime
from pathlib import Path
from xtools import Xtools
class FollowingChangeMonitor:
def __init__(self, username: str, storage_dir: str = "following_data"):
self.username = username
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
self.state_file = self.storage_dir / f"{username}_following.json"
self.history_file = self.storage_dir / f"{username}_follow_history.json"
self.known_following = set(self._load_state())
self.history = self._load_history()
def _load_state(self) -> list:
if self.state_file.exists():
return json.loads(self.state_file.read_text())
return []
def _load_history(self) -> list:
if self.history_file.exists():
return json.loads(self.history_file.read_text())
return []
def _save_state(self, following: set):
self.state_file.write_text(json.dumps(list(following)))
def _save_history(self):
self.history_file.write_text(json.dumps(self.history, indent=2))
async def check_changes(self) -> dict:
"""Check for following changes."""
async with Xtools() as x:
result = await x.scrape.following(self.username, limit=5000)
current_following = {u.username for u in result.users}
changes = {
"new_follows": [],
"unfollows": [],
"timestamp": datetime.now().isoformat()
}
if self.known_following:
# New follows
new_follows = current_following - self.known_following
changes["new_follows"] = list(new_follows)
# Unfollows
unfollows = self.known_following - current_following
changes["unfollows"] = list(unfollows)
# Record history
if new_follows or unfollows:
self.history.append(changes)
self._save_history()
# Update state
self.known_following = current_following
self._save_state(current_following)
return changes
def get_follow_history(self, username: str) -> list:
"""Get follow/unfollow history for specific user."""
events = []
for record in self.history:
if username in record.get("new_follows", []):
events.append({"action": "followed", "timestamp": record["timestamp"]})
if username in record.get("unfollows", []):
events.append({"action": "unfollowed", "timestamp": record["timestamp"]})
return events
# Usage
async def main():
monitor = FollowingChangeMonitor("target_username")
changes = await monitor.check_changes()
if changes["new_follows"]:
print(f"➕ Started following ({len(changes['new_follows'])}):")
for user in changes["new_follows"][:10]:
print(f" @{user}")
if changes["unfollows"]:
print(f"➖ Stopped following ({len(changes['unfollows'])}):")
for user in changes["unfollows"][:10]:
print(f" @{user}")
asyncio.run(main())
Multiple Account Tracking¶
Multi-Account Monitor¶
import asyncio
import json
from datetime import datetime
from pathlib import Path
from xtools import Xtools
class MultiAccountMonitor:
def __init__(self, usernames: list, storage_dir: str = "multi_monitor"):
self.usernames = usernames
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
self.state = self._load_state()
def _load_state(self) -> dict:
state_file = self.storage_dir / "state.json"
if state_file.exists():
return json.loads(state_file.read_text())
return {}
def _save_state(self):
state_file = self.storage_dir / "state.json"
state_file.write_text(json.dumps(self.state, indent=2))
async def check_all(self) -> dict:
"""Check all accounts for changes."""
async with Xtools() as x:
results = {}
for username in self.usernames:
profile = await x.scrape.profile(username)
current = {
"name": profile.name,
"bio": profile.bio,
"followers": profile.followers_count,
"following": profile.following_count,
"avatar": profile.profile_image_url,
"verified": profile.verified
}
changes = []
last = self.state.get(username, {})
for field in ["name", "bio", "avatar", "verified"]:
if field in last and current[field] != last[field]:
changes.append({
"field": field,
"old": last[field],
"new": current[field]
})
results[username] = {
"current": current,
"changes": changes
}
self.state[username] = current
await asyncio.sleep(1) # Rate limiting
self._save_state()
return results
async def monitor_loop(self, interval_minutes: int = 60):
"""Continuous monitoring loop."""
print(f"🔍 Monitoring {len(self.usernames)} accounts...")
while True:
results = await self.check_all()
for username, data in results.items():
if data["changes"]:
print(f"\n🔔 @{username} changes:")
for change in data["changes"]:
print(f" {change['field']}: {change['old']} → {change['new']}")
await asyncio.sleep(interval_minutes * 60)
# Usage
async def main():
accounts = [
"elonmusk",
"naval",
"paulg",
"sama",
"vaborsh"
]
monitor = MultiAccountMonitor(accounts)
await monitor.monitor_loop(interval_minutes=30)
asyncio.run(main())
Change History Storage¶
Structured Change Log¶
import asyncio
import sqlite3
from datetime import datetime
from xtools import Xtools
class ChangeHistoryDB:
def __init__(self, db_path: str = "account_changes.db"):
self.conn = sqlite3.connect(db_path)
self._init_db()
def _init_db(self):
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS profile_changes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
field TEXT NOT NULL,
old_value TEXT,
new_value TEXT,
detected_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS tweet_deletions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
tweet_id TEXT NOT NULL,
tweet_text TEXT,
original_date TEXT,
deleted_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS follow_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
target_username TEXT NOT NULL,
action TEXT NOT NULL,
detected_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_profile_user ON profile_changes(username);
CREATE INDEX IF NOT EXISTS idx_deletions_user ON tweet_deletions(username);
CREATE INDEX IF NOT EXISTS idx_follow_user ON follow_events(username);
""")
self.conn.commit()
def log_profile_change(self, username: str, field: str, old_value, new_value):
self.conn.execute("""
INSERT INTO profile_changes (username, field, old_value, new_value)
VALUES (?, ?, ?, ?)
""", (username, field, str(old_value), str(new_value)))
self.conn.commit()
def log_deletion(self, username: str, tweet_id: str, text: str, original_date: str):
self.conn.execute("""
INSERT INTO tweet_deletions (username, tweet_id, tweet_text, original_date)
VALUES (?, ?, ?, ?)
""", (username, tweet_id, text, original_date))
self.conn.commit()
def log_follow_event(self, username: str, target: str, action: str):
self.conn.execute("""
INSERT INTO follow_events (username, target_username, action)
VALUES (?, ?, ?)
""", (username, target, action))
self.conn.commit()
def get_user_history(self, username: str, days: int = 30) -> dict:
"""Get all changes for a user."""
from datetime import timedelta
since = (datetime.now() - timedelta(days=days)).isoformat()
profile_changes = self.conn.execute("""
SELECT field, old_value, new_value, detected_at
FROM profile_changes
WHERE username = ? AND detected_at >= ?
ORDER BY detected_at DESC
""", (username, since)).fetchall()
deletions = self.conn.execute("""
SELECT tweet_id, tweet_text, deleted_at
FROM tweet_deletions
WHERE username = ? AND deleted_at >= ?
ORDER BY deleted_at DESC
""", (username, since)).fetchall()
follow_events = self.conn.execute("""
SELECT target_username, action, detected_at
FROM follow_events
WHERE username = ? AND detected_at >= ?
ORDER BY detected_at DESC
""", (username, since)).fetchall()
return {
"profile_changes": profile_changes,
"deletions": deletions,
"follow_events": follow_events
}
# Usage
db = ChangeHistoryDB()
# Log changes
db.log_profile_change("elonmusk", "bio", "Old bio text", "New bio text")
db.log_deletion("user", "123456", "Deleted tweet text", "2024-01-01")
db.log_follow_event("user", "target", "followed")
# Get history
history = db.get_user_history("elonmusk", days=7)
print(f"Profile changes: {len(history['profile_changes'])}")
print(f"Deletions: {len(history['deletions'])}")
print(f"Follow events: {len(history['follow_events'])}")
Alert Configuration¶
Configurable Alert System¶
import asyncio
from dataclasses import dataclass
from typing import Callable, Optional
from xtools import Xtools
from xtools.notifications import DiscordNotifier, TelegramNotifier
@dataclass
class AlertConfig:
profile_changes: bool = True
bio_only: bool = False
deletions: bool = True
min_deletion_likes: int = 0
follow_events: bool = True
follower_threshold: Optional[int] = None
class ConfigurableMonitor:
def __init__(
self,
username: str,
config: AlertConfig,
discord_webhook: str = None,
telegram_token: str = None,
telegram_chat: str = None
):
self.username = username
self.config = config
self.notifiers = []
if discord_webhook:
self.notifiers.append(DiscordNotifier(discord_webhook))
if telegram_token and telegram_chat:
self.notifiers.append(TelegramNotifier(telegram_token, telegram_chat))
async def send_alert(self, title: str, message: str, priority: str = "normal"):
"""Send alert to all configured notifiers."""
for notifier in self.notifiers:
await notifier.send(
title=title,
message=message,
color=0xFF0000 if priority == "high" else 0x00FF00
)
async def handle_profile_change(self, changes: list):
if not self.config.profile_changes:
return
for change in changes:
if self.config.bio_only and change["field"] != "bio":
continue
await self.send_alert(
title=f"@{self.username} Profile Change",
message=f"{change['field']}: {change['new']}"
)
async def handle_deletion(self, tweet: dict):
if not self.config.deletions:
return
if tweet.get("likes", 0) < self.config.min_deletion_likes:
return
await self.send_alert(
title=f"@{self.username} Deleted Tweet",
message=tweet["text"][:200],
priority="high" if tweet.get("likes", 0) > 1000 else "normal"
)
async def handle_follow_event(self, target: str, action: str):
if not self.config.follow_events:
return
await self.send_alert(
title=f"@{self.username} {action.title()}",
message=f"@{target}"
)
# Usage
config = AlertConfig(
profile_changes=True,
bio_only=False,
deletions=True,
min_deletion_likes=100, # Only alert for popular deletions
follow_events=True
)
monitor = ConfigurableMonitor(
"target_user",
config,
discord_webhook="https://discord.com/api/webhooks/..."
)
Next Steps¶
- Growth Monitoring - Track follower changes
- Keyword Monitoring - Monitor mentions and topics
- Engagement Monitoring - Track interactions