```python
import time
from typing import Optional
import requests
from dify_plugin import TextEmbeddingModel
from dify_plugin.entities import I18nObject
from dify_plugin.entities.model import EmbeddingInputType, AIModelEntity, ModelType, FetchFrom
from dify_plugin.errors.model import CredentialsValidateFailedError, InvokeError
from dify_plugin.entities.model.text_embedding import (
    TextEmbeddingResult, EmbeddingUsage,
)


class EmbeddingTextEmbeddingModel(TextEmbeddingModel):
    

    def _invoke(
            self,
            model: str,
            credentials: dict,
            texts: list[str],
            user: Optional[str] = None,
            input_type: EmbeddingInputType = EmbeddingInputType.DOCUMENT,
    ) -> TextEmbeddingResult:
        """调用文本嵌入API生成向量"""
        # 调试日志初始化
        print("开始调试\n")

        # 测试用硬编码(生产环境需从credentials读取)
        model = "XXX"
        api_key = "sk-XXXX"

        # 打印关键参数调试信息
        print(f"DEBUG: API Key: {api_key}")
        print(f"DEBUG: Model: {model}")
        print("DEBUG: 第一步完成:已验证模型与API\n")

        # 验证API密钥
        if not api_key:
            raise InvokeError("缺少API凭证:请提供api_key(格式应为sk-xxx)")

        # 准备请求参数
        inputs = texts
        url = "https://eisapi.byd.com/open-api/1.0/llm/v1/common-embeddings"
        headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

        # 记录调用开始时间
        start_time = time.perf_counter()

        # 构造请求体
        data = {
            "model": model,
            "input": inputs
        }
        print(f"DEBUG:data: {inputs}\n")
        print("DEBUG: 第二步完成:请求体构建完成\n")

        # 带重试机制的API调用
        max_retries = 20
        for attempt in range(max_retries):
            try:
                response = requests.post(
                    url=url,
                    json=data,
                    headers=headers,
                    timeout=60
                )
                print(f"DEBUG: Response status: {response.status_code}")
                print(f"DEBUG: Response headers: {response.headers}")

                # 处理限流错误(429)
                if response.status_code == 429:
                    if attempt < max_retries - 1:
                        wait_time = 2 **attempt
                        print(f"DEBUG: 触发限流,等待{wait_time}秒后重试")
                        time.sleep(wait_time)
                        continue
                    else:
                        raise InvokeError(f"API限流,状态码: {response.status_code}, 响应: {response.text}")

                # 处理非200状态码
                if response.status_code != 200:
                    raise InvokeError(f"API错误,状态码: {response.status_code}, 响应: {response.text}")

                # 验证响应格式
                content_type = response.headers.get('content-type', '')
                if 'application/json' not in content_type:
                    raise InvokeError(f"非JSON响应: {content_type}, 内容: {response.text}")

                # 验证非空响应
                if not response.text.strip():
                    raise InvokeError("API返回空响应")

                # 解析JSON响应
                try:
                    resp = response.json()
                except ValueError as e:
                    print(f"DEBUG: JSON解析失败,响应: {response.text}")
                    raise InvokeError(f"无效JSON格式: {str(e)}")

                break  # 成功获取响应,退出重试循环

            except requests.exceptions.Timeout:
                if attempt < max_retries - 1:
                    print("DEBUG: 请求超时,重试...")
                    continue
                else:
                    raise InvokeError("API请求超时")
            except requests.exceptions.ConnectionError:
                if attempt < max_retries - 1:
                    print("DEBUG: 连接错误,重试...")
                    time.sleep(1)
                    continue
                else:
                    raise InvokeError("无法连接到API服务器")
            except requests.exceptions.RequestException as e:
                if attempt < max_retries - 1:
                    print("DEBUG: 请求异常,重试...")
                    time.sleep(1)
                    continue
                else:
                    raise InvokeError(f"API请求失败: {str(e)}")

        print("DEBUG: 第三步完成:调用API成功")

        # 解析嵌入向量
        if "data" not in resp:
            raise InvokeError(f"响应缺少data字段,完整响应: {resp}")
        
        embeddings_data = resp["data"]
        embeddings = []
        for item in embeddings_data:
            if isinstance(item, dict) and "embedding" in item:
                embeddings.append(item["embedding"])
            elif isinstance(item, list):
                embeddings.append(item)
            else:
                embeddings.append(item)

        print("DEBUG: 第四步完成:Embedding成功")

        # 验证向量提取结果
        if not embeddings:
            raise InvokeError("无法从响应中提取嵌入向量")

        # 计算使用量
        usage_dict = resp.get("usage", {})
        total_tokens = usage_dict.get("total_tokens", 0)
        latency = round(time.perf_counter() - start_time, 4)
        
        usage = EmbeddingUsage(
            tokens=total_tokens,
            total_tokens=total_tokens,
            latency=latency,
            unit_price=0.0,
            price_unit=1.0,
            total_price=0.0,
            currency="USD"
        )

        print("DEBUG: 第五步完成:token已计算")
        print(f"DEBUG: Usage: {usage}")

        # 构造返回结果
        result = TextEmbeddingResult(
            model=model,
            embeddings=embeddings,
            usage=usage,
        )

        print("DEBUG: 已完成")
        return result

    def get_num_tokens(self, model: str, credentials: dict, texts: list[str]) -> list[int]:
        """估算文本token数(用字符数粗略替代)"""
        return [len(text) for text in texts]

    def validate_credentials(self, model: str, credentials: dict) -> None:
        """验证API凭证格式"""
        try:
            api_key = credentials.get("api_key")
            if not api_key:
                raise CredentialsValidateFailedError("API密钥不能为空")
            if not isinstance(api_key, str) or not api_key.startswith("sk-"):
                raise CredentialsValidateFailedError("API密钥格式不正确,应以'sk-'开头")
        except Exception as ex:
            raise CredentialsValidateFailedError(f"凭证验证失败: {str(ex)}")

    @property
    def _invoke_error_mapping(self) -> dict[type[InvokeError], list[type[Exception]]]:
        return {InvokeError: [Exception]}

    def get_customizable_model_schema(
            self, model: str, credentials: dict
    ) -> AIModelEntity:
        """返回模型元信息"""
        return AIModelEntity(
            model=model,
            label=I18nObject(zh_Hans=model, en_US=model),
            model_type=ModelType.TEXT_EMBEDDING,
            features=[],
            fetch_from=FetchFrom.CUSTOMIZABLE_MODEL,
            model_properties={},
            parameter_rules=[],
        )
```

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐