import ollama from mcp.server.fastmcp import FastMCP import os import json import logging import time from concurrent.futures import ThreadPoolExecutor, as_completed logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') _CONFIG = None mcp = FastMCP("code_parser") def get_config(): """获取配置并缓存""" global _CONFIG if _CONFIG is None: try: script_dir = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(script_dir, '..', 'config.json') with open(config_path, 'r', encoding='utf-8') as f: _CONFIG = json.load(f) logging.info("配置加载成功") except Exception as e: logging.error(f"配置文件加载失败: {str(e)}") raise return _CONFIG def llm_parse_code(code_path, code_content): """使用OLLAMA处理代码注释翻译和添加""" config = get_config() client = ollama.Client(host=config['OLLAMA_URL']) prompt = f''' 请严格遵循以下要求处理代码: 1. 仅添加中文注释,不要修改任何原始代码逻辑、格式和变量名 2. 对于已有的英文注释,请将其准确地翻译成中文 3. 请保留原代码的格式和结构 4. 最后输出完整且可运行的代码(不要使用markdown格式) 文件路径:{code_path} 代码内容: {code_content} ''' max_retries = 3 for attempt in range(max_retries): try: response = "" chat_stream = client.generate( model=config['OLLAMA_MODEL'], prompt=prompt, stream=True, think=False ) for chunk in chat_stream: content = chunk.get('response', '') response += content if response.strip(): return response logging.warning(f"LLM响应可能无效,重试中... ({attempt+1}/{max_retries})") except Exception as e: logging.error(f"LLM请求失败: {str(e)},尝试重新连接...") time.sleep(2 ** attempt) logging.error("LLM处理失败,返回原始代码") return code_content def detect_and_read_file(file_path): """尝试多种编码方式读取文件内容""" encodings = ['utf-8', 'gbk', 'latin-1', 'cp1252', 'iso-8859-1'] for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() return content except UnicodeDecodeError: continue try: with open(file_path, 'rb') as f: raw_data = f.read() return raw_data.decode('utf-8', errors='ignore') except Exception as e: logging.error(f"无法读取文件 {file_path}: {str(e)}") return None @mcp.tool() def parse_code(project_path, excluded_items_path=None, save_path=None): """为指定目录下的代码文件添加中文注释。 Args: project_path: 项目根目录路径 excluded_items_path: 需要排除的文件或目录列表(可选) save_path: 解析后文件保存路径(可选,默认为项目根目录下的 'parsed_code' 目录) """ project_path = os.path.abspath(project_path) if save_path is None: save_path = os.path.join(project_path, 'parsed_code') else: save_path = os.path.abspath(save_path) os.makedirs(save_path, exist_ok=True) abs_save_path = os.path.abspath(save_path) excluded_abs_paths = set() if excluded_items_path is not None: if isinstance(excluded_items_path, str): with open(excluded_items_path, 'r', encoding='utf-8') as f: excluded_items = [line.strip() for line in f.readlines()] for item in excluded_items: abs_item = os.path.abspath(os.path.join(project_path, item)) excluded_abs_paths.add(abs_item) code_extensions = [ '.py', '.js', '.jsx', '.java', '.c', '.cpp', '.h', '.hpp', '.cs', '.go', '.rs', '.ts', '.tsx', '.html', '.css', '.scss', '.php', '.rb', '.swift', '.kt', '.m', '.sql', '.sh', '.bat' ] files_to_process = [] for root, dirs, files in os.walk(project_path): root_abs = os.path.abspath(root) if root_abs.startswith(abs_save_path + os.sep) or root_abs == abs_save_path: continue dirs[:] = [d for d in dirs if os.path.join(root_abs, d) not in excluded_abs_paths] for file in files: file_path = os.path.join(root_abs, file) if file_path in excluded_abs_paths: continue _, ext = os.path.splitext(file) if ext.lower() not in code_extensions: continue relative_path = os.path.relpath(root_abs, project_path) save_dir = os.path.join(save_path, relative_path) target_path = os.path.join(save_dir, file) if os.path.exists(target_path): logging.info(f"跳过已处理文件: {file_path}") continue files_to_process.append((file_path, save_dir, target_path)) if not files_to_process: return "没有找到需要处理的代码文件" logging.info(f"发现 {len(files_to_process)} 个文件需要处理") def process_file(file_data): file_path, save_dir, target_path = file_data try: code_content = detect_and_read_file(file_path) if code_content is None: logging.warning(f"无法读取文件 {file_path},跳过处理") return MAX_LINES = 800 if code_content.count('\n') > MAX_LINES: logging.warning(f"文件过大({file_path},{code_content.count('\n')}行),跳过处理") return logging.info(f"处理文件: {file_path}") relative_file_path = os.path.relpath(file_path, project_path) parsed_code = llm_parse_code(relative_file_path, code_content) os.makedirs(save_dir, exist_ok=True) with open(target_path, 'w', encoding='utf-8') as out_file: out_file.write(parsed_code) except Exception as e: logging.error(f"处理文件失败 {file_path}: {str(e)}") max_workers = min(os.cpu_count() or 1, 4) logging.info(f"使用线程池处理,最大线程数: {max_workers}") processed_count = 0 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(process_file, file_data): file_data for file_data in files_to_process} for future in as_completed(futures): file_data = futures[future] try: future.result() processed_count += 1 except Exception as e: logging.error(f"处理文件 {file_data[0]} 出现异常: {str(e)}") return f"代码注释添加完成: 处理了 {processed_count}/{len(files_to_process)} 个文件,保存路径: {save_path}" if __name__ == "__main__": mcp.run(transport='stdio')