import hvplot.streamz import pandas as pd import numpy as np from streamz import Stream from streamz.dataframe import DataFrame from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message import datetime import queue import threading import time import os import json from huggingface_hub import CommitScheduler, HfApi, hf_hub_download import uuid from pathlib import Path import panel as pn pn.extension(design="material") # Create a queue to communicate between threads post_queue = queue.Queue() # Counter for posts post_count = 0 # Create streaming dataframe stream = Stream() # Wait 1 second to collect initial data time.sleep(1) example = pd.DataFrame( {"timestamp": [pd.Timestamp.now()], "post_count": [post_count]}, index=[0] ) df = DataFrame(stream, example=example) # Calculate backlog for 24 hours DAY_IN_SECONDS = 24 * 60 * 60 # 24 hours * 60 minutes * 60 seconds # Add environment variable support for configuration REPO_ID = os.getenv("HF_REPO_ID", "davanstrien/bluesky-counts") REPO_TYPE = os.getenv("HF_REPO_TYPE", "dataset") HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN") # Required for HuggingFace API access DATA_FOLDER = Path("bluesky_data") DATA_FILE = f"bluesky_counts_{uuid.uuid4()}.json" # def load_hub_data(): # """Load the most recent data from the Hub""" # try: # api = HfApi(token=HF_TOKEN) # # List files in the repository # files = api.list_repo_files(REPO_ID, repo_type=REPO_TYPE) # data_files = [f for f in files if f.startswith("data/bluesky_counts_")] # if not data_files: # return [] # # Get the most recent file # latest_file = sorted(data_files)[-1] # # Download the file # local_path = hf_hub_download( # repo_id=REPO_ID, filename=latest_file, repo_type=REPO_TYPE, token=HF_TOKEN # ) # # Load and parse the data # data = [] # with open(local_path, "r") as f: # data.extend(json.loads(line.strip()) for line in f) # # Keep only last 24 hours of data # return data[-DAY_IN_SECONDS:] # except Exception as e: # print(f"Error loading data from Hub: {e}") # return [] # Initialize storage and Hub connection # DATA_FOLDER.mkdir(exist_ok=True) # scheduler = CommitScheduler( # repo_id=REPO_ID, # repo_type=REPO_TYPE, # folder_path=DATA_FOLDER, # path_in_repo="data", # every=600, # Upload every 10 minutes # token=HF_TOKEN, # Add token for authentication # ) def on_message_handler(message): global post_count commit = parse_subscribe_repos_message(message) # Only count new posts (not likes, reposts, etc) if hasattr(commit, "ops"): for op in commit.ops: if op.action == "create" and "app.bsky.feed.post" in op.path: post_count += 1 def emit_counts(): """Emit post counts every second""" global post_count # if saved_data := load_hub_data(): # print(f"Loaded {len(saved_data)} historical data points from Hub") # # Emit historical data # for point in saved_data[-100:]: # Emit last 100 points to initialize plot # df = pd.DataFrame( # { # "timestamp": [pd.Timestamp(point["timestamp"])], # "post_count": [point["post_count"]], # } # ) # stream.emit(df) # Wait for first second to collect initial data time.sleep(1) while True: # Create DataFrame with current timestamp and count now = pd.Timestamp.now() df = pd.DataFrame({"timestamp": [now], "post_count": [post_count]}) stream.emit(df) # Reset counter post_count = 0 # Wait 1 second time.sleep(1) # Create the plot with 24-hour backlog plot = df.hvplot.line( "timestamp", "post_count", title="Bluesky Posts per Second (Last 24 Hours)", width=800, height=400, backlog=DAY_IN_SECONDS, # Keep last 24 hours of points ) # Start Firehose client in a separate thread def run_firehose(): client = FirehoseSubscribeReposClient() client.start(on_message_handler) firehose_thread = threading.Thread(target=run_firehose) firehose_thread.daemon = True firehose_thread.start() # Start emitting counts in another thread emit_thread = threading.Thread(target=emit_counts) emit_thread.daemon = True emit_thread.start() # If running in a Jupyter notebook, display the plot if __name__ == "__main__": import panel as pn pn.extension() dashboard = pn.Column(pn.pane.HoloViews(plot)) # Update server configuration for Docker pn.serve( dashboard, address="0.0.0.0", port=7860, allow_websocket_origin=["*"], show=False, )