init commit

This commit is contained in:
2025-08-15 19:50:11 +08:00
commit 766b383e52
14 changed files with 653 additions and 0 deletions

91
.gitignore vendored Normal file
View File

@@ -0,0 +1,91 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# pipenv
Pipfile.lock
# poetry
poetry.lock
# mypy
.mypy_cache/
.dmypy.json
# Pyre type checker
.pyre/
# VS Code
.vscode/
# macOS
.DS_Store
# Local env
.env
.env.*
.venv
venv/
ENV/
env.bak/
venv.bak/
# Log files
*.log
# PDF/Word/Other outputs
*.pdf
*.docx
*.xlsx
*.pptx
# Lock files
uv.lock
# Config files
*.local.json

0
README.md Normal file
View File

207
code_server/code.py Normal file
View File

@@ -0,0 +1,207 @@
import ollama
from mcp.server.fastmcp import FastMCP
import os
import json
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
_CONFIG = None
mcp = FastMCP("code_parser")
def get_config():
"""获取配置并缓存"""
global _CONFIG
if _CONFIG is None:
try:
script_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(script_dir, '..', 'config.json')
with open(config_path, 'r', encoding='utf-8') as f:
_CONFIG = json.load(f)
logging.info("配置加载成功")
except Exception as e:
logging.error(f"配置文件加载失败: {str(e)}")
raise
return _CONFIG
def llm_parse_code(code_path, code_content):
"""使用OLLAMA处理代码注释翻译和添加"""
config = get_config()
client = ollama.Client(host=config['OLLAMA_URL'])
prompt = f'''
请严格遵循以下要求处理代码:
1. 仅添加中文注释,不要修改任何原始代码逻辑、格式和变量名
2. 对于已有的英文注释,请将其准确地翻译成中文
3. 请保留原代码的格式和结构
4. 最后输出完整且可运行的代码不要使用markdown格式
文件路径:{code_path}
代码内容:
{code_content}
'''
max_retries = 3
for attempt in range(max_retries):
try:
response = ""
chat_stream = client.generate(
model=config['OLLAMA_MODEL'],
prompt=prompt,
stream=True,
think=False
)
for chunk in chat_stream:
content = chunk.get('response', '')
response += content
if response.strip():
return response
logging.warning(f"LLM响应可能无效重试中... ({attempt+1}/{max_retries})")
except Exception as e:
logging.error(f"LLM请求失败: {str(e)},尝试重新连接...")
time.sleep(2 ** attempt)
logging.error("LLM处理失败返回原始代码")
return code_content
def detect_and_read_file(file_path):
"""尝试多种编码方式读取文件内容"""
encodings = ['utf-8', 'gbk', 'latin-1', 'cp1252', 'iso-8859-1']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
return content
except UnicodeDecodeError:
continue
try:
with open(file_path, 'rb') as f:
raw_data = f.read()
return raw_data.decode('utf-8', errors='ignore')
except Exception as e:
logging.error(f"无法读取文件 {file_path}: {str(e)}")
return None
@mcp.tool()
def parse_code(project_path, excluded_items_path=None, save_path=None):
"""为指定目录下的代码文件添加中文注释。
Args:
project_path: 项目根目录路径
excluded_items_path: 需要排除的文件或目录列表(可选)
save_path: 解析后文件保存路径(可选,默认为项目根目录下的 'parsed_code' 目录)
"""
project_path = os.path.abspath(project_path)
if save_path is None:
save_path = os.path.join(project_path, 'parsed_code')
else:
save_path = os.path.abspath(save_path)
os.makedirs(save_path, exist_ok=True)
abs_save_path = os.path.abspath(save_path)
excluded_abs_paths = set()
if excluded_items_path is not None:
if isinstance(excluded_items_path, str):
with open(excluded_items_path, 'r', encoding='utf-8') as f:
excluded_items = [line.strip() for line in f.readlines()]
for item in excluded_items:
abs_item = os.path.abspath(os.path.join(project_path, item))
excluded_abs_paths.add(abs_item)
code_extensions = [
'.py', '.js', '.jsx', '.java', '.c', '.cpp', '.h', '.hpp',
'.cs', '.go', '.rs', '.ts', '.tsx', '.html', '.css', '.scss',
'.php', '.rb', '.swift', '.kt', '.m', '.sql', '.sh', '.bat'
]
files_to_process = []
for root, dirs, files in os.walk(project_path):
root_abs = os.path.abspath(root)
if root_abs.startswith(abs_save_path + os.sep) or root_abs == abs_save_path:
continue
dirs[:] = [d for d in dirs if os.path.join(root_abs, d) not in excluded_abs_paths]
for file in files:
file_path = os.path.join(root_abs, file)
if file_path in excluded_abs_paths:
continue
_, ext = os.path.splitext(file)
if ext.lower() not in code_extensions:
continue
relative_path = os.path.relpath(root_abs, project_path)
save_dir = os.path.join(save_path, relative_path)
target_path = os.path.join(save_dir, file)
if os.path.exists(target_path):
logging.info(f"跳过已处理文件: {file_path}")
continue
files_to_process.append((file_path, save_dir, target_path))
if not files_to_process:
return "没有找到需要处理的代码文件"
logging.info(f"发现 {len(files_to_process)} 个文件需要处理")
def process_file(file_data):
file_path, save_dir, target_path = file_data
try:
code_content = detect_and_read_file(file_path)
if code_content is None:
logging.warning(f"无法读取文件 {file_path},跳过处理")
return
MAX_LINES = 800
if code_content.count('\n') > MAX_LINES:
logging.warning(f"文件过大({file_path}{code_content.count('\n')}行),跳过处理")
return
logging.info(f"处理文件: {file_path}")
relative_file_path = os.path.relpath(file_path, project_path)
parsed_code = llm_parse_code(relative_file_path, code_content)
os.makedirs(save_dir, exist_ok=True)
with open(target_path, 'w', encoding='utf-8') as out_file:
out_file.write(parsed_code)
except Exception as e:
logging.error(f"处理文件失败 {file_path}: {str(e)}")
max_workers = min(os.cpu_count() or 1, 4)
logging.info(f"使用线程池处理,最大线程数: {max_workers}")
processed_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_file, file_data): file_data for file_data in files_to_process}
for future in as_completed(futures):
file_data = futures[future]
try:
future.result()
processed_count += 1
except Exception as e:
logging.error(f"处理文件 {file_data[0]} 出现异常: {str(e)}")
return f"代码注释添加完成: 处理了 {processed_count}/{len(files_to_process)} 个文件,保存路径: {save_path}"
if __name__ == "__main__":
mcp.run(transport='stdio')

4
config.json Normal file
View File

@@ -0,0 +1,4 @@
{
"OLLAMA_URL": "http://172.16.0.254:11434",
"OLLAMA_MODEL": "qwen3:30b"
}

39
macOS/word_opt.py Normal file
View File

@@ -0,0 +1,39 @@
from mcp.server.fastmcp import FastMCP
import os
import docx
from docx.oxml.ns import qn
mcp = FastMCP("word")
@mcp.tool()
async def create_word_doc(filename: str, content: str, filepath: str) -> str:
"""在macOS下创建一个Word文档。
Args:
filename: 文件名(如 example.docx
content: 文档正文内容
filepath: 文件保存路径(若用户未提供则默认为"~/Desktop")
"""
if docx is None:
return "python-docx 未安装,请先运行 pip install python-docx"
try:
doc = docx.Document()
para = doc.add_paragraph(content)
run = para.runs[0] if para.runs else para.add_run(content)
font = run.font
font.name = "宋体"
# 设置中英文字体
r = run._element
r.rPr.rFonts.set(qn('w:eastAsia'), '宋体')
r.rPr.rFonts.set(qn('w:ascii'), 'Times New Roman')
r.rPr.rFonts.set(qn('w:hAnsi'), 'Times New Roman')
save_path = os.path.expanduser(os.path.join(filepath, filename))
doc.save(save_path)
return f"Word文档已在指定路径创建创建路径为: {save_path}"
except Exception as e:
return f"创建Word文档失败: {e}"
if __name__ == "__main__":
mcp.run(transport='stdio')

6
main.py Normal file
View File

@@ -0,0 +1,6 @@
def main():
print("Hello from weather!")
if __name__ == "__main__":
main()

68
pdf_server/pdf.py Normal file
View File

@@ -0,0 +1,68 @@
from mcp.server.fastmcp import FastMCP
import fitz
import httpx
import json
import os
mcp = FastMCP("pdf")
with open(os.path.join(os.path.dirname(__file__), '../server_config.json'), 'r', encoding='utf-8') as f:
config = json.load(f)
OLLAMA_URL = config["OLLAMA_URL"]
OLLAMA_MODEL = config["OLLAMA_MODEL"]
async def translate_text(text: str) -> str:
prompt = f"请将以下内容翻译成流畅、准确的中文,仅输出翻译结果:{text}"
payload = {
"model": OLLAMA_MODEL,
"prompt": prompt,
}
async with httpx.AsyncClient() as client:
try:
resp = await client.post(OLLAMA_URL, json=payload, timeout=120.0)
resp.raise_for_status()
lines = resp.text.strip().splitlines()
responses = []
for line in lines:
try:
result = json.loads(line)
if "response" in result and result["response"]:
responses.append(result["response"])
except Exception:
continue
if responses:
return "".join(responses)
return "翻译失败:无有效返回内容"
except Exception as e:
return f"翻译失败: {e}"
def extract_pdf_text(pdf_path: str) -> str:
try:
doc = fitz.open(pdf_path)
text = "\n".join(page.get_text() for page in doc)
doc.close()
return text
except Exception as e:
return f"PDF解析失败: {e}"
@mcp.tool()
async def translate_pdf(pdf_path: str) -> str:
"""
读取PDF文件内容并翻译成中文。
Args:
pdf_path: PDF文件的绝对路径
"""
text = extract_pdf_text(pdf_path)
if text.startswith("PDF解析失败"):
return text
max_len = 2000
chunks = [text[i:i+max_len] for i in range(0, len(text), max_len)]
translated = []
for chunk in chunks:
zh = await translate_text(chunk)
translated.append(zh)
return "\n".join(translated)
if __name__ == "__main__":
mcp.run(transport='stdio')

15
pdf_server/test_pdf.py Normal file
View File

@@ -0,0 +1,15 @@
import asyncio
import os
from pdf import translate_pdf
def test_translate_pdf():
test_pdf_path = os.path.abspath("/Users/mengxin/Project/mcp-client/测试文档.pdf")
if not os.path.exists(test_pdf_path):
print("测试PDF文件不存在请放置测试文档.pdf 在当前目录下。")
return
result = asyncio.run(translate_pdf(test_pdf_path))
print("翻译结果:\n", result)
assert "翻译失败" not in result and "PDF解析失败" not in result, "翻译或解析失败"
if __name__ == "__main__":
test_translate_pdf()

10
pyproject.toml Normal file
View File

@@ -0,0 +1,10 @@
[project]
name = "weather"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"httpx>=0.28.1",
"mcp[cli]>=1.9.4",
]

84
search_server/search.py Normal file
View File

@@ -0,0 +1,84 @@
import httpx
import json
from mcp.server.fastmcp import FastMCP
from bs4 import BeautifulSoup
mcp = FastMCP("search")
GOOGLE_SEARCH_URL = "https://google.serper.dev/search"
GOOGLE_API_KEY = "2bc74e437bc6b48a82672b7d6ae005d0cd9f369a"
async def fetch_page_content(url: str) -> str:
try:
async with httpx.AsyncClient(follow_redirects=True, timeout=10.0) as client:
resp = await client.get(url, headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
"Referer": url,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"cookie":"bFC1D2a8-fB14-cF61-3F50-AF98CCbcef62"
})
resp.raise_for_status()
html = resp.text
soup = BeautifulSoup(html, "lxml")
main = soup.find('main')
text = main.get_text(separator=' ', strip=True) if main else soup.body.get_text(separator=' ', strip=True)
return text[:2000]
except Exception:
return ""
async def search_google(query: str) -> list[dict[str, str]]:
headers = {
"X-API-KEY": GOOGLE_API_KEY,
"Content-Type": "application/json"
}
payload = json.dumps({
"q": query,
})
async with httpx.AsyncClient() as client:
try:
response = await client.post(GOOGLE_SEARCH_URL, headers=headers, data=payload, timeout=30.0)
response.raise_for_status()
data = response.json()
results = []
for item in data.get("organic", [])[:3]:
title = item.get("title")
link = item.get("link")
snippet = item.get("snippet", "")
if title and link:
content = await fetch_page_content(link)
results.append({
"title": title,
"link": link,
"snippet": snippet,
"content": content
})
return results
except Exception:
return []
def format_search_results(results: list[dict[str, str]]) -> str:
if not results:
return "No results found or unable to fetch results."
formatted = []
for r in results:
formatted.append(f"""
Title: {r['title']}
Link: {r['link']}
Snippet: {r['snippet']}
Content: {r['content']}
""")
return "\n---\n".join(formatted)
@mcp.tool()
async def search_web(query: str) -> str:
"""When user input unable to confirm or need search web or other tool can not use, this tool can search the web using the given query.
returens a formatted string with the title, link, snippet, and content of the top results.
Args:
query: The search query to use for the web search(Note that it is recommended to use English for the search query.).
"""
results = await search_google(query)
return format_search_results(results)
if __name__ == "__main__":
mcp.run(transport='stdio')

21
search_server/test.txt Normal file

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,10 @@
import asyncio
from search import search_web
async def test_search_web_output():
query = "曼联最新的英超排名是多少"
result = await search_web(query)
print("搜索结果输出:\n", result)
if __name__ == "__main__":
asyncio.run(test_search_web_output())

4
server_config.json Normal file
View File

@@ -0,0 +1,4 @@
{
"OLLAMA_URL": "http://localhost:11434/api/generate",
"OLLAMA_MODEL": "qwen3:14b"
}

94
weather_server/weather.py Normal file
View File

@@ -0,0 +1,94 @@
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP
# Initialize FastMCP server
mcp = FastMCP("weather")
# Constants
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
async def make_nws_request(url: str) -> dict[str, Any] | None:
"""Make a request to the NWS API with proper error handling."""
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/geo+json"
}
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, headers=headers, timeout=30.0)
response.raise_for_status()
return response.json()
except Exception:
return None
def format_alert(feature: dict) -> str:
"""Format an alert feature into a readable string."""
props = feature["properties"]
return f"""
Event: {props.get('event', 'Unknown')}
Area: {props.get('areaDesc', 'Unknown')}
Severity: {props.get('severity', 'Unknown')}
Description: {props.get('description', 'No description available')}
Instructions: {props.get('instruction', 'No specific instructions provided')}
"""
@mcp.tool()
async def get_alerts(state: str) -> str:
"""Get weather alerts for a US state.
Args:
state: Two-letter US state code (e.g. CA, NY)
"""
url = f"{NWS_API_BASE}/alerts/active/area/{state}"
data = await make_nws_request(url)
if not data or "features" not in data:
return "Unable to fetch alerts or no alerts found."
if not data["features"]:
return "No active alerts for this state."
alerts = [format_alert(feature) for feature in data["features"]]
return "\n---\n".join(alerts)
@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
"""Get weather forecast for a location.
Args:
latitude: Latitude of the location
longitude: Longitude of the location
"""
# First get the forecast grid endpoint
points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
points_data = await make_nws_request(points_url)
if not points_data:
return "Unable to fetch forecast data for this location."
# Get the forecast URL from the points response
forecast_url = points_data["properties"]["forecast"]
forecast_data = await make_nws_request(forecast_url)
if not forecast_data:
return "Unable to fetch detailed forecast."
# Format the periods into a readable forecast
periods = forecast_data["properties"]["periods"]
forecasts = []
for period in periods[:5]: # Only show next 5 periods
forecast = f"""
{period['name']}:
Temperature: {period['temperature']}°{period['temperatureUnit']}
Wind: {period['windSpeed']} {period['windDirection']}
Forecast: {period['detailedForecast']}
"""
forecasts.append(forecast)
return "\n---\n".join(forecasts)
if __name__ == "__main__":
# Initialize and run the server
mcp.run(transport='stdio')