Spaces:
Runtime error
Runtime error
import discord | |
from discord import Option, commands | |
import redis | |
import os | |
import random | |
import time | |
from threading import Thread | |
import gradio as gr | |
from gradio import utils | |
from pathlib import Path | |
# Function to generate auto-incremented IDs | |
def generate_id(key): | |
return r.incr(key) | |
def generate_progress_bar(num_labels, width=20, n=300): | |
print(num_labels) | |
percentage = int(int(num_labels) / n * 100) | |
completed_blocks = int(percentage * width / 100) | |
remaining_blocks = width - completed_blocks | |
progress_bar = 'β' * completed_blocks + 'β' * remaining_blocks | |
percentage_text = f' {num_labels}/{n} identified π¦π± ' | |
return f'π {progress_bar} {percentage_text}' | |
# Redis client setup | |
redis_url = 'redis://default:[email protected]:7369' | |
r = redis.from_url(redis_url) | |
r.set('species_identified', 0) | |
# def improve_player_stats_image(ctx): | |
# # add 1 to the number of species identified | |
# value = r.get(b'species_identified').decode('utf-8') | |
# r.set(b'species_identified', int(value) + 1) | |
# xp_key = f'{ctx.author.name}:XP'.encode('utf-8') | |
# if not r.exists(xp_key): | |
# r.set(xp_key, 0) | |
# role_key = f'{ctx.author.name}:role'.encode('utf-8') | |
# if not r.exists(role_key): | |
# r.set(role_key, 'Naturalist') | |
# img_oc_key = f'{ctx.author.name}:image'.encode('utf-8') | |
# if not r.exists(img_oc_key): | |
# r.set(img_oc_key, 0) | |
# r.set(xp_key, int(r.get(xp_key)) + 10) | |
# r.set(img_oc_key, int(r.get(img_oc_key)) + 1) | |
bot = discord.Bot() | |
async def on_ready(): | |
print(f"{bot.user} is ready and online!") | |
async def predict_image(ctx, id): | |
# Fetch the image data from Redis | |
image_data_bytes = r.hgetall(f'image:{id}'.encode('utf-8')) | |
image_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in image_data_bytes.items()} | |
path = f'https://gainforest-transparency-dashboard.s3.amazonaws.com/{image_data["awsCID"]}' | |
print(image_data) | |
message = f"**Image Observation: {id} by _{image_data['sensor']}_**" | |
message += "\n_Below are Top 10 predictions of AI_" | |
emoji_unicode_list = [chr(0x30 + i) + '\uFE0F\u20E3' for i in range(11)] | |
prediction_data_bytes = r.hgetall(f'prediction:{id}:1'.encode('utf-8')) | |
prediction_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in prediction_data_bytes.items()} | |
embed = discord.Embed( | |
title="", | |
description=f"Label: {prediction_data['label']}\nPredictions of our AI algorithms", | |
color=discord.Colour.blurple(), # Pycord provides a class with default colors you can choose from | |
) | |
embed.set_author(name=f"Image ID: {id} - AI Predictions", icon_url=ctx.author.display_avatar.url) | |
for i in range(1, 10): | |
prediction_data_bytes = r.hgetall(f'prediction:{id}:{i}'.encode('utf-8')) | |
prediction_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in prediction_data_bytes.items()} | |
species = prediction_data['label'] | |
score = prediction_data['confidence'] | |
# message = message + f'\n> {emoji_unicode_list[i-1]} {species} with confidence {score}' | |
embed.add_field(name=f'{species}', value=f'Confidence: {float(score):.4f}', inline=True) | |
embed.set_thumbnail(url=path) | |
# Send the image | |
await ctx.respond("Hint: You can use [Singapore Biodiversity Online](https://singapore.biodiversity.online/), [NParks Flora & Fauna Database](https://www.nparks.gov.sg/florafaunaweb), and [Singapore Birds](https://singaporebirds.com/singapore-bird-list/) to verify", embed=embed) | |
async def image(ctx, id): | |
# Fetch the image data from Redis | |
image_data_bytes = r.hgetall(f'image:{id}'.encode('utf-8')) | |
image_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in image_data_bytes.items()} | |
print(image_data) | |
# Create an embed | |
embed = discord.Embed(color=discord.Colour.blurple()) | |
embed.set_author(name=f"Image ID: {id}", icon_url=ctx.author.display_avatar.url) | |
embed.add_field(name="Sensor", value=image_data['sensor'], inline=True) | |
embed.add_field(name="Recorded at", value=image_data['timestamp'], inline=True) | |
embed.add_field(name="Recent Label", value=image_data['label'], inline=True) | |
embed.add_field(name="Proposed by", value=image_data['author'], inline=True) | |
path = f'https://gainforest-transparency-dashboard.s3.amazonaws.com/{image_data["awsCID"]}' | |
embed.set_image(url=path) | |
await ctx.respond(f"You are looking at Image ID **{id}**\n!Hint: You can use [Singapore Biodiversity Online](https://singapore.biodiversity.online/), [NParks Flora & Fauna Database](https://www.nparks.gov.sg/florafaunaweb), and [Singapore Birds](https://singaporebirds.com/singapore-bird-list/) to verify", embed=embed) | |
def confAutocomplete(self: discord.AutocompleteContext): | |
return ['low', 'medium', 'high'] | |
def get_conf_score(confidence: str): | |
match confidence: | |
case "low": | |
return 0 | |
case "medium": | |
return 1 | |
case "high": | |
return 2 | |
case _: | |
return 0 | |
async def identify_image(ctx, id:int, label:str, confidence: str): | |
timestamp = str(time.time()) | |
author_id = str(ctx.author_id) | |
role = "Hobbyist" | |
role_names = [role.name for role in ctx.author.roles] | |
if "Expert" in role_names: | |
role = "Expert" | |
label_cnt = generate_id(f'cnt:label:image:{id}') | |
r.hset(f'label:image:{id}:{label_cnt}'.encode('utf-8'), b'label', label.encode('utf-8')) | |
r.hset(f'label:image:{id}:{label_cnt}'.encode('utf-8'), b'author', ctx.author.name.encode('utf-8')) | |
conf_score = get_conf_score(confidence) | |
r.hset(f'label:image:{id}:{label_cnt}'.encode('utf-8'), b'confidence', conf_score) | |
r.hset(f'label:image:{id}:{label_cnt}'.encode('utf-8'), b'role', role) | |
r.hset(f'label:image:{id}:{label_cnt}'.encode('utf-8'), b'authorid', author_id) | |
r.hset(f'label:image:{id}:{label_cnt}'.encode('utf-8'), b'timestamp', timestamp) | |
r.hset(f'image:{id}'.encode('utf-8'), b'label', label.encode('utf-8')) | |
r.hset(f'image:{id}'.encode('utf-8'), b'author', ctx.author.mention.encode('utf-8')) | |
# Fetch the image data from Redis | |
image_data_bytes = r.hgetall(f'image:{id}'.encode('utf-8')) | |
image_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in image_data_bytes.items()} | |
print(image_data) | |
# Create an embed | |
embed = discord.Embed(color=discord.Colour.blurple()) | |
embed.set_author(name=f"Image Observation {id} - New Label π", icon_url=ctx.author.display_avatar.url) | |
embed.add_field(name="Sensor", value=image_data['sensor'], inline=True) | |
embed.add_field(name="Recorded at", value=image_data['timestamp'], inline=True) | |
embed.add_field(name="Recent Label", value=image_data['label'], inline=True) | |
embed.add_field(name="Proposed by", value=image_data['author'], inline=True) | |
path = f'https://gainforest-transparency-dashboard.s3.amazonaws.com/{image_data["awsCID"]}' | |
embed.set_image(url=path) | |
await ctx.respond(f"{ctx.author.mention} labeled observation **{id}** as **{label}** π (Earned 10 XP)", embed=embed) | |
async def progress(ctx): | |
value = r.get(b'species_identified').decode('utf-8') | |
progress_bar = generate_progress_bar(value) | |
await ctx.respond(progress_bar) # this decorator makes a slash command | |
# this decorator makes a slash command | |
async def sound(ctx, id): # a slash command will be created with the name "ping" | |
# Fetch the sound data from Redis | |
sound_data_bytes = r.hgetall(f'sound:{id}'.encode('utf-8')) | |
sound_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in sound_data_bytes.items()} | |
# Create an embed | |
embed = discord.Embed(color=discord.Colour.blurple()) | |
embed.set_author(name=f"Sound Observation {id} - Task", icon_url=ctx.author.display_avatar.url) | |
embed.add_field(name="Sensor", value=sound_data['sensor'], inline=True) | |
embed.add_field(name="Recorded at", value=sound_data['timestamp'], inline=True) | |
embed.add_field(name="Recent Label", value=sound_data['label'], inline=True) | |
embed.add_field(name="Proposed by", value=sound_data['author'], inline=True) | |
embed.add_field(name="Detected at Timestamp", value=sound_data['label_at'], inline=True) | |
await ctx.respond(f'Please listen to Sound ID **{id}** here:\nhttps://gainforest.app/observations/{sound_data["uuid"]} π¦π΅', embed=embed) | |
async def identify_sound(ctx, id:int, label:str, timestamp:str, confidence: str): | |
timestamp = str(time.time()) | |
author_id = str(ctx.author_id) | |
role = "Hobbyist" | |
role_names = [role.name for role in ctx.author.roles] | |
if "Expert" in role_names: | |
role = "Expert" | |
label_cnt = generate_id(f'cnt:label:sound:{id}') | |
r.hset(f'label:sound:{id}:{label_cnt}'.encode('utf-8'), b'label', label.encode('utf-8')) | |
r.hset(f'label:sound:{id}:{label_cnt}'.encode('utf-8'), b'author', ctx.author.name.encode('utf-8')) | |
r.hset(f'label:sound:{id}:{label_cnt}'.encode('utf-8'), b'role', role) | |
r.hset(f'label:sound:{id}:{label_cnt}'.encode('utf-8'), b'label_at', timestamp.encode('utf-8')) | |
conf_score = get_conf_score(confidence) | |
r.hset(f'label:sound:{id}:{label_cnt}'.encode('utf-8'), b'confidence', conf_score) | |
r.hset(f'label:sound:{id}:{label_cnt}'.encode('utf-8'), b'authorid', author_id) | |
r.hset(f'label:sound:{id}:{label_cnt}'.encode('utf-8'), b'timestamp', timestamp) | |
r.hset(f'sound:{id}'.encode('utf-8'), b'label', label.encode('utf-8')) | |
r.hset(f'sound:{id}'.encode('utf-8'), b'label_at', timestamp.encode('utf-8')) | |
r.hset(f'sound:{id}'.encode('utf-8'), b'author', ctx.author.mention.encode('utf-8')) | |
# Fetch the sound data from Redis | |
sound_data_bytes = r.hgetall(f'sound:{id}'.encode('utf-8')) | |
sound_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in sound_data_bytes.items()} | |
print(sound_data) | |
# Create an embed | |
embed = discord.Embed(color=discord.Colour.blurple()) | |
embed.set_author(name=f"Sound Observation {id} - New Label π", icon_url=ctx.author.display_avatar.url) | |
embed.add_field(name="Sensor", value=sound_data['sensor'], inline=True) | |
embed.add_field(name="Recorded at", value=sound_data['timestamp'], inline=True) | |
embed.add_field(name="Recent Label", value=sound_data['label'], inline=True) | |
embed.add_field(name="Proposed by", value=sound_data['author'], inline=True) | |
embed.add_field(name="Detected at Timestamp", value=sound_data['label_at'], inline=True) | |
await ctx.respond(f'{ctx.author.mention} labeled sound observation **{id}** as **{label}** at **{timestamp}** π (Earned 10 XP).\nListen here:\nhttps://gainforest.app/observations/{sound_data["uuid"]} π¦π΅', embed=embed) | |
async def next_image(ctx): | |
total_images = r.get(b'cnt:image').decode('utf-8') | |
numbers = list(range(1, int(total_images))) | |
exclude_redis = r.lrange('image:accept', 0, -1) | |
exclude = set(int(num.decode('utf-8')) for num in exclude_redis) | |
numbers = [num for num in numbers if num not in exclude] | |
id = random.choice(numbers) | |
await image(ctx, id) | |
async def next_sound(ctx): | |
total_sound = r.get(b'cnt:sound').decode('utf-8') | |
numbers = list(range(1, int(total_sound))) | |
exclude_redis = r.lrange('image:accept', 0, -1) | |
exclude = set(int(num.decode('utf-8')) for num in exclude_redis) | |
numbers = [num for num in numbers if num not in exclude] | |
id = random.choice(numbers) | |
await sound(ctx, id) | |
async def profile(ctx): | |
# xp_key = f'{ctx.author.name}:XP'.encode('utf-8') | |
# if not r.exists(xp_key): | |
# r.set(xp_key, 0) | |
# img_oc_key = f'{ctx.author.name}:image'.encode('utf-8') | |
# if not r.exists(img_oc_key): | |
# r.set(img_oc_key, 0) | |
# s_oc_key = f'{ctx.author.name}:sound'.encode('utf-8') | |
# if not r.exists(s_oc_key): | |
# r.set(s_oc_key, 0) | |
# Create an embed | |
role = "Hobbyist" | |
role_names = [role.name for role in ctx.author.roles] | |
if "Expert" in role_names: | |
role = "Expert" | |
embed = discord.Embed(color=discord.Colour.blurple()) | |
embed.set_author(name=f"{ctx.author.name} - Profile", icon_url=ctx.author.display_avatar.url) | |
# embed.add_field(name="**PROGRESS**", value=f"**Level**: 1\n**XP**: {r.get(xp_key).decode()}/1000\n", inline=False) | |
# embed.add_field(name="**STATS**", value=f"**π± Images labeled**: {r.get(img_oc_key).decode()}", inline=False) | |
embed.set_thumbnail(url=ctx.author.display_avatar.url) | |
embed.set_footer(text=f"π Role: {r.get(role).decode()}") | |
await ctx.respond(embed=embed) | |
t = Thread(target=bot.run, daemon=True, args=('MTA5NzMzMDI2MDI4NDAzNTEyMg.GNFiQP.hZC_2HLTvVAROlKKUmnVQjviT0G4wHeQq23-rs', )) | |
t.start() | |
import gradio as gr | |
with gr.Blocks() as demo: | |
gr.Markdown(Path('landing.md').read_text()) | |
demo.launch() |