Spaces:
Sleeping
Sleeping
import sqlite3 | |
import os | |
from src.schema import Skill | |
class SkillsDB: | |
def __init__(self, db_name="skills.db"): | |
# Check if the database file exists | |
db_exists = os.path.exists(db_name) | |
# Connect to the database | |
self.conn = sqlite3.connect(db_name) | |
self.cursor = self.conn.cursor() | |
# If the database file didn't exist before, create and initialize the database | |
if not db_exists: | |
self.create_db() | |
def create_db(self): | |
# Create skill table | |
self.cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS skills ( | |
id INTEGER PRIMARY KEY, | |
repo_id TEXT NOT NULL, | |
skill_name TEXT NOT NULL, | |
skill_description TEXT, | |
author TEXT, | |
created_at TEXT, | |
skill_usage_example TEXT, | |
skill_program_language TEXT, | |
skill_tags TEXT | |
); | |
''') | |
# Create tags table | |
self.cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS tags ( | |
id INTEGER PRIMARY KEY, | |
tag TEXT NOT NULL UNIQUE | |
); | |
''') | |
self.conn.commit() | |
def add_skill(self, skill): | |
self.cursor.execute('SELECT id FROM skills WHERE skill_name = ? AND author = ?;', (skill.skill_name, skill.author)) | |
if self.cursor.fetchone() is not None: | |
return f"Skill with name '{skill.skill_name}' by author '{skill.author}' already exists!" | |
# Handle tags: check if they exist; if not, insert them | |
tag_ids = [] | |
for tag in skill.skill_tags: | |
self.cursor.execute('SELECT id FROM tags WHERE tag = ?;', (tag,)) | |
tag_id = self.cursor.fetchone() | |
if tag_id is None: | |
self.cursor.execute('INSERT INTO tags (tag) VALUES (?);', (tag,)) | |
tag_id = self.cursor.lastrowid | |
else: | |
tag_id = tag_id[0] | |
tag_ids.append(str(tag_id)) | |
tags_str = ",".join(tag_ids) | |
# Insert skill into skills table | |
self.cursor.execute(''' | |
INSERT INTO skills (repo_id, skill_name, skill_description, author, created_at, skill_usage_example, skill_program_language, skill_tags) | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?); | |
''', (skill.repo_id, skill.skill_name, skill.skill_description, skill.author, skill.created_at, skill.skill_usage_example, skill.skill_program_language, tags_str)) | |
self.conn.commit() | |
return "ok" | |
def get_skills(self): | |
# Fetch all skills from the skills table along with column names | |
self.cursor.execute('SELECT * FROM skills;') | |
col_names = [col[0] for col in self.cursor.description] | |
skills_data = self.cursor.fetchall() | |
# Extract data using column names and create Skill objects | |
skills = [] | |
for skill_data in skills_data: | |
skill_dict = dict(zip(col_names, skill_data)) | |
tag_ids = skill_dict['skill_tags'].split(',') | |
self.cursor.execute('SELECT tag FROM tags WHERE id IN (%s);' % ','.join(['?'] * len(tag_ids)), tag_ids) | |
tags = [tag[0] for tag in self.cursor.fetchall()] | |
skill_obj = Skill(skill_dict['repo_id'], skill_dict['skill_name'], skill_dict['skill_description'], skill_dict['author'], | |
skill_dict['created_at'], skill_dict['skill_usage_example'], skill_dict['skill_program_language'], tags) | |
skills.append(skill_obj) | |
return skills | |
def get_tags(self): | |
# Fetch all tags from the tags table | |
self.cursor.execute('SELECT tag FROM tags;') | |
tags = self.cursor.fetchall() | |
if tags is not None: | |
return [tag[0] for tag in tags] | |
return [] | |
def close(self): | |
self.conn.close() |