from crewai import Task from agents import * from tools import * from pydantic import BaseModel from typing import Dict, List class PlacesOutput(BaseModel): results: List[Dict[str, str]] # def create_tasks(user_query): # task_analysis = Task( # description=f"Analyze the user query: '{user_query}' and determine what type of location information they are seeking.", # agent=agents.task_picker, # expected_output="A clear determination of what the user is asking for: their current location info, recommendations near them, or info about a specific named location." # ) # task_locate = Task( # description=f"Based on the user query: '{user_query}', determine the location to focus on. Either get the user's current location or identify the location mentioned in the query.", # agent=agents.location_finder, # expected_output="Coordinates (latitude,longitude) or a location name that will be used for subsequent tasks." # ) # task_gather_info = Task( # description=f"Using the location from the previous task, gather relevant information about places based on the user query: '{user_query}'", # agent=agents.place_researcher, # expected_output="Detailed information about relevant places including details like descriptions, ratings, hours, etc. in a csv format", # context=[task_analysis, task_locate], # ) # # task_analyze_data = Task( # # description=f"Analyze the gathered place information to extract patterns and insights relevant to the user query: '{user_query}'", # # agent=agents.data_analyzer, # # expected_output="Analysis of the place data highlighting key patterns, trends, or notable information.", # # context=[task_gather_info] # # ) # # task_provide_recommendations = Task( # # description=f"Based on all the information gathered and analyzed, provide personalized recommendations that address the user query: '{user_query}'", # # agent=agents.recommendations_expert, # # expected_output="Personalized recommendations and a comprehensive response to the user's query about locations.", # # context=[task_gather_info, task_analyze_data] # # ) # return [task_analysis, task_locate, task_gather_info] def search_surrounding_places() -> List[Dict[str, str]]: """Search for places surrounding the user's location and collect their details""" task_analysis = Task( description="Analyze the user query: {user_query} and determine what type of location information they are seeking. And also check if they have defined any radius or limits or queries", agent=task_picker, expected_output="A clear determination of what the user is asking for: their current location info, recommendations near them, or info about a specific named location." ) task_locate = Task( description="Based on the user query: {user_query}, determine the location to focus on. Either get the user's current location or identify the location mentioned in the query.", agent=location_finder, expected_output="Coordinates (latitude,longitude) or a location name that will be used for subsequent tasks." ) # Task to get user's location # get_location_task = Task( # description="Determine the user's current location by getting their coordinates.", # agent=location_finder, # expected_output="Latitude and longitude coordinates of the user's current location.", # context=[task_analysis] # ) # Task to search for surrounding places search_places_task = Task( description="Using the location from the previous task, gather relevant information about places based on the user query: {user_query}", agent=place_researcher, expected_output="JSON data of surrounding places including fsq_id, name, category, and other available information.", context=[task_locate] ) # Task to get detailed information about each place get_details_task = Task( description="For each place found, gather detailed information including name, phone, email, address, and distance.", agent=place_researcher, expected_output="Complete JSON data with detailed information about each place.", context=[search_places_task] ) # Task to process the data for Excel export process_data_task = Task( description="""Process the gathered data and prepare it for Excel export. You MUST return a valid JSON array containing objects with these fields: - name: Name of the business/place - description: A general description of the FSQ Place. Typically provided by the owner/claimant of the FSQ Place and/or updated by City Guide Superusers. - distance: Distance from user's location in meters (numeric value only) - location: An object containing none, some, or all of the following fields: : address : address_extended : locality : dma : region : postcode : country : admin_region : post_town : po_box : cross_street : formatted_address - tel: Phone number if available - email: Email if available (can be empty) - website: Website URL if available Format your response as a valid JSON array inside a code block. """, agent=data_processor, expected_output="A JSON array containing normalized place details ready for Excel export.", context=[get_details_task] ) return [task_analysis, task_locate, search_places_task, get_details_task, process_data_task]