import hvplot.streamz | |
import pandas as pd | |
import numpy as np | |
from streamz import Stream | |
from streamz.dataframe import DataFrame | |
from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message | |
import datetime | |
import queue | |
import threading | |
import time | |
import os | |
import json | |
from huggingface_hub import CommitScheduler, HfApi, hf_hub_download | |
import uuid | |
from pathlib import Path | |
import panel as pn | |
pn.extension(design="material") | |
# Create a queue to communicate between threads | |
post_queue = queue.Queue() | |
# Counter for posts | |
post_count = 0 | |
# Create streaming dataframe | |
stream = Stream() | |
# Wait 1 second to collect initial data | |
time.sleep(1) | |
example = pd.DataFrame( | |
{"timestamp": [pd.Timestamp.now()], "post_count": [post_count]}, index=[0] | |
) | |
df = DataFrame(stream, example=example) | |
# Calculate backlog for 24 hours | |
DAY_IN_SECONDS = 24 * 60 * 60 # 24 hours * 60 minutes * 60 seconds | |
# Add environment variable support for configuration | |
REPO_ID = os.getenv("HF_REPO_ID", "davanstrien/bluesky-counts") | |
REPO_TYPE = os.getenv("HF_REPO_TYPE", "dataset") | |
HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN") # Required for HuggingFace API access | |
DATA_FOLDER = Path("bluesky_data") | |
DATA_FILE = f"bluesky_counts_{uuid.uuid4()}.json" | |
# def load_hub_data(): | |
# """Load the most recent data from the Hub""" | |
# try: | |
# api = HfApi(token=HF_TOKEN) | |
# # List files in the repository | |
# files = api.list_repo_files(REPO_ID, repo_type=REPO_TYPE) | |
# data_files = [f for f in files if f.startswith("data/bluesky_counts_")] | |
# if not data_files: | |
# return [] | |
# # Get the most recent file | |
# latest_file = sorted(data_files)[-1] | |
# # Download the file | |
# local_path = hf_hub_download( | |
# repo_id=REPO_ID, filename=latest_file, repo_type=REPO_TYPE, token=HF_TOKEN | |
# ) | |
# # Load and parse the data | |
# data = [] | |
# with open(local_path, "r") as f: | |
# data.extend(json.loads(line.strip()) for line in f) | |
# # Keep only last 24 hours of data | |
# return data[-DAY_IN_SECONDS:] | |
# except Exception as e: | |
# print(f"Error loading data from Hub: {e}") | |
# return [] | |
# Initialize storage and Hub connection | |
# DATA_FOLDER.mkdir(exist_ok=True) | |
# scheduler = CommitScheduler( | |
# repo_id=REPO_ID, | |
# repo_type=REPO_TYPE, | |
# folder_path=DATA_FOLDER, | |
# path_in_repo="data", | |
# every=600, # Upload every 10 minutes | |
# token=HF_TOKEN, # Add token for authentication | |
# ) | |
def on_message_handler(message): | |
global post_count | |
commit = parse_subscribe_repos_message(message) | |
# Only count new posts (not likes, reposts, etc) | |
if hasattr(commit, "ops"): | |
for op in commit.ops: | |
if op.action == "create" and "app.bsky.feed.post" in op.path: | |
post_count += 1 | |
def emit_counts(): | |
"""Emit post counts every second""" | |
global post_count | |
# if saved_data := load_hub_data(): | |
# print(f"Loaded {len(saved_data)} historical data points from Hub") | |
# # Emit historical data | |
# for point in saved_data[-100:]: # Emit last 100 points to initialize plot | |
# df = pd.DataFrame( | |
# { | |
# "timestamp": [pd.Timestamp(point["timestamp"])], | |
# "post_count": [point["post_count"]], | |
# } | |
# ) | |
# stream.emit(df) | |
# Wait for first second to collect initial data | |
time.sleep(1) | |
while True: | |
# Create DataFrame with current timestamp and count | |
now = pd.Timestamp.now() | |
df = pd.DataFrame({"timestamp": [now], "post_count": [post_count]}) | |
stream.emit(df) | |
# Reset counter | |
post_count = 0 | |
# Wait 1 second | |
time.sleep(1) | |
# Create the plot with 24-hour backlog | |
plot = df.hvplot.line( | |
"timestamp", | |
"post_count", | |
title="Bluesky Posts per Second (Last 24 Hours)", | |
width=800, | |
height=400, | |
backlog=DAY_IN_SECONDS, # Keep last 24 hours of points | |
) | |
# Start Firehose client in a separate thread | |
def run_firehose(): | |
client = FirehoseSubscribeReposClient() | |
client.start(on_message_handler) | |
firehose_thread = threading.Thread(target=run_firehose) | |
firehose_thread.daemon = True | |
firehose_thread.start() | |
# Start emitting counts in another thread | |
emit_thread = threading.Thread(target=emit_counts) | |
emit_thread.daemon = True | |
emit_thread.start() | |
# If running in a Jupyter notebook, display the plot | |
if __name__ == "__main__": | |
import panel as pn | |
pn.extension() | |
dashboard = pn.Column(pn.pane.HoloViews(plot)) | |
# Update server configuration for Docker | |
pn.serve( | |
dashboard, | |
address="0.0.0.0", | |
port=7860, | |
allow_websocket_origin=["*"], | |
show=False, | |
) |