下载地址:https://www.pan38.com/yun/share.php?code=JCnzE   提取密码:1133

这个工具包含三个主要模块,支持多账号批量登录、媒体文件处理和自动发布功能。使用时需要先配置accounts.json文件,包含加密后的账号信息。媒体处理器可以自动压缩视频和图片以适应小红书平台要求。

源码部分:【别人开发的模块,我们只是用工具调用而已】


import json
import time
import requests
from flask import Flask, request, jsonify

app = Flask(__name__)

class WeChatWorkRobot:
    def __init__(self, corp_id, corp_secret, agent_id):
        self.corp_id = corp_id
        self.corp_secret = corp_secret
        self.agent_id = agent_id
        self.access_token = None
        self.token_expire_time = 0
        
    def get_access_token(self):
        if time.time() < self.token_expire_time and self.access_token:
            return self.access_token
            
        url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={self.corp_id}&corpsecret={self.corp_secret}"
        response = requests.get(url)
        data = response.json()
        
        if data.get('errcode') == 0:
            self.access_token = data['access_token']
            self.token_expire_time = time.time() + data['expires_in'] - 300
            return self.access_token
        else:
            raise Exception(f"获取access_token失败: {data}")
            
    def send_message(self, user_id, content):
        token = self.get_access_token()
        url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}"
        
        payload = {
            "touser": user_id,
            "msgtype": "text",
            "agentid": self.agent_id,
            "text": {"content": content},
            "safe": 0
        }
        
        response = requests.post(url, json=payload)
        return response.json()
        
    def handle_message(self, msg_data):
        msg_type = msg_data.get('MsgType')
        user_id = msg_data.get('FromUserName')
        
        if msg_type == 'text':
            content = msg_data.get('Content', '').strip()
            if content == '帮助':
                return self.send_message(user_id, "输入关键词获取帮助信息")
            else:
                return self.send_message(user_id, f"已收到您的消息: {content}")
        return None

robot = WeChatWorkRobot(
    corp_id="YOUR_CORP_ID",
    corp_secret="YOUR_CORP_SECRET",
    agent_id=YOUR_AGENT_ID
)

@app.route('/callback', methods=['POST'])
def callback():
    data = request.json
    if data.get('Event') == 'subscribe':
        user_id = data.get('FromUserName')
        robot.send_message(user_id, "欢迎关注企业微信机器人")
    else:
        robot.handle_message(data)
    return jsonify({"status": "success"})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)
 企业微信配置
WECHAT_CONFIG = {
    "corp_id": "YOUR_CORP_ID",
    "corp_secret": "YOUR_CORP_SECRET",
    "agent_id": 1000002,
    "token": "YOUR_TOKEN",
    "encoding_aes_key": "YOUR_ENCODING_AES_KEY"
}

# 自动回复规则
REPLY_RULES = {
    "帮助": "输入以下关键词获取帮助:\n1. 天气\n2. 新闻\n3. 会议",
    "天气": "今日天气晴朗,气温25-32℃",
    "新闻": "最新新闻请访问公司内网",
    "会议": "今日会议安排:\n10:00 部门例会\n15:00 项目评审"
}
import re
from config import REPLY_RULES

class MessageHandler:
    @staticmethod
    def process_text_message(content):
        content = content.strip().lower()
        
        # 精确匹配
        if content in REPLY_RULES:
            return REPLY_RULES[content]
            
        # 模糊匹配
        for keyword, reply in REPLY_RULES.items():
            if keyword in content:
                return reply
                
        # 正则匹配
        if re.search(r'时间|几点', content):
            from datetime import datetime
            return f"当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
            
        return "抱歉,我不理解您的请求。输入'帮助'获取支持信息。"

    @staticmethod
    def process_event_message(event):
        if event == 'subscribe':
            return "欢迎关注企业微信机器人!"
        elif event == 'unsubscribe':
            return ""
        return None
 hashlib
import time
import random
import string
from Crypto.Cipher import AES
import base64
from config import WECHAT_CONFIG

class WeChatUtils:
    @staticmethod
    def verify_signature(signature, timestamp, nonce):
        token = WECHAT_CONFIG['token']
        tmp_list = sorted([token, timestamp, nonce])
        tmp_str = ''.join(tmp_list).encode('utf-8')
        hashcode = hashlib.sha1(tmp_str).hexdigest()
        return hashcode == signature

    @staticmethod
    def decrypt_message(encrypt_msg):
        aes_key = base64.b64decode(WECHAT_CONFIG['encoding_aes_key'] + "=")
        cipher = AES.new(aes_key, AES.MODE_CBC, aes_key[:16])
        decrypted = cipher.decrypt(base64.b64decode(encrypt_msg))
        
        pad = ord(decrypted[-1:])
        content = decrypted[16:-pad].decode('utf-8')
        return content

    @staticmethod
    def encrypt_message(reply_msg):
        aes_key = base64.b64decode(WECHAT_CONFIG['encoding_aes_key'] + "=")
        cipher = AES.new(aes_key, AES.MODE_CBC, aes_key[:16])
        
        random_str = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
        msg_len = len(reply_msg.encode('utf-8'))
        pad_len = 32 - (msg_len % 32)
        pad_char = chr(pad_len)
        
        plaintext = random_str + reply_msg + pad_char * pad_len
        encrypted = cipher.encrypt(plaintext.encode('utf-8'))
        return base64.b64encode(encrypted).decode('utf-8')

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐