Files
2025-08-15 19:50:11 +08:00

208 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import ollama
from mcp.server.fastmcp import FastMCP
import os
import json
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
_CONFIG = None
mcp = FastMCP("code_parser")
def get_config():
"""获取配置并缓存"""
global _CONFIG
if _CONFIG is None:
try:
script_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(script_dir, '..', 'config.json')
with open(config_path, 'r', encoding='utf-8') as f:
_CONFIG = json.load(f)
logging.info("配置加载成功")
except Exception as e:
logging.error(f"配置文件加载失败: {str(e)}")
raise
return _CONFIG
def llm_parse_code(code_path, code_content):
"""使用OLLAMA处理代码注释翻译和添加"""
config = get_config()
client = ollama.Client(host=config['OLLAMA_URL'])
prompt = f'''
请严格遵循以下要求处理代码:
1. 仅添加中文注释,不要修改任何原始代码逻辑、格式和变量名
2. 对于已有的英文注释,请将其准确地翻译成中文
3. 请保留原代码的格式和结构
4. 最后输出完整且可运行的代码不要使用markdown格式
文件路径:{code_path}
代码内容:
{code_content}
'''
max_retries = 3
for attempt in range(max_retries):
try:
response = ""
chat_stream = client.generate(
model=config['OLLAMA_MODEL'],
prompt=prompt,
stream=True,
think=False
)
for chunk in chat_stream:
content = chunk.get('response', '')
response += content
if response.strip():
return response
logging.warning(f"LLM响应可能无效重试中... ({attempt+1}/{max_retries})")
except Exception as e:
logging.error(f"LLM请求失败: {str(e)},尝试重新连接...")
time.sleep(2 ** attempt)
logging.error("LLM处理失败返回原始代码")
return code_content
def detect_and_read_file(file_path):
"""尝试多种编码方式读取文件内容"""
encodings = ['utf-8', 'gbk', 'latin-1', 'cp1252', 'iso-8859-1']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
return content
except UnicodeDecodeError:
continue
try:
with open(file_path, 'rb') as f:
raw_data = f.read()
return raw_data.decode('utf-8', errors='ignore')
except Exception as e:
logging.error(f"无法读取文件 {file_path}: {str(e)}")
return None
@mcp.tool()
def parse_code(project_path, excluded_items_path=None, save_path=None):
"""为指定目录下的代码文件添加中文注释。
Args:
project_path: 项目根目录路径
excluded_items_path: 需要排除的文件或目录列表(可选)
save_path: 解析后文件保存路径(可选,默认为项目根目录下的 'parsed_code' 目录)
"""
project_path = os.path.abspath(project_path)
if save_path is None:
save_path = os.path.join(project_path, 'parsed_code')
else:
save_path = os.path.abspath(save_path)
os.makedirs(save_path, exist_ok=True)
abs_save_path = os.path.abspath(save_path)
excluded_abs_paths = set()
if excluded_items_path is not None:
if isinstance(excluded_items_path, str):
with open(excluded_items_path, 'r', encoding='utf-8') as f:
excluded_items = [line.strip() for line in f.readlines()]
for item in excluded_items:
abs_item = os.path.abspath(os.path.join(project_path, item))
excluded_abs_paths.add(abs_item)
code_extensions = [
'.py', '.js', '.jsx', '.java', '.c', '.cpp', '.h', '.hpp',
'.cs', '.go', '.rs', '.ts', '.tsx', '.html', '.css', '.scss',
'.php', '.rb', '.swift', '.kt', '.m', '.sql', '.sh', '.bat'
]
files_to_process = []
for root, dirs, files in os.walk(project_path):
root_abs = os.path.abspath(root)
if root_abs.startswith(abs_save_path + os.sep) or root_abs == abs_save_path:
continue
dirs[:] = [d for d in dirs if os.path.join(root_abs, d) not in excluded_abs_paths]
for file in files:
file_path = os.path.join(root_abs, file)
if file_path in excluded_abs_paths:
continue
_, ext = os.path.splitext(file)
if ext.lower() not in code_extensions:
continue
relative_path = os.path.relpath(root_abs, project_path)
save_dir = os.path.join(save_path, relative_path)
target_path = os.path.join(save_dir, file)
if os.path.exists(target_path):
logging.info(f"跳过已处理文件: {file_path}")
continue
files_to_process.append((file_path, save_dir, target_path))
if not files_to_process:
return "没有找到需要处理的代码文件"
logging.info(f"发现 {len(files_to_process)} 个文件需要处理")
def process_file(file_data):
file_path, save_dir, target_path = file_data
try:
code_content = detect_and_read_file(file_path)
if code_content is None:
logging.warning(f"无法读取文件 {file_path},跳过处理")
return
MAX_LINES = 800
if code_content.count('\n') > MAX_LINES:
logging.warning(f"文件过大({file_path}{code_content.count('\n')}行),跳过处理")
return
logging.info(f"处理文件: {file_path}")
relative_file_path = os.path.relpath(file_path, project_path)
parsed_code = llm_parse_code(relative_file_path, code_content)
os.makedirs(save_dir, exist_ok=True)
with open(target_path, 'w', encoding='utf-8') as out_file:
out_file.write(parsed_code)
except Exception as e:
logging.error(f"处理文件失败 {file_path}: {str(e)}")
max_workers = min(os.cpu_count() or 1, 4)
logging.info(f"使用线程池处理,最大线程数: {max_workers}")
processed_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_file, file_data): file_data for file_data in files_to_process}
for future in as_completed(futures):
file_data = futures[future]
try:
future.result()
processed_count += 1
except Exception as e:
logging.error(f"处理文件 {file_data[0]} 出现异常: {str(e)}")
return f"代码注释添加完成: 处理了 {processed_count}/{len(files_to_process)} 个文件,保存路径: {save_path}"
if __name__ == "__main__":
mcp.run(transport='stdio')