Research & Analysis Cookbook¶
Academic and professional research techniques using X/Twitter data.
🔬 Academic Research Framework¶
Build a research-grade data collection system.
"""
Academic Research Framework
Ethical, reproducible Twitter research methodology
"""
import asyncio
import hashlib
import json
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
from typing import Optional
@dataclass
class ResearchMetadata:
"""Metadata for research dataset."""
study_id: str
researcher: str
institution: str
irb_approval: Optional[str]
research_question: str
data_collection_start: datetime
data_collection_end: Optional[datetime]
sampling_method: str
ethical_considerations: list
data_processing_notes: list
@dataclass
class DataCollectionLog:
"""Audit log for data collection."""
timestamp: datetime
operation: str
parameters: dict
records_collected: int
errors: list
checksum: str
class AcademicResearcher:
"""Framework for ethical Twitter research."""
def __init__(self, xeepy, metadata: ResearchMetadata):
self.x = xeepy
self.metadata = metadata
self.collection_logs: list[DataCollectionLog] = []
# Setup directories
self.data_dir = Path(f"research_{metadata.study_id}")
self.data_dir.mkdir(exist_ok=True)
(self.data_dir / "raw").mkdir(exist_ok=True)
(self.data_dir / "processed").mkdir(exist_ok=True)
(self.data_dir / "logs").mkdir(exist_ok=True)
def _anonymize_user(self, user: dict) -> dict:
"""Anonymize user data for privacy."""
# Create consistent pseudonym
user_hash = hashlib.sha256(
user.get("username", "").encode()
).hexdigest()[:12]
return {
"id_hash": user_hash,
"follower_range": self._bucket_followers(user.get("followers", 0)),
"account_age_category": self._categorize_age(user.get("created_at")),
"verified": user.get("verified", False),
"bio_length": len(user.get("bio", "") or ""),
# Remove: username, name, profile_image, etc.
}
def _bucket_followers(self, count: int) -> str:
"""Bucket follower count for privacy."""
if count < 100:
return "<100"
elif count < 1000:
return "100-999"
elif count < 10000:
return "1K-10K"
elif count < 100000:
return "10K-100K"
else:
return ">100K"
def _categorize_age(self, created_at) -> str:
"""Categorize account age."""
if not created_at:
return "unknown"
age_days = (datetime.now() - created_at).days
if age_days < 90:
return "new (<3mo)"
elif age_days < 365:
return "recent (3mo-1y)"
elif age_days < 365 * 3:
return "established (1-3y)"
else:
return "veteran (>3y)"
async def collect_conversation_network(
self,
seed_tweets: list[str],
depth: int = 2,
anonymize: bool = True
) -> dict:
"""Collect conversation network from seed tweets."""
network = {
"nodes": [],
"edges": [],
"tweets": []
}
visited = set()
queue = [(url, 0) for url in seed_tweets]
while queue:
tweet_url, current_depth = queue.pop(0)
if tweet_url in visited or current_depth > depth:
continue
visited.add(tweet_url)
try:
replies = await self.x.scrape.replies(tweet_url, limit=100)
for reply in replies.items:
# Process tweet
tweet_data = {
"id_hash": hashlib.sha256(reply.id.encode()).hexdigest()[:12],
"text": reply.text if not anonymize else self._anonymize_text(reply.text),
"created_at": reply.created_at.isoformat(),
"engagement": {
"likes": reply.likes or 0,
"retweets": reply.retweets or 0,
"replies": reply.replies or 0
}
}
if anonymize:
tweet_data["author"] = self._anonymize_user(asdict(reply.author))
network["tweets"].append(tweet_data)
# Add to queue for further exploration
if current_depth < depth:
queue.append((reply.url, current_depth + 1))
except Exception as e:
self._log_error("collect_conversation", str(e))
self._log_collection("conversation_network", {
"seed_tweets": len(seed_tweets),
"depth": depth
}, len(network["tweets"]))
return network
def _anonymize_text(self, text: str) -> str:
"""Basic text anonymization."""
import re
# Remove @mentions
text = re.sub(r'@\w+', '@[USER]', text)
# Remove URLs
text = re.sub(r'https?://\S+', '[URL]', text)
# Remove potential names (basic)
# Note: More sophisticated NER would be needed for production
return text
async def collect_hashtag_sample(
self,
hashtag: str,
sample_size: int,
sampling_method: str = "chronological"
) -> list[dict]:
"""Collect stratified sample from hashtag."""
tweets = await self.x.scrape.hashtag(
f"#{hashtag}",
limit=sample_size * 2 # Oversample for filtering
)
# Apply sampling method
if sampling_method == "chronological":
sample = tweets.items[:sample_size]
elif sampling_method == "random":
import random
sample = random.sample(
tweets.items,
min(sample_size, len(tweets.items))
)
elif sampling_method == "stratified_engagement":
# Stratify by engagement level
sorted_tweets = sorted(
tweets.items,
key=lambda t: (t.likes or 0) + (t.retweets or 0)
)
# Take from each quartile
quartile_size = len(sorted_tweets) // 4
sample = []
for i in range(4):
start = i * quartile_size
end = start + (sample_size // 4)
sample.extend(sorted_tweets[start:end])
else:
sample = tweets.items[:sample_size]
self._log_collection("hashtag_sample", {
"hashtag": hashtag,
"method": sampling_method
}, len(sample))
return [self._process_tweet_for_research(t) for t in sample]
def _process_tweet_for_research(self, tweet) -> dict:
"""Process tweet for research use."""
return {
"id_hash": hashlib.sha256(tweet.id.encode()).hexdigest()[:12],
"text": tweet.text,
"created_at": tweet.created_at.isoformat() if tweet.created_at else None,
"likes": tweet.likes or 0,
"retweets": tweet.retweets or 0,
"replies": tweet.replies or 0,
"has_media": bool(getattr(tweet, 'media', None)),
"has_url": 'http' in tweet.text,
"has_hashtags": '#' in tweet.text,
"tweet_length": len(tweet.text),
"author_anonymized": self._anonymize_user(
asdict(tweet.author) if hasattr(tweet.author, '__dict__') else {}
)
}
def _log_collection(self, operation: str, params: dict, count: int):
"""Log data collection operation."""
log = DataCollectionLog(
timestamp=datetime.now(),
operation=operation,
parameters=params,
records_collected=count,
errors=[],
checksum=hashlib.sha256(
json.dumps(params, sort_keys=True).encode()
).hexdigest()[:16]
)
self.collection_logs.append(log)
def _log_error(self, operation: str, error: str):
"""Log collection error."""
if self.collection_logs:
self.collection_logs[-1].errors.append(error)
def export_dataset(self, data: list, filename: str):
"""Export dataset with metadata."""
export = {
"metadata": asdict(self.metadata),
"collection_logs": [
{
"timestamp": log.timestamp.isoformat(),
"operation": log.operation,
"parameters": log.parameters,
"records_collected": log.records_collected,
"errors": log.errors,
"checksum": log.checksum
}
for log in self.collection_logs
],
"data": data
}
filepath = self.data_dir / "processed" / filename
with open(filepath, 'w') as f:
json.dump(export, f, indent=2, default=str)
print(f"✅ Dataset exported to {filepath}")
print(f" Records: {len(data)}")
print(f" Checksum: {hashlib.sha256(json.dumps(data, sort_keys=True, default=str).encode()).hexdigest()[:16]}")
# Usage
async def run_research():
from xeepy import Xeepy
metadata = ResearchMetadata(
study_id="twitter_discourse_2024",
researcher="Dr. Jane Smith",
institution="University Example",
irb_approval="IRB-2024-001",
research_question="How do conversation dynamics differ across topic communities?",
data_collection_start=datetime.now(),
data_collection_end=None,
sampling_method="stratified_engagement",
ethical_considerations=[
"Data anonymized before analysis",
"No individual identification attempted",
"Public tweets only"
],
data_processing_notes=[]
)
async with Xeepy() as x:
researcher = AcademicResearcher(x, metadata)
# Collect data
sample = await researcher.collect_hashtag_sample(
"academictwitter",
sample_size=500,
sampling_method="stratified_engagement"
)
# Export
researcher.export_dataset(sample, "hashtag_sample.json")
asyncio.run(run_research())
📊 Discourse Analysis Toolkit¶
Analyze public discourse and narratives.
"""
Discourse Analysis Toolkit
Study narratives, framing, and public discourse on Twitter
"""
import asyncio
from collections import Counter, defaultdict
from datetime import datetime
import re
class DiscourseAnalyzer:
"""Analyze Twitter discourse patterns."""
def __init__(self, xeepy):
self.x = xeepy
async def analyze_framing(
self,
topic: str,
sample_size: int = 500
) -> dict:
"""Analyze how a topic is framed in discourse."""
tweets = await self.x.scrape.search(topic, limit=sample_size)
analysis = {
"topic": topic,
"sample_size": len(tweets.items),
"frames": defaultdict(list),
"sentiment_distribution": {"positive": 0, "negative": 0, "neutral": 0},
"key_terms": Counter(),
"hashtag_clusters": defaultdict(int),
"source_types": Counter()
}
for tweet in tweets.items:
# Extract frames
frames = self._identify_frames(tweet.text)
for frame in frames:
analysis["frames"][frame].append(tweet.text[:100])
# Sentiment
sentiment = self._simple_sentiment(tweet.text)
analysis["sentiment_distribution"][sentiment] += 1
# Key terms (bigrams)
terms = self._extract_key_terms(tweet.text)
analysis["key_terms"].update(terms)
# Hashtag clusters
hashtags = re.findall(r'#\w+', tweet.text.lower())
for tag in hashtags:
analysis["hashtag_clusters"][tag] += 1
# Source type
source = self._categorize_source(tweet.author)
analysis["source_types"][source] += 1
# Convert to serializable
analysis["frames"] = dict(analysis["frames"])
analysis["key_terms"] = dict(analysis["key_terms"].most_common(50))
analysis["hashtag_clusters"] = dict(analysis["hashtag_clusters"])
analysis["source_types"] = dict(analysis["source_types"])
return analysis
def _identify_frames(self, text: str) -> list[str]:
"""Identify framing patterns in text."""
frames = []
text_lower = text.lower()
frame_patterns = {
"economic": ["cost", "price", "money", "economy", "jobs", "business", "market"],
"moral": ["right", "wrong", "should", "must", "duty", "responsible"],
"conflict": ["fight", "battle", "war", "attack", "defend", "opposition"],
"human_interest": ["story", "people", "family", "children", "personal"],
"scientific": ["study", "research", "data", "evidence", "expert", "science"],
"political": ["government", "policy", "politician", "vote", "election"],
"security": ["safe", "danger", "threat", "protect", "risk", "security"]
}
for frame, keywords in frame_patterns.items():
if any(kw in text_lower for kw in keywords):
frames.append(frame)
return frames if frames else ["unclassified"]
def _simple_sentiment(self, text: str) -> str:
"""Simple sentiment classification."""
positive = ["good", "great", "excellent", "amazing", "love", "best", "happy", "wonderful"]
negative = ["bad", "terrible", "awful", "hate", "worst", "sad", "angry", "disappointed"]
text_lower = text.lower()
pos = sum(1 for w in positive if w in text_lower)
neg = sum(1 for w in negative if w in text_lower)
if pos > neg:
return "positive"
elif neg > pos:
return "negative"
return "neutral"
def _extract_key_terms(self, text: str) -> list[str]:
"""Extract meaningful terms."""
# Remove URLs, mentions
text = re.sub(r'https?://\S+', '', text)
text = re.sub(r'@\w+', '', text)
text = re.sub(r'#', '', text)
# Simple tokenization
words = re.findall(r'\b[a-zA-Z]{3,}\b', text.lower())
# Remove common stopwords
stopwords = {'the', 'and', 'for', 'that', 'this', 'with', 'are', 'was', 'but', 'have', 'has'}
words = [w for w in words if w not in stopwords]
# Create bigrams
bigrams = [f"{words[i]} {words[i+1]}" for i in range(len(words)-1)]
return bigrams
def _categorize_source(self, author) -> str:
"""Categorize tweet author type."""
if hasattr(author, 'verified') and author.verified:
return "verified"
elif hasattr(author, 'followers_count'):
if author.followers_count > 100000:
return "influencer"
elif author.followers_count > 10000:
return "micro_influencer"
elif author.followers_count > 1000:
return "active_user"
return "regular_user"
async def track_narrative_evolution(
self,
topic: str,
days: int = 7,
samples_per_day: int = 100
) -> dict:
"""Track how narratives evolve over time."""
from datetime import timedelta
evolution = {
"topic": topic,
"period_days": days,
"daily_analysis": []
}
# Note: Would need date-filtered search
# This is a simplified version
for day_offset in range(days):
day_data = {
"day": day_offset,
"frames": Counter(),
"sentiment": Counter(),
"top_terms": []
}
# In practice, you'd filter by date
tweets = await self.x.scrape.search(
topic,
limit=samples_per_day
)
for tweet in tweets.items:
frames = self._identify_frames(tweet.text)
day_data["frames"].update(frames)
day_data["sentiment"][self._simple_sentiment(tweet.text)] += 1
day_data["frames"] = dict(day_data["frames"])
day_data["sentiment"] = dict(day_data["sentiment"])
evolution["daily_analysis"].append(day_data)
return evolution
# Usage
async def analyze_discourse():
from xeepy import Xeepy
async with Xeepy() as x:
analyzer = DiscourseAnalyzer(x)
# Analyze framing
analysis = await analyzer.analyze_framing(
"climate change",
sample_size=500
)
print("📊 Discourse Analysis Results")
print(f"\nSentiment Distribution:")
for sent, count in analysis["sentiment_distribution"].items():
pct = (count / analysis["sample_size"]) * 100
print(f" {sent}: {pct:.1f}%")
print(f"\nTop Frames:")
for frame, examples in list(analysis["frames"].items())[:5]:
print(f" {frame}: {len(examples)} tweets")
print(f"\nTop Terms:")
for term, count in list(analysis["key_terms"].items())[:10]:
print(f" '{term}': {count}")
asyncio.run(analyze_discourse())
🌐 Network Analysis¶
Study social network structures and influence.
"""
Social Network Analysis
Study network structures, influence, and information flow
"""
import asyncio
from collections import defaultdict
from dataclasses import dataclass
import json
@dataclass
class NetworkNode:
"""Node in social network."""
id: str
label: str
followers: int = 0
following: int = 0
degree: int = 0
in_degree: int = 0
out_degree: int = 0
betweenness: float = 0.0
community: int = -1
class NetworkAnalyzer:
"""Analyze Twitter social networks."""
def __init__(self, xeepy):
self.x = xeepy
self.nodes: dict[str, NetworkNode] = {}
self.edges: list[tuple] = []
async def build_follower_network(
self,
seed_users: list[str],
depth: int = 1,
sample_per_user: int = 100
) -> dict:
"""Build follower network from seed users."""
visited = set()
queue = [(user, 0) for user in seed_users]
while queue:
username, current_depth = queue.pop(0)
if username in visited or current_depth > depth:
continue
visited.add(username)
try:
# Get profile
profile = await self.x.scrape.profile(username)
# Add node
self.nodes[username] = NetworkNode(
id=username,
label=profile.name or username,
followers=profile.followers_count,
following=profile.following_count
)
# Get followers
followers = await self.x.scrape.followers(
username,
limit=sample_per_user
)
for follower in followers.items:
# Add edge (follower -> user)
self.edges.append((follower.username, username))
# Add follower node if not exists
if follower.username not in self.nodes:
self.nodes[follower.username] = NetworkNode(
id=follower.username,
label=follower.name or follower.username,
followers=getattr(follower, 'followers_count', 0),
following=getattr(follower, 'following_count', 0)
)
# Queue for expansion
if current_depth < depth:
queue.append((follower.username, current_depth + 1))
except Exception as e:
print(f"Error processing @{username}: {e}")
# Calculate network metrics
self._calculate_degrees()
return {
"nodes": [vars(n) for n in self.nodes.values()],
"edges": [{"source": e[0], "target": e[1]} for e in self.edges],
"stats": self._network_stats()
}
def _calculate_degrees(self):
"""Calculate node degrees."""
in_degrees = defaultdict(int)
out_degrees = defaultdict(int)
for source, target in self.edges:
out_degrees[source] += 1
in_degrees[target] += 1
for username, node in self.nodes.items():
node.in_degree = in_degrees.get(username, 0)
node.out_degree = out_degrees.get(username, 0)
node.degree = node.in_degree + node.out_degree
def _network_stats(self) -> dict:
"""Calculate network-level statistics."""
if not self.nodes:
return {}
degrees = [n.degree for n in self.nodes.values()]
return {
"num_nodes": len(self.nodes),
"num_edges": len(self.edges),
"avg_degree": sum(degrees) / len(degrees),
"max_degree": max(degrees),
"density": len(self.edges) / (len(self.nodes) * (len(self.nodes) - 1)) if len(self.nodes) > 1 else 0
}
def find_key_influencers(self, top_n: int = 10) -> list[dict]:
"""Find most influential nodes."""
# Combine metrics for influence score
scored = []
for username, node in self.nodes.items():
score = (
node.in_degree * 0.4 +
(node.followers / 1000) * 0.3 +
node.degree * 0.3
)
scored.append({
"username": username,
"influence_score": score,
"in_degree": node.in_degree,
"followers": node.followers
})
return sorted(scored, key=lambda x: x["influence_score"], reverse=True)[:top_n]
def find_bridges(self) -> list[str]:
"""Find bridge nodes connecting communities."""
# Simple heuristic: nodes with high degree but connecting to many unique nodes
bridges = []
for username, node in self.nodes.items():
if node.out_degree > 5 and node.in_degree > 5:
# Check if connects to diverse set
connections = set()
for src, tgt in self.edges:
if src == username:
connections.add(tgt)
if tgt == username:
connections.add(src)
if len(connections) > node.degree * 0.7:
bridges.append(username)
return bridges
def export_for_gephi(self, filename: str):
"""Export network for Gephi visualization."""
# Export nodes
nodes_csv = "Id,Label,Followers,Following,Degree\n"
for username, node in self.nodes.items():
nodes_csv += f"{username},{node.label},{node.followers},{node.following},{node.degree}\n"
with open(f"{filename}_nodes.csv", "w") as f:
f.write(nodes_csv)
# Export edges
edges_csv = "Source,Target\n"
for source, target in self.edges:
edges_csv += f"{source},{target}\n"
with open(f"{filename}_edges.csv", "w") as f:
f.write(edges_csv)
print(f"✅ Exported to {filename}_nodes.csv and {filename}_edges.csv")
# Usage
async def analyze_network():
from xeepy import Xeepy
async with Xeepy() as x:
analyzer = NetworkAnalyzer(x)
# Build network
network = await analyzer.build_follower_network(
["user1", "user2"],
depth=1,
sample_per_user=50
)
print(f"📊 Network Statistics:")
print(f" Nodes: {network['stats']['num_nodes']}")
print(f" Edges: {network['stats']['num_edges']}")
print(f" Density: {network['stats']['density']:.4f}")
# Find influencers
influencers = analyzer.find_key_influencers(10)
print(f"\n🌟 Top Influencers:")
for inf in influencers:
print(f" @{inf['username']}: score={inf['influence_score']:.2f}")
# Export for visualization
analyzer.export_for_gephi("twitter_network")
asyncio.run(analyze_network())
📈 Trend Detection & Analysis¶
Detect and analyze emerging trends.
"""
Trend Detection System
Identify and analyze emerging trends in real-time
"""
import asyncio
from collections import Counter, defaultdict
from datetime import datetime, timedelta
import re
class TrendDetector:
"""Detect emerging trends on Twitter."""
def __init__(self, xeepy):
self.x = xeepy
self.baseline_terms = Counter()
self.current_terms = Counter()
self.trend_history = []
async def capture_baseline(
self,
topics: list[str],
sample_size: int = 500
):
"""Capture baseline term frequencies."""
all_terms = Counter()
for topic in topics:
tweets = await self.x.scrape.search(topic, limit=sample_size)
for tweet in tweets.items:
terms = self._extract_terms(tweet.text)
all_terms.update(terms)
self.baseline_terms = all_terms
print(f"📊 Baseline captured: {len(all_terms)} unique terms")
async def detect_emerging(
self,
topics: list[str],
sample_size: int = 500,
threshold: float = 2.0
) -> list[dict]:
"""Detect terms emerging above baseline."""
current = Counter()
for topic in topics:
tweets = await self.x.scrape.search(topic, limit=sample_size)
for tweet in tweets.items:
terms = self._extract_terms(tweet.text)
current.update(terms)
self.current_terms = current
# Find emerging terms
emerging = []
for term, count in current.most_common(100):
baseline_count = self.baseline_terms.get(term, 1)
ratio = count / baseline_count
if ratio >= threshold and count >= 5:
emerging.append({
"term": term,
"current_count": count,
"baseline_count": baseline_count,
"emergence_ratio": ratio
})
# Sort by emergence ratio
emerging.sort(key=lambda x: x["emergence_ratio"], reverse=True)
return emerging[:20]
def _extract_terms(self, text: str) -> list[str]:
"""Extract meaningful terms from text."""
# Clean
text = re.sub(r'https?://\S+', '', text)
text = re.sub(r'@\w+', '', text)
# Extract hashtags
hashtags = re.findall(r'#(\w+)', text.lower())
# Extract words (3+ chars)
words = re.findall(r'\b[a-zA-Z]{3,}\b', text.lower())
# Simple stopword filter
stopwords = {'the', 'and', 'for', 'that', 'this', 'with', 'are', 'was', 'but', 'have', 'has', 'you', 'your'}
words = [w for w in words if w not in stopwords]
# Combine
return hashtags + words
async def analyze_trend(self, trend_term: str) -> dict:
"""Deep analysis of a specific trend."""
tweets = await self.x.scrape.search(trend_term, limit=200)
analysis = {
"term": trend_term,
"sample_size": len(tweets.items),
"sentiment": {"positive": 0, "negative": 0, "neutral": 0},
"co_occurring_hashtags": Counter(),
"top_users": Counter(),
"engagement_stats": {
"total_likes": 0,
"total_retweets": 0,
"avg_engagement": 0
},
"sample_tweets": []
}
total_engagement = 0
for tweet in tweets.items:
# Sentiment
sentiment = self._simple_sentiment(tweet.text)
analysis["sentiment"][sentiment] += 1
# Co-occurring hashtags
hashtags = re.findall(r'#\w+', tweet.text.lower())
for tag in hashtags:
if tag.lower() != f"#{trend_term.lower()}":
analysis["co_occurring_hashtags"][tag] += 1
# Top users
analysis["top_users"][tweet.author.username] += 1
# Engagement
engagement = (tweet.likes or 0) + (tweet.retweets or 0)
total_engagement += engagement
analysis["engagement_stats"]["total_likes"] += tweet.likes or 0
analysis["engagement_stats"]["total_retweets"] += tweet.retweets or 0
if tweets.items:
analysis["engagement_stats"]["avg_engagement"] = total_engagement / len(tweets.items)
# Get top examples
sorted_tweets = sorted(
tweets.items,
key=lambda t: (t.likes or 0) + (t.retweets or 0),
reverse=True
)
analysis["sample_tweets"] = [
{"text": t.text[:200], "engagement": (t.likes or 0) + (t.retweets or 0)}
for t in sorted_tweets[:5]
]
# Convert counters
analysis["co_occurring_hashtags"] = dict(
analysis["co_occurring_hashtags"].most_common(10)
)
analysis["top_users"] = dict(
analysis["top_users"].most_common(10)
)
return analysis
def _simple_sentiment(self, text: str) -> str:
"""Simple sentiment classification."""
positive = ["good", "great", "love", "amazing", "best", "happy", "excited"]
negative = ["bad", "terrible", "hate", "worst", "sad", "angry", "disappointed"]
text_lower = text.lower()
pos = sum(1 for w in positive if w in text_lower)
neg = sum(1 for w in negative if w in text_lower)
if pos > neg:
return "positive"
elif neg > pos:
return "negative"
return "neutral"
# Usage
async def detect_trends():
from xeepy import Xeepy
async with Xeepy() as x:
detector = TrendDetector(x)
# Capture baseline
topics = ["technology", "AI", "startup"]
await detector.capture_baseline(topics, sample_size=200)
# Wait and detect emerging
await asyncio.sleep(5)
emerging = await detector.detect_emerging(topics, threshold=1.5)
print("📈 Emerging Trends:")
for trend in emerging[:10]:
print(f" '{trend['term']}': {trend['emergence_ratio']:.1f}x increase")
# Analyze top trend
if emerging:
analysis = await detector.analyze_trend(emerging[0]["term"])
print(f"\n🔍 Trend Analysis: {analysis['term']}")
print(f" Sentiment: {analysis['sentiment']}")
print(f" Avg Engagement: {analysis['engagement_stats']['avg_engagement']:.1f}")
asyncio.run(detect_trends())
Next Steps¶
- Business Recipes - Business applications
- Data Science Recipes - ML and analytics
- Academic Resources - Community support