Business Intelligence Cookbook¶
Transform X/Twitter data into actionable business intelligence.
🎯 Lead Generation Engine¶
Build a sophisticated lead generation system that finds and qualifies prospects.
"""
Lead Generation Engine
Identify, qualify, and track potential customers from Twitter
"""
import asyncio
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from enum import Enum
class LeadScore(Enum):
"""Lead qualification scores."""
HOT = 90 # High intent, good fit
WARM = 70 # Shows interest
COOL = 50 # Potential fit
COLD = 30 # Low priority
UNQUALIFIED = 0
@dataclass
class Lead:
"""Qualified lead data."""
username: str
user_id: str
name: str
bio: str
followers: int
following: int
location: Optional[str]
website: Optional[str]
# Qualification
score: int = 0
score_reasons: list = field(default_factory=list)
intent_signals: list = field(default_factory=list)
# Tracking
source: str = "" # How we found them
first_seen: datetime = field(default_factory=datetime.now)
last_activity: Optional[datetime] = None
# Engagement history
engaged: bool = False
engagement_type: Optional[str] = None
response_received: bool = False
class LeadGenerator:
"""Generate and qualify leads from Twitter."""
def __init__(self, xeepy, config: dict):
self.x = xeepy
self.config = config
# Define your Ideal Customer Profile (ICP)
self.icp = config.get("icp", {
"keywords": ["startup", "founder", "ceo", "cto", "saas"],
"min_followers": 500,
"max_followers": 50000,
"must_have_website": True,
"industries": ["tech", "software", "ai"],
"exclude_keywords": ["parody", "fan account", "not affiliated"]
})
# Intent signals to watch for
self.intent_keywords = config.get("intent_keywords", [
"looking for",
"need help with",
"anyone recommend",
"struggling with",
"wish there was",
"tired of",
"alternative to"
])
self.leads: list[Lead] = []
async def find_leads_from_competitors(
self,
competitor_usernames: list[str],
limit_per_competitor: int = 100
) -> list[Lead]:
"""Find leads from competitor followers/engagers."""
leads = []
for competitor in competitor_usernames:
print(f"🔍 Analyzing @{competitor}'s audience...")
# Get recent engagers (people who reply/like their content)
tweets = await self.x.scrape.tweets(competitor, limit=20)
for tweet in tweets.items:
# Get people who replied
replies = await self.x.scrape.replies(tweet.url, limit=50)
for reply in replies.items:
lead = await self._qualify_user(
reply.author,
source=f"competitor:{competitor}"
)
if lead and lead.score >= LeadScore.COOL.value:
leads.append(lead)
# Sample of followers
followers = await self.x.scrape.followers(
competitor,
limit=limit_per_competitor
)
for user in followers.items:
lead = await self._qualify_user(
user,
source=f"competitor_follower:{competitor}"
)
if lead and lead.score >= LeadScore.COOL.value:
leads.append(lead)
# Deduplicate
seen = set()
unique_leads = []
for lead in leads:
if lead.username not in seen:
seen.add(lead.username)
unique_leads.append(lead)
self.leads.extend(unique_leads)
return unique_leads
async def find_leads_from_intent(
self,
search_queries: list[str],
limit_per_query: int = 50
) -> list[Lead]:
"""Find leads showing purchase intent."""
leads = []
for query in search_queries:
print(f"🔍 Searching for: {query}")
results = await self.x.scrape.search(query, limit=limit_per_query)
for tweet in results.items:
# Check for intent signals
intent_signals = self._detect_intent(tweet.text)
if intent_signals:
lead = await self._qualify_user(
tweet.author,
source=f"intent_search:{query}"
)
if lead:
lead.intent_signals = intent_signals
lead.score += 20 # Bonus for showing intent
leads.append(lead)
self.leads.extend(leads)
return leads
async def find_leads_from_hashtags(
self,
hashtags: list[str],
limit_per_hashtag: int = 100
) -> list[Lead]:
"""Find leads from relevant hashtags."""
leads = []
for hashtag in hashtags:
print(f"🔍 Scanning #{hashtag}...")
tweets = await self.x.scrape.hashtag(
f"#{hashtag}",
limit=limit_per_hashtag
)
for tweet in tweets.items:
lead = await self._qualify_user(
tweet.author,
source=f"hashtag:{hashtag}"
)
if lead and lead.score >= LeadScore.COOL.value:
leads.append(lead)
self.leads.extend(leads)
return leads
async def _qualify_user(self, user, source: str) -> Optional[Lead]:
"""Qualify a user against ICP criteria."""
# Skip if missing key data
if not user or not user.username:
return None
# Get full profile if needed
if not hasattr(user, 'bio') or not user.bio:
try:
user = await self.x.scrape.profile(user.username)
except:
return None
# Start scoring
score = 50 # Base score
reasons = []
bio_lower = (user.bio or "").lower()
# Check ICP keywords in bio
for keyword in self.icp.get("keywords", []):
if keyword.lower() in bio_lower:
score += 10
reasons.append(f"Bio contains '{keyword}'")
# Check exclusion keywords
for exclude in self.icp.get("exclude_keywords", []):
if exclude.lower() in bio_lower:
return None # Disqualify
# Follower range check
min_followers = self.icp.get("min_followers", 0)
max_followers = self.icp.get("max_followers", float("inf"))
if user.followers_count < min_followers:
score -= 20
reasons.append("Below minimum followers")
elif user.followers_count > max_followers:
score -= 10
reasons.append("Above maximum followers")
else:
score += 10
reasons.append("Follower count in range")
# Website check
if self.icp.get("must_have_website") and user.website:
score += 15
reasons.append("Has website")
elif self.icp.get("must_have_website"):
score -= 10
reasons.append("No website")
# Engagement ratio (active account)
if user.followers_count > 0:
ratio = user.following_count / user.followers_count
if 0.5 <= ratio <= 2.0:
score += 5
reasons.append("Healthy follow ratio")
# Create lead
lead = Lead(
username=user.username,
user_id=user.id,
name=user.name or user.username,
bio=user.bio or "",
followers=user.followers_count,
following=user.following_count,
location=getattr(user, 'location', None),
website=getattr(user, 'website', None),
score=max(0, min(100, score)),
score_reasons=reasons,
source=source
)
return lead
def _detect_intent(self, text: str) -> list[str]:
"""Detect purchase intent signals in text."""
text_lower = text.lower()
signals = []
for signal in self.intent_keywords:
if signal.lower() in text_lower:
signals.append(signal)
return signals
def export_leads(self, filepath: str, min_score: int = 50):
"""Export qualified leads to CSV."""
qualified = [l for l in self.leads if l.score >= min_score]
self.x.export.to_csv([
{
"username": l.username,
"name": l.name,
"bio": l.bio,
"followers": l.followers,
"website": l.website,
"score": l.score,
"score_reasons": "; ".join(l.score_reasons),
"intent_signals": "; ".join(l.intent_signals),
"source": l.source,
"twitter_url": f"https://x.com/{l.username}"
}
for l in qualified
], filepath)
print(f"✅ Exported {len(qualified)} leads to {filepath}")
# Usage
async def main():
from xeepy import Xeepy
async with Xeepy() as x:
generator = LeadGenerator(x, {
"icp": {
"keywords": ["founder", "ceo", "startup", "saas", "building"],
"min_followers": 500,
"max_followers": 100000,
"must_have_website": True,
"exclude_keywords": ["parody", "meme"]
},
"intent_keywords": [
"looking for a tool",
"need help with",
"any recommendations for",
"alternative to"
]
})
# Find leads from multiple sources
await generator.find_leads_from_competitors(
["competitor1", "competitor2"],
limit_per_competitor=50
)
await generator.find_leads_from_intent([
'"looking for" automation tool',
'"need help with" twitter growth'
])
await generator.find_leads_from_hashtags([
"buildinpublic",
"indiehackers"
])
# Export qualified leads
generator.export_leads("qualified_leads.csv", min_score=60)
asyncio.run(main())
📊 Competitor Intelligence Dashboard¶
Monitor competitors in real-time.
"""
Competitor Intelligence Dashboard
Real-time competitive monitoring and analysis
"""
import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from collections import defaultdict
@dataclass
class CompetitorProfile:
"""Tracked competitor data."""
username: str
name: str
bio: str
followers: int
following: int
tweet_count: int
# Historical tracking
followers_history: list = field(default_factory=list)
engagement_history: list = field(default_factory=list)
# Content analysis
top_tweets: list = field(default_factory=list)
posting_frequency: float = 0.0
avg_engagement: float = 0.0
content_themes: dict = field(default_factory=dict)
# Alerts
alerts: list = field(default_factory=list)
class CompetitorIntelligence:
"""Monitor and analyze competitors."""
def __init__(self, xeepy, competitors: list[str]):
self.x = xeepy
self.competitors = competitors
self.profiles: dict[str, CompetitorProfile] = {}
self.baseline: dict = {}
async def initialize(self):
"""Capture baseline data for all competitors."""
print("📊 Capturing competitor baselines...")
for username in self.competitors:
profile = await self.x.scrape.profile(username)
self.profiles[username] = CompetitorProfile(
username=username,
name=profile.name,
bio=profile.bio,
followers=profile.followers_count,
following=profile.following_count,
tweet_count=profile.tweets_count
)
# Capture initial metrics
await self._analyze_content(username)
print(f"✅ Tracking {len(self.profiles)} competitors")
async def check_for_changes(self) -> list[dict]:
"""Check for significant competitor changes."""
alerts = []
for username, profile in self.profiles.items():
current = await self.x.scrape.profile(username)
# Follower spike/drop
follower_change = current.followers_count - profile.followers
change_pct = (follower_change / profile.followers) * 100 if profile.followers > 0 else 0
if abs(change_pct) >= 5:
alert = {
"type": "follower_change",
"competitor": username,
"change": follower_change,
"change_pct": change_pct,
"direction": "gained" if follower_change > 0 else "lost",
"timestamp": datetime.now()
}
alerts.append(alert)
profile.alerts.append(alert)
# Bio change
if current.bio != profile.bio:
alerts.append({
"type": "bio_change",
"competitor": username,
"old_bio": profile.bio,
"new_bio": current.bio,
"timestamp": datetime.now()
})
# Update tracking
profile.followers = current.followers_count
profile.followers_history.append({
"timestamp": datetime.now(),
"count": current.followers_count
})
profile.bio = current.bio
return alerts
async def _analyze_content(self, username: str):
"""Analyze competitor content strategy."""
profile = self.profiles[username]
# Get recent tweets
tweets = await self.x.scrape.tweets(username, limit=100)
if not tweets.items:
return
# Calculate metrics
total_engagement = sum(
(t.likes or 0) + (t.retweets or 0) + (t.replies or 0)
for t in tweets.items
)
profile.avg_engagement = total_engagement / len(tweets.items)
# Find top performers
sorted_tweets = sorted(
tweets.items,
key=lambda t: (t.likes or 0) + (t.retweets or 0),
reverse=True
)
profile.top_tweets = sorted_tweets[:5]
# Posting frequency
if len(tweets.items) >= 2:
time_span = tweets.items[0].created_at - tweets.items[-1].created_at
days = max(time_span.days, 1)
profile.posting_frequency = len(tweets.items) / days
# Content themes (simple keyword analysis)
themes = defaultdict(int)
theme_keywords = {
"product": ["launch", "ship", "built", "feature", "update"],
"growth": ["grow", "scale", "revenue", "mrr", "arb"],
"education": ["learn", "tip", "how to", "thread", "guide"],
"engagement": ["what do you", "question", "thoughts", "agree"],
"personal": ["excited", "grateful", "journey", "milestone"]
}
for tweet in tweets.items:
text_lower = tweet.text.lower()
for theme, keywords in theme_keywords.items():
if any(kw in text_lower for kw in keywords):
themes[theme] += 1
profile.content_themes = dict(themes)
async def get_content_gaps(self) -> dict:
"""Find content opportunities competitors are missing."""
all_themes = defaultdict(int)
competitor_themes = {}
for username, profile in self.profiles.items():
await self._analyze_content(username)
competitor_themes[username] = profile.content_themes
for theme, count in profile.content_themes.items():
all_themes[theme] += count
# Find themes with low coverage
gaps = {}
for theme, total in all_themes.items():
avg = total / len(self.competitors)
gaps[theme] = {
"total_posts": total,
"avg_per_competitor": avg,
"opportunity": "high" if avg < 5 else "medium" if avg < 15 else "low"
}
return gaps
async def benchmark_report(self) -> str:
"""Generate competitive benchmark report."""
report = ["# Competitive Intelligence Report", ""]
report.append(f"*Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}*")
report.append("")
# Summary table
report.append("## Overview")
report.append("")
report.append("| Competitor | Followers | Avg Engagement | Posts/Day |")
report.append("|------------|-----------|----------------|-----------|")
for username, profile in self.profiles.items():
report.append(
f"| @{username} | {profile.followers:,} | "
f"{profile.avg_engagement:.1f} | {profile.posting_frequency:.1f} |"
)
report.append("")
# Top performing content
report.append("## Top Performing Content")
for username, profile in self.profiles.items():
report.append(f"\n### @{username}")
for i, tweet in enumerate(profile.top_tweets[:3], 1):
engagement = (tweet.likes or 0) + (tweet.retweets or 0)
report.append(f"{i}. [{tweet.text[:50]}...]({tweet.url}) - {engagement:,} engagements")
# Content themes
report.append("\n## Content Strategy Analysis")
for username, profile in self.profiles.items():
report.append(f"\n### @{username}")
for theme, count in sorted(profile.content_themes.items(), key=lambda x: -x[1]):
report.append(f"- {theme.title()}: {count} posts")
# Alerts
all_alerts = []
for profile in self.profiles.values():
all_alerts.extend(profile.alerts)
if all_alerts:
report.append("\n## Recent Alerts")
for alert in sorted(all_alerts, key=lambda x: x["timestamp"], reverse=True)[:10]:
report.append(f"- **{alert['type']}**: @{alert['competitor']} - {alert.get('change', 'N/A')}")
return "\n".join(report)
# Usage
async def run_competitor_monitoring():
from xeepy import Xeepy
async with Xeepy() as x:
intel = CompetitorIntelligence(x, [
"competitor1",
"competitor2",
"competitor3"
])
await intel.initialize()
# Check for changes
alerts = await intel.check_for_changes()
for alert in alerts:
print(f"🚨 {alert['type']}: @{alert['competitor']}")
# Find content gaps
gaps = await intel.get_content_gaps()
print("\n📊 Content Opportunities:")
for theme, data in gaps.items():
if data['opportunity'] == 'high':
print(f" - {theme}: HIGH opportunity")
# Generate report
report = await intel.benchmark_report()
with open("competitor_report.md", "w") as f:
f.write(report)
print("\n✅ Report saved to competitor_report.md")
asyncio.run(run_competitor_monitoring())
💰 ROI Tracking System¶
Measure the business impact of your Twitter presence.
"""
Twitter ROI Tracking System
Measure business outcomes from Twitter activities
"""
import asyncio
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import json
@dataclass
class Conversion:
"""A tracked conversion event."""
event_id: str
event_type: str # "signup", "demo", "sale", "lead"
source_tweet: Optional[str]
source_campaign: Optional[str]
username: Optional[str]
value: float
timestamp: datetime
attribution: str # "direct", "assisted", "organic"
class TwitterROI:
"""Track and measure Twitter marketing ROI."""
def __init__(self, xeepy, config: dict):
self.x = xeepy
self.config = config
self.conversions: list[Conversion] = []
self.campaigns: dict = {}
self.baseline_metrics: dict = {}
async def start_campaign(
self,
campaign_name: str,
campaign_type: str,
budget: float = 0,
goals: dict = None
) -> str:
"""Start tracking a new campaign."""
campaign_id = f"camp_{datetime.now().strftime('%Y%m%d%H%M%S')}"
# Capture baseline
my_profile = await self.x.scrape.profile(self.config["username"])
self.campaigns[campaign_id] = {
"name": campaign_name,
"type": campaign_type, # "content", "engagement", "follower", "lead_gen"
"budget": budget,
"goals": goals or {},
"start_date": datetime.now(),
"end_date": None,
"status": "active",
"baseline": {
"followers": my_profile.followers_count,
"following": my_profile.following_count
},
"tweets": [],
"activities": [],
"metrics": {}
}
print(f"🚀 Campaign '{campaign_name}' started (ID: {campaign_id})")
return campaign_id
async def track_tweet(self, campaign_id: str, tweet_url: str):
"""Track a tweet as part of a campaign."""
if campaign_id not in self.campaigns:
raise ValueError(f"Campaign {campaign_id} not found")
# Get tweet metrics
replies = await self.x.scrape.replies(tweet_url, limit=1)
tweet_data = {
"url": tweet_url,
"tracked_at": datetime.now().isoformat(),
"initial_metrics": {} # Would capture likes, RTs, etc.
}
self.campaigns[campaign_id]["tweets"].append(tweet_data)
async def record_conversion(
self,
event_type: str,
value: float,
campaign_id: Optional[str] = None,
source_tweet: Optional[str] = None,
username: Optional[str] = None,
metadata: dict = None
):
"""Record a conversion event."""
conversion = Conversion(
event_id=f"conv_{datetime.now().strftime('%Y%m%d%H%M%S%f')}",
event_type=event_type,
source_tweet=source_tweet,
source_campaign=campaign_id,
username=username,
value=value,
timestamp=datetime.now(),
attribution=self._determine_attribution(campaign_id, source_tweet)
)
self.conversions.append(conversion)
# Update campaign metrics
if campaign_id and campaign_id in self.campaigns:
camp = self.campaigns[campaign_id]
camp["activities"].append({
"type": "conversion",
"event_type": event_type,
"value": value,
"timestamp": datetime.now().isoformat()
})
print(f"💰 Conversion recorded: {event_type} - ${value}")
def _determine_attribution(
self,
campaign_id: Optional[str],
source_tweet: Optional[str]
) -> str:
"""Determine attribution model for conversion."""
if source_tweet:
return "direct"
elif campaign_id:
return "assisted"
else:
return "organic"
async def end_campaign(self, campaign_id: str) -> dict:
"""End a campaign and calculate final metrics."""
if campaign_id not in self.campaigns:
raise ValueError(f"Campaign {campaign_id} not found")
camp = self.campaigns[campaign_id]
camp["end_date"] = datetime.now()
camp["status"] = "completed"
# Capture final metrics
my_profile = await self.x.scrape.profile(self.config["username"])
# Calculate results
baseline = camp["baseline"]
results = {
"duration_days": (camp["end_date"] - camp["start_date"]).days,
"followers_gained": my_profile.followers_count - baseline["followers"],
"tweets_sent": len(camp["tweets"]),
"total_conversions": sum(
1 for c in self.conversions
if c.source_campaign == campaign_id
),
"total_revenue": sum(
c.value for c in self.conversions
if c.source_campaign == campaign_id
),
"budget": camp["budget"]
}
# Calculate ROI
if camp["budget"] > 0:
results["roi"] = ((results["total_revenue"] - camp["budget"]) / camp["budget"]) * 100
else:
results["roi"] = float("inf") if results["total_revenue"] > 0 else 0
# Cost per metrics
if results["followers_gained"] > 0 and camp["budget"] > 0:
results["cost_per_follower"] = camp["budget"] / results["followers_gained"]
if results["total_conversions"] > 0 and camp["budget"] > 0:
results["cost_per_conversion"] = camp["budget"] / results["total_conversions"]
camp["results"] = results
return results
def calculate_lifetime_value(self) -> dict:
"""Calculate customer lifetime value from Twitter."""
# Group conversions by user
user_values = {}
for conv in self.conversions:
if conv.username:
if conv.username not in user_values:
user_values[conv.username] = {"total": 0, "events": 0}
user_values[conv.username]["total"] += conv.value
user_values[conv.username]["events"] += 1
if not user_values:
return {"avg_ltv": 0, "total_customers": 0}
total_value = sum(u["total"] for u in user_values.values())
return {
"avg_ltv": total_value / len(user_values),
"total_customers": len(user_values),
"total_revenue": total_value,
"top_customers": sorted(
user_values.items(),
key=lambda x: x[1]["total"],
reverse=True
)[:10]
}
def generate_roi_report(self) -> str:
"""Generate comprehensive ROI report."""
report = ["# Twitter Marketing ROI Report", ""]
report.append(f"*Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}*")
report.append("")
# Overall metrics
total_conversions = len(self.conversions)
total_revenue = sum(c.value for c in self.conversions)
total_budget = sum(c["budget"] for c in self.campaigns.values())
report.append("## Executive Summary")
report.append("")
report.append(f"- **Total Campaigns**: {len(self.campaigns)}")
report.append(f"- **Total Conversions**: {total_conversions}")
report.append(f"- **Total Revenue**: ${total_revenue:,.2f}")
report.append(f"- **Total Budget**: ${total_budget:,.2f}")
if total_budget > 0:
overall_roi = ((total_revenue - total_budget) / total_budget) * 100
report.append(f"- **Overall ROI**: {overall_roi:.1f}%")
report.append("")
# Campaign breakdown
report.append("## Campaign Performance")
report.append("")
report.append("| Campaign | Duration | Followers | Conversions | Revenue | ROI |")
report.append("|----------|----------|-----------|-------------|---------|-----|")
for camp_id, camp in self.campaigns.items():
if "results" in camp:
r = camp["results"]
report.append(
f"| {camp['name']} | {r.get('duration_days', 'N/A')}d | "
f"+{r.get('followers_gained', 0)} | {r.get('total_conversions', 0)} | "
f"${r.get('total_revenue', 0):,.0f} | {r.get('roi', 0):.1f}% |"
)
report.append("")
# Attribution
report.append("## Attribution Analysis")
attribution_counts = {"direct": 0, "assisted": 0, "organic": 0}
attribution_values = {"direct": 0, "assisted": 0, "organic": 0}
for conv in self.conversions:
attribution_counts[conv.attribution] += 1
attribution_values[conv.attribution] += conv.value
report.append("")
for attr in ["direct", "assisted", "organic"]:
report.append(
f"- **{attr.title()}**: {attribution_counts[attr]} conversions, "
f"${attribution_values[attr]:,.2f} revenue"
)
# LTV
ltv = self.calculate_lifetime_value()
report.append("")
report.append("## Customer Value")
report.append(f"- **Average LTV**: ${ltv['avg_ltv']:,.2f}")
report.append(f"- **Total Customers**: {ltv['total_customers']}")
return "\n".join(report)
# Usage
async def track_marketing_roi():
from xeepy import Xeepy
async with Xeepy() as x:
roi = TwitterROI(x, {"username": "your_account"})
# Start a campaign
campaign_id = await roi.start_campaign(
"Q1 Product Launch",
"content",
budget=500,
goals={"followers": 1000, "conversions": 50}
)
# Track activities
await roi.track_tweet(campaign_id, "https://x.com/you/status/123")
# Record conversions
await roi.record_conversion(
"signup",
value=0,
campaign_id=campaign_id,
source_tweet="https://x.com/you/status/123",
username="new_user"
)
await roi.record_conversion(
"sale",
value=99.00,
campaign_id=campaign_id,
username="new_user"
)
# End campaign and get results
results = await roi.end_campaign(campaign_id)
print(f"\n📊 Campaign Results:")
print(f" ROI: {results.get('roi', 0):.1f}%")
print(f" Revenue: ${results.get('total_revenue', 0):,.2f}")
# Generate report
report = roi.generate_roi_report()
with open("roi_report.md", "w") as f:
f.write(report)
asyncio.run(track_marketing_roi())
🏢 Customer Support Monitoring¶
Monitor brand mentions for support opportunities.
"""
Customer Support Monitor
Track and respond to support requests on Twitter
"""
import asyncio
from datetime import datetime
from enum import Enum
from dataclasses import dataclass
class TicketPriority(Enum):
URGENT = "urgent"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class TicketStatus(Enum):
NEW = "new"
IN_PROGRESS = "in_progress"
WAITING = "waiting"
RESOLVED = "resolved"
@dataclass
class SupportTicket:
"""Support ticket from Twitter."""
id: str
tweet_url: str
username: str
text: str
priority: TicketPriority
status: TicketStatus
sentiment: str
keywords: list
created_at: datetime
assigned_to: str = None
notes: list = None
class SupportMonitor:
"""Monitor Twitter for support requests."""
def __init__(self, xeepy, config: dict):
self.x = xeepy
self.config = config
self.tickets: dict[str, SupportTicket] = {}
# Keywords indicating support needs
self.support_keywords = config.get("support_keywords", [
"help", "support", "broken", "issue", "bug",
"doesn't work", "not working", "can't", "error",
"problem", "frustrated", "disappointed"
])
self.urgent_keywords = config.get("urgent_keywords", [
"urgent", "asap", "immediately", "critical",
"down", "outage", "broken"
])
async def scan_mentions(self, limit: int = 50) -> list[SupportTicket]:
"""Scan mentions for support requests."""
brand = self.config["brand_username"]
mentions = await self.x.scrape.mentions(brand, limit=limit)
new_tickets = []
for mention in mentions.items:
if mention.id in self.tickets:
continue
# Check if it's a support request
text_lower = mention.text.lower()
keywords_found = [
kw for kw in self.support_keywords
if kw in text_lower
]
if not keywords_found:
continue
# Determine priority
priority = TicketPriority.MEDIUM
if any(kw in text_lower for kw in self.urgent_keywords):
priority = TicketPriority.URGENT
elif "?" in mention.text and len(keywords_found) == 1:
priority = TicketPriority.LOW
ticket = SupportTicket(
id=mention.id,
tweet_url=mention.url,
username=mention.author.username,
text=mention.text,
priority=priority,
status=TicketStatus.NEW,
sentiment=self._analyze_sentiment(mention.text),
keywords=keywords_found,
created_at=mention.created_at,
notes=[]
)
self.tickets[mention.id] = ticket
new_tickets.append(ticket)
return new_tickets
def _analyze_sentiment(self, text: str) -> str:
"""Simple sentiment analysis."""
negative = ["frustrated", "angry", "disappointed", "hate", "terrible", "worst"]
positive = ["thanks", "appreciate", "love", "great", "awesome"]
text_lower = text.lower()
neg_count = sum(1 for w in negative if w in text_lower)
pos_count = sum(1 for w in positive if w in text_lower)
if neg_count > pos_count:
return "negative"
elif pos_count > neg_count:
return "positive"
return "neutral"
def prioritize_queue(self) -> list[SupportTicket]:
"""Get prioritized queue of open tickets."""
open_tickets = [
t for t in self.tickets.values()
if t.status in [TicketStatus.NEW, TicketStatus.IN_PROGRESS]
]
# Sort by priority, then sentiment, then age
priority_order = {
TicketPriority.URGENT: 0,
TicketPriority.HIGH: 1,
TicketPriority.MEDIUM: 2,
TicketPriority.LOW: 3
}
sentiment_order = {"negative": 0, "neutral": 1, "positive": 2}
return sorted(open_tickets, key=lambda t: (
priority_order[t.priority],
sentiment_order[t.sentiment],
t.created_at
))
# Usage
async def monitor_support():
from xeepy import Xeepy
async with Xeepy() as x:
monitor = SupportMonitor(x, {
"brand_username": "your_brand",
"support_keywords": ["help", "broken", "issue", "bug", "not working"],
"urgent_keywords": ["urgent", "down", "critical"]
})
# Scan for new tickets
new_tickets = await monitor.scan_mentions(limit=100)
print(f"🎫 Found {len(new_tickets)} new support requests")
# Get prioritized queue
queue = monitor.prioritize_queue()
print("\n📋 Support Queue:")
for ticket in queue[:10]:
print(f" [{ticket.priority.value}] @{ticket.username}: {ticket.text[:50]}...")
asyncio.run(monitor_support())
📈 Sales Pipeline Integration¶
Connect Twitter engagement to your CRM.
"""
Twitter to CRM Pipeline
Sync Twitter interactions with your sales pipeline
"""
import asyncio
from datetime import datetime
import httpx # For API calls
class TwitterCRMSync:
"""Sync Twitter interactions to CRM."""
def __init__(self, xeepy, crm_config: dict):
self.x = xeepy
self.crm_config = crm_config
self.crm_api = crm_config.get("api_url")
self.api_key = crm_config.get("api_key")
async def sync_follower_as_lead(self, username: str):
"""Create CRM lead from Twitter follower."""
# Get profile
profile = await self.x.scrape.profile(username)
# Map to CRM fields
lead_data = {
"source": "twitter",
"source_id": profile.id,
"name": profile.name,
"twitter_handle": f"@{username}",
"twitter_url": f"https://x.com/{username}",
"bio": profile.bio,
"website": profile.website,
"followers": profile.followers_count,
"location": getattr(profile, 'location', None),
"created_at": datetime.now().isoformat()
}
# Push to CRM (example with generic REST API)
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.crm_api}/leads",
json=lead_data,
headers={"Authorization": f"Bearer {self.api_key}"}
)
return response.json()
async def log_interaction(
self,
username: str,
interaction_type: str, # "dm", "reply", "like", "follow"
content: str = None,
tweet_url: str = None
):
"""Log Twitter interaction to CRM."""
interaction_data = {
"contact_identifier": f"twitter:@{username}",
"channel": "twitter",
"type": interaction_type,
"content": content,
"reference_url": tweet_url,
"timestamp": datetime.now().isoformat()
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.crm_api}/interactions",
json=interaction_data,
headers={"Authorization": f"Bearer {self.api_key}"}
)
return response.json()
# Usage example
async def sync_to_crm():
from xeepy import Xeepy
async with Xeepy() as x:
crm = TwitterCRMSync(x, {
"api_url": "https://api.yourcrm.com/v1",
"api_key": "your_api_key"
})
# Sync new followers as leads
new_followers = await x.monitor.new_followers()
for follower in new_followers[:10]:
await crm.sync_follower_as_lead(follower.username)
print(f"✅ Synced @{follower.username} to CRM")
asyncio.run(sync_to_crm())
Next Steps¶
- Growth Recipes - Scale your audience
- Automation Recipes - Automate workflows
- Data Science Recipes - Analyze your data