File size: 4,814 Bytes
10e329b c77eb65 10e329b ab39a72 c77eb65 ab39a72 c77eb65 ab39a72 10e329b ab39a72 10e329b c77eb65 10e329b c77eb65 10e329b c77eb65 10e329b c77eb65 10e329b c77eb65 |
|
import hvplot.streamz
import pandas as pd
import numpy as np
from streamz import Stream
from streamz.dataframe import DataFrame
from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message
import datetime
import queue
import threading
import time
import os
import json
from huggingface_hub import CommitScheduler, HfApi, hf_hub_download
import uuid
from pathlib import Path
import panel as pn
pn.extension(design="material")
# Create a queue to communicate between threads
post_queue = queue.Queue()
# Counter for posts
post_count = 0
# Create streaming dataframe
stream = Stream()
# Wait 1 second to collect initial data
time.sleep(1)
example = pd.DataFrame(
{"timestamp": [pd.Timestamp.now()], "post_count": [post_count]}, index=[0]
)
df = DataFrame(stream, example=example)
# Calculate backlog for 24 hours
DAY_IN_SECONDS = 24 * 60 * 60 # 24 hours * 60 minutes * 60 seconds
# Add environment variable support for configuration
REPO_ID = os.getenv("HF_REPO_ID", "davanstrien/bluesky-counts")
REPO_TYPE = os.getenv("HF_REPO_TYPE", "dataset")
HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN") # Required for HuggingFace API access
DATA_FOLDER = Path("bluesky_data")
DATA_FILE = f"bluesky_counts_{uuid.uuid4()}.json"
# def load_hub_data():
# """Load the most recent data from the Hub"""
# try:
# api = HfApi(token=HF_TOKEN)
# # List files in the repository
# files = api.list_repo_files(REPO_ID, repo_type=REPO_TYPE)
# data_files = [f for f in files if f.startswith("data/bluesky_counts_")]
# if not data_files:
# return []
# # Get the most recent file
# latest_file = sorted(data_files)[-1]
# # Download the file
# local_path = hf_hub_download(
# repo_id=REPO_ID, filename=latest_file, repo_type=REPO_TYPE, token=HF_TOKEN
# )
# # Load and parse the data
# data = []
# with open(local_path, "r") as f:
# data.extend(json.loads(line.strip()) for line in f)
# # Keep only last 24 hours of data
# return data[-DAY_IN_SECONDS:]
# except Exception as e:
# print(f"Error loading data from Hub: {e}")
# return []
# Initialize storage and Hub connection
# DATA_FOLDER.mkdir(exist_ok=True)
# scheduler = CommitScheduler(
# repo_id=REPO_ID,
# repo_type=REPO_TYPE,
# folder_path=DATA_FOLDER,
# path_in_repo="data",
# every=600, # Upload every 10 minutes
# token=HF_TOKEN, # Add token for authentication
# )
def on_message_handler(message):
global post_count
commit = parse_subscribe_repos_message(message)
# Only count new posts (not likes, reposts, etc)
if hasattr(commit, "ops"):
for op in commit.ops:
if op.action == "create" and "app.bsky.feed.post" in op.path:
post_count += 1
def emit_counts():
"""Emit post counts every second"""
global post_count
# if saved_data := load_hub_data():
# print(f"Loaded {len(saved_data)} historical data points from Hub")
# # Emit historical data
# for point in saved_data[-100:]: # Emit last 100 points to initialize plot
# df = pd.DataFrame(
# {
# "timestamp": [pd.Timestamp(point["timestamp"])],
# "post_count": [point["post_count"]],
# }
# )
# stream.emit(df)
# Wait for first second to collect initial data
time.sleep(1)
while True:
# Create DataFrame with current timestamp and count
now = pd.Timestamp.now()
df = pd.DataFrame({"timestamp": [now], "post_count": [post_count]})
stream.emit(df)
# Reset counter
post_count = 0
# Wait 1 second
time.sleep(1)
# Create the plot with 24-hour backlog
plot = df.hvplot.line(
"timestamp",
"post_count",
title="Bluesky Posts per Second (Last 24 Hours)",
width=800,
height=400,
backlog=DAY_IN_SECONDS, # Keep last 24 hours of points
)
# Start Firehose client in a separate thread
def run_firehose():
client = FirehoseSubscribeReposClient()
client.start(on_message_handler)
firehose_thread = threading.Thread(target=run_firehose)
firehose_thread.daemon = True
firehose_thread.start()
# Start emitting counts in another thread
emit_thread = threading.Thread(target=emit_counts)
emit_thread.daemon = True
emit_thread.start()
# If running in a Jupyter notebook, display the plot
if __name__ == "__main__":
import panel as pn
pn.extension()
dashboard = pn.Column(pn.pane.HoloViews(plot))
# Update server configuration for Docker
pn.serve(
dashboard,
address="0.0.0.0",
port=7860,
allow_websocket_origin=["*"],
show=False,
) |