import aiohttp import asyncio import random import multiprocessing from dateutil import parser import os import logging from fastapi import FastAPI import uvicorn from contextlib import asynccontextmanager # Disable all logging except for critical errors logging.basicConfig(level=logging.CRITICAL) @asynccontextmanager async def lifespan(app: FastAPI): # Startup initialize_base_ids() asyncio.create_task(start_scanner()) yield # Shutdown # Add any cleanup code here if needed app = FastAPI(lifespan=lifespan) @app.get("/") async def root(): return {"status": "running"} discord_webhook_url = os.environ['webhook'] api_batch_url = "https://epic-alligator-77.deno.dev/post" asset_info_url = "https://economy.roproxy.com/v2/assets/" assets_checked = multiprocessing.Value('i', 0) valid_assets_found = multiprocessing.Value('i', 0) last_used_ids = {} base_ids = {} async def send_to_discord(session, asset_id, name, creator_id, creator_name, creator_type, asset_type, created_date): embed = { "embeds": [ { "url": f"https://www.roblox.com/library/{asset_id}", "title": name or "Unknown Asset", "thumbnail": { "url": f"https://rbxgleaks.pythonanywhere.com/asset/{asset_id}", }, "fields": [ { "name": "Creator Type", "value": creator_type or "Unknown", "inline": True }, { "name": "Creator ID", "value": creator_id or "Unknown", "inline": True } ], "description": f"**Asset Type:** {asset_type}\n**ID:** ||{asset_id}||\n**Creator:** [{creator_name}](https://www.roblox.com/users/{creator_id}/profile)\n**Created:** {created_date}" } ], "content": "", } try: async with session.post(discord_webhook_url, json=embed) as response: await response.text() except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionError): pass async def check_asset_batch(session, asset_ids): global assets_checked, valid_assets_found payload = [{"assetId": asset_id} for asset_id in asset_ids] try: async with session.post(api_batch_url, json=payload, headers={ 'Content-Type': 'application/json', 'Requester': 'Client', 'User-Agent': 'Roblox/WinInetRobloxApp' }) as response: if response.status == 200: results = await response.json() if not results or (isinstance(results, list) and all("errors" in result for result in results)): return tasks = [] for asset_id in asset_ids: if any(result.get("assetId") == asset_id and not result.get("IsCopyrightProtected", True) for result in results): tasks.append(fetch_asset_info(session, asset_id)) await asyncio.gather(*tasks) except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionError): pass async def fetch_asset_info(session, asset_id): global assets_checked, valid_assets_found try: with assets_checked.get_lock(): assets_checked.value += 1 async with session.get(f"{asset_info_url}{asset_id}/details") as asset_info_response: if asset_info_response.status == 200: asset_info = await asset_info_response.json() name = asset_info.get("Name", "Unknown") asset_type = asset_info.get("AssetTypeId", "Unknown") creator = asset_info.get("Creator", {}) creator_id = creator.get("Id", "Unknown") creator_name = creator.get("Name", "Unknown") creator_type = creator.get("CreatorType", "Unknown") created_date_str = asset_info.get("Created", "Unknown") created_date = parse_iso8601(created_date_str) if created_date: created_date_formatted = created_date.strftime("%Y-%m-%d %H:%M:%S") else: created_date_formatted = "Unknown" with valid_assets_found.get_lock(): valid_assets_found.value += 1 await send_to_discord(session, asset_id, name, creator_id, creator_name, creator_type, asset_type, created_date_formatted) except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionError): pass def parse_iso8601(date_str): try: return parser.isoparse(date_str) except ValueError: return None def generate_base_id(): base = random.randint(70000000000000, 140000000000000) return str(base) def initialize_base_ids(): for digit in range(7, 15): base_ids[digit] = generate_base_id() last_used_ids[digit] = int(base_ids[digit]) def generate_ids_batch(digit, batch_size=10000): last_used_id = last_used_ids[digit] offset = random.randint(-1000000, 1000000) ids_batch = [str(last_used_id + offset + i) for i in range(batch_size)] last_used_ids[digit] += batch_size return ids_batch async def run_scanner_instance(digit): while True: try: async with aiohttp.ClientSession() as session: batch = generate_ids_batch(digit) await check_asset_batch(session, batch) except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionError): await asyncio.sleep(1) # Add a small delay before retrying async def print_status_periodically(): while True: print("Working") await asyncio.sleep(60) async def start_scanner(): tasks = [] instances_per_digit = 20000 for i in range(instances_per_digit): digit = 7 + (i % 8) tasks.append(run_scanner_instance(digit)) tasks.append(print_status_periodically()) await asyncio.gather(*tasks) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)