Spaces:
Build error
Build error
import os | |
from pathlib import Path | |
from typing import Literal, Optional | |
from fastapi import BackgroundTasks, FastAPI, Header, HTTPException | |
from fastapi.responses import FileResponse | |
from huggingface_hub import ( | |
CommitOperationAdd, | |
CommitOperationDelete, | |
comment_discussion, | |
create_commit, | |
create_repo, | |
delete_repo, | |
snapshot_download, | |
) | |
from pydantic import BaseModel | |
from requests import HTTPError | |
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET") | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
class WebhookPayloadEvent(BaseModel): | |
action: Literal["create", "update", "delete"] | |
scope: str | |
class WebhookPayloadRepo(BaseModel): | |
type: Literal["dataset", "model", "space"] | |
name: str | |
private: bool | |
class WebhookPayloadDiscussion(BaseModel): | |
num: int | |
isPullRequest: bool | |
class WebhookPayload(BaseModel): | |
event: WebhookPayloadEvent | |
repo: WebhookPayloadRepo | |
discussion: Optional[WebhookPayloadDiscussion] | |
app = FastAPI() | |
async def home(): | |
return FileResponse("home.html") | |
async def post_webhook( | |
payload: WebhookPayload, | |
task_queue: BackgroundTasks, | |
x_webhook_secret: Optional[str] = Header(default=None), | |
): | |
# Taken from https://huggingface.co/spaces/huggingface-projects/auto-retrain | |
if x_webhook_secret is None: | |
raise HTTPException(401) | |
if x_webhook_secret != WEBHOOK_SECRET: | |
raise HTTPException(403) | |
if payload.repo.type != "space": | |
raise HTTPException(400, f"Must be a Space, not {payload.repo.type}") | |
if not payload.event.scope.startswith("discussion"): | |
return "Not a discussion" | |
if payload.discussion is None: | |
return "Couldn't parse 'payload.discussion'" | |
if not payload.discussion.isPullRequest: | |
return "Not a Pull Request" | |
if payload.event.action == "create" or payload.event.action == "update": | |
task_queue.add_task( | |
sync_ci_space, | |
space_id=payload.repo.name, | |
pr_num=payload.discussion.num, | |
private=payload.repo.private, | |
) | |
elif payload.event.action == "delete": | |
task_queue.add_task( | |
delete_ci_space, | |
space_id=payload.repo.name, | |
pr_num=payload.discussion.num, | |
) | |
else: | |
return f"Couldn't handle action {payload.event.action}" | |
return "Processed" | |
def sync_ci_space(space_id: str, pr_num: int, private=bool) -> None: | |
# Create a temporary space for CI if didn't exist | |
ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
try: | |
create_repo(ci_space_id, repo_type="space", private=private, token=HF_TOKEN) | |
is_new = True | |
except HTTPError as err: | |
if err.response.status_code == 409: # already exists | |
is_new = False | |
else: | |
raise | |
# Download space codebase from PR revision | |
snapshot_path = snapshot_download( | |
repo_id=space_id, | |
revision=f"refs/pr/{pr_num}", | |
repo_type="space", | |
token=HF_TOKEN, | |
) | |
# Sync space codebase with PR revision | |
operations = [CommitOperationDelete("/")] # little aggressive but works | |
operations += [ | |
CommitOperationAdd( | |
path_in_repo=str(filepath.relative_to(snapshot_path)), | |
path_or_fileobj=filepath, | |
) | |
for filepath in Path(snapshot_path).glob("*/**") | |
if filepath.is_file() | |
] | |
create_commit( | |
repo_id=ci_space_id, | |
repo_type="space", | |
operations=operations, | |
commit_message=f"Sync CI Space with PR {pr_num}.", | |
token=HF_TOKEN, | |
) | |
# Post a comment on the PR | |
notify_pr(space_id=space_id, pr_num=pr_num, action="create" if is_new else "update") | |
def delete_ci_space(space_id: str, pr_num: int) -> None: | |
# Delete | |
ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
delete_repo(repo_id=ci_space_id, repo_type="space", token=HF_TOKEN) | |
# Notify about deletion | |
notify_pr(space_id=space_id, pr_num=pr_num, action="delete") | |
def notify_pr( | |
space_id: str, pr_num: int, action: Literal["create", "update", "delete"] | |
) -> None: | |
ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
if action == "create": | |
comment = NOTIFICATION_TEMPLATE_CREATE.format(ci_space_id=ci_space_id) | |
elif action == "update": | |
comment = NOTIFICATION_TEMPLATE_UPDATE.format(ci_space_id=ci_space_id) | |
elif action == "delete": | |
comment = NOTIFICATION_TEMPLATE_DELETE | |
else: | |
raise ValueError(f"Status {action} not handled.") | |
comment_discussion( | |
repo_id=space_id, | |
repo_type="space", | |
discussion_num=pr_num, | |
comment=comment, | |
token=HF_TOKEN, | |
) | |
def _get_ci_space_id(space_id: str, pr_num: int) -> str: | |
return f"{space_id}-ci-pr-{pr_num}" | |
NOTIFICATION_TEMPLATE_CREATE = """\ | |
Hey there! | |
Following the creation of this PR, a temporary test Space [{ci_space_id}}](https://huggingface.co/spaces/{ci_space_id}) has been launched. | |
Any changes pushed to this PR will be synced with the test Space. | |
(This is an automated message) | |
""" | |
NOTIFICATION_TEMPLATE_UPDATE = """\ | |
Hey there! | |
Following new commits that happened in this PR, the temporary test Space [{ci_space_id}}](https://huggingface.co/spaces/{ci_space_id}) has been updated. | |
(This is an automated message) | |
""" | |
NOTIFICATION_TEMPLATE_DELETE = """\ | |
Hey there! | |
PR is now merged. The temporary test Space has been deleted. | |
(This is an automated message) | |
""" | |