Spaces:
Running
Running
import os | |
import sys | |
import tempfile | |
import shutil | |
from urllib.parse import urlparse | |
import requests | |
from github import Github | |
from git import Repo | |
from collections import defaultdict | |
import time | |
import numpy as np | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.cluster import KMeans | |
from sklearn.metrics.pairwise import cosine_similarity | |
import subprocess | |
import json | |
from pathlib import Path | |
import traceback | |
import argparse | |
def run_semgrep(repo_path): | |
try: | |
result = subprocess.run( | |
["semgrep", "--config", "auto", "--json", repo_path], | |
capture_output=True, | |
text=True, | |
check=True | |
) | |
return json.loads(result.stdout) | |
except subprocess.CalledProcessError as e: | |
print(f"Semgrep error: {e}") | |
return None | |
except json.JSONDecodeError: | |
print("Failed to parse Semgrep output") | |
return None | |
def get_repo_info(input_str): | |
if input_str.startswith("http") or input_str.startswith("https"): | |
parsed_url = urlparse(input_str) | |
path_parts = parsed_url.path.strip("/").split("/") | |
return path_parts[0], path_parts[1] | |
else: | |
return input_str.split("/") | |
def clone_repo(owner, repo_name, temp_dir): | |
repo_url = f"https://github.com/{owner}/{repo_name}.git" | |
Repo.clone_from(repo_url, temp_dir) | |
return temp_dir | |
def analyze_code(repo_path): | |
file_types = defaultdict(int) | |
file_contents = {} | |
for root, _, files in os.walk(repo_path): | |
for file in files: | |
file_path = os.path.join(root, file) | |
_, ext = os.path.splitext(file) | |
file_types[ext] += 1 | |
if ext in ['.py', '.js', '.java', '.cpp', '.cs', '.go', '.rb', '.php', 'ts', 'tsx', 'jsx']: | |
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
file_contents[file_path] = f.read() | |
semgrep_results = run_semgrep(repo_path) | |
return { | |
"file_types": dict(file_types), | |
"file_contents": file_contents, | |
"semgrep_results": semgrep_results | |
} | |
def analyze_issues(github_repo, max_issues): | |
closed_issues = [] | |
open_issues = [] | |
for issue in github_repo.get_issues(state="all")[:max_issues]: | |
issue_data = { | |
"number": issue.number, | |
"title": issue.title, | |
"body": issue.body, | |
"state": issue.state, | |
"created_at": issue.created_at.isoformat(), | |
"closed_at": issue.closed_at.isoformat() if issue.closed_at else None, | |
"comments": [] | |
} | |
for comment in issue.get_comments(): | |
issue_data["comments"].append({ | |
"body": comment.body, | |
"created_at": comment.created_at.isoformat() | |
}) | |
if issue.state == "closed": | |
closed_issues.append(issue_data) | |
else: | |
open_issues.append(issue_data) | |
time.sleep(0.5) # Rate limiting | |
# Cluster and filter closed issues | |
if closed_issues: | |
filtered_closed_issues = cluster_and_filter_items(closed_issues, n_clusters=min(5, len(closed_issues)), n_items=min(10, len(closed_issues))) | |
else: | |
filtered_closed_issues = [] | |
return { | |
'closed_issues': closed_issues, | |
'open_issues': open_issues, | |
'filtered_closed_issues': filtered_closed_issues | |
} | |
def analyze_pull_requests(github_repo, max_prs): | |
closed_prs = [] | |
open_prs = [] | |
for pr in github_repo.get_pulls(state="all")[:max_prs]: | |
pr_data = { | |
"number": pr.number, | |
"title": pr.title, | |
"body": pr.body, | |
"state": pr.state, | |
"created_at": pr.created_at.isoformat(), | |
"closed_at": pr.closed_at.isoformat() if pr.closed_at else None, | |
"comments": [], | |
"diff": pr.get_files() | |
} | |
for comment in pr.get_comments(): | |
pr_data["comments"].append({ | |
"body": comment.body, | |
"created_at": comment.created_at.isoformat() | |
}) | |
if pr.state == "closed": | |
closed_prs.append(pr_data) | |
else: | |
open_prs.append(pr_data) | |
time.sleep(0.5) # Rate limiting | |
# Cluster and filter closed PRs | |
if closed_prs: | |
filtered_closed_prs = cluster_and_filter_items(closed_prs, n_clusters=min(5, len(closed_prs)), n_items=min(10, len(closed_prs))) | |
else: | |
filtered_closed_prs = [] | |
return { | |
'closed_prs': closed_prs, | |
'open_prs': open_prs, | |
'filtered_closed_prs': filtered_closed_prs | |
} | |
def call_llm(client, prompt, model="google/gemini-flash-1.5-exp", max_tokens=4096): | |
response = client.chat.completions.create( | |
model=model, | |
messages=[ | |
{"role": "user", "content": prompt} | |
], | |
max_tokens=max_tokens, | |
) | |
return response.choices[0].message.content | |
def safe_call_llm(client, prompt, retries=3): | |
for attempt in range(retries): | |
try: | |
response = call_llm(client, prompt) | |
return parse_llm_response(response) | |
except Exception as e: | |
print(f"Error in LLM call (attempt {attempt + 1}/{retries}): {str(e)}") | |
if attempt == retries - 1: | |
print("All retries failed. Returning empty list.") | |
return [] | |
return [] | |
def parse_llm_response(response): | |
try: | |
# First, try to parse the entire response as JSON | |
return json.loads(response) | |
except json.JSONDecodeError: | |
# If that fails, try to extract JSON from the response | |
try: | |
start = response.index('[') | |
end = response.rindex(']') + 1 | |
json_str = response[start:end] | |
return json.loads(json_str) | |
except (ValueError, json.JSONDecodeError): | |
print(f"Warning: Failed to parse LLM response as JSON. Response: {response[:100]}...") | |
return [] | |
def cluster_and_filter_items(items, n_clusters=5, n_items=10): | |
# Combine title and body for text analysis | |
texts = [f"{item['title']} {item['body']}" for item in items] | |
# Create TF-IDF vectors | |
vectorizer = TfidfVectorizer(stop_words='english', max_features=1000) | |
tfidf_matrix = vectorizer.fit_transform(texts) | |
# Perform clustering | |
kmeans = KMeans(n_clusters=min(n_clusters, len(items))) | |
kmeans.fit(tfidf_matrix) | |
# Get cluster centers | |
cluster_centers = kmeans.cluster_centers_ | |
# Find items closest to cluster centers | |
filtered_items = [] | |
for i in range(min(n_clusters, len(items))): | |
cluster_items = [item for item, label in zip(items, kmeans.labels_) if label == i] | |
cluster_vectors = tfidf_matrix[kmeans.labels_ == i] | |
# Calculate similarities to cluster center | |
similarities = cosine_similarity(cluster_vectors, cluster_centers[i].reshape(1, -1)).flatten() | |
# Sort items by similarity and select top ones | |
sorted_items = [x for _, x in sorted(zip(similarities, cluster_items), key=lambda pair: pair[0], reverse=True)] | |
filtered_items.extend(sorted_items[:min(n_items // n_clusters, len(sorted_items))]) | |
return filtered_items | |
def safe_filter_open_items(open_items, closed_patterns, n_items=10): | |
try: | |
# Combine title and body for text analysis | |
open_texts = [f"{item.get('title', '')} {item.get('body', '')}" for item in open_items] | |
pattern_texts = [f"{pattern.get('theme', '')} {pattern.get('description', '')}" for pattern in closed_patterns] | |
if not open_texts or not pattern_texts: | |
print("Warning: No open items or closed patterns to analyze.") | |
return [] | |
# Create TF-IDF vectors | |
vectorizer = TfidfVectorizer(stop_words='english', max_features=1000) | |
tfidf_matrix = vectorizer.fit_transform(open_texts + pattern_texts) | |
# Split the matrix into open items and patterns | |
open_vectors = tfidf_matrix[:len(open_items)] | |
pattern_vectors = tfidf_matrix[len(open_items):] | |
# Calculate similarities between open items and patterns | |
similarities = cosine_similarity(open_vectors, pattern_vectors) | |
# Calculate the average similarity for each open item | |
avg_similarities = np.mean(similarities, axis=1) | |
# Sort open items by average similarity and select top ones | |
sorted_items = [x for _, x in sorted(zip(avg_similarities, open_items), key=lambda pair: pair[0], reverse=True)] | |
return sorted_items[:n_items] | |
except Exception as e: | |
print(f"Error in filtering open items: {str(e)}") | |
traceback.print_exc() | |
return open_items[:n_items] # Return first n_items if filtering fails | |
def filter_open_items(open_items, closed_patterns, n_items=10): | |
# Combine title and body for text analysis | |
open_texts = [f"{item['title']} {item['body']}" for item in open_items] | |
pattern_texts = [f"{pattern.get('theme', '')} {pattern.get('description', '')}" for pattern in closed_patterns] | |
# Create TF-IDF vectors | |
vectorizer = TfidfVectorizer(stop_words='english', max_features=1000) | |
tfidf_matrix = vectorizer.fit_transform(open_texts + pattern_texts) | |
# Split the matrix into open items and patterns | |
open_vectors = tfidf_matrix[:len(open_items)] | |
pattern_vectors = tfidf_matrix[len(open_items):] | |
# Calculate similarities between open items and patterns | |
similarities = cosine_similarity(open_vectors, pattern_vectors) | |
# Calculate the average similarity for each open item | |
avg_similarities = np.mean(similarities, axis=1) | |
# Sort open items by average similarity and select top ones | |
sorted_items = [x for _, x in sorted(zip(avg_similarities, open_items), key=lambda pair: pair[0], reverse=True)] | |
return sorted_items[:n_items] | |
def llm_analyze_closed_items(client, items, item_type): | |
prompt = f""" | |
Analyze the following closed GitHub {item_type}: | |
{items} | |
Based on these closed {item_type}, identify: | |
1. Common themes or recurring patterns | |
2. Areas where automation could streamline {item_type} management | |
3. Potential LLM-assisted workflows to improve the {item_type} process | |
4. Do not return anything other than the expected JSON object | |
For each identified pattern or theme, provide: | |
- A short title or theme name | |
- A brief description of the pattern | |
- Potential LLM-assisted solutions or workflows | |
Format your response as a list of JSON objects, like this: | |
[ | |
{{ | |
"theme": "Theme name", | |
"description": "Brief description of the pattern", | |
"llm_solution": "Potential LLM-assisted solution or workflow" | |
}}, | |
... | |
] | |
""" | |
return safe_call_llm(client, prompt) | |
def llm_analyze_open_items(client, open_items, closed_patterns, item_type, repo_url): | |
prompt = f""" | |
Consider the following patterns identified in closed {item_type}: | |
{closed_patterns} | |
Now, analyze these open {item_type} in light of the above patterns: | |
{open_items} | |
For each open {item_type}: | |
1. Identify which pattern(s) it most closely matches | |
2. Suggest specific LLM-assisted workflows or automations that could be applied, based on the matched patterns | |
3. Explain how the suggested workflow would improve the handling of this {item_type} | |
4. Include the {item_type} number in your response | |
5. Do not return anything other than the expected JSON object | |
Format your response as a list of JSON objects, like this: | |
[ | |
{{ | |
"number": {item_type} number, | |
"matched_patterns": ["Pattern 1", "Pattern 2"], | |
"suggested_workflow": "Description of the suggested LLM-assisted workflow", | |
"expected_improvement": "Explanation of how this would improve the {item_type} handling" | |
}}, | |
... | |
] | |
""" | |
return safe_call_llm(client, prompt) | |
def llm_analyze_issues(client, issues_data, repo_url): | |
filtered_closed_issues = issues_data['filtered_closed_issues'] | |
all_closed_issues = issues_data['closed_issues'] | |
open_issues = issues_data['open_issues'] | |
closed_patterns = llm_analyze_closed_items(client, filtered_closed_issues, "issues") | |
relevant_open_issues = safe_filter_open_items(open_issues, closed_patterns, n_items=10) | |
open_issues_analysis = llm_analyze_open_items(client, relevant_open_issues, closed_patterns, "issues", repo_url) | |
summary_prompt = f""" | |
Summarize the analysis of closed and open issues: | |
Closed Issues Patterns: | |
{closed_patterns} | |
Open Issues Analysis: | |
{open_issues_analysis} | |
Provide a concise summary of: | |
1. Key patterns identified in closed issues | |
2. Most promising LLM-assisted workflows for handling open issues | |
3. Overall recommendations for improving issue management in this repository | |
4. For each suggested workflow, include the number of an open issue where it could be applied | |
5. Do not return anything other than the expected JSON object | |
Format your response as a JSON object with the following structure: | |
{{ | |
"key_patterns": ["pattern1", "pattern2", ...], | |
"promising_workflows": [ | |
{{ | |
"workflow": "Description of the workflow", | |
"applicable_issue": issue_number | |
}}, | |
... | |
], | |
"overall_recommendations": ["recommendation1", "recommendation2", ...] | |
}} | |
Total number of closed issues analyzed: {len(all_closed_issues)} | |
Total number of open issues: {len(open_issues)} | |
""" | |
summary = safe_call_llm(client, summary_prompt) | |
return { | |
'closed_patterns': closed_patterns, | |
'open_issues_analysis': open_issues_analysis, | |
'summary': summary | |
} | |
def llm_analyze_prs(client, prs_data, repo_url): | |
filtered_closed_prs = prs_data['filtered_closed_prs'] | |
all_closed_prs = prs_data['closed_prs'] | |
open_prs = prs_data['open_prs'] | |
closed_patterns = llm_analyze_closed_items(client, filtered_closed_prs, "pull requests") | |
relevant_open_prs = safe_filter_open_items(open_prs, closed_patterns, n_items=10) | |
open_prs_analysis = llm_analyze_open_items(client, relevant_open_prs, closed_patterns, "pull requests", repo_url) | |
summary_prompt = f""" | |
Summarize the analysis of closed and open pull requests: | |
Closed PRs Patterns: | |
{closed_patterns} | |
Open PRs Analysis: | |
{open_prs_analysis} | |
Provide a concise summary of: | |
1. Key patterns identified in closed pull requests | |
2. Most promising LLM-assisted workflows for handling open pull requests | |
3. Overall recommendations for improving the PR process in this repository | |
4. For each suggested workflow, include the number of an open PR where it could be applied | |
5. Do not return anything other than the expected JSON object | |
Format your response as a JSON object with the following structure: | |
{{ | |
"key_patterns": ["pattern1", "pattern2", ...], | |
"promising_workflows": [ | |
{{ | |
"workflow": "Description of the workflow", | |
"applicable_pr": pr_number | |
}}, | |
... | |
], | |
"overall_recommendations": ["recommendation1", "recommendation2", ...] | |
}} | |
Total number of closed pull requests analyzed: {len(all_closed_prs)} | |
Total number of open pull requests: {len(open_prs)} | |
""" | |
summary = safe_call_llm(client, summary_prompt) | |
return { | |
'closed_patterns': closed_patterns, | |
'open_prs_analysis': open_prs_analysis, | |
'summary': summary | |
} | |
def llm_analyze_code(client, code_analysis): | |
semgrep_summary = "No Semgrep results available." | |
if code_analysis['semgrep_results']: | |
findings = code_analysis['semgrep_results'].get('results', []) | |
semgrep_summary = f"Semgrep found {len(findings)} potential issues:" | |
for finding in findings[:10]: # Limit to 10 findings to avoid token limits | |
semgrep_summary += f"\n- {finding['check_id']} in {finding['path']}: {finding['extra']['message']}" | |
file_contents_summary = "" | |
for file_path, content in code_analysis['file_contents'].items(): | |
file_contents_summary += f"\n\nFile: {file_path}\nContent:\n{content[:1000]}..." # Limit content to avoid token limits | |
prompt = f""" | |
Analyze the following code structure, content, and Semgrep results: | |
File types: {code_analysis['file_types']} | |
Semgrep Analysis: | |
{semgrep_summary} | |
File Contents Summary: | |
{file_contents_summary} | |
Based on this information, provide an analysis covering: | |
1. Patterns in the codebase | |
2. Best practices being followed or missing | |
3. Areas for improvement | |
4. Potential security vulnerabilities or bugs (based on Semgrep results) | |
5. Opportunities for LLM-assisted automation in coding tasks | |
For LLM-assisted opportunities, consider tasks like code review, bug fixing, test generation, or documentation. | |
Respond ONLY with a JSON object in the following format: | |
{{ | |
"patterns": ["pattern1", "pattern2", ...], | |
"best_practices": {{ | |
"followed": ["practice1", "practice2", ...], | |
"missing": ["practice1", "practice2", ...] | |
}}, | |
"areas_for_improvement": ["area1", "area2", ...], | |
"potential_vulnerabilities": [ | |
{{ | |
"description": "Description of the vulnerability", | |
"file_path": "Path to the affected file", | |
"severity": "High/Medium/Low" | |
}}, | |
... | |
], | |
"llm_opportunities": [ | |
{{ | |
"task": "Description of the LLM-assisted task", | |
"file_path": "Path to the relevant file", | |
"improvement": "How LLM assistance would help" | |
}}, | |
... | |
] | |
}} | |
Ensure your response is a valid JSON object and nothing else. | |
""" | |
return safe_call_llm(client, prompt) | |
def llm_synthesize_findings(client, code_analysis, issues_analysis, pr_analysis): | |
prompt = f""" | |
Synthesize the following analyses of a GitHub repository: | |
Code Analysis: | |
{code_analysis} | |
Issues Analysis: | |
{issues_analysis} | |
Pull Requests Analysis: | |
{pr_analysis} | |
Based on these analyses: | |
1. Summarize the key findings across all areas (code, issues, and PRs) | |
2. Identify the top 3-5 most promising opportunities for LLM-assisted workflows | |
3. For each opportunity, provide a specific example of how it could be implemented and the potential benefits | |
4. Suggest any additional areas of investigation or analysis that could provide further insights | |
""" | |
return call_llm(client, prompt, max_tokens=8192) | |
def generate_report(repo_info, code_analysis, issues_analysis, pr_analysis, final_analysis): | |
repo_url = f"https://github.com/{repo_info['owner']}/{repo_info['repo_name']}" | |
report = f"""# LLM-Assisted Workflow Analysis for {repo_info['owner']}/{repo_info['repo_name']} | |
## Repository Overview | |
- Owner: {repo_info['owner']} | |
- Repository: {repo_info['repo_name']} | |
- URL: {repo_url} | |
- File types: {code_analysis.get('file_types', 'N/A')} | |
## Code Analysis | |
""" | |
if isinstance(code_analysis.get('llm_analysis'), dict): | |
code_llm_analysis = code_analysis['llm_analysis'] | |
report += "### Patterns Identified\n" | |
for pattern in code_llm_analysis.get('patterns', []): | |
report += f"- {pattern}\n" | |
report += "\n### Best Practices\n" | |
report += "#### Followed:\n" | |
for practice in code_llm_analysis.get('best_practices', {}).get('followed', []): | |
report += f"- {practice}\n" | |
report += "\n#### Missing:\n" | |
for practice in code_llm_analysis.get('best_practices', {}).get('missing', []): | |
report += f"- {practice}\n" | |
report += "\n### Areas for Improvement\n" | |
for area in code_llm_analysis.get('areas_for_improvement', []): | |
report += f"- {area}\n" | |
report += "\n### Potential Vulnerabilities\n" | |
for vuln in code_llm_analysis.get('potential_vulnerabilities', []): | |
report += f"- {vuln['description']} in `{vuln['file_path']}` (Severity: {vuln['severity']})\n" | |
report += "\n### LLM-Assisted Coding Opportunities\n" | |
for opp in code_llm_analysis.get('llm_opportunities', []): | |
report += f"- **Task:** {opp['task']}\n" | |
report += f" - **File:** `{opp['file_path']}`\n" | |
report += f" - **Improvement:** {opp['improvement']}\n\n" | |
else: | |
report += "No structured code analysis available.\n" | |
report += "\n## Issues Analysis\n" | |
if isinstance(issues_analysis.get('summary'), dict): | |
report += "### Key Patterns in Issues\n" | |
for pattern in issues_analysis['summary'].get('key_patterns', ['No key patterns identified.']): | |
report += f"- {pattern}\n" | |
report += "\n### Promising LLM-Assisted Workflows for Issues\n" | |
for workflow in issues_analysis['summary'].get('promising_workflows', []): | |
report += f"- **Workflow:** {workflow['workflow']}\n" | |
report += f" - **Example Issue:** [{workflow['applicable_issue']}]({repo_url}/issues/{workflow['applicable_issue']})\n\n" | |
report += "### Overall Recommendations for Issue Management\n" | |
for rec in issues_analysis['summary'].get('overall_recommendations', ['No recommendations available.']): | |
report += f"- {rec}\n" | |
else: | |
report += "No structured issues analysis available.\n" | |
report += "\n## Pull Requests Analysis\n" | |
if isinstance(pr_analysis.get('summary'), dict): | |
report += "### Key Patterns in Pull Requests\n" | |
for pattern in pr_analysis['summary'].get('key_patterns', ['No key patterns identified.']): | |
report += f"- {pattern}\n" | |
report += "\n### Promising LLM-Assisted Workflows for Pull Requests\n" | |
for workflow in pr_analysis['summary'].get('promising_workflows', []): | |
report += f"- **Workflow:** {workflow['workflow']}\n" | |
report += f" - **Example PR:** [{workflow['applicable_pr']}]({repo_url}/pull/{workflow['applicable_pr']})\n\n" | |
report += "### Overall Recommendations for PR Process\n" | |
for rec in pr_analysis['summary'].get('overall_recommendations', ['No recommendations available.']): | |
report += f"- {rec}\n" | |
else: | |
report += "No structured pull requests analysis available.\n" | |
report += f"\n## Synthesis and Recommendations\n{final_analysis}\n" | |
return report | |