import streamlit as st import requests import xmltodict import pandas as pd from datetime import datetime, timedelta import streamlit.components.v1 as components import plotly.express as px import time import plotly.io as pio import httpx from openai import OpenAI # plotly의 JSON 직렬화 엔진을 기본 json으로 설정 pio.json.config.default_engine = 'json' # 페이지 설정 st.set_page_config( page_title="우리집 날씨 정보", page_icon="🌤️", layout="wide", menu_items={ 'Get Help': None, 'Report a bug': None, 'About': None } ) # CSS 스타일 개선 st.markdown(""" """, unsafe_allow_html=True) def get_korean_weekday(date): weekday = date.strftime('%a') weekday_dict = { 'Mon': '월', 'Tue': '화', 'Wed': '수', 'Thu': '목', 'Fri': '금', 'Sat': '토', 'Sun': '일' } return weekday_dict[weekday] def check_network_status(): try: response = httpx.get("http://www.google.com", timeout=5) return response.status_code == 200 except httpx.RequestError: return False def check_api_status(): try: url = "http://openapi.seoul.go.kr:8088/77544e69764a414d363647424a655a/xml/citydata/1/5/신림역" response = requests.get(url, timeout=5) if response.status_code == 200: data = xmltodict.parse(response.text) if data.get('SeoulRtd.citydata', {}).get('RESULT', {}).get('MESSAGE') == "정상 처리되었습니다.": return True return False except: return False @st.cache_data(ttl=300) def get_weather_data(): url = "http://openapi.seoul.go.kr:8088/77544e69764a414d363647424a655a/xml/citydata/1/5/신림역" try: response = requests.get(url, timeout=30) response.raise_for_status() if response.text.strip(): # 응답이 비어있지 않은 경우에만 파싱 data = xmltodict.parse(response.text) result = data['SeoulRtd.citydata']['CITYDATA']['WEATHER_STTS']['WEATHER_STTS'] if result: return result except (requests.exceptions.Timeout, requests.exceptions.RequestException, Exception): pass return None def get_background_color(pm10_value): try: pm10 = float(pm10_value) if pm10 <= 30: return "#87CEEB" # 파랑 (좋음) elif pm10 <= 80: return "#90EE90" # 초록 (보통) elif pm10 <= 150: return "#FFD700" # 노랑 (나쁨) else: return "#FF6B6B" # 빨강 (매우 나쁨) except: return "#FFFFFF" # 기본 흰색 def get_current_sky_status(data): current_time = datetime.utcnow() + timedelta(hours=9) current_hour = current_time.hour forecast_data = data['FCST24HOURS']['FCST24HOURS'] if not isinstance(forecast_data, list): forecast_data = [forecast_data] closest_forecast = None min_time_diff = float('inf') for forecast in forecast_data: forecast_hour = int(forecast['FCST_DT'][8:10]) time_diff = abs(forecast_hour - current_hour) if time_diff < min_time_diff: min_time_diff = time_diff closest_forecast = forecast return closest_forecast['SKY_STTS'] if closest_forecast else "정보없음" def format_news_message(news_list): if not isinstance(news_list, list): news_list = [news_list] current_warnings = [] for news in news_list: if not isinstance(news, dict): continue warn_val = news.get('WARN_VAL', '') warn_stress = news.get('WARN_STRESS', '') command = news.get('COMMAND', '') warn_msg = news.get('WARN_MSG', '') announce_time = news.get('ANNOUNCE_TIME', '') if announce_time and len(announce_time) == 12: year = announce_time[0:4] month = announce_time[4:6] day = announce_time[6:8] hour = announce_time[8:10] minute = announce_time[10:12] formatted_time = f"({year}년{month}월{day}일{hour}시{minute}분)" else: formatted_time = "" if command == '해제': warning_text = f"✅ {warn_val}{warn_stress} 해제 {formatted_time} {warn_msg}" else: warning_text = f"⚠️ {warn_val}{warn_stress} 발령 {formatted_time} {warn_msg}" current_warnings.append(warning_text) return ' | '.join(current_warnings) def show_weather_info(data): st.markdown('
', unsafe_allow_html=True) # Add update time display using the last API call timestamp (already in KST) refresh_time = datetime.fromtimestamp(st.session_state.last_api_call) if st.session_state.last_api_call else (datetime.utcnow() + timedelta(hours=9)) st.markdown(f'''
Data refreshed at: {refresh_time.strftime('%Y-%m-%d %H:%M:%S')}
''', unsafe_allow_html=True) # Add this code to define formatted_date current_time = datetime.utcnow() + timedelta(hours=9) weekday = get_korean_weekday(current_time) formatted_date = f"{current_time.strftime('%Y-%m-%d')}({weekday})" pm10 = float(data['PM10']) if pm10 <= 30: dust_status = "좋음" dust_color = "#87CEEB" # Blue elif pm10 <= 80: dust_status = "보통" dust_color = "#90EE90" # Green elif pm10 <= 150: dust_status = "나쁨" dust_color = "#FFD700" # Yellow else: dust_status = "매우나쁨" dust_color = "#FF6B6B" # Red temp = data.get('TEMP', "정보없음") precip_type = data.get('PRECPT_TYPE', "정보없음") try: temp = f"{float(temp):.1f}°C" except: temp = "정보없음" # 현재 시간 기준으로 가장 가까운 06시 데이터 찾기 morning_six_data = None current_time = datetime.utcnow() + timedelta(hours=9) # KST forecast_data = data['FCST24HOURS']['FCST24HOURS'] if not isinstance(forecast_data, list): forecast_data = [forecast_data] for fcst in forecast_data: fcst_hour = int(fcst['FCST_DT'][8:10]) # HH if fcst_hour == 6: fcst_datetime = datetime.strptime(fcst['FCST_DT'], '%Y%m%d%H%M') if fcst_datetime > current_time: morning_six_data = fcst break # 06시 날씨 정보 준비 tomorrow_morning_weather = "없음" if morning_six_data: tomorrow_temp = morning_six_data['TEMP'] weather_icon = "" # PRECPT_TYPE 먼저 확인 precip_type = morning_six_data['PRECPT_TYPE'] if precip_type == "비" or precip_type == "비/눈": weather_icon = "☔" elif precip_type == "눈": weather_icon = '' # PRECPT_TYPE이 '없음'이면 SKY_STTS 기반으로 아이콘 설정 else: if morning_six_data['SKY_STTS'] == "맑음": weather_icon = "🌞" elif morning_six_data['SKY_STTS'] in ["구름", "구름많음"]: weather_icon = "⛅" elif morning_six_data['SKY_STTS'] == "흐림": weather_icon = '' tomorrow_morning_weather = f"{tomorrow_temp}°C {weather_icon}" # 화면에 표시 weather_icon = "" current_time_str = current_time.strftime('%Y%m%d%H') # Check current precipitation type first if data['PRECPT_TYPE'] in ["비", "눈", "비/눈", "빗방울"]: if data['PRECPT_TYPE'] in ["비", "빗방울"]: weather_icon = "☔" elif data['PRECPT_TYPE'] == "눈": weather_icon = '' elif data['PRECPT_TYPE'] == "비/눈": weather_icon = '☔' else: # Find nearest forecast time when no current precipitation nearest_forecast = None min_time_diff = float('inf') for forecast in forecast_data: forecast_time = datetime.strptime(forecast['FCST_DT'], '%Y%m%d%H%M') time_diff = abs((forecast_time - current_time).total_seconds()) if time_diff < min_time_diff: min_time_diff = time_diff nearest_forecast = forecast if nearest_forecast: if nearest_forecast['PRECPT_TYPE'] in ["비", "눈", "비/눈", "빗방울"]: if nearest_forecast['PRECPT_TYPE'] in ["비", "빗방울"]: weather_icon = "☔" elif nearest_forecast['PRECPT_TYPE'] == "눈": weather_icon = '' elif nearest_forecast['PRECPT_TYPE'] == "비/눈": weather_icon = '☔' else: # Use SKY_STTS when no precipitation sky_status = nearest_forecast['SKY_STTS'] if sky_status == "맑음": weather_icon = "🌞" elif sky_status in ["구름", "구름많음"]: weather_icon = "⛅" elif sky_status == "흐림": weather_icon = '' precip_mark = weather_icon st.markdown(f'''
{temp}{precip_mark}                 {tomorrow_morning_weather}
{formatted_date}
''', unsafe_allow_html=True) clock_html = """
""" components.html(clock_html, height=300) # 날씨 예보 생성 및 스크롤 컨테이너 표시 col1, col2, col3, col4 = st.columns([1, 1, 1, 2]) with col1: if st.button("날씨 예보 스크롤", key="toggle_scroll"): st.session_state.scroll_visible = not st.session_state.scroll_visible # 날씨 예보 생성 forecast_data = data['FCST24HOURS']['FCST24HOURS'] if not isinstance(forecast_data, list): forecast_data = [forecast_data] forecast_data_str = "\n".join([ f"[{f['FCST_DT'][:4]}년 {f['FCST_DT'][4:6]}월 {f['FCST_DT'][6:8]}일 {f['FCST_DT'][8:10]}시] {f['TEMP']}도, {f['SKY_STTS']}" for f in forecast_data ]) current_time = datetime.utcnow() + timedelta(hours=9) current_time_str = current_time.strftime('%H시 %M분') # 날씨 예보 텍스트 생성 st.session_state.weather_forecast = get_weather_forecast(forecast_data_str, current_time_str) # 스크롤 컨테이너 CSS background_color = get_background_color(data['PM10']) display_style = "block" if st.session_state.scroll_visible else "none" scroll_style = f""" background-color: rgba(255, 255, 255, 0.9); color: #333; display: {display_style}; position: fixed; bottom: 20px; left: 0; width: 100%; overflow: hidden; padding: 10px 0; z-index: 1000; """ text_style = """ white-space: nowrap; animation: scroll-text 30s linear infinite; display: inline-block; font-size: 2.5em; font-weight: bold; """ # 스크롤 컨테이너 표시 st.markdown(f'''
{st.session_state.weather_forecast}
''', unsafe_allow_html=True) with col2: st.button("시간대별 온도 보기", on_click=lambda: st.session_state.update({'current_section': 'temperature'})) # API 응답 체크 버튼 부분 수정 with col3: if st.button("API 응답 체크"): if check_api_status(): st.session_state.api_failed = False new_data = get_weather_data() if new_data: st.session_state.weather_data = new_data st.session_state.last_api_call = datetime.utcnow().timestamp() st.rerun() # session_state에 API 실패 시간 저장을 위한 변수 추가 if 'api_failed_time' not in st.session_state: st.session_state.api_failed_time = None with col4: network_ok = check_network_status() if not network_ok: status_color = "#FF0000" status_text = "네트워크 연결 없음" else: current_time = datetime.utcnow() + timedelta(hours=9) # KST if not st.session_state.api_failed: status_color = "#00AA00" st.session_state.api_status_time = current_time status_time = st.session_state.api_status_time.strftime('%Y-%m-%d %H:%M') status_text = f"API 정상({status_time} 성공)" else: status_color = "#FF0000" if st.session_state.api_status_time is None: st.session_state.api_status_time = current_time status_time = st.session_state.api_status_time.strftime('%Y-%m-%d %H:%M') status_text = f"API 응답 없음({status_time} 발생)" # API 상태 표시를 위한 고유한 클래스를 사용 st.markdown("""

%s

""" % (status_color, status_text), unsafe_allow_html=True) # forecast_data 처리 forecast_data = data['FCST24HOURS']['FCST24HOURS'] if not isinstance(forecast_data, list): forecast_data = [forecast_data] times = [] temps = [] weather_descriptions = [] for forecast in forecast_data: times.append(forecast['FCST_DT'][8:10] + "시") temps.append(float(forecast['TEMP'])) sky_status = forecast['SKY_STTS'] precip_type = forecast['PRECPT_TYPE'] if precip_type == "비": description = "비" elif precip_type == "눈": description = "눈" elif precip_type == "비/눈": description = "비/눈" elif sky_status == "맑음": description = "맑음" elif sky_status in ["구름", "구름많음"]: description = "구름" if sky_status == "구름" else "구름많음" elif sky_status == "흐림": description = "흐림" else: description = "정보없음" weather_descriptions.append(description) # 스크롤 컨테이너 표시 background_color = get_background_color(data['PM10']) display_style = "block" if st.session_state.scroll_visible else "none" scroll_style = f""" background-color: rgba(255, 255, 255, 0.9); color: #333; display: {display_style}; """ # 저장된 날씨 예보 표시 st.markdown(f'''
{st.session_state.weather_forecast}
''', unsafe_allow_html=True) st.markdown('
', unsafe_allow_html=True) def show_temperature_graph(data): st.markdown('
', unsafe_allow_html=True) st.markdown('

시간대별 온도

', unsafe_allow_html=True) forecast_data = data['FCST24HOURS']['FCST24HOURS'] if not isinstance(forecast_data, list): forecast_data = [forecast_data] # Sort forecast data by FCST_DT to ensure correct time ordering forecast_data = sorted(forecast_data, key=lambda x: x['FCST_DT']) # 현재 시간 기준으로 유효한 예보 데이터만 필터링 current_time = datetime.utcnow() + timedelta(hours=9) # KST current_date = current_time.strftime('%Y%m%d') next_date = (current_time + timedelta(days=1)).strftime('%Y%m%d') # 현재 시간 이후의 예보 데이터와 다음 날의 데이터 모두 포함 valid_forecast_data = [] for fcst in forecast_data: fcst_date = fcst['FCST_DT'][:8] # YYYYMMDD fcst_hour = int(fcst['FCST_DT'][8:10]) # HH current_hour = current_time.hour # 현재 날짜의 현재 시간 이후 데이터 또는 다음 날의 데이터 if (fcst_date == current_date and fcst_hour >= current_hour) or fcst_date == next_date: valid_forecast_data.append(fcst) # 유효한 데이터가 없으면 전체 데이터 사용 if not valid_forecast_data: valid_forecast_data = forecast_data # 현재 시각과 가장 가까운 예보 시간 찾기 current_time = datetime.utcnow() + timedelta(hours=9) # 녹색 세로선 추가 및 "현재" 텍스트 표시 - 이제 항상 첫 번째 데이터 포인트에 표시 time_differences = [] for fcst in valid_forecast_data: forecast_time = datetime.strptime(fcst['FCST_DT'], '%Y%m%d%H%M') time_diff = abs((forecast_time - current_time).total_seconds()) time_differences.append(time_diff) current_index = time_differences.index(min(time_differences)) # Reorder forecast data to start from current time valid_forecast_data = valid_forecast_data[current_index:] + valid_forecast_data[:current_index] times = [] temps = [] weather_icons = [] weather_descriptions = [] date_changes = [] for i, forecast in enumerate(valid_forecast_data): time_str = forecast['FCST_DT'] date = time_str[6:8] hour = time_str[8:10] if i > 0 and valid_forecast_data[i-1]['FCST_DT'][6:8] != date: date_changes.append(i) times.append(f"{hour}시") temps.append(float(forecast['TEMP'])) sky_status = forecast['SKY_STTS'] precip_type = forecast['PRECPT_TYPE'] if precip_type == "비": icon = "☔" description = "비" elif precip_type == "눈": icon = '' description = "눈" elif precip_type == "비/눈": icon = '☔' description = "비/눈" elif sky_status == "맑음": icon = "🌞" description = "맑음" elif sky_status in ["구름", "구름많음"]: icon = "⛅" description = "구름" if sky_status == "구름" else "구름
많음" elif sky_status == "흐림": icon = '' description = "흐림" else: icon = "☀️" description = "정보없음" weather_icons.append(icon) weather_descriptions.append(description) df = pd.DataFrame({ '시간': times, '기온': temps, '날씨': weather_icons, '설명': weather_descriptions, 'FCST_DT': [f['FCST_DT'] for f in valid_forecast_data] }) fig = px.line(df, x='시간', y='기온', markers=True) # Add nighttime overlay (18:00-06:00) for i in range(len(times)): hour = int(times[i].replace('시', '')) if hour >= 18 or hour < 6: fig.add_vrect( x0=times[i], x1=times[i+1] if i < len(times)-1 else times[-1], fillcolor='rgba(0, 0, 0, 0.1)', layer='below', line_width=0, annotation_text="", annotation_position="top left" ) # 녹색 세로선 추가 및 "현재" 텍스트 표시 fig.add_vline(x=times[0], line_width=2, line_dash="dash", line_color="green") fig.add_annotation( x=times[0], y=max(temps) + 4, text="현재", showarrow=True, arrowhead=2, ) bold_times = ["00시", "06시", "12시", "18시", "24시"] for time in bold_times: if time in times: index = times.index(time) fig.add_annotation( x=time, y=min(temps) - 3, text=time, showarrow=False, font=dict(size=30, color="black", family="Arial") ) fig.add_vline(x='12시', line_width=2, line_dash="dash", line_color="rgba(0,0,0,0.5)") # 오늘과 내일, 오전과 오후 텍스트는 해당 시간대의 데이터가 있을 때만 표시 time_set = set(times) current_date = datetime.utcnow() + timedelta(hours=9) # KST current_hour = current_date.hour if '11시' in time_set: fig.add_annotation(x='11시', y=max(temps) + 4, text="오전", showarrow=False, font=dict(size=24)) if '13시' in time_set: fig.add_annotation(x='13시', y=max(temps) + 4, text="오후", showarrow=False, font=dict(size=24)) # 시간 순서대로 정렬된 데이터라고 가정 for i, time in enumerate(times): hour = int(time.replace('시', '')) # 현재 시각이 23시이고, times[0]이 00시라면 첫 번째 23시가 오늘 23시 if hour == 23 and times[0] == '00시': if i == 0: # 첫 번째 23시 (오늘 23시) fig.add_annotation(x=time, y=max(temps) + 4, text="오늘", showarrow=False, font=dict(size=24)) # 01시는 다음 날이므로 "내일" 표시 (00시 다음에 오는 01시) if hour == 1 and i > 0 and times[i-1] == '00시': fig.add_annotation(x=time, y=max(temps) + 4, text="내일", showarrow=False, font=dict(size=24)) fig.update_traces( line_color='#FF6B6B', marker=dict(size=10, color='#FF6B6B'), textposition="top center", mode='lines+markers+text', text=[f"{int(round(temp))}°" for temp in df['기온']], textfont=dict(size=24) ) for i, (icon, description) in enumerate(zip(weather_icons, weather_descriptions)): fig.add_annotation( x=times[i], y=max(temps) + 3, text=f"{icon}", showarrow=False, font=dict(size=30) ) fig.add_annotation( x=times[i], y=max(temps) + 2, text=f"{description}", showarrow=False, font=dict(size=16), textangle=0 ) for date_change in date_changes: fig.add_vline( x=times[date_change], line_width=2, line_dash="dash", line_color="rgba(255, 0, 0, 0.7)" ) fig.update_layout( title=None, xaxis_title='', yaxis_title=None, #'기온 (°C)', height=600, width=7200, showlegend=False, plot_bgcolor='rgba(255,255,255,0.9)', paper_bgcolor='rgba(0,0,0,0)', margin=dict(l=50, r=50, t=0, b=0), xaxis=dict( tickangle=0, tickfont=dict(size=14), gridcolor='rgba(0,0,0,0.1)', dtick=1, tickmode='array', ticktext=[f"{i:02d}시" for i in range(24)], tickvals=[f"{i:02d}시" for i in range(24)] ), yaxis=dict( tickfont=dict(size=14), gridcolor='rgba(0,0,0,0.1)', showticklabels=True, tickformat='d', ticksuffix='°C', automargin=True, rangemode='tozero' ) ) st.plotly_chart(fig, use_container_width=True) # 날씨 예보 생성 및 표시 부분을 세션 상태로 관리 if 'weather_forecast' not in st.session_state: forecast_data_str = "\n".join([ f"[{f['FCST_DT'][:4]}년 {f['FCST_DT'][4:6]}월 {f['FCST_DT'][6:8]}일 {f['FCST_DT'][8:10]}시] {temp}도, {description}" for f, time, temp, description in zip(valid_forecast_data, times, temps, weather_descriptions) ]) current_time_str = current_time.strftime('%H시 %M분') st.session_state.weather_forecast = get_weather_forecast(forecast_data_str, current_time_str) # 저장된 날씨 예보 표시 st.markdown(f'''
{st.session_state.weather_forecast}
''', unsafe_allow_html=True) # 스크롤 텍스트 위에 버튼이 오도록 마진 추가 st.markdown('''
''', unsafe_allow_html=True) # 우리집 날씨 정보로 돌아가기 버튼 추가 st.button("우리집 날씨 정보로 돌아가기", on_click=lambda: st.session_state.update({'current_section': 'weather'})) st.markdown('
', unsafe_allow_html=True) @st.cache_data(ttl=300) # 5분 캐시 def get_weather_forecast(forecast_data_str, current_time_str): client = OpenAI( api_key="glhf_9ea0e0babe1e45353dd03b44cb979e22", base_url="https://glhf.chat/api/openai/v1", http_client=httpx.Client( follow_redirects=True, timeout=30.0 ) ) response = client.chat.completions.create( model="hf:Nexusflow/Athene-V2-Chat", messages=[ {"role": "system", "content": "당신은 날씨 예보관입니다. 주어진 시간대별 날씨 데이터를 바탕으로 정확한 날씨 예보를 생성해주세요."}, {"role": "user", "content": f"""현재 시각은 {current_time_str}입니다. 다음 FCST_DT의 시간대별 날씨 데이터를 보고 실제 날씨 상황에 맞는 정확한 날씨 예보를 200자의 자연스러운 문장으로 만들어주세요. 비나 눈 예보가 있는 경우에만 우산을 준비하도록 안내해주세요. 옷차림은 다음을 참고하세요. 27°C이상: 반팔티, 반바지, 민소매 23°C~26°C: 얇은 셔츠, 반팔티, 반바지, 면바지 20°C~22°C: 얇은 가디건, 긴팔티, 긴바지 17°C~19°C: 얇은 니트, 가디건, 맨투맨, 얇은 자켓, 긴바지 12°C~16°C: 자켓, 가디건, 야상, 맨투맨, 니트, 스타킹, 긴바지 9°C~11°C: 트렌치코트, 야상, 가죽 자켓, 스타킹, 긴바지 5°C~8°C: 코트, 히트텍, 니트, 긴바지 4°C이하: 패딩, 두꺼운 코트, 목도리, 기모제품 시간대별 날씨 데이터: {forecast_data_str}"""} ] ) return response.choices[0].message.content def main(): if 'api_status_time' not in st.session_state: st.session_state.api_status_time = None if 'current_section' not in st.session_state: st.session_state.current_section = 'weather' st.session_state.last_api_call = 0 st.session_state.weather_data = None st.session_state.api_failed = False st.session_state.scroll_visible = False st.session_state.weather_forecast = "" current_time = datetime.utcnow() + timedelta(hours=9) current_timestamp = current_time.timestamp() if 'last_api_call' not in st.session_state: st.session_state.last_api_call = 0 time_since_last_call = current_timestamp - st.session_state.last_api_call retry_interval = 60 if st.session_state.api_failed else 300 # API 실패시 1분, 정상시 5분 refresh_placeholder = st.empty() # 네트워크 상태 체크 및 데이터 갱신 if not st.session_state.weather_data or time_since_last_call >= retry_interval: if check_network_status(): try: new_data = get_weather_data() if new_data: st.session_state.weather_data = new_data st.session_state.last_api_call = current_timestamp st.session_state.api_failed = False pm10_value = new_data['PM10'] background_color = get_background_color(pm10_value) st.markdown(f""" """, unsafe_allow_html=True) st.rerun() else: st.session_state.api_failed = True st.session_state.api_status_time = current_time except Exception as e: st.session_state.api_failed = True st.session_state.api_status_time = current_time st.error(f"Failed to refresh data: {str(e)}") else: st.warning("현재 네트워크에 문제가 발생했습니다. 데이터 갱신이 불가능합니다.") data = st.session_state.weather_data if data: pm10_value = data['PM10'] background_color = get_background_color(pm10_value) st.markdown(f""" """, unsafe_allow_html=True) if st.session_state.current_section == 'weather': show_weather_info(data) else: show_temperature_graph(data) # 자동 새로고침을 위한 타이머 with refresh_placeholder: if time_since_last_call >= retry_interval: network_ok = check_network_status() if network_ok: try: new_data = get_weather_data() if new_data: st.session_state.api_failed = False st.session_state.weather_data = new_data st.session_state.last_api_call = current_timestamp st.rerun() else: st.session_state.api_failed = True st.session_state.api_status_time = current_time except: st.session_state.api_failed = True st.session_state.api_status_time = current_time time.sleep(60) st.rerun() if __name__ == "__main__": main()