import hvplot.streamz import pandas as pd import numpy as np from streamz import Stream from streamz.dataframe import DataFrame from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message import datetime import queue import threading import time import os import json from huggingface_hub import CommitScheduler, HfApi, hf_hub_download import uuid from pathlib import Path import panel as pn pn.extension(design="material") # Create a queue to communicate between threads post_queue = queue.Queue() # Counter for posts post_count = 0 # Create streaming dataframe stream = Stream() # Wait 1 second to collect initial data time.sleep(1) example = pd.DataFrame( {"timestamp": [pd.Timestamp.now()], "post_count": [post_count]}, index=[0] ) df = DataFrame(stream, example=example) # Calculate backlog for 1 month (31 days) MONTH_IN_SECONDS = 31 * 24 * 60 * 60 # 31 days * 24 hours * 60 minutes * 60 seconds # Add environment variable support for configuration REPO_ID = os.getenv("HF_REPO_ID", "davanstrien/bluesky-counts") REPO_TYPE = os.getenv("HF_REPO_TYPE", "dataset") HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN") # Required for HuggingFace API access DATA_FOLDER = Path("bluesky_data") DATA_FILE = f"bluesky_counts_{uuid.uuid4()}.json" def load_hub_data(): """Load the most recent data from the Hub""" try: api = HfApi(token=HF_TOKEN) # List files in the repository files = api.list_repo_files(REPO_ID, repo_type=REPO_TYPE) data_files = [f for f in files if f.startswith("data/bluesky_counts_")] if not data_files: return [] # Get the most recent file latest_file = sorted(data_files)[-1] # Download the file local_path = hf_hub_download( repo_id=REPO_ID, filename=latest_file, repo_type=REPO_TYPE, token=HF_TOKEN ) # Load and parse the data data = [] with open(local_path, "r") as f: data.extend(json.loads(line.strip()) for line in f) # Keep only last month of data return data[-MONTH_IN_SECONDS:] except Exception as e: print(f"Error loading data from Hub: {e}") return [] # Initialize storage and Hub connection DATA_FOLDER.mkdir(exist_ok=True) scheduler = CommitScheduler( repo_id=REPO_ID, repo_type=REPO_TYPE, folder_path=DATA_FOLDER, path_in_repo="data", every=600, # Upload every 10 minutes token=HF_TOKEN, # Add token for authentication ) def on_message_handler(message): global post_count commit = parse_subscribe_repos_message(message) # Only count new posts (not likes, reposts, etc) if hasattr(commit, "ops"): for op in commit.ops: if op.action == "create" and "app.bsky.feed.post" in op.path: post_count += 1 def emit_counts(): """Emit post counts every second""" global post_count if saved_data := load_hub_data(): print(f"Loaded {len(saved_data)} historical data points from Hub") # Emit historical data for point in saved_data[-100:]: # Emit last 100 points to initialize plot df = pd.DataFrame( { "timestamp": [pd.Timestamp(point["timestamp"])], "post_count": [point["post_count"]], } ) stream.emit(df) # Wait for first second to collect initial data time.sleep(1) while True: # Create DataFrame with current timestamp and count now = pd.Timestamp.now() df = pd.DataFrame({"timestamp": [now], "post_count": [post_count]}) stream.emit(df) # Reset counter post_count = 0 # Wait 1 second time.sleep(1) # Create the plot with month-long backlog plot = df.hvplot.line( "timestamp", "post_count", title="Bluesky Posts per Second", width=800, height=400, backlog=MONTH_IN_SECONDS, # Keep last month of points ) # Start Firehose client in a separate thread def run_firehose(): client = FirehoseSubscribeReposClient() client.start(on_message_handler) firehose_thread = threading.Thread(target=run_firehose) firehose_thread.daemon = True firehose_thread.start() # Start emitting counts in another thread emit_thread = threading.Thread(target=emit_counts) emit_thread.daemon = True emit_thread.start() # If running in a Jupyter notebook, display the plot if __name__ == "__main__": import panel as pn pn.extension() dashboard = pn.Column(pn.pane.HoloViews(plot)) # Update server configuration for Docker pn.serve( dashboard, address="0.0.0.0", port=7860, allow_websocket_origin=["*"], # Changed from "*" to ["*"] show=False, )