import sqlite3 import os from src.schema import Skill class SkillsDB: def __init__(self, db_name="skills.db"): # Check if the database file exists db_exists = os.path.exists(db_name) # Connect to the database self.conn = sqlite3.connect(db_name) self.cursor = self.conn.cursor() # If the database file didn't exist before, create and initialize the database if not db_exists: self.create_db() def create_db(self): # Create skill table self.cursor.execute(''' CREATE TABLE IF NOT EXISTS skills ( id INTEGER PRIMARY KEY, repo_id TEXT NOT NULL, skill_name TEXT NOT NULL, skill_description TEXT, author TEXT, created_at TEXT, skill_usage_example TEXT, skill_program_language TEXT, skill_tags TEXT ); ''') # Create tags table self.cursor.execute(''' CREATE TABLE IF NOT EXISTS tags ( id INTEGER PRIMARY KEY, tag TEXT NOT NULL UNIQUE ); ''') self.conn.commit() def add_skill(self, skill): self.cursor.execute('SELECT id FROM skills WHERE skill_name = ? AND author = ?;', (skill.skill_name, skill.author)) if self.cursor.fetchone() is not None: return f"Skill with name '{skill.skill_name}' by author '{skill.author}' already exists!" # Handle tags: check if they exist; if not, insert them tag_ids = [] for tag in skill.skill_tags: self.cursor.execute('SELECT id FROM tags WHERE tag = ?;', (tag,)) tag_id = self.cursor.fetchone() if tag_id is None: self.cursor.execute('INSERT INTO tags (tag) VALUES (?);', (tag,)) tag_id = self.cursor.lastrowid else: tag_id = tag_id[0] tag_ids.append(str(tag_id)) tags_str = ",".join(tag_ids) # Insert skill into skills table self.cursor.execute(''' INSERT INTO skills (repo_id, skill_name, skill_description, author, created_at, skill_usage_example, skill_program_language, skill_tags) VALUES (?, ?, ?, ?, ?, ?, ?, ?); ''', (skill.repo_id, skill.skill_name, skill.skill_description, skill.author, skill.created_at, skill.skill_usage_example, skill.skill_program_language, tags_str)) self.conn.commit() return "ok" def get_skills(self): # Fetch all skills from the skills table along with column names self.cursor.execute('SELECT * FROM skills;') col_names = [col[0] for col in self.cursor.description] skills_data = self.cursor.fetchall() # Extract data using column names and create Skill objects skills = [] for skill_data in skills_data: skill_dict = dict(zip(col_names, skill_data)) tag_ids = skill_dict['skill_tags'].split(',') self.cursor.execute('SELECT tag FROM tags WHERE id IN (%s);' % ','.join(['?'] * len(tag_ids)), tag_ids) tags = [tag[0] for tag in self.cursor.fetchall()] skill_obj = Skill(skill_dict['repo_id'], skill_dict['skill_name'], skill_dict['skill_description'], skill_dict['author'], skill_dict['created_at'], skill_dict['skill_usage_example'], skill_dict['skill_program_language'], tags) skills.append(skill_obj) return skills def get_tags(self): # Fetch all tags from the tags table self.cursor.execute('SELECT tag FROM tags;') tags = self.cursor.fetchall() if tags is not None: return [tag[0] for tag in tags] return [] def close(self): self.conn.close()