Dify-plugin插件开发篇之 text-embedding模型-python
【代码】Dify-plugin插件开发篇之 text-embedding模型-python。
·
```python
import time
from typing import Optional
import requests
from dify_plugin import TextEmbeddingModel
from dify_plugin.entities import I18nObject
from dify_plugin.entities.model import EmbeddingInputType, AIModelEntity, ModelType, FetchFrom
from dify_plugin.errors.model import CredentialsValidateFailedError, InvokeError
from dify_plugin.entities.model.text_embedding import (
TextEmbeddingResult, EmbeddingUsage,
)
class EmbeddingTextEmbeddingModel(TextEmbeddingModel):
def _invoke(
self,
model: str,
credentials: dict,
texts: list[str],
user: Optional[str] = None,
input_type: EmbeddingInputType = EmbeddingInputType.DOCUMENT,
) -> TextEmbeddingResult:
"""调用文本嵌入API生成向量"""
# 调试日志初始化
print("开始调试\n")
# 测试用硬编码(生产环境需从credentials读取)
model = "XXX"
api_key = "sk-XXXX"
# 打印关键参数调试信息
print(f"DEBUG: API Key: {api_key}")
print(f"DEBUG: Model: {model}")
print("DEBUG: 第一步完成:已验证模型与API\n")
# 验证API密钥
if not api_key:
raise InvokeError("缺少API凭证:请提供api_key(格式应为sk-xxx)")
# 准备请求参数
inputs = texts
url = "https://eisapi.byd.com/open-api/1.0/llm/v1/common-embeddings"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# 记录调用开始时间
start_time = time.perf_counter()
# 构造请求体
data = {
"model": model,
"input": inputs
}
print(f"DEBUG:data: {inputs}\n")
print("DEBUG: 第二步完成:请求体构建完成\n")
# 带重试机制的API调用
max_retries = 20
for attempt in range(max_retries):
try:
response = requests.post(
url=url,
json=data,
headers=headers,
timeout=60
)
print(f"DEBUG: Response status: {response.status_code}")
print(f"DEBUG: Response headers: {response.headers}")
# 处理限流错误(429)
if response.status_code == 429:
if attempt < max_retries - 1:
wait_time = 2 **attempt
print(f"DEBUG: 触发限流,等待{wait_time}秒后重试")
time.sleep(wait_time)
continue
else:
raise InvokeError(f"API限流,状态码: {response.status_code}, 响应: {response.text}")
# 处理非200状态码
if response.status_code != 200:
raise InvokeError(f"API错误,状态码: {response.status_code}, 响应: {response.text}")
# 验证响应格式
content_type = response.headers.get('content-type', '')
if 'application/json' not in content_type:
raise InvokeError(f"非JSON响应: {content_type}, 内容: {response.text}")
# 验证非空响应
if not response.text.strip():
raise InvokeError("API返回空响应")
# 解析JSON响应
try:
resp = response.json()
except ValueError as e:
print(f"DEBUG: JSON解析失败,响应: {response.text}")
raise InvokeError(f"无效JSON格式: {str(e)}")
break # 成功获取响应,退出重试循环
except requests.exceptions.Timeout:
if attempt < max_retries - 1:
print("DEBUG: 请求超时,重试...")
continue
else:
raise InvokeError("API请求超时")
except requests.exceptions.ConnectionError:
if attempt < max_retries - 1:
print("DEBUG: 连接错误,重试...")
time.sleep(1)
continue
else:
raise InvokeError("无法连接到API服务器")
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
print("DEBUG: 请求异常,重试...")
time.sleep(1)
continue
else:
raise InvokeError(f"API请求失败: {str(e)}")
print("DEBUG: 第三步完成:调用API成功")
# 解析嵌入向量
if "data" not in resp:
raise InvokeError(f"响应缺少data字段,完整响应: {resp}")
embeddings_data = resp["data"]
embeddings = []
for item in embeddings_data:
if isinstance(item, dict) and "embedding" in item:
embeddings.append(item["embedding"])
elif isinstance(item, list):
embeddings.append(item)
else:
embeddings.append(item)
print("DEBUG: 第四步完成:Embedding成功")
# 验证向量提取结果
if not embeddings:
raise InvokeError("无法从响应中提取嵌入向量")
# 计算使用量
usage_dict = resp.get("usage", {})
total_tokens = usage_dict.get("total_tokens", 0)
latency = round(time.perf_counter() - start_time, 4)
usage = EmbeddingUsage(
tokens=total_tokens,
total_tokens=total_tokens,
latency=latency,
unit_price=0.0,
price_unit=1.0,
total_price=0.0,
currency="USD"
)
print("DEBUG: 第五步完成:token已计算")
print(f"DEBUG: Usage: {usage}")
# 构造返回结果
result = TextEmbeddingResult(
model=model,
embeddings=embeddings,
usage=usage,
)
print("DEBUG: 已完成")
return result
def get_num_tokens(self, model: str, credentials: dict, texts: list[str]) -> list[int]:
"""估算文本token数(用字符数粗略替代)"""
return [len(text) for text in texts]
def validate_credentials(self, model: str, credentials: dict) -> None:
"""验证API凭证格式"""
try:
api_key = credentials.get("api_key")
if not api_key:
raise CredentialsValidateFailedError("API密钥不能为空")
if not isinstance(api_key, str) or not api_key.startswith("sk-"):
raise CredentialsValidateFailedError("API密钥格式不正确,应以'sk-'开头")
except Exception as ex:
raise CredentialsValidateFailedError(f"凭证验证失败: {str(ex)}")
@property
def _invoke_error_mapping(self) -> dict[type[InvokeError], list[type[Exception]]]:
return {InvokeError: [Exception]}
def get_customizable_model_schema(
self, model: str, credentials: dict
) -> AIModelEntity:
"""返回模型元信息"""
return AIModelEntity(
model=model,
label=I18nObject(zh_Hans=model, en_US=model),
model_type=ModelType.TEXT_EMBEDDING,
features=[],
fetch_from=FetchFrom.CUSTOMIZABLE_MODEL,
model_properties={},
parameter_rules=[],
)
```
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)