import ast import uuid from typing import Dict, List, Union import argilla as rg import gradio as gr import pandas as pd from datasets import Dataset from distilabel.distiset import Distiset from huggingface_hub import HfApi from src.distilabel_dataset_generator.apps.base import ( get_argilla_client, get_pipeline_code_ui, hide_success_message, show_success_message_hub, validate_argilla_user_workspace_dataset, validate_push_to_hub, ) from src.distilabel_dataset_generator.pipelines.base import ( DEFAULT_BATCH_SIZE, ) from src.distilabel_dataset_generator.pipelines.embeddings import ( get_embeddings, get_sentence_embedding_dimensions, ) from src.distilabel_dataset_generator.pipelines.sft import ( DEFAULT_DATASET_DESCRIPTIONS, PROMPT_CREATION_PROMPT, generate_pipeline_code, get_magpie_generator, get_prompt_generator, get_response_generator, ) from src.distilabel_dataset_generator.utils import ( get_org_dropdown, ) def convert_dataframe_messages(dataframe: pd.DataFrame) -> pd.DataFrame: def convert_to_list_of_dicts(messages: str) -> List[Dict[str, str]]: return ast.literal_eval( messages.replace("'user'}", "'user'},") .replace("'system'}", "'system'},") .replace("'assistant'}", "'assistant'},") ) if "messages" in dataframe.columns: dataframe["messages"] = dataframe["messages"].apply( lambda x: convert_to_list_of_dicts(x) if isinstance(x, str) else x ) return dataframe def generate_system_prompt(dataset_description, progress=gr.Progress()): progress(0.0, desc="Generating system prompt") progress(0.3, desc="Initializing text generation") generate_description = get_prompt_generator() progress(0.7, desc="Generating system prompt") result = next( generate_description.process( [ { "system_prompt": PROMPT_CREATION_PROMPT, "instruction": dataset_description, } ] ) )[0]["generation"] progress(1.0, desc="System prompt generated") return result, pd.DataFrame() def generate_sample_dataset(system_prompt, progress=gr.Progress()): df = generate_dataset( system_prompt=system_prompt, num_turns=1, num_rows=10, progress=progress, is_sample=True, ) return df def generate_dataset( system_prompt: str, num_turns: int = 1, num_rows: int = 10, is_sample: bool = False, progress=gr.Progress(), ) -> pd.DataFrame: progress(0.0, desc="(1/2) Generating instructions") magpie_generator = get_magpie_generator( num_turns, num_rows, system_prompt, is_sample ) response_generator = get_response_generator(num_turns, system_prompt, is_sample) total_steps: int = num_rows * 2 batch_size = DEFAULT_BATCH_SIZE # create instructions n_processed = 0 magpie_results = [] while n_processed < num_rows: progress( 0.5 * n_processed / num_rows, total=total_steps, desc="(1/2) Generating instructions", ) remaining_rows = num_rows - n_processed batch_size = min(batch_size, remaining_rows) inputs = [{"system_prompt": system_prompt} for _ in range(batch_size)] batch = list(magpie_generator.process(inputs=inputs)) magpie_results.extend(batch[0]) n_processed += batch_size progress(0.5, desc="(1/2) Generating instructions") # generate responses n_processed = 0 response_results = [] if num_turns == 1: while n_processed < num_rows: progress( 0.5 + 0.5 * n_processed / num_rows, total=total_steps, desc="(2/2) Generating responses", ) batch = magpie_results[n_processed : n_processed + batch_size] responses = list(response_generator.process(inputs=batch)) response_results.extend(responses[0]) n_processed += batch_size for result in response_results: result["prompt"] = result["instruction"] result["completion"] = result["generation"] result["system_prompt"] = system_prompt else: for result in magpie_results: result["conversation"].insert( 0, {"role": "system", "content": system_prompt} ) result["messages"] = result["conversation"] while n_processed < num_rows: progress( 0.5 + 0.5 * n_processed / num_rows, total=total_steps, desc="(2/2) Generating responses", ) batch = magpie_results[n_processed : n_processed + batch_size] responses = list(response_generator.process(inputs=batch)) response_results.extend(responses[0]) n_processed += batch_size for result in response_results: result["messages"].append( {"role": "assistant", "content": result["generation"]} ) progress( 1, total=total_steps, desc="(2/2) Creating dataset", ) # create distiset distiset_results = [] for result in response_results: record = {} for relevant_keys in [ "messages", "prompt", "completion", "model_name", "system_prompt", ]: if relevant_keys in result: record[relevant_keys] = result[relevant_keys] distiset_results.append(record) distiset = Distiset( { "default": Dataset.from_list(distiset_results), } ) # If not pushing to hub generate the dataset directly distiset = distiset["default"] if num_turns == 1: outputs = distiset.to_pandas()[["prompt", "completion", "system_prompt"]] else: outputs = distiset.to_pandas()[["messages"]] dataframe = pd.DataFrame(outputs) progress(1.0, desc="Dataset generation completed") return dataframe def push_dataset_to_hub(dataframe, org_name, repo_name, oauth_token, private): repo_id = validate_push_to_hub(org_name, repo_name) original_dataframe = dataframe.copy(deep=True) dataframe = convert_dataframe_messages(dataframe) distiset = Distiset({"default": Dataset.from_pandas(dataframe)}) distiset.push_to_hub( repo_id=repo_id, private=private, include_script=False, token=oauth_token.token, create_pr=False, ) return original_dataframe def push_dataset_to_argilla( org_name: str, repo_name: str, system_prompt: str, num_turns: int = 1, n_rows: int = 10, private: bool = False, oauth_token: Union[gr.OAuthToken, None] = None, progress=gr.Progress(), ) -> pd.DataFrame: dataframe = generate_dataset( system_prompt=system_prompt, num_turns=num_turns, num_rows=n_rows, ) push_dataset_to_hub(dataframe, org_name, repo_name, oauth_token, private) try: progress(0.1, desc="Setting up user and workspace") client = get_argilla_client() hf_user = HfApi().whoami(token=oauth_token.token)["name"] if "messages" in dataframe.columns: settings = rg.Settings( fields=[ rg.ChatField( name="messages", description="The messages in the conversation", title="Messages", ), ], questions=[ rg.RatingQuestion( name="rating", title="Rating", description="The rating of the conversation", values=list(range(1, 6)), ), ], metadata=[ rg.IntegerMetadataProperty( name="user_message_length", title="User Message Length" ), rg.IntegerMetadataProperty( name="assistant_message_length", title="Assistant Message Length", ), ], vectors=[ rg.VectorField( name="messages_embeddings", dimensions=get_sentence_embedding_dimensions(), ) ], guidelines="Please review the conversation and provide a score for the assistant's response.", ) dataframe["user_message_length"] = dataframe["messages"].apply( lambda x: sum([len(y["content"]) for y in x if y["role"] == "user"]) ) dataframe["assistant_message_length"] = dataframe["messages"].apply( lambda x: sum( [len(y["content"]) for y in x if y["role"] == "assistant"] ) ) dataframe["messages_embeddings"] = get_embeddings( dataframe["messages"].apply( lambda x: " ".join([y["content"] for y in x]) ) ) else: settings = rg.Settings( fields=[ rg.TextField( name="system_prompt", title="System Prompt", description="The system prompt used for the conversation", required=False, ), rg.TextField( name="prompt", title="Prompt", description="The prompt used for the conversation", ), rg.TextField( name="completion", title="Completion", description="The completion from the assistant", ), ], questions=[ rg.RatingQuestion( name="rating", title="Rating", description="The rating of the conversation", values=list(range(1, 6)), ), ], metadata=[ rg.IntegerMetadataProperty( name="prompt_length", title="Prompt Length" ), rg.IntegerMetadataProperty( name="completion_length", title="Completion Length" ), ], vectors=[ rg.VectorField( name="prompt_embeddings", dimensions=get_sentence_embedding_dimensions(), ) ], guidelines="Please review the conversation and correct the prompt and completion where needed.", ) dataframe["prompt_length"] = dataframe["prompt"].apply(len) dataframe["completion_length"] = dataframe["completion"].apply(len) dataframe["prompt_embeddings"] = get_embeddings(dataframe["prompt"]) progress(0.5, desc="Creating dataset") rg_dataset = client.datasets(name=repo_name, workspace=hf_user) if rg_dataset is None: rg_dataset = rg.Dataset( name=repo_name, workspace=hf_user, settings=settings, client=client, ) rg_dataset = rg_dataset.create() progress(0.7, desc="Pushing dataset to Argilla") hf_dataset = Dataset.from_pandas(dataframe) rg_dataset.records.log(records=hf_dataset) progress(1.0, desc="Dataset pushed to Argilla") except Exception as e: raise gr.Error(f"Error pushing dataset to Argilla: {e}") return "" with gr.Blocks() as app: gr.Markdown("## Describe the dataset you want") gr.HTML("