import { Logger } from './utils/logger'; import { ClientMCPConfig, ApiResponse } from './types'; import OpenAI from "openai"; import { Tool as AnthropicTool } from "@anthropic-ai/sdk/resources/messages/messages.mjs"; import { Client } from "@modelcontextprotocol/sdk/client/index.js"; import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"; import { convertContext } from "./utils/convertContext"; import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"; import { ChatChunk } from "./types"; /** * Main client class for interacting with the Gemini MCP API */ export class ClientMCP { private readonly config: Required> & { apiKey: string, model: string }; private readonly logger: Logger; private abortController: AbortController; private readonly ai: OpenAI; private mcp: Client; private transport: StdioClientTransport | SSEClientTransport | null = null; private tools: AnthropicTool[] = []; public encoder = new TextEncoder(); public functionDeclarations: OpenAI.Chat.Completions.ChatCompletionTool[] = []; public messages: OpenAI.Chat.Completions.ChatCompletionMessageParam[] = []; public knowledgeBase: string[] = []; /** * Create a new MCP client instance * @param config Configuration options */ constructor(config: ClientMCPConfig) { if (!config.apiKey) { throw new Error('API key is required'); } this.config = { timeout: config.timeout || 30000, debug: config.debug || false, apiKey: config.apiKey, model: config.model || "gemini-2.0-flash", }; const openai = new OpenAI({ apiKey: config.apiKey, baseURL: config.baseUrl }); this.ai = openai; this.mcp = new Client({ name: "mcp-client-cli", version: "1.0.0" }, { capabilities: { tools: {} } }); this.encoder = new TextEncoder(); this.addToMessages(config.systemMessages || "You are a helpful assistant.", "system"); this.logger = Logger.getInstance(this.config.debug); this.abortController = new AbortController(); } /** * chatCompletion */ public async chatCompletion({ messages, reasoning_effort, tools, tool_choice }: { messages: OpenAI.Chat.Completions.ChatCompletionMessageParam[] reasoning_effort?: "low" | "medium" | "high" tools?: OpenAI.Chat.Completions.ChatCompletionCreateParams['tools'], tool_choice?: OpenAI.Chat.Completions.ChatCompletionCreateParams['tool_choice'] }): Promise> { try { const response = await this.ai.chat.completions.create({ model: this.config.model, messages, reasoning_effort: reasoning_effort as OpenAI.Chat.Completions.ChatCompletionCreateParams['reasoning_effort'], tools, tool_choice }) return { data: response } } catch (error) { this.logger.error('Error in chatCompletion:', error); throw error; } } /** * chatCompletionStream by yield */ public async *chatCompletionStream({ messages, reasoning_effort, tools, tool_choice }: { messages: OpenAI.Chat.Completions.ChatCompletionMessageParam[] reasoning_effort?: "low" | "medium" | "high" tools?: OpenAI.Chat.Completions.ChatCompletionCreateParams['tools'], tool_choice?: OpenAI.Chat.Completions.ChatCompletionCreateParams['tool_choice'] }): AsyncGenerator { try { const stream = await this.ai.chat.completions.create({ model: this.config.model, messages, reasoning_effort: reasoning_effort as OpenAI.Chat.Completions.ChatCompletionCreateParams['reasoning_effort'], tools, tool_choice, stream: true }) for await (const chunk of stream) { yield chunk } } catch (error) { this.logger.error('Error in chatCompletionStream:', error); throw error; } } /** * Connect to mcp server */ public async connectToServer(serverScriptPathOrUrl: string | URL, session_id?: string): Promise { try { if (typeof serverScriptPathOrUrl === 'string') { const isJs = serverScriptPathOrUrl.endsWith(".js"); const isPy = serverScriptPathOrUrl.endsWith(".py"); if (!isJs && !isPy) { throw new Error("Server script must be a .js or .py file"); } const command = isPy ? process.platform === "win32" ? "python" : "python3" : process.execPath; this.transport = new StdioClientTransport({ command, args: [serverScriptPathOrUrl], }); this.mcp.connect(this.transport); } else { // Create a new SSE transport try { const newTransport = new SSEClientTransport( // new URL("/mcp", "http://localhost:3003"), serverScriptPathOrUrl, { requestInit: { headers: { Accept: "text/event-stream", 'mcp-session-id': session_id || "" } } } ); // console.log("Trying SSE MCP...", newTransport) this.transport = newTransport await this.mcp.connect(newTransport); } catch (error) { this.logger.error("Error connecting to MCP server:", error); throw error; } } const toolsResult = await this.mcp.listTools(); this.tools = toolsResult.tools.map((tool) => { this.logger.info("Tool:", JSON.stringify(tool, null, 2)); return { name: tool.name, description: tool.description, input_schema: tool.inputSchema, }; }); // log the name, description of the tools this.logger.info("Tools:", this.tools.map((tool) => `${tool.name}: ${tool.description}`).join("\n")); try { const functionDeclarations = convertContext(this.tools); this.functionDeclarations = functionDeclarations; } catch (error) { this.logger.error("Error converting tools to function declarations:", error); } } catch (e) { this.logger.error("Failed to connect to MCP server: ", e); throw e; } } public async *chat( content: string, depth: number = 0, maxDepth: number = 5 ): AsyncGenerator { // Prevent infinite recursion if (depth >= maxDepth) { this.logger.error(`Maximum chat depth (${maxDepth}) reached. Stopping recursion.`); let fullContent = ''; for await (const chunk of this.chatCompletionStream({ messages: this.messages.concat([ { role: "user", content: `You have reached the maximum chat depth (${maxDepth}). The main goal was: "${content}". \n` + `By utilising all the available tools, AI gets all this conversation context and try to complete the main goal but if it fails to complete the main goal. \n` + `if this content is enough to complete the main goal, then just return the result. or ask for help.` } ]), })) { const content = chunk.choices[0]?.delta?.content || ''; if (content) { fullContent += content; yield chunk; continue; } } this.addToMessages(fullContent, "assistant"); return; } this.addToMessages(content, "user"); let fullContent = ''; for await (const chunk of this.chatCompletionStream({ messages: this.messages, tools: this.functionDeclarations, })) { const content = chunk.choices[0]?.delta?.content || ''; if (content) { fullContent += content; yield chunk; continue; } // Handle tool calls const toolCalls = chunk.choices[0]?.delta?.tool_calls; if (toolCalls?.length) { // Process tool calls in parallel const toolPromises = toolCalls.map(toolCall => this.handleToolCall(toolCall) ); // Wait for all tool calls to complete const toolResults = await Promise.all(toolPromises); // Process and yield tool results for (const result of toolResults) { if (result) { yield result; // Add tool result to messages this.addToMessages(result.choices[0].delta.content!, 'assistant'); // Get explanation for the tool result // const explanation = await this.getToolExplanation(result.choices[0].delta.content!); // if (explanation) { // yield explanation; // this.addToMessages(explanation.choices[0].delta.content!, 'assistant'); // } } } } } // Add the complete assistant's response to messages const trimmedContent = fullContent.trim(); if (trimmedContent) { this.addToMessages(trimmedContent, 'assistant'); } const isExpected = await this.isThisResultExpected(); this.logger.info("isExpected", JSON.stringify(isExpected, null, 2)); if (isExpected.nextUserMessage) { // send it to *chat for await (const nextChunk of this.chat(isExpected.nextUserMessage, depth + 1, maxDepth)) { yield nextChunk; } } else { // is there any need for the Summarize the conversation? const isSummaryNeeded = await this.isSummaryNeeded(); if (isSummaryNeeded) { // summarize the conversation for await (const chunk of this.chatCompletionStream({ messages: this.messages.concat([ { role: "user", content: "Summarize the conversation" } ]), })) { yield chunk; } } } } private async handleToolCall(toolCall: OpenAI.Chat.Completions.ChatCompletionChunk.Choice.Delta.ToolCall) { if (!toolCall.function?.name || !toolCall.function?.arguments) { return null; } let toolsArguments: Record; try { toolsArguments = JSON.parse(toolCall.function.arguments || "{}"); } catch (error) { this.logger.error("Error parsing tool arguments:", error); return null; } try { const toolResult = await this.mcp.callTool({ name: toolCall.function.name, arguments: toolsArguments }); const toolResultString = `ToolCall: ${toolCall.function.name}\n` + `Arguments: ${JSON.stringify(toolsArguments, null, 2)}\n` + `Result: ${JSON.stringify(toolResult.content, null, 2)}`; return { choices: [{ delta: { content: toolResultString, tool_calls: [{ function: { name: toolCall.function.name, arguments: toolCall.function.arguments } }] } }] }; } catch (error) { this.logger.error(`Error executing tool ${toolCall.function.name}:`, error); return { choices: [{ delta: { content: `Error executing tool ${toolCall.function.name}: ${error}`, tool_calls: [{ function: { name: toolCall.function.name, arguments: toolCall.function.arguments } }] } }] }; } } public async getToolExplanation(toolResult: string) { try { let explanation = ''; for await (const chunk of this.chatCompletionStream({ messages: [{ role: "user", content: `Explain this tool result: ${toolResult}` }], })) { const content = chunk.choices[0]?.delta?.content || ''; if (content) { explanation += content; } } return { choices: [{ delta: { content: `\nExplanation: ${explanation}\n` } }] }; } catch (error) { this.logger.error("Error getting tool explanation:", error); return null; } } public async isThisResultExpected(): Promise<{ expected: boolean; nextUserMessage?: string; }> { let expected = false; let nextUserMessage: string | undefined; try { const tools = [{ "type": "function", "function": { "name": "is_this_result_expected", "description": "Check if the result is expected, or there is more to be done", "parameters": { "type": "object", "properties": { "expected": { "type": "boolean", "description": "Is the result expected?", }, }, "required": ["expected"], }, } }] as OpenAI.Chat.Completions.ChatCompletionCreateParams['tools']; let functionArguments = ''; // First check if the result is expected for await (const chunk of this.chatCompletionStream({ messages: this.messages.concat([ { role: "user", content: `This is an internal call. You will return by is_this_result_expected function.` + `is this result expected or there is more to be done by user for this conversation?` } ]), tools, })) { const toolCall = chunk.choices[0]?.delta?.tool_calls?.[0]; if (toolCall?.function?.name === "is_this_result_expected" && toolCall.function.arguments) { functionArguments += toolCall.function.arguments; } } // Parse the function arguments try { this.logger.info("functionArguments", functionArguments); const args = JSON.parse(functionArguments || "{}"); if (args.expected === true) { expected = true; } } catch (error) { this.logger.error("Error parsing function arguments:", error); expected = false; } if (expected === false) { // If not expected, get the next user message const nextMessageTools = [{ "type": "function", "function": { "name": "get_user_next_message", "description": "Get the user's next message", "parameters": { "type": "object", "properties": { "message": { "type": "string", "description": "The user's next message", }, }, "required": ["message"], }, }, }] as OpenAI.Chat.Completions.ChatCompletionCreateParams['tools']; let nextMessageArgs = ''; for await (const chunk of this.chatCompletionStream({ messages: this.messages.concat([ { role: "user", content: `This is an internal call. You will return by get_user_next_message function.` + `What would be the user's next message to get the expected result or to do more?` } ]), tools: nextMessageTools, })) { const toolCall = chunk.choices[0]?.delta?.tool_calls?.[0]; if (toolCall?.function?.name === "get_user_next_message" && toolCall.function.arguments) { nextMessageArgs += toolCall.function.arguments; } } // Parse the next message arguments try { const args = JSON.parse(nextMessageArgs || "{}"); nextUserMessage = args.message; } catch (error) { this.logger.error("Error parsing next message arguments:", error); nextUserMessage = undefined; } } return { expected, nextUserMessage }; } catch (error) { this.logger.error("Error in isThisResultExpected:", error); return { expected: false, nextUserMessage: undefined }; } } private async isSummaryNeeded(): Promise { try { const summaryNeededTools = [{ "type": "function", "function": { "name": "is_summary_needed", "description": "Check if the summary is needed", "parameters": { "type": "object", "properties": { "summary_needed": { "type": "boolean", "description": "Is the summary needed?", }, }, "required": ["summary_needed"], }, }, }] as OpenAI.Chat.Completions.ChatCompletionCreateParams['tools']; let summaryNeeded = false; for await (const chunk of this.chatCompletionStream({ messages: this.messages.concat([ { role: "user", content: `This is an internal call. You will return by is_summary_needed function.` + `Is there any need for the Summarize the conversation?` } ]), tools: summaryNeededTools, })) { const toolCall = chunk.choices[0]?.delta?.tool_calls?.[0]; if (toolCall?.function?.name === "is_summary_needed" && toolCall.function.arguments) { try { const args = JSON.parse(toolCall.function.arguments || "{}"); summaryNeeded = args.summary_needed; } catch (error) { this.logger.error("Error parsing summary needed arguments:", error); summaryNeeded = false; } } } return summaryNeeded; } catch (error) { this.logger.error("Error in isSummaryNeeded:", error); return false; } } private addToMessages(content: string, role: "user" | "assistant" | "system") { this.messages.push({ role, content, }); } /** * Cancel any ongoing requests */ public cancelRequests(): void { this.abortController.abort(); this.abortController = new AbortController(); } }