Spaces:
Running
Running
File size: 4,925 Bytes
62d106f 0ce740f 62d106f 0ce740f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
"""
Common utilities.
"""
from asyncio import AbstractEventLoop
import oss2
import logging
import logging.handlers
import os
import platform
import sys
from typing import AsyncGenerator, Generator
import warnings
from pathlib import Path
import requests
from .constants import LOGDIR, LOG_SERVER_ADDR, SAVE_LOG
from .utils import save_log_str_on_log_server
handler = None
visited_loggers = set()
# Assuming LOGDIR and other necessary imports and global variables are defined
class APIHandler(logging.Handler):
"""Custom logging handler that sends logs to an API."""
def __init__(self, apiUrl, log_path, *args, **kwargs):
super(APIHandler, self).__init__(*args, **kwargs)
self.apiUrl = apiUrl
self.log_path = log_path
def emit(self, record):
log_entry = self.format(record)
try:
save_log_str_on_log_server(log_entry, self.log_path)
except requests.RequestException as e:
print(f"Error sending log to API: {e}", file=sys.stderr)
def build_logger(logger_name, logger_filename, add_remote_handler=False):
global handler
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Set the format of root handlers
if not logging.getLogger().handlers:
if sys.version_info[1] >= 9:
# This is for windows
logging.basicConfig(level=logging.INFO, encoding="utf-8")
else:
if platform.system() == "Windows":
warnings.warn(
"If you are running on Windows, "
"we recommend you use Python >= 3.9 for UTF-8 encoding."
)
logging.basicConfig(level=logging.INFO)
logging.getLogger().handlers[0].setFormatter(formatter)
# Redirect stdout and stderr to loggers
stdout_logger = logging.getLogger("stdout")
stdout_logger.setLevel(logging.INFO)
sl = StreamToLogger(stdout_logger, logging.INFO)
sys.stdout = sl
stderr_logger = logging.getLogger("stderr")
stderr_logger.setLevel(logging.ERROR)
sl = StreamToLogger(stderr_logger, logging.ERROR)
sys.stderr = sl
# Get logger
logger = logging.getLogger(logger_name)
logger.setLevel(logging.INFO)
if add_remote_handler:
# Add APIHandler to send logs to your API
api_url = f"{LOG_SERVER_ADDR}/{SAVE_LOG}"
remote_logger_filename = str(Path(logger_filename).stem + "_remote.log")
api_handler = APIHandler(apiUrl=api_url, log_path=f"{LOGDIR}/{remote_logger_filename}")
api_handler.setFormatter(formatter)
logger.addHandler(api_handler)
stdout_logger.addHandler(api_handler)
stderr_logger.addHandler(api_handler)
# if LOGDIR is empty, then don't try output log to local file
if LOGDIR != "":
os.makedirs(LOGDIR, exist_ok=True)
filename = os.path.join(LOGDIR, logger_filename)
handler = logging.handlers.TimedRotatingFileHandler(
filename, when="D", utc=True, encoding="utf-8"
)
handler.setFormatter(formatter)
for l in [stdout_logger, stderr_logger, logger]:
if l in visited_loggers:
continue
visited_loggers.add(l)
l.addHandler(handler)
return logger
class StreamToLogger(object):
"""
Fake file-like stream object that redirects writes to a logger instance.
"""
def __init__(self, logger, log_level=logging.INFO):
self.terminal = sys.stdout
self.logger = logger
self.log_level = log_level
self.linebuf = ""
def __getattr__(self, attr):
return getattr(self.terminal, attr)
def write(self, buf):
temp_linebuf = self.linebuf + buf
self.linebuf = ""
for line in temp_linebuf.splitlines(True):
# From the io.TextIOWrapper docs:
# On output, if newline is None, any '\n' characters written
# are translated to the system default line separator.
# By default sys.stdout.write() expects '\n' newlines and then
# translates them so this is still cross platform.
if line[-1] == "\n":
encoded_message = line.encode("utf-8", "ignore").decode("utf-8")
self.logger.log(self.log_level, encoded_message.rstrip())
else:
self.linebuf += line
def flush(self):
if self.linebuf != "":
encoded_message = self.linebuf.encode("utf-8", "ignore").decode("utf-8")
self.logger.log(self.log_level, encoded_message.rstrip())
self.linebuf = ""
def get_oss_bucket():
auth = oss2.Auth(os.getenv("OSS_ACCESS_KEY_ID"), os.getenv("OSS_ACCESS_KEY_SECRET"))
bucket = oss2.Bucket(auth, os.getenv("OSS_ENDPOINT"), os.getenv("OSS_BUCKET_NAME"))
return bucket |