import tensorflow as tf from flask import Flask, send_from_directory, jsonify import json from transformers import pipeline import nltk from nltk.corpus import stopwords import tweepy import os from datetime import datetime, timezone import re import logging import gunicorn.app.base import time from functools import wraps import pandas as pd import torch import random # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Initialize Flask app app = Flask(__name__, static_url_path='') # Download the stopwords corpus try: nltk.download('stopwords', quiet=True) stop = set(stopwords.words('english')) except Exception as e: logger.error(f"Error downloading stopwords: {str(e)}") stop = set() # Twitter API credentials TWITTER_BEARER_TOKEN = os.environ.get('TWITTER_BEARER_TOKEN') # Configure Twitter API client = tweepy.Client(bearer_token=TWITTER_BEARER_TOKEN) # Load sample tweets from CSV with error handling try: sample_tweets_df = pd.read_csv('train.csv') # Generate unique timestamps for each tweet within the last hour now = pd.Timestamp.now(tz=timezone.utc) sample_tweets_df['created_at'] = [ now - pd.Timedelta(minutes=random.randint(1, 60)) for _ in range(len(sample_tweets_df)) ] except Exception as e: logger.error(f"Error loading sample tweets: {str(e)}") sample_tweets_df = pd.DataFrame(columns=['id', 'text', 'created_at']) def clean_text(text): """Clean and preprocess the input text.""" try: text = str(text).lower() text = re.sub(r'https?://\S+|www\.\S+', '', text) text = re.sub(r'<.*?>', '', text) text = re.sub(r'[^a-zA-Z0-9\s]', '', text) text = re.sub(r'\s+', ' ', text).strip() words = [word for word in text.split() if word not in stop] return " ".join(words) except Exception as e: logger.error(f"Error in clean_text: {str(e)}") return text # Initialize models with error handling and caching models = {} def get_model(model_type): """Get or initialize a model with caching.""" if model_type not in models: try: if model_type == 'disaster': models[model_type] = pipeline( "text-classification", model="distilbert-base-uncased-finetuned-sst-2-english", device=-1 # Force CPU ) elif model_type == 'ner': models[model_type] = pipeline( "ner", model="dbmdz/bert-large-cased-finetuned-conll03-english", aggregation_strategy="simple", device=-1 # Force CPU ) elif model_type == 'sentiment': models[model_type] = pipeline( "sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english", device=-1 # Force CPU ) except Exception as e: logger.error(f"Error loading {model_type} model: {str(e)}") models[model_type] = None return models[model_type] def predict_disasters(texts): """Predict if texts are disaster-related.""" try: disaster_keywords = [ 'disaster', 'emergency', 'crisis', 'catastrophe', 'tragedy', 'earthquake', 'flood', 'hurricane', 'tornado', 'tsunami', 'explosion', 'fire', 'accident', 'collapse', 'crash', 'killed', 'died', 'dead', 'injured', 'trapped', 'evacuate', 'evacuation', 'rescue', 'damage', 'destroyed' ] predictions = [] for text in texts: text_lower = text.lower() if any(keyword in text_lower for keyword in disaster_keywords): predictions.append(True) else: classifier = get_model('disaster') if classifier: result = classifier(text)[0] predictions.append(result['label'] == 'NEGATIVE') else: predictions.append(False) return predictions except Exception as e: logger.error(f"Error in predict_disasters: {str(e)}") return [False] * len(texts) def extract_locations(text): """Extract location entities from text.""" try: ner_pipeline = get_model('ner') if ner_pipeline is None: return [] ner_results = ner_pipeline(text) locations = [entity['word'] for entity in ner_results if entity['entity_group'] == 'LOC'] return locations except Exception as e: logger.error(f"Error in extract_locations: {str(e)}") return [] def analyze_sentiment(text): """Analyze sentiment of text.""" try: sentiment_pipeline = get_model('sentiment') if sentiment_pipeline is None: return "Neutral" urgent_keywords = ['emergency', 'urgent', 'immediate', 'critical', 'severe', 'deadly'] if any(keyword in text.lower() for keyword in urgent_keywords): return "Urgent" sentiment_results = sentiment_pipeline(text) sentiment = sentiment_results[0]['label'] if sentiment == 'NEGATIVE': return "Urgent" elif sentiment == 'POSITIVE': return "Not Urgent" else: return "Neutral" except Exception as e: logger.error(f"Error in analyze_sentiment: {str(e)}") return "Neutral" def process_tweet(tweet): """Process a single tweet.""" try: text = tweet['text'] cleaned_text = clean_text(text) is_disaster = predict_disasters([cleaned_text])[0] locations = extract_locations(text) sentiment = analyze_sentiment(text) return { 'id': tweet['id'], 'text': text, 'isDisaster': bool(is_disaster), 'location': ', '.join(locations) if locations else 'Unknown', 'sentiment': sentiment, 'timestamp': tweet['created_at'].isoformat() } except Exception as e: logger.error(f"Error processing tweet: {str(e)}") return None class RateLimiter: def __init__(self, max_calls, period): self.max_calls = max_calls self.period = period self.calls = [] def __call__(self, f): @wraps(f) def wrapped(*args, **kwargs): now = time.time() self.calls = [call for call in self.calls if call > now - self.period] if len(self.calls) >= self.max_calls: sleep_time = self.period - (now - self.calls[0]) logger.warning(f"Rate limit reached. Sleeping for {sleep_time:.2f} seconds") time.sleep(sleep_time) self.calls.append(now) return f(*args, **kwargs) return wrapped # Implement rate limiting twitter_limiter = RateLimiter(max_calls=50, period=15*60) @twitter_limiter def fetch_tweets(): """Fetch sample tweets about disasters.""" try: sample_tweets = sample_tweets_df.sample(n=10).to_dict('records') processed_tweets = [] for tweet in sample_tweets: processed_tweet = process_tweet(tweet) if processed_tweet: processed_tweets.append(processed_tweet) return processed_tweets except Exception as e: logger.error(f"Error in fetch_tweets: {str(e)}") return [] # Flask routes @app.route('/') def root(): """Serve the main HTML page.""" return send_from_directory('.', 'index.html') @app.route('/index.css') def serve_css(): """Serve the CSS file.""" return send_from_directory('.', 'index.css') @app.route('/main.js') def serve_js(): """Serve the JavaScript file.""" return send_from_directory('.', 'main.js') @app.route('/api/predict') def get_predictions(): """API endpoint for predictions.""" try: tweets = fetch_tweets() return jsonify({'data': json.dumps(tweets)}) except Exception as e: logger.error(f"Error in get_predictions: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/health') def health_check(): """Health check endpoint.""" return jsonify({'status': 'healthy'}), 200 class StandaloneApplication(gunicorn.app.base.BaseApplication): def __init__(self, app, options=None): self.options = options or {} self.application = app super().__init__() def load_config(self): config = {key: value for key, value in self.options.items() if key in self.cfg.settings and value is not None} for key, value in config.items(): self.cfg.set(key.lower(), value) def load(self): return self.application if __name__ == "__main__": try: logger.info(f"TensorFlow version: {tf.__version__}") # Start the Flask app with Gunicorn options = { 'bind': '0.0.0.0:7860', 'workers': 1, 'timeout': 300, # Increased timeout 'max_requests': 1, 'max_requests_jitter': 5, 'preload_app': True, # Preload the application } StandaloneApplication(app, options).run() except Exception as e: logger.error(f"Error launching the application: {str(e)}")