File size: 4,814 Bytes
10e329b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c77eb65
 
10e329b
 
 
 
 
 
 
 
 
ab39a72
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c77eb65
 
ab39a72
 
 
 
 
c77eb65
ab39a72
 
 
 
 
 
 
 
 
10e329b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab39a72
 
 
 
 
 
 
 
 
 
 
10e329b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c77eb65
10e329b
 
 
c77eb65
10e329b
 
c77eb65
10e329b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c77eb65
10e329b
c77eb65
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import hvplot.streamz
import pandas as pd
import numpy as np
from streamz import Stream
from streamz.dataframe import DataFrame
from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message
import datetime
import queue
import threading
import time
import os
import json
from huggingface_hub import CommitScheduler, HfApi, hf_hub_download
import uuid
from pathlib import Path
import panel as pn

pn.extension(design="material")
# Create a queue to communicate between threads
post_queue = queue.Queue()

# Counter for posts
post_count = 0

# Create streaming dataframe
stream = Stream()
# Wait 1 second to collect initial data
time.sleep(1)
example = pd.DataFrame(
    {"timestamp": [pd.Timestamp.now()], "post_count": [post_count]}, index=[0]
)
df = DataFrame(stream, example=example)

# Calculate backlog for 24 hours
DAY_IN_SECONDS = 24 * 60 * 60  # 24 hours * 60 minutes * 60 seconds

# Add environment variable support for configuration
REPO_ID = os.getenv("HF_REPO_ID", "davanstrien/bluesky-counts")
REPO_TYPE = os.getenv("HF_REPO_TYPE", "dataset")
HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN")  # Required for HuggingFace API access
DATA_FOLDER = Path("bluesky_data")
DATA_FILE = f"bluesky_counts_{uuid.uuid4()}.json"


# def load_hub_data():
#     """Load the most recent data from the Hub"""
#     try:
#         api = HfApi(token=HF_TOKEN)
#         # List files in the repository
#         files = api.list_repo_files(REPO_ID, repo_type=REPO_TYPE)
#         data_files = [f for f in files if f.startswith("data/bluesky_counts_")]

#         if not data_files:
#             return []

#         # Get the most recent file
#         latest_file = sorted(data_files)[-1]
#         # Download the file
#         local_path = hf_hub_download(
#             repo_id=REPO_ID, filename=latest_file, repo_type=REPO_TYPE, token=HF_TOKEN
#         )

#         # Load and parse the data
#         data = []
#         with open(local_path, "r") as f:
#             data.extend(json.loads(line.strip()) for line in f)
#         # Keep only last 24 hours of data
#         return data[-DAY_IN_SECONDS:]
#     except Exception as e:
#         print(f"Error loading data from Hub: {e}")
#         return []


# Initialize storage and Hub connection
# DATA_FOLDER.mkdir(exist_ok=True)
# scheduler = CommitScheduler(
#     repo_id=REPO_ID,
#     repo_type=REPO_TYPE,
#     folder_path=DATA_FOLDER,
#     path_in_repo="data",
#     every=600,  # Upload every 10 minutes
#     token=HF_TOKEN,  # Add token for authentication
# )


def on_message_handler(message):
    global post_count
    commit = parse_subscribe_repos_message(message)
    # Only count new posts (not likes, reposts, etc)
    if hasattr(commit, "ops"):
        for op in commit.ops:
            if op.action == "create" and "app.bsky.feed.post" in op.path:
                post_count += 1


def emit_counts():
    """Emit post counts every second"""
    global post_count

    # if saved_data := load_hub_data():
    #     print(f"Loaded {len(saved_data)} historical data points from Hub")
    #     # Emit historical data
    #     for point in saved_data[-100:]:  # Emit last 100 points to initialize plot
    #         df = pd.DataFrame(
    #             {
    #                 "timestamp": [pd.Timestamp(point["timestamp"])],
    #                 "post_count": [point["post_count"]],
    #             }
    #         )
    #         stream.emit(df)

    # Wait for first second to collect initial data
    time.sleep(1)

    while True:
        # Create DataFrame with current timestamp and count
        now = pd.Timestamp.now()
        df = pd.DataFrame({"timestamp": [now], "post_count": [post_count]})
        stream.emit(df)

        # Reset counter
        post_count = 0
        # Wait 1 second
        time.sleep(1)


# Create the plot with 24-hour backlog
plot = df.hvplot.line(
    "timestamp",
    "post_count",
    title="Bluesky Posts per Second (Last 24 Hours)",
    width=800,
    height=400,
    backlog=DAY_IN_SECONDS,  # Keep last 24 hours of points
)


# Start Firehose client in a separate thread
def run_firehose():
    client = FirehoseSubscribeReposClient()
    client.start(on_message_handler)


firehose_thread = threading.Thread(target=run_firehose)
firehose_thread.daemon = True
firehose_thread.start()

# Start emitting counts in another thread
emit_thread = threading.Thread(target=emit_counts)
emit_thread.daemon = True
emit_thread.start()

# If running in a Jupyter notebook, display the plot
if __name__ == "__main__":
    import panel as pn

    pn.extension()
    dashboard = pn.Column(pn.pane.HoloViews(plot))
    # Update server configuration for Docker
    pn.serve(
        dashboard,
        address="0.0.0.0",
        port=7860,
        allow_websocket_origin=["*"],
        show=False,
    )