pppppz / ccc.py
coollsd's picture
Update ccc.py
0987a64 verified
import aiohttp
import asyncio
import random
from dateutil import parser
import os
import logging
import time
# Disable all logging except for critical errors
logging.basicConfig(level=logging.CRITICAL)
discord_webhook_url = os.environ['webhook']
api_batch_url = "https://epic-alligator-77.deno.dev/post"
asset_info_url = "https://economy.roproxy.com/v2/assets/"
last_working_time = time.time()
async def send_to_discord(session, asset_id, name, creator_id, creator_name, creator_type, asset_type, created_date):
embed = {
"embeds": [
{
"url": f"https://www.roblox.com/library/{asset_id}",
"title": name or "Unknown Asset",
"thumbnail": {
"url": f"https://rbxgleaks.pythonanywhere.com/asset/{asset_id}",
},
"fields": [
{
"name": "Creator Type",
"value": creator_type or "Unknown",
"inline": True
},
{
"name": "Creator ID",
"value": creator_id or "Unknown",
"inline": True
}
],
"description": f"**Asset Type:** {asset_type}\n**ID:** ||{asset_id}||\n**Creator:** [{creator_name}](https://www.roblox.com/users/{creator_id}/profile)\n**Created:** {created_date}"
}
],
"content": "",
}
try:
async with session.post(discord_webhook_url, json=embed) as response:
await response.text()
except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionError):
pass
async def check_asset_batch(session, asset_ids):
payload = [{"assetId": asset_id} for asset_id in asset_ids]
try:
async with session.post(api_batch_url, json=payload, headers={
'Content-Type': 'application/json',
'Requester': 'Client',
'User-Agent': 'Roblox/WinInetRobloxApp'
}) as response:
if response.status == 200:
results = await response.json()
if not results or (isinstance(results, list) and all("errors" in result for result in results)):
return
tasks = []
for asset_id in asset_ids:
if any(result.get("assetId") == asset_id and not result.get("IsCopyrightProtected", True) for result in results):
tasks.append(fetch_asset_info(session, asset_id))
await asyncio.gather(*tasks)
except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionError):
pass
async def fetch_asset_info(session, asset_id):
try:
async with session.get(f"{asset_info_url}{asset_id}/details") as asset_info_response:
if asset_info_response.status == 200:
asset_info = await asset_info_response.json()
name = asset_info.get("Name", "Unknown")
asset_type = asset_info.get("AssetTypeId", "Unknown")
creator = asset_info.get("Creator", {})
creator_id = creator.get("Id", "Unknown")
creator_name = creator.get("Name", "Unknown")
creator_type = creator.get("CreatorType", "Unknown")
created_date_str = asset_info.get("Created", "Unknown")
created_date = parse_iso8601(created_date_str)
created_date_formatted = created_date.strftime("%Y-%m-%d %H:%M:%S") if created_date else "Unknown"
await send_to_discord(session, asset_id, name, creator_id, creator_name, creator_type, asset_type, created_date_formatted)
except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionError):
pass
def parse_iso8601(date_str):
try:
return parser.isoparse(date_str)
except ValueError:
return None
def generate_ids_batch(digit, batch_size=10000):
base = random.randint(70000000000000, 140000000000000)
for i in range(batch_size):
yield str(base + i)
async def run_scanner_instance(digit, semaphore):
async with aiohttp.ClientSession() as session:
while True:
try:
async with semaphore:
batch = list(generate_ids_batch(digit))
await check_asset_batch(session, batch)
except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionError):
await asyncio.sleep(1)
async def print_status_periodically():
global last_working_time
while True:
print("Working")
last_working_time = time.time()
await asyncio.sleep(60)
async def check_restart():
global last_working_time
while True:
if time.time() - last_working_time > 180: # 3 minutes
print("Restarting due to inactivity")
os._exit(1) # Force restart
await asyncio.sleep(10)
async def main():
semaphore = asyncio.Semaphore(100) # Limit concurrent tasks
tasks = []
for i in range(160000): # 20000 instances per digit * 8 digits
digit = 7 + (i % 8)
tasks.append(run_scanner_instance(digit, semaphore))
tasks.append(print_status_periodically())
tasks.append(check_restart())
await asyncio.gather(*tasks)
if __name__ == "__main__":
while True:
try:
asyncio.run(main())
except Exception as e:
print(f"Error occurred: {e}")
print("Restarting...")
time.sleep(5) # Wait 5 seconds before restarting