LeadGen/backend/app.py
2025-06-06 21:22:18 +05:30

185 lines
6.4 KiB
Python

from flask import Flask, render_template, request, jsonify, send_file
from flask_cors import CORS
import pandas as pd
import json
import re
from typing import List, Dict, Any
from tasks import search_surrounding_places
from crewai import Crew
from agents import *
import os
import tempfile
import traceback
app = Flask(__name__)
CORS(app)
class JsonExtractor:
"""Helper class to extract JSON from agent output text"""
@staticmethod
def extract_json(text):
"""Extract JSON data from text that might contain markdown code blocks"""
# Try to find JSON in code blocks
json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', text)
if json_match:
json_str = json_match.group(1)
else:
# If no code blocks, try to find anything that looks like a JSON array or object
json_match = re.search(r'(\[[\s\S]*\]|\{[\s\S]*\})', text)
if json_match:
json_str = json_match.group(1)
else:
return None
try:
return json.loads(json_str)
except json.JSONDecodeError:
try:
# Sometimes the JSON might have trailing commas which are invalid
# Try to clean it up by removing trailing commas
cleaned_json = re.sub(r',\s*([}\]])', r'\1', json_str)
return json.loads(cleaned_json)
except:
return None
def export_to_excel(place_data: List[Dict[str, str]], filename: str = "surrounding_places.xlsx"):
"""Export the place data to an Excel file"""
if not place_data:
print("No place data to export")
return False
try:
df = pd.DataFrame(place_data)
# Reorder columns for better readability
column_order = ["name", "description", "location", "distance", "tel", "email", "website"]
available_columns = [col for col in column_order if col in df.columns]
# Add any columns that might be in the data but not in our order list
available_columns.extend([col for col in df.columns if col not in column_order])
df = df[available_columns]
df.to_excel(filename, index=False)
print(f"Successfully exported {len(place_data)} places to {filename}")
return True
except Exception as e:
print(f"Error exporting to Excel: {e}")
return False
def main(prompt):
tasks = search_surrounding_places()
crew = Crew(
agents=[location_finder, place_researcher, data_processor],
tasks=tasks,
# verbose=2
)
result = crew.kickoff(inputs={"user_query": prompt})
place_data = JsonExtractor.extract_json(result.raw)
# If we got valid data from the agent's output, return it
if place_data and isinstance(place_data, list) and len(place_data) > 0:
# Ensure the data is standardized
standardized_data = []
for place in place_data:
standardized_place = {
"name": place.get("name", ""),
"description": place.get("description", ""),
"fsq_id": place.get("fsq_id", ""),
"distance": place.get("distance", ""),
"location": place.get("location", ""),
"tel": place.get("tel", ""),
"email": place.get("email", ""),
"website": place.get("website", "")
}
standardized_data.append(standardized_place)
else:
place_data = []
if place_data:
# Generate a unique filename with timestamp
import datetime
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"surrounding_places_{timestamp}.xlsx"
# Create the file in a temp directory or app directory
filepath = os.path.join(tempfile.gettempdir(), filename)
success = export_to_excel(place_data, filepath)
if success:
return {
"success": True,
"message": f"Found {len(place_data)} places. File ready for download.",
"filename": filename,
"filepath": filepath,
"places_count": len(place_data)
}
else:
return {
"success": False,
"message": "Error occurred while exporting to Excel"
}
else:
return {
"success": False,
"message": "No places found or error occurred during search."
}
generated_files = {}
@app.route("/v1/generate", methods=["POST"])
def run_agent():
try:
data = request.get_json()
user_query = data.get('user_query', '')
# Remove JSON.stringify wrapping if present
if user_query.startswith('"') and user_query.endswith('"'):
user_query = json.loads(user_query)
print(f"Processing query: {user_query}")
# Call the main function
result = main(user_query)
# If successful, store the file path for download
if result.get('success') and 'filepath' in result:
file_id = result['filename'].replace('.xlsx', '')
generated_files[file_id] = result['filepath']
result['file_id'] = file_id
# Remove filepath from response (don't expose server paths)
del result['filepath']
return jsonify(result)
except Exception as e:
print(f"Error in run_agent: {str(e)}")
print(traceback.format_exc())
return jsonify({
"success": False,
"message": f"An error occurred: {str(e)}"
}), 500
@app.route("/v1/download/<file_id>", methods=["GET"])
def download_file(file_id):
try:
if file_id not in generated_files:
return jsonify({"error": "File not found"}), 404
filepath = generated_files[file_id]
if not os.path.exists(filepath):
return jsonify({"error": "File no longer exists"}), 404
return send_file(
filepath,
as_attachment=True,
download_name=f"{file_id}.xlsx",
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
except Exception as e:
print(f"Error in download_file: {str(e)}")
return jsonify({"error": "Download failed"}), 500
if __name__ == "__main__":
app.run(debug=True, port=3000, host="localhost")