Xianbao QIAN
use client side rendering for the homepage.
c80b461
raw
history blame
6.96 kB
import duckdb
import yaml
import time
import logging
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Create a DuckDB connection
con = duckdb.connect()
def execute_with_timing(query, description):
"""Execute a DuckDB query and log the execution time."""
start_time = time.perf_counter() # Use perf_counter for higher resolution timing
con.execute(query)
end_time = time.perf_counter() # End timing after the query execution
logging.info(f"Completed {description} in {end_time - start_time:.6f} seconds.")
# Start timing the total execution
total_start_time = time.perf_counter()
# Load parents.parquet into an in-memory table
load_parents_query = """
CREATE TABLE parents_in_memory AS
SELECT * FROM parquet_scan('public/parents.parquet')
"""
execute_with_timing(load_parents_query, "Loaded parents.parquet into RAM")
# Step 1: Assign a unique numerical ID to each model ID
unique_id_query = """
CREATE TABLE unique_ids AS
SELECT
id,
ROW_NUMBER() OVER () AS tmp_id
FROM parents_in_memory
"""
execute_with_timing(unique_id_query, "Step 1: Created unique_ids table")
# Step 2: Unnest base_models and create a temporary table
unnest_query = """
CREATE TABLE unnested_models AS
SELECT
u.tmp_id AS child_tmp_id, -- Numerical ID for the child model
UNNEST(p.base_models) AS base_model
FROM parents_in_memory p
JOIN unique_ids u ON p.id = u.id
WHERE p.base_models IS NOT NULL -- Filter out models without base models
"""
execute_with_timing(unnest_query, "Step 2: Created unnested_models table")
# Step 3: Create a temporary table for direct parent mapping using numerical IDs
parent_level_query = """
CREATE TABLE parent_level AS
SELECT
u.child_tmp_id, -- Numerical ID for the child model
b.tmp_id AS base_tmp_id -- Numerical ID for the base model (parent)
FROM unnested_models u
JOIN unique_ids b ON u.base_model = b.id
"""
execute_with_timing(parent_level_query, "Step 3: Created parent_level table")
# Step 4: Recursive CTE to find all ancestor-children mappings using numerical IDs
ancestor_children_query = """
CREATE TABLE ancestor_children AS
WITH RECURSIVE ancestor_children_cte AS (
SELECT
base_tmp_id AS ancestor_tmp_id, -- Start with direct parent as ancestor
child_tmp_id AS child_tmp_id, -- Direct child
1 AS depth -- Initialize depth counter
FROM parent_level
UNION ALL
SELECT
ac.ancestor_tmp_id, -- Propagate ancestor
pl.child_tmp_id, -- Find new child in the chain
ac.depth + 1 -- Increment depth counter
FROM parent_level pl
JOIN ancestor_children_cte ac ON pl.base_tmp_id = ac.child_tmp_id
WHERE ac.depth < 20 -- Limit recursion to 10 levels
)
SELECT
a.id AS ancestor,
LIST(DISTINCT c.id) AS all_children
FROM ancestor_children_cte ac
JOIN unique_ids a ON ac.ancestor_tmp_id = a.tmp_id
JOIN unique_ids c ON c.tmp_id = ac.child_tmp_id
GROUP BY ancestor
"""
execute_with_timing(ancestor_children_query, "Step 4: Created ancestor_children table with string IDs")
# Create a direct children mapping table
direct_children_mapping_query = """
CREATE TABLE direct_children_mapping AS
SELECT
p.id AS parent,
LIST(DISTINCT u.id) AS direct_children
FROM parents_in_memory p
LEFT JOIN unnested_models um ON p.id = um.base_model
LEFT JOIN unique_ids u ON um.child_tmp_id = u.tmp_id
GROUP BY p.id
"""
execute_with_timing(direct_children_mapping_query, "Created direct_children_mapping table")
# Write the final result to a parquet file, using direct_children_mapping for direct_children
start_time = time.perf_counter()
final_output_query = """
COPY (
SELECT
ac.ancestor as ancestor,
dcm.direct_children as direct_children,
ac.all_children as all_children,
CAST(array_length(ac.all_children) AS INTEGER) as all_children_count,
CAST(array_length(dcm.direct_children) AS INTEGER) as direct_children_count
FROM ancestor_children ac
LEFT JOIN direct_children_mapping dcm ON ac.ancestor = dcm.parent
ORDER BY all_children_count DESC
) TO 'public/ancestor_children.parquet' (FORMAT 'parquet')
"""
con.execute(final_output_query)
end_time = time.perf_counter()
logging.info(f"Written ancestor_children to parquet file in {end_time - start_time:.6f} seconds.")
# Write a random sample of 10 rows with non-empty children to yaml file for inspection
start_time = time.perf_counter()
sample_query = """
SELECT ac.ancestor, dcm.direct_children, ac.all_children
FROM ancestor_children ac
LEFT JOIN direct_children_mapping dcm ON ac.ancestor = dcm.parent
WHERE array_length(ac.all_children) > 0
LIMIT 10
"""
sample_data = con.execute(sample_query).fetchall()
with open("public/ancestor_children.example.yaml", "w") as f:
yaml.safe_dump(sample_data, f, default_flow_style=False)
end_time = time.perf_counter()
logging.info(f"Written sample data to YAML file in {end_time - start_time:.6f} seconds.")
# Write a random sample of 10 rows with no children (direct or indirect) to yaml file
start_time = time.perf_counter()
no_children_query = """
SELECT ac.ancestor, dcm.direct_children, ac.all_children
FROM ancestor_children ac
LEFT JOIN direct_children_mapping dcm ON ac.ancestor = dcm.parent
WHERE array_length(ac.all_children) = 0
LIMIT 10
"""
no_children_data = con.execute(no_children_query).fetchall()
end_time = time.perf_counter()
logging.info(f"Fetched sample data of models with no children in {end_time - start_time:.6f} seconds.")
logging.info("Examples of models with no children (direct or indirect):")
for model in no_children_data:
logging.info(model)
# List top 10 ancestors with the most children and their number of direct children
start_time = time.perf_counter()
top_ancestors_query = """
SELECT
ac.ancestor,
array_length(ac.all_children) AS num_all_children,
array_length(dcm.direct_children) AS num_direct_children
FROM ancestor_children ac
LEFT JOIN direct_children_mapping dcm ON ac.ancestor = dcm.parent
ORDER BY num_all_children DESC
LIMIT 10
"""
top_ancestors = con.execute(top_ancestors_query).fetchall()
end_time = time.perf_counter()
logging.info(f"Listed top 10 ancestors with the most children in {end_time - start_time:.6f} seconds.")
logging.info("Top 10 ancestors with the most children and their number of direct children:")
for ancestor in top_ancestors:
logging.info(ancestor)
# Log the total processing time
total_execution_time = time.perf_counter() - total_start_time
print(f"Total processing time: {total_execution_time:.6f} seconds")