import aiohttp |
import asyncio |
import random |
from dateutil import parser |
import os |
import logging |
import time |
logging.basicConfig(level=logging.CRITICAL) |
discord_webhook_url = os.environ['webhook'] |
api_batch_url = "https://epic-alligator-77.deno.dev/post" |
asset_info_url = "https://economy.roproxy.com/v2/assets/" |
last_working_time = time.time() |
async def send_to_discord(session, asset_id, name, creator_id, creator_name, creator_type, asset_type, created_date): |
embed = { |
"embeds": [ |
{ |
"url": f"https://www.roblox.com/library/{asset_id}", |
"title": name or "Unknown Asset", |
"thumbnail": { |
"url": f"https://rbxgleaks.pythonanywhere.com/asset/{asset_id}", |
}, |
"fields": [ |
{ |
"name": "Creator Type", |
"value": creator_type or "Unknown", |
"inline": True |
}, |
{ |
"name": "Creator ID", |
"value": creator_id or "Unknown", |
"inline": True |
} |
], |
"description": f"**Asset Type:** {asset_type}\n**ID:** ||{asset_id}||\n**Creator:** [{creator_name}](https://www.roblox.com/users/{creator_id}/profile)\n**Created:** {created_date}" |
} |
], |
"content": "", |
} |
try: |
async with session.post(discord_webhook_url, json=embed) as response: |
await response.text() |
except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionError): |
pass |
async def check_asset_batch(session, asset_ids): |
payload = [{"assetId": asset_id} for asset_id in asset_ids] |
try: |
async with session.post(api_batch_url, json=payload, headers={ |
'Content-Type': 'application/json', |
'Requester': 'Client', |
'User-Agent': 'Roblox/WinInetRobloxApp' |
}) as response: |
if response.status == 200: |
results = await response.json() |
if not results or (isinstance(results, list) and all("errors" in result for result in results)): |
return |
tasks = [] |
for asset_id in asset_ids: |
if any(result.get("assetId") == asset_id and not result.get("IsCopyrightProtected", True) for result in results): |
tasks.append(fetch_asset_info(session, asset_id)) |
await asyncio.gather(*tasks) |
except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionError): |
pass |
async def fetch_asset_info(session, asset_id): |
try: |
async with session.get(f"{asset_info_url}{asset_id}/details") as asset_info_response: |
if asset_info_response.status == 200: |
asset_info = await asset_info_response.json() |
name = asset_info.get("Name", "Unknown") |
asset_type = asset_info.get("AssetTypeId", "Unknown") |
creator = asset_info.get("Creator", {}) |
creator_id = creator.get("Id", "Unknown") |
creator_name = creator.get("Name", "Unknown") |
creator_type = creator.get("CreatorType", "Unknown") |
created_date_str = asset_info.get("Created", "Unknown") |
created_date = parse_iso8601(created_date_str) |
created_date_formatted = created_date.strftime("%Y-%m-%d %H:%M:%S") if created_date else "Unknown" |
await send_to_discord(session, asset_id, name, creator_id, creator_name, creator_type, asset_type, created_date_formatted) |
except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionError): |
pass |
def parse_iso8601(date_str): |
try: |
return parser.isoparse(date_str) |
except ValueError: |
return None |
def generate_ids_batch(digit, batch_size=10000): |
base = random.randint(70000000000000, 140000000000000) |
for i in range(batch_size): |
yield str(base + i) |
async def run_scanner_instance(digit, semaphore): |
async with aiohttp.ClientSession() as session: |
while True: |
try: |
async with semaphore: |
batch = list(generate_ids_batch(digit)) |
await check_asset_batch(session, batch) |
except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionError): |
await asyncio.sleep(1) |
async def print_status_periodically(): |
global last_working_time |
while True: |
print("Working") |
last_working_time = time.time() |
await asyncio.sleep(60) |
async def check_restart(): |
global last_working_time |
while True: |
if time.time() - last_working_time > 180: |
print("Restarting due to inactivity") |
os._exit(1) |
await asyncio.sleep(10) |
async def main(): |
semaphore = asyncio.Semaphore(100) |
tasks = [] |
for i in range(160000): |
digit = 7 + (i % 8) |
tasks.append(run_scanner_instance(digit, semaphore)) |
tasks.append(print_status_periodically()) |
tasks.append(check_restart()) |
await asyncio.gather(*tasks) |
if __name__ == "__main__": |
while True: |
try: |
asyncio.run(main()) |
except Exception as e: |
print(f"Error occurred: {e}") |
print("Restarting...") |
time.sleep(5) |