File size: 6,032 Bytes
2aa4e5a ca649b3 881d825 ca649b3 2aa4e5a 881d825 2aa4e5a ca649b3 892834b 2aa4e5a ca649b3 892834b 2aa4e5a 78891bd ca649b3 78891bd 2aa4e5a 892834b 2aa4e5a 78891bd ca649b3 2aa4e5a 410992d 2aa4e5a a5ce025 2aa4e5a 881d825 2aa4e5a 881d825 2aa4e5a a5ce025 2aa4e5a 881d825 a5ce025 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
import aiohttp
import asyncio
import random
import multiprocessing
from dateutil import parser
import os
import logging
from fastapi import FastAPI
import uvicorn
# Disable all logging except for critical errors
logging.basicConfig(level=logging.CRITICAL)
app = FastAPI()
@app.get("/")
async def root():
return {"status": "running"}
discord_webhook_url = os.environ['webhook']
api_batch_url = "https://epic-alligator-77.deno.dev/post"
asset_info_url = "https://economy.roproxy.com/v2/assets/"
assets_checked = multiprocessing.Value('i', 0)
valid_assets_found = multiprocessing.Value('i', 0)
last_used_ids = {}
base_ids = {}
async def send_to_discord(session, asset_id, name, creator_id, creator_name, creator_type, asset_type, created_date):
embed = {
"embeds": [
{
"url": f"https://www.roblox.com/library/{asset_id}",
"title": name or "Unknown Asset",
"thumbnail": {
"url": f"https://rbxgleaks.pythonanywhere.com/asset/{asset_id}",
},
"fields": [
{
"name": "Creator Type",
"value": creator_type or "Unknown",
"inline": True
},
{
"name": "Creator ID",
"value": creator_id or "Unknown",
"inline": True
}
],
"description": f"**Asset Type:** {asset_type}\n**ID:** ||{asset_id}||\n**Creator:** [{creator_name}](https://www.roblox.com/users/{creator_id}/profile)\n**Created:** {created_date}"
}
],
"content": "",
}
try:
async with session.post(discord_webhook_url, json=embed) as response:
await response.text()
except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionError):
pass
async def check_asset_batch(session, asset_ids):
global assets_checked, valid_assets_found
payload = [{"assetId": asset_id} for asset_id in asset_ids]
try:
async with session.post(api_batch_url, json=payload, headers={
'Content-Type': 'application/json',
'Requester': 'Client',
'User-Agent': 'Roblox/WinInetRobloxApp'
}) as response:
if response.status == 200:
results = await response.json()
if not results or (isinstance(results, list) and all("errors" in result for result in results)):
return
tasks = []
for asset_id in asset_ids:
if any(result.get("assetId") == asset_id and not result.get("IsCopyrightProtected", True) for result in results):
tasks.append(fetch_asset_info(session, asset_id))
await asyncio.gather(*tasks)
except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionError):
pass
async def fetch_asset_info(session, asset_id):
global assets_checked, valid_assets_found
try:
with assets_checked.get_lock():
assets_checked.value += 1
async with session.get(f"{asset_info_url}{asset_id}/details") as asset_info_response:
if asset_info_response.status == 200:
asset_info = await asset_info_response.json()
name = asset_info.get("Name", "Unknown")
asset_type = asset_info.get("AssetTypeId", "Unknown")
creator = asset_info.get("Creator", {})
creator_id = creator.get("Id", "Unknown")
creator_name = creator.get("Name", "Unknown")
creator_type = creator.get("CreatorType", "Unknown")
created_date_str = asset_info.get("Created", "Unknown")
created_date = parse_iso8601(created_date_str)
if created_date:
created_date_formatted = created_date.strftime("%Y-%m-%d %H:%M:%S")
else:
created_date_formatted = "Unknown"
with valid_assets_found.get_lock():
valid_assets_found.value += 1
await send_to_discord(session, asset_id, name, creator_id, creator_name, creator_type, asset_type, created_date_formatted)
except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionError):
pass
def parse_iso8601(date_str):
try:
return parser.isoparse(date_str)
except ValueError:
return None
def generate_base_id():
base = random.randint(70000000000000, 140000000000000)
return str(base)
def initialize_base_ids():
for digit in range(7, 15):
base_ids[digit] = generate_base_id()
last_used_ids[digit] = int(base_ids[digit])
def generate_ids_batch(digit, batch_size=10000):
last_used_id = last_used_ids[digit]
offset = random.randint(-1000000, 1000000)
ids_batch = [str(last_used_id + offset + i) for i in range(batch_size)]
last_used_ids[digit] += batch_size
return ids_batch
async def run_scanner_instance(digit):
while True:
try:
async with aiohttp.ClientSession() as session:
batch = generate_ids_batch(digit)
await check_asset_batch(session, batch)
except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionError):
await asyncio.sleep(1) # Add a small delay before retrying
async def print_status_periodically():
while True:
print("Working")
await asyncio.sleep(60)
async def start_scanner():
initialize_base_ids()
tasks = []
instances_per_digit = 20000
for i in range(instances_per_digit):
digit = 7 + (i % 8)
tasks.append(run_scanner_instance(digit))
tasks.append(print_status_periodically())
await asyncio.gather(*tasks)
@app.on_event("startup")
async def startup_event():
asyncio.create_task(start_scanner())
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860) |