client-mcp/src_old/client.ts
2025-06-11 10:42:40 +05:30

595 lines
21 KiB
TypeScript

import { Logger } from './utils/logger';
import {
ClientMCPConfig,
ApiResponse
} from './types';
import OpenAI from "openai";
import {
Tool as AnthropicTool
} from "@anthropic-ai/sdk/resources/messages/messages.mjs";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import { convertContext } from "./utils/convertContext";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
import { ChatChunk } from "./types";
/**
* Main client class for interacting with the Gemini MCP API
*/
export class ClientMCP {
private readonly config: Required<Pick<ClientMCPConfig, 'timeout' | 'debug'>> & {
apiKey: string,
model: string
};
private readonly logger: Logger;
private abortController: AbortController;
private readonly ai: OpenAI;
private mcp: Client;
private transport: StdioClientTransport | SSEClientTransport | null = null;
private tools: AnthropicTool[] = [];
public encoder = new TextEncoder();
public functionDeclarations: OpenAI.Chat.Completions.ChatCompletionTool[] = [];
public messages: OpenAI.Chat.Completions.ChatCompletionMessageParam[] = [];
public knowledgeBase: string[] = [];
/**
* Create a new MCP client instance
* @param config Configuration options
*/
constructor(config: ClientMCPConfig) {
if (!config.apiKey) {
throw new Error('API key is required');
}
this.config = {
timeout: config.timeout || 30000,
debug: config.debug || false,
apiKey: config.apiKey,
model: config.model || "gemini-2.0-flash",
};
const openai = new OpenAI({
apiKey: config.apiKey,
baseURL: config.baseUrl
});
this.ai = openai;
this.mcp = new Client({ name: "mcp-client-cli", version: "1.0.0" }, { capabilities: { tools: {} } });
this.encoder = new TextEncoder();
this.addToMessages(config.systemMessages || "You are a helpful assistant.", "system");
this.logger = Logger.getInstance(this.config.debug);
this.abortController = new AbortController();
}
/**
* chatCompletion
*/
public async chatCompletion({
messages,
reasoning_effort,
tools,
tool_choice
}: {
messages: OpenAI.Chat.Completions.ChatCompletionMessageParam[]
reasoning_effort?: "low" | "medium" | "high"
tools?: OpenAI.Chat.Completions.ChatCompletionCreateParams['tools'],
tool_choice?: OpenAI.Chat.Completions.ChatCompletionCreateParams['tool_choice']
}): Promise<ApiResponse<OpenAI.Chat.Completions.ChatCompletion & {
_request_id?: string | null;
}>> {
try {
const response = await this.ai.chat.completions.create({
model: this.config.model,
messages,
reasoning_effort: reasoning_effort as OpenAI.Chat.Completions.ChatCompletionCreateParams['reasoning_effort'],
tools,
tool_choice
})
return {
data: response
}
} catch (error) {
this.logger.error('Error in chatCompletion:', error);
throw error;
}
}
/**
* chatCompletionStream by yield
*/
public async *chatCompletionStream({
messages,
reasoning_effort,
tools,
tool_choice
}: {
messages: OpenAI.Chat.Completions.ChatCompletionMessageParam[]
reasoning_effort?: "low" | "medium" | "high"
tools?: OpenAI.Chat.Completions.ChatCompletionCreateParams['tools'],
tool_choice?: OpenAI.Chat.Completions.ChatCompletionCreateParams['tool_choice']
}): AsyncGenerator<OpenAI.Chat.Completions.ChatCompletionChunk> {
try {
const stream = await this.ai.chat.completions.create({
model: this.config.model,
messages,
reasoning_effort: reasoning_effort as OpenAI.Chat.Completions.ChatCompletionCreateParams['reasoning_effort'],
tools,
tool_choice,
stream: true
})
for await (const chunk of stream) {
yield chunk
}
} catch (error) {
this.logger.error('Error in chatCompletionStream:', error);
throw error;
}
}
/**
* Connect to mcp server
*/
public async connectToServer(serverScriptPathOrUrl: string | URL, session_id?: string): Promise<void> {
try {
if (typeof serverScriptPathOrUrl === 'string') {
const isJs = serverScriptPathOrUrl.endsWith(".js");
const isPy = serverScriptPathOrUrl.endsWith(".py");
if (!isJs && !isPy) {
throw new Error("Server script must be a .js or .py file");
}
const command = isPy
? process.platform === "win32"
? "python"
: "python3"
: process.execPath;
this.transport = new StdioClientTransport({
command,
args: [serverScriptPathOrUrl],
});
this.mcp.connect(this.transport);
} else {
// Create a new SSE transport
try {
const newTransport = new SSEClientTransport(
// new URL("/mcp", "http://localhost:3003"),
serverScriptPathOrUrl,
{
requestInit: {
headers: {
Accept: "text/event-stream",
'mcp-session-id': session_id || ""
}
}
}
);
// console.log("Trying SSE MCP...", newTransport)
this.transport = newTransport
await this.mcp.connect(newTransport);
} catch (error) {
this.logger.error("Error connecting to MCP server:", error);
throw error;
}
}
const toolsResult = await this.mcp.listTools();
this.tools = toolsResult.tools.map((tool) => {
this.logger.info("Tool:", JSON.stringify(tool, null, 2));
return {
name: tool.name,
description: tool.description,
input_schema: tool.inputSchema,
};
});
// log the name, description of the tools
this.logger.info("Tools:", this.tools.map((tool) => `${tool.name}: ${tool.description}`).join("\n"));
try {
const functionDeclarations = convertContext(this.tools);
this.functionDeclarations = functionDeclarations;
} catch (error) {
this.logger.error("Error converting tools to function declarations:", error);
}
} catch (e) {
this.logger.error("Failed to connect to MCP server: ", e);
throw e;
}
}
public async *chat(
content: string,
depth: number = 0,
maxDepth: number = 5
): AsyncGenerator<OpenAI.Chat.Completions.ChatCompletionChunk | ChatChunk> {
// Prevent infinite recursion
if (depth >= maxDepth) {
this.logger.error(`Maximum chat depth (${maxDepth}) reached. Stopping recursion.`);
let fullContent = '';
for await (const chunk of this.chatCompletionStream({
messages: this.messages.concat([
{
role: "user",
content: `You have reached the maximum chat depth (${maxDepth}). The main goal was: "${content}". \n` +
`By utilising all the available tools, AI gets all this conversation context and try to complete the main goal but if it fails to complete the main goal. \n` +
`if this content is enough to complete the main goal, then just return the result. or ask for help.`
}
]),
})) {
const content = chunk.choices[0]?.delta?.content || '';
if (content) {
fullContent += content;
yield chunk;
continue;
}
}
this.addToMessages(fullContent, "assistant");
return;
}
this.addToMessages(content, "user");
let fullContent = '';
for await (const chunk of this.chatCompletionStream({
messages: this.messages,
tools: this.functionDeclarations,
})) {
const content = chunk.choices[0]?.delta?.content || '';
if (content) {
fullContent += content;
yield chunk;
continue;
}
// Handle tool calls
const toolCalls = chunk.choices[0]?.delta?.tool_calls;
if (toolCalls?.length) {
// Process tool calls in parallel
const toolPromises = toolCalls.map(toolCall =>
this.handleToolCall(toolCall)
);
// Wait for all tool calls to complete
const toolResults = await Promise.all(toolPromises);
// Process and yield tool results
for (const result of toolResults) {
if (result) {
yield result;
// Add tool result to messages
this.addToMessages(result.choices[0].delta.content!, 'assistant');
// Get explanation for the tool result
// const explanation = await this.getToolExplanation(result.choices[0].delta.content!);
// if (explanation) {
// yield explanation;
// this.addToMessages(explanation.choices[0].delta.content!, 'assistant');
// }
}
}
}
}
// Add the complete assistant's response to messages
const trimmedContent = fullContent.trim();
if (trimmedContent) {
this.addToMessages(trimmedContent, 'assistant');
}
const isExpected = await this.isThisResultExpected();
this.logger.info("isExpected", JSON.stringify(isExpected, null, 2));
if (isExpected.nextUserMessage) {
// send it to *chat
for await (const nextChunk of this.chat(isExpected.nextUserMessage, depth + 1, maxDepth)) {
yield nextChunk;
}
} else {
// is there any need for the Summarize the conversation?
const isSummaryNeeded = await this.isSummaryNeeded();
if (isSummaryNeeded) {
// summarize the conversation
for await (const chunk of this.chatCompletionStream({
messages: this.messages.concat([
{
role: "user",
content: "Summarize the conversation"
}
]),
})) {
yield chunk;
}
}
}
}
private async handleToolCall(toolCall: OpenAI.Chat.Completions.ChatCompletionChunk.Choice.Delta.ToolCall) {
if (!toolCall.function?.name || !toolCall.function?.arguments) {
return null;
}
let toolsArguments: Record<string, unknown>;
try {
toolsArguments = JSON.parse(toolCall.function.arguments || "{}");
} catch (error) {
this.logger.error("Error parsing tool arguments:", error);
return null;
}
try {
const toolResult = await this.mcp.callTool({
name: toolCall.function.name,
arguments: toolsArguments
});
const toolResultString = `ToolCall: ${toolCall.function.name}\n` +
`Arguments: ${JSON.stringify(toolsArguments, null, 2)}\n` +
`Result: ${JSON.stringify(toolResult.content, null, 2)}`;
return {
choices: [{
delta: {
content: toolResultString,
tool_calls: [{
function: {
name: toolCall.function.name,
arguments: toolCall.function.arguments
}
}]
}
}]
};
} catch (error) {
this.logger.error(`Error executing tool ${toolCall.function.name}:`, error);
return {
choices: [{
delta: {
content: `Error executing tool ${toolCall.function.name}: ${error}`,
tool_calls: [{
function: {
name: toolCall.function.name,
arguments: toolCall.function.arguments
}
}]
}
}]
};
}
}
public async getToolExplanation(toolResult: string) {
try {
let explanation = '';
for await (const chunk of this.chatCompletionStream({
messages: [{
role: "user",
content: `Explain this tool result: ${toolResult}`
}],
})) {
const content = chunk.choices[0]?.delta?.content || '';
if (content) {
explanation += content;
}
}
return {
choices: [{
delta: {
content: `\nExplanation: ${explanation}\n`
}
}]
};
} catch (error) {
this.logger.error("Error getting tool explanation:", error);
return null;
}
}
public async isThisResultExpected(): Promise<{
expected: boolean;
nextUserMessage?: string;
}> {
let expected = false;
let nextUserMessage: string | undefined;
try {
const tools = [{
"type": "function",
"function": {
"name": "is_this_result_expected",
"description": "Check if the result is expected, or there is more to be done",
"parameters": {
"type": "object",
"properties": {
"expected": {
"type": "boolean",
"description": "Is the result expected?",
},
},
"required": ["expected"],
},
}
}] as OpenAI.Chat.Completions.ChatCompletionCreateParams['tools'];
let functionArguments = '';
// First check if the result is expected
for await (const chunk of this.chatCompletionStream({
messages: this.messages.concat([
{
role: "user",
content: `This is an internal call. You will return by is_this_result_expected function.` +
`is this result expected or there is more to be done by user for this conversation?`
}
]),
tools,
})) {
const toolCall = chunk.choices[0]?.delta?.tool_calls?.[0];
if (toolCall?.function?.name === "is_this_result_expected" && toolCall.function.arguments) {
functionArguments += toolCall.function.arguments;
}
}
// Parse the function arguments
try {
this.logger.info("functionArguments", functionArguments);
const args = JSON.parse(functionArguments || "{}");
if (args.expected === true) {
expected = true;
}
} catch (error) {
this.logger.error("Error parsing function arguments:", error);
expected = false;
}
if (expected === false) {
// If not expected, get the next user message
const nextMessageTools = [{
"type": "function",
"function": {
"name": "get_user_next_message",
"description": "Get the user's next message",
"parameters": {
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "The user's next message",
},
},
"required": ["message"],
},
},
}] as OpenAI.Chat.Completions.ChatCompletionCreateParams['tools'];
let nextMessageArgs = '';
for await (const chunk of this.chatCompletionStream({
messages: this.messages.concat([
{
role: "user",
content: `This is an internal call. You will return by get_user_next_message function.` +
`What would be the user's next message to get the expected result or to do more?`
}
]),
tools: nextMessageTools,
})) {
const toolCall = chunk.choices[0]?.delta?.tool_calls?.[0];
if (toolCall?.function?.name === "get_user_next_message" && toolCall.function.arguments) {
nextMessageArgs += toolCall.function.arguments;
}
}
// Parse the next message arguments
try {
const args = JSON.parse(nextMessageArgs || "{}");
nextUserMessage = args.message;
} catch (error) {
this.logger.error("Error parsing next message arguments:", error);
nextUserMessage = undefined;
}
}
return {
expected,
nextUserMessage
};
} catch (error) {
this.logger.error("Error in isThisResultExpected:", error);
return { expected: false, nextUserMessage: undefined };
}
}
private async isSummaryNeeded(): Promise<boolean> {
try {
const summaryNeededTools = [{
"type": "function",
"function": {
"name": "is_summary_needed",
"description": "Check if the summary is needed",
"parameters": {
"type": "object",
"properties": {
"summary_needed": {
"type": "boolean",
"description": "Is the summary needed?",
},
},
"required": ["summary_needed"],
},
},
}] as OpenAI.Chat.Completions.ChatCompletionCreateParams['tools'];
let summaryNeeded = false;
for await (const chunk of this.chatCompletionStream({
messages: this.messages.concat([
{
role: "user",
content: `This is an internal call. You will return by is_summary_needed function.` +
`Is there any need for the Summarize the conversation?`
}
]),
tools: summaryNeededTools,
})) {
const toolCall = chunk.choices[0]?.delta?.tool_calls?.[0];
if (toolCall?.function?.name === "is_summary_needed" && toolCall.function.arguments) {
try {
const args = JSON.parse(toolCall.function.arguments || "{}");
summaryNeeded = args.summary_needed;
} catch (error) {
this.logger.error("Error parsing summary needed arguments:", error);
summaryNeeded = false;
}
}
}
return summaryNeeded;
} catch (error) {
this.logger.error("Error in isSummaryNeeded:", error);
return false;
}
}
private addToMessages(content: string, role: "user" | "assistant" | "system") {
this.messages.push({
role,
content,
});
}
/**
* Cancel any ongoing requests
*/
public cancelRequests(): void {
this.abortController.abort();
this.abortController = new AbortController();
}
}