【python 获取邮箱验证码】模拟登录并获取163邮箱验证码,仅供学习!仅供测试!仅供交流!
·
文章日期:2025.07.30
使用工具:python3.12.7、node.js v22.17.1
浏览器:Brave 138版本以上
本章知识:163邮箱模拟登录并获取某平台的邮箱验证码
文章难度:中等偏上
文章全程已做去敏处理!!!
AES解密处理(直接解密即可)(crypto-js.js 标准算法):在线AES加解密工具
注意:仅供学习!!仅供交流!!仅供测试!!!
声明:本章仅科普、学习、交流!!!
由于内容较多,本章将会适当的压缩省略一部分内容,小白可能会看不懂,懂基础有实战经验的可以看看,小白请谅解一下
本章将不提供完整源码,但会提供部分拆解的代码(非扣代码),供大家参考学习(都是为了保护本博主,防止你们以后看不到我了😶🌫️😶🌫️😶🌫️)
先看个小视频


【主要使用的 python 模块】
# -*- coding: utf-8 -*-
import uuid # 生成 deviceId 的指纹
import json5 # 格式化邮件数据普通的json无法格式化
import re # 正则匹配邮件和提取邮件内的验证码
import requests # 模拟发送请求
from urllib.parse import quote # 编码
from datetime import datetime # 处理邮件内的时间
from typing import Union, Literal # 类型提示
from functools import wraps # 装饰器
import json # 格式化普通数据
import time # 用于延迟执行、生成随机值
import hashlib # 用于解决邮箱的自加密键值(可用可不用)、解决 NTES_WEB_FP 指纹的生成
import random # 用于随机值生成
【主要使用的 node.js 模块】
window = global;
// npm install sm-crypto --save
// SM4国密对称算法 替代AES 每次加密出的结果都是一样
const sm4 = require('./sm4.js');
// 引入官方源码 生成 args 参数专用
const args_s = require('./args_s.js')
// RSA为非对称加密算法,每次的加密结果都将不一样
const RSAencrypt2 = require('./rsa');
// 提供接口服务
const express = require('express');
注意:选择一个合适的浏览器,中途不可更换,防止环境的改变影响实际的参数,推荐使用google浏览器
1、打开某某网站(使用文章开头的AES在线工具解密): 02DJH/gHvouB6PZDzFucSLNWMWI/iO0K9bnAJp5+P0Y=
2、打开网站后,我们需要先打开控制台里的应用,然后把所有的缓存数据和cookie信息都清除掉,防止对我们接下来的操作会有影响

3、【encParams参数加密】打开网络,打开保留日志并启用停用缓存,防止走缓存,然后清除日志并刷新网页

4、【encParams参数加密】发现有一个密文参数,我们直接添加xhr断点,然后刷新页面


5、【encParams参数加密】刷新页面后发现断住了,根据堆栈分析查看发现他的加密位置,并测试出了他是一个sm4国密对称算法,类似与AES加密。好,我们打上断点,并刷新页面,查看他是加密了什么内容

6、【encParams参数加密】刷新页面后,断住了,可以看到他加密前的明文参数,然后我们只需要略施小计,就让他的密钥浮出水面,我们单步跟进去,看看密钥是多少


7、【encParams参数加密】发现密钥后,我们直接本地模拟,看看对不对
8、【encParams参数加密】本地模拟一下,嘿了个嘿,竟然一毛一样,(本SM4源码含密钥将在文章末尾附上)
文章末尾附上源码

9、【PPPP参数加密】,我们在框框里随便输入一下内容,然后点击确定,我们要点一下放行,过一下断点,直到出现【/zj/mail/l】接口,你会发现有两个参数是是需要我们模拟加密的,一个是【PPPP】参数,另一个是【pVParam】参数,我们先把【PPPP】参数搞定

10、【PPPP参数加密】嘿了个嘿,在堆栈里苦苦寻找,终于找到了加密方法,我们我们直接打上断点,重新填写表单,看看是不是这个地方

11、【PPPP参数加密】没毛病,找到这个参数的加密位置了,是RSA非对称加密,接下来让我们找一下他的密钥

12、【PPPP参数加密】我们直接进入函数内,然后打上断点,单步进来,嘿,这密钥不就出来了吗


13、【PPPP参数加密】本地直接模拟,由于非对称加密的结果每次都是不一样的,所以表面无法判断是否正确,只能通过技术手段验证,比如用hook劫持技术,当他执行这个的时候,你可以把结果替换为你本地加密的结果,然后让他的服务器进行验证,这样就可以直到我们的加密有没有问题,由于本人已经测试过了,这个就不在演示了,以后有案例会给大家演示的。好了接下来该搞【pVParam】参数了
文章末尾附上源码

14、【pVParam参数加密】这个参数有点东西,给大家讲一下原理,首先网站会请求一个接口【dl/zj/mail/powGetP】,这个接口会返回一个字典数据,然后网站内有一个js文件,是由字符串类型的js拼接而成的,然后会生成一个【blob:】开头的虚拟链接,这个链接是所访问的内容是本地的资源,属于【本地资源引用技术 Blob URL(Object URL)】,然后网站会访问这个链接里的js内容,并用js内的函数把字典数据加密并转换成另一种字段数据,加密并转换后的数据称之为【pVParam】
15、【pVParam参数加密】我们直接全局搜索【new Blob】,然后打上断点,仔细的你肯定能发现这个东西,看下面图


16、【pVParam参数加密】我们再次提交表单,发现断住了,我们单步往下走,直到他生成这个虚拟链接,我们把链接打开并复制内容到本地(只有在这里断住后我们才能访问链接,不然的话他会自动清除内容无法访问)

17、【pVParam参数加密】复制到本地,并格式化

18、【pVParam参数加密】我们本地格式化后,记得加最重要的环境,window。然后需要修改一下【vdfAsync】方法里的末尾代码


19、【pVParam参数加密】我们从控制台拿了一个字典,试了一下,没毛病,跑通了

20、最关键的三个加密参数已经搞定了,接下来就是做成接口,用py访问并调用他们,做成一套流水线。不多说,直接上代码
【main.js】
window = global;
// SM4国密对称算法 替代AES 每次加密出的结果都是一样 无需安装此包
const sm4 = require('./sm4.js');
// 引入官方源码 生成 args 参数专用 此包的代码是官方的,不提供,需要自己扣
const args_s = require('./args_s.js')
// RSA非对称加密算法 可以加密任何字符串
// RSA为非对称加密算法,每次的加密结果都将不一样
const RSAencrypt2 = require('./rsa');
// 提供接口服务 请手动安装此包 pm install express
const express = require('express');
const app = express();
app.use(express.json());
// 固定密钥
var _sm4pubkey = "BC60B8B9E4FFEFFA219E5AD77F11F9E2";
// 随机值 用于 rtid=一直随机生成 utid=固定随机
var t = function() {
var e = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
, t = 32
, n = [];
for (; t-- > 0; )
n[t] = e.charAt(Math.random() * e.length);
return n.join("")
};
// ini接口调用
app.get('/dl/zj/mail/ini', (req, res) => {
rtid = t()
s = {
"pd": "mail163",
"pkid": "CvViHzl",
"pkht": atob("bWFpbC4xNjMuY29t"),
"channel": 0,
"topURL": atob("aHR0cHM6Ly9tYWlsLjE2My5jb20v"),
"rtid": rtid
}
encParams = sm4.encrypt(JSON.stringify(s), _sm4pubkey)
res.status(200).json({code: 1, encParams: encParams, rtid: rtid});
});
// gt 接口调用
app.post('/dl/zj/mail/gt', (req, res) => {
// email 是邮箱 xxxx@163.com
// console.log(req.body)
const {email} = req.body;
rtid = t()
s = {
"un": email,
"pkid": "CvViHzl",
"pd": "mail163",
"channel": 0,
"topURL": atob("aHR0cHM6Ly9tYWlsLjE2My5jb20v"),
"rtid": rtid
}
encParams = sm4.encrypt(JSON.stringify(s), _sm4pubkey)
res.status(200).json({code: 1, encParams: encParams, rtid: rtid});
});
// powGetP 接口调用
app.get('/dl/zj/mail/powGetP', (req, res) => {
rtid = t()
s = {
"pkid": "CvViHzl",
"pd": "mail163",
"channel": 0,
"topURL": atob("aHR0cHM6Ly9tYWlsLjE2My5jb20v"),
"rtid": rtid
}
encParams = sm4.encrypt(JSON.stringify(s), _sm4pubkey)
res.status(200).json({code: 1, encParams: encParams, rtid: rtid});
});
// powGetP_r 接口调用 【人机验证】
app.post('/dl/zj/mail/powGetP_r', (req, res) => {
const {email,pvSid} = req.body;
rtid = t()
s = {
"pkid": "CvViHzl",
"pd": "mail163",
"channel": 0,
"topURL": atob("aHR0cHM6Ly9tYWlsLjE2My5jb20v"),
"rtid": rtid,
"pvSid": pvSid,
"un": email
}
encParams = sm4.encrypt(JSON.stringify(s), _sm4pubkey)
res.status(200).json({code: 1, encParams: encParams, rtid: rtid});
});
// I 接口调用
app.post('/dl/zj/mail/l', (req, res) => {
// email 是邮箱号
// password 是邮箱***
// tk 是 接口返回的 tk 值
// pVParam 是 接口返回的 pVParam 值
const {email, password, tk, pVInfo} = req.body;
rtid = t()
// 公钥
const publicKey = `
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC5gsH+AA4XWONB5TDcUd+xCz7e
jOFHZKlcZDx+pF1i7Gsvi1vjyJoQhRtRSn950x498VUkx7rUxg1/ScBVfrRxQOZ8
xFBye3pjAzfb22+RCuYApSVpJ3OO3KsEuKExftz9oFBv3ejxPlYc5yq7YiBO8XlT
nQN0Sa4R4qhPO3I2MQIDAQAB
-----END PUBLIC KEY-----
`;
s = {
"un": email,
"pw": RSAencrypt2.RSAencrypt2(password, publicKey), // 密码 RSA加密,有公钥
"pd": "mail163",
"l": 0, // 0:关闭30天免登录 1:开启30天免登录
"d": 30, // 30天免登录
"t": (new Date).getTime(), // 实时生成
"pkid": "CvViHzl",
"domains": "",
"tk": tk, // 调用gt返回值
"pwdKeyUp": 1,
"pVParam": args_s.vdfAsync(pVInfo), // 使用官方的js进行转换
"channel": 0,
"topURL": atob("aHR0cHM6Ly9tYWlsLjE2My5jb20v"),
"rtid": rtid // 随机
}
console.log(s)
encParams = sm4.encrypt(JSON.stringify(s), _sm4pubkey)
res.status(200).json({code: 1, encParams: encParams, s:s});
});
app.listen(4000, '0.0.0.0', () => {
console.log('Node.js 服务监听端口 0.0.0.0:4000');
});
【rsa.js】
// npm install node-forge --save 需要安装此包
// RSA非对称加密算法 可以加密任何字符串
// RSA为非对称加密算法,每次的加密结果都将不一样
const forge = require('node-forge');
// 公钥进行加密 明文 公钥 填充模式 RSA-OAEP / RSAES-PKCS1-V1_5
function encryptMessage(message, publicKeyPem,mode='RSAES-PKCS1-V1_5') {
const publicKey = forge.pki.publicKeyFromPem(publicKeyPem);
const encrypted = publicKey.encrypt(forge.util.encodeUtf8(message), mode);
const encryptedBase64 = forge.util.encode64(encrypted);
return encryptedBase64;
}
// 密码加密方法
function RSAencrypt2(e, publicKey) {
return encryptMessage(e, publicKey)
}
module.exports = {
RSAencrypt2: RSAencrypt2
}
【sm4.js】
// npm install sm-crypto --save
// SM4国密对称算法 每次加密出的结果都是一样
/* eslint-disable no-bitwise, no-mixed-operators, complexity */
const DECRYPT = 0
const ROUND = 32
const BLOCK = 16
const Sbox = [
0xd6, 0x90, 0xe9, 0xfe, 0xcc, 0xe1, 0x3d, 0xb7, 0x16, 0xb6, 0x14, 0xc2, 0x28, 0xfb, 0x2c, 0x05,
0x2b, 0x67, 0x9a, 0x76, 0x2a, 0xbe, 0x04, 0xc3, 0xaa, 0x44, 0x13, 0x26, 0x49, 0x86, 0x06, 0x99,
0x9c, 0x42, 0x50, 0xf4, 0x91, 0xef, 0x98, 0x7a, 0x33, 0x54, 0x0b, 0x43, 0xed, 0xcf, 0xac, 0x62,
0xe4, 0xb3, 0x1c, 0xa9, 0xc9, 0x08, 0xe8, 0x95, 0x80, 0xdf, 0x94, 0xfa, 0x75, 0x8f, 0x3f, 0xa6,
0x47, 0x07, 0xa7, 0xfc, 0xf3, 0x73, 0x17, 0xba, 0x83, 0x59, 0x3c, 0x19, 0xe6, 0x85, 0x4f, 0xa8,
0x68, 0x6b, 0x81, 0xb2, 0x71, 0x64, 0xda, 0x8b, 0xf8, 0xeb, 0x0f, 0x4b, 0x70, 0x56, 0x9d, 0x35,
0x1e, 0x24, 0x0e, 0x5e, 0x63, 0x58, 0xd1, 0xa2, 0x25, 0x22, 0x7c, 0x3b, 0x01, 0x21, 0x78, 0x87,
0xd4, 0x00, 0x46, 0x57, 0x9f, 0xd3, 0x27, 0x52, 0x4c, 0x36, 0x02, 0xe7, 0xa0, 0xc4, 0xc8, 0x9e,
0xea, 0xbf, 0x8a, 0xd2, 0x40, 0xc7, 0x38, 0xb5, 0xa3, 0xf7, 0xf2, 0xce, 0xf9, 0x61, 0x15, 0xa1,
0xe0, 0xae, 0x5d, 0xa4, 0x9b, 0x34, 0x1a, 0x55, 0xad, 0x93, 0x32, 0x30, 0xf5, 0x8c, 0xb1, 0xe3,
0x1d, 0xf6, 0xe2, 0x2e, 0x82, 0x66, 0xca, 0x60, 0xc0, 0x29, 0x23, 0xab, 0x0d, 0x53, 0x4e, 0x6f,
0xd5, 0xdb, 0x37, 0x45, 0xde, 0xfd, 0x8e, 0x2f, 0x03, 0xff, 0x6a, 0x72, 0x6d, 0x6c, 0x5b, 0x51,
0x8d, 0x1b, 0xaf, 0x92, 0xbb, 0xdd, 0xbc, 0x7f, 0x11, 0xd9, 0x5c, 0x41, 0x1f, 0x10, 0x5a, 0xd8,
0x0a, 0xc1, 0x31, 0x88, 0xa5, 0xcd, 0x7b, 0xbd, 0x2d, 0x74, 0xd0, 0x12, 0xb8, 0xe5, 0xb4, 0xb0,
0x89, 0x69, 0x97, 0x4a, 0x0c, 0x96, 0x77, 0x7e, 0x65, 0xb9, 0xf1, 0x09, 0xc5, 0x6e, 0xc6, 0x84,
0x18, 0xf0, 0x7d, 0xec, 0x3a, 0xdc, 0x4d, 0x20, 0x79, 0xee, 0x5f, 0x3e, 0xd7, 0xcb, 0x39, 0x48
]
const CK = [
0x00070e15, 0x1c232a31, 0x383f464d, 0x545b6269,
0x70777e85, 0x8c939aa1, 0xa8afb6bd, 0xc4cbd2d9,
0xe0e7eef5, 0xfc030a11, 0x181f262d, 0x343b4249,
0x50575e65, 0x6c737a81, 0x888f969d, 0xa4abb2b9,
0xc0c7ced5, 0xdce3eaf1, 0xf8ff060d, 0x141b2229,
0x30373e45, 0x4c535a61, 0x686f767d, 0x848b9299,
0xa0a7aeb5, 0xbcc3cad1, 0xd8dfe6ed, 0xf4fb0209,
0x10171e25, 0x2c333a41, 0x484f565d, 0x646b7279
]
/**
* 16 进制串转字节数组
*/
function hexToArray(str) {
const arr = []
for (let i = 0, len = str.length; i < len; i += 2) {
arr.push(parseInt(str.substr(i, 2), 16))
}
return arr
}
/**
* 字节数组转 16 进制串
*/
function ArrayToHex(arr) {
return arr.map(item => {
item = item.toString(16)
return item.length === 1 ? '0' + item : item
}).join('')
}
/**
* utf8 串转字节数组
*/
function utf8ToArray(str) {
const arr = []
for (let i = 0, len = str.length; i < len; i++) {
const point = str.codePointAt(i)
if (point <= 0x007f) {
// 单字节,标量值:00000000 00000000 0zzzzzzz
arr.push(point)
} else if (point <= 0x07ff) {
// 双字节,标量值:00000000 00000yyy yyzzzzzz
arr.push(0xc0 | (point >>> 6)) // 110yyyyy(0xc0-0xdf)
arr.push(0x80 | (point & 0x3f)) // 10zzzzzz(0x80-0xbf)
} else if (point <= 0xD7FF || (point >= 0xE000 && point <= 0xFFFF)) {
// 三字节:标量值:00000000 xxxxyyyy yyzzzzzz
arr.push(0xe0 | (point >>> 12)) // 1110xxxx(0xe0-0xef)
arr.push(0x80 | ((point >>> 6) & 0x3f)) // 10yyyyyy(0x80-0xbf)
arr.push(0x80 | (point & 0x3f)) // 10zzzzzz(0x80-0xbf)
} else if (point >= 0x010000 && point <= 0x10FFFF) {
// 四字节:标量值:000wwwxx xxxxyyyy yyzzzzzz
i++
arr.push((0xf0 | (point >>> 18) & 0x1c)) // 11110www(0xf0-0xf7)
arr.push((0x80 | ((point >>> 12) & 0x3f))) // 10xxxxxx(0x80-0xbf)
arr.push((0x80 | ((point >>> 6) & 0x3f))) // 10yyyyyy(0x80-0xbf)
arr.push((0x80 | (point & 0x3f))) // 10zzzzzz(0x80-0xbf)
} else {
// 五、六字节,暂时不支持
arr.push(point)
throw new Error('input is not supported')
}
}
return arr
}
/**
* 字节数组转 utf8 串
*/
function arrayToUtf8(arr) {
const str = []
for (let i = 0, len = arr.length; i < len; i++) {
if (arr[i] >= 0xf0 && arr[i] <= 0xf7) {
// 四字节
str.push(String.fromCodePoint(((arr[i] & 0x07) << 18) + ((arr[i + 1] & 0x3f) << 12) + ((arr[i + 2] & 0x3f) << 6) + (arr[i + 3] & 0x3f)))
i += 3
} else if (arr[i] >= 0xe0 && arr[i] <= 0xef) {
// 三字节
str.push(String.fromCodePoint(((arr[i] & 0x0f) << 12) + ((arr[i + 1] & 0x3f) << 6) + (arr[i + 2] & 0x3f)))
i += 2
} else if (arr[i] >= 0xc0 && arr[i] <= 0xdf) {
// 双字节
str.push(String.fromCodePoint(((arr[i] & 0x1f) << 6) + (arr[i + 1] & 0x3f)))
i++
} else {
// 单字节
str.push(String.fromCodePoint(arr[i]))
}
}
return str.join('')
}
/**
* 32 比特循环左移
*/
function rotl(x, n) {
const s = n & 31
return (x << s) | (x >>> (32 - s))
}
/**
* 非线性变换
*/
function byteSub(a) {
return (Sbox[a >>> 24 & 0xFF] & 0xFF) << 24 |
(Sbox[a >>> 16 & 0xFF] & 0xFF) << 16 |
(Sbox[a >>> 8 & 0xFF] & 0xFF) << 8 |
(Sbox[a & 0xFF] & 0xFF)
}
/**
* 线性变换,加密/解密用
*/
function l1(b) {
return b ^ rotl(b, 2) ^ rotl(b, 10) ^ rotl(b, 18) ^ rotl(b, 24)
}
/**
* 线性变换,生成轮密钥用
*/
function l2(b) {
return b ^ rotl(b, 13) ^ rotl(b, 23)
}
/**
* 以一组 128 比特进行加密/解密操作
*/
function sms4Crypt(input, output, roundKey) {
const x = new Array(4)
// 字节数组转成字数组(此处 1 字 = 32 比特)
const tmp = new Array(4)
for (let i = 0; i < 4; i++) {
tmp[0] = input[4 * i] & 0xff
tmp[1] = input[4 * i + 1] & 0xff
tmp[2] = input[4 * i + 2] & 0xff
tmp[3] = input[4 * i + 3] & 0xff
x[i] = tmp[0] << 24 | tmp[1] << 16 | tmp[2] << 8 | tmp[3]
}
// x[i + 4] = x[i] ^ l1(byteSub(x[i + 1] ^ x[i + 2] ^ x[i + 3] ^ roundKey[i]))
for (let r = 0, mid; r < 32; r += 4) {
mid = x[1] ^ x[2] ^ x[3] ^ roundKey[r + 0]
x[0] ^= l1(byteSub(mid)) // x[4]
mid = x[2] ^ x[3] ^ x[0] ^ roundKey[r + 1]
x[1] ^= l1(byteSub(mid)) // x[5]
mid = x[3] ^ x[0] ^ x[1] ^ roundKey[r + 2]
x[2] ^= l1(byteSub(mid)) // x[6]
mid = x[0] ^ x[1] ^ x[2] ^ roundKey[r + 3]
x[3] ^= l1(byteSub(mid)) // x[7]
}
// 反序变换
for (let j = 0; j < 16; j += 4) {
output[j] = x[3 - j / 4] >>> 24 & 0xff
output[j + 1] = x[3 - j / 4] >>> 16 & 0xff
output[j + 2] = x[3 - j / 4] >>> 8 & 0xff
output[j + 3] = x[3 - j / 4] & 0xff
}
}
/**
* 密钥扩展算法
*/
function sms4KeyExt(key, roundKey, cryptFlag) {
const x = new Array(4)
// 字节数组转成字数组(此处 1 字 = 32 比特)
const tmp = new Array(4)
for (let i = 0; i < 4; i++) {
tmp[0] = key[0 + 4 * i] & 0xff
tmp[1] = key[1 + 4 * i] & 0xff
tmp[2] = key[2 + 4 * i] & 0xff
tmp[3] = key[3 + 4 * i] & 0xff
x[i] = tmp[0] << 24 | tmp[1] << 16 | tmp[2] << 8 | tmp[3]
}
// 与系统参数做异或
x[0] ^= 0xa3b1bac6
x[1] ^= 0x56aa3350
x[2] ^= 0x677d9197
x[3] ^= 0xb27022dc
// roundKey[i] = x[i + 4] = x[i] ^ l2(byteSub(x[i + 1] ^ x[i + 2] ^ x[i + 3] ^ CK[i]))
for (let r = 0, mid; r < 32; r += 4) {
mid = x[1] ^ x[2] ^ x[3] ^ CK[r + 0]
roundKey[r + 0] = x[0] ^= l2(byteSub(mid)) // x[4]
mid = x[2] ^ x[3] ^ x[0] ^ CK[r + 1]
roundKey[r + 1] = x[1] ^= l2(byteSub(mid)) // x[5]
mid = x[3] ^ x[0] ^ x[1] ^ CK[r + 2]
roundKey[r + 2] = x[2] ^= l2(byteSub(mid)) // x[6]
mid = x[0] ^ x[1] ^ x[2] ^ CK[r + 3]
roundKey[r + 3] = x[3] ^= l2(byteSub(mid)) // x[7]
}
// 解密时使用反序的轮密钥
if (cryptFlag === DECRYPT) {
for (let r = 0, mid; r < 16; r++) {
mid = roundKey[r]
roundKey[r] = roundKey[31 - r]
roundKey[31 - r] = mid
}
}
}
function sm4(inArray, key, cryptFlag, {
padding = 'pkcs#7', mode, iv = [], output = 'string'
} = {}) {
if (mode === 'cbc') {
// CBC 模式,默认走 ECB 模式
if (typeof iv === 'string') iv = hexToArray(iv)
if (iv.length !== (128 / 8)) {
// iv 不是 128 比特
throw new Error('iv is invalid')
}
}
// 检查 key
if (typeof key === 'string') key = hexToArray(key)
if (key.length !== (128 / 8)) {
// key 不是 128 比特
throw new Error('key is invalid')
}
// 检查输入
if (typeof inArray === 'string') {
if (cryptFlag !== DECRYPT) {
// 加密,输入为 utf8 串
inArray = utf8ToArray(inArray)
} else {
// 解密,输入为 16 进制串
inArray = hexToArray(inArray)
}
} else {
inArray = [...inArray]
}
// 新增填充,sm4 是 16 个字节一个分组,所以统一走到 pkcs#7
if ((padding === 'pkcs#5' || padding === 'pkcs#7') && cryptFlag !== DECRYPT) {
const paddingCount = BLOCK - inArray.length % BLOCK
for (let i = 0; i < paddingCount; i++) inArray.push(paddingCount)
}
// 生成轮密钥
const roundKey = new Array(ROUND)
sms4KeyExt(key, roundKey, cryptFlag)
const outArray = []
let lastVector = iv
let restLen = inArray.length
let point = 0
while (restLen >= BLOCK) {
const input = inArray.slice(point, point + 16)
const output = new Array(16)
if (mode === 'cbc') {
for (let i = 0; i < BLOCK; i++) {
if (cryptFlag !== DECRYPT) {
// 加密过程在组加密前进行异或
input[i] ^= lastVector[i]
}
}
}
sms4Crypt(input, output, roundKey)
for (let i = 0; i < BLOCK; i++) {
if (mode === 'cbc') {
if (cryptFlag === DECRYPT) {
// 解密过程在组解密后进行异或
output[i] ^= lastVector[i]
}
}
outArray[point + i] = output[i]
}
if (mode === 'cbc') {
if (cryptFlag !== DECRYPT) {
// 使用上一次输出作为加密向量
lastVector = output
} else {
// 使用上一次输入作为解密向量
lastVector = input
}
}
restLen -= BLOCK
point += BLOCK
}
// 去除填充,sm4 是 16 个字节一个分组,所以统一走到 pkcs#7
if ((padding === 'pkcs#5' || padding === 'pkcs#7') && cryptFlag === DECRYPT) {
const len = outArray.length
const paddingCount = outArray[len - 1]
for (let i = 1; i <= paddingCount; i++) {
if (outArray[len - i] !== paddingCount) throw new Error('padding is invalid')
}
outArray.splice(len - paddingCount, paddingCount)
}
// 调整输出
if (output !== 'array') {
if (cryptFlag !== DECRYPT) {
// 加密,输出转 16 进制串
return ArrayToHex(outArray)
} else {
// 解密,输出转 utf8 串
return arrayToUtf8(outArray)
}
} else {
return outArray
}
}
// 固定密钥
var _sm4pubkey = "BC60B8B9E4FFEFFA219E5AD77F11F9E2";
function sm4encrypt(e) {
// 加密
return sm4(e, _sm4pubkey, 1)
// 解密
// return sm4(JSON.stringify(e), _sm4pubkey, 0)
}
// export sm4
module.exports = {
encrypt: sm4encrypt
}
【args_s.js】 这个需要你们自己扣代码,我帮你们把功能导出给你们写好,你们只需要把代码扣下来放进去即可,别忘了修改
window = global;
/*
此处放你扣好的代码
*/
module.exports = {vdfAsync}
【main.py】
# -*- coding: utf-8 -*-
import uuid # 生成 deviceId 的指纹
import json5 # 格式化邮件数据普通的json无法格式化
import re # 正则匹配邮件和提取邮件内的验证码
import requests # 模拟发送请求
from urllib.parse import quote # 编码
from datetime import datetime # 处理邮件内的时间
from typing import Union, Literal # 类型提示
from functools import wraps # 装饰器
import json # 格式化普通数据
import time # 用于延迟执行、生成随机值
import hashlib # 用于解决邮箱的自加密键值(可用可不用)、解决 NTES_WEB_FP 指纹的生成
import random # 用于随机值生成
import base64 # 去敏专用
def base64_senior(string: str, algorithm: Literal['encode', 'decode'] = 'encode'):
if algorithm == 'encode':
input_bytes = string.encode('utf-8')
encoded_bytes = base64.b64encode(input_bytes)
return encoded_bytes.decode('utf-8')
elif algorithm == 'decode':
encoded_bytes = string.encode('utf-8')
decoded_bytes = base64.b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
else:
raise ValueError('缺少algorithm字段内容或输入错误')
class email_163_verification:
def __init__(self, *, email, password, node_host='', platform=None, **params):
print('——————————————【初始化】')
self.host_163 = base64_senior('aHR0cHM6Ly9tYWlsLjE2My5jb20v', 'decode')
self.host_163_2 = base64_senior('aHR0cHM6Ly9kbC5yZWcuMTYzLmNvbS8=', 'decode')
# Node接口地址环境,用于生成加密参数 (必不可少)
self.node_host = node_host or 'http://127.0.0.1:4000'
# token 是否可以更换,默认只更换一次,下一次直接抛出异常
self.replace_token = True
# 平台 目前只支持 lazada、shein、shopee、tiktok 获取验证码,其他平台暂未扩展
self.platform = platform and platform.lower()
# 邮箱
self.email = email.lower()
self.print_('邮箱', email)
# 邮箱密码
self.password = password
# 初始化基本信息,模拟加载浏览器的一整套环境,用此环境去登陆并获取邮箱验证码,此环境可存储并用于下一次
self.initial_basic_information()
def login(self):
'''登录并获取持续会话令牌'''
print('——————————————【执行163邮箱登录流程(用账户密码)】——————————————')
# 初始化基本信息,模拟加载浏览器的一整套环境,用此环境去登陆并获取邮箱验证码,此环境可存储并用于下一次
self.initial_basic_information()
time.sleep(0.1)
# [生成]一个持续会话的 utid
self.py_utid()
time.sleep(0.1)
# [获取]一个持续会话的 stats_session_id
self.fgw_mailsrvIpdetail_detail()
time.sleep(0.1)
# [获取] 用 deviceId 指纹授权并获取 sdid
self.fgw_mailsrvDeviceIdmapping_webapp_init()
time.sleep(0.1)
# [获取] capId(可要可不要) l_s_mail163CvViHzl(关键参数) 获取到参数后,需要等待8秒令牌生效后在继续执行
self.dl_zj_mail_ini()
time.sleep(0.1)
# [请求] 模拟混淆分析(可要可不要)
self.UA1435545636633_1()
time.sleep(0.1)
# [请求] 模拟混淆分析(可要可不要)
self.UA1435545636633_2()
time.sleep(0.1)
# [获取] pVInfo 登陆所需的关键参数值
self.dl_zj_mail_powGetP()
time.sleep(0.1)
# [获取] tk 登陆所需的关键参数值
self.dl_zj_mail_gt()
time.sleep(0.1)
# [获取][执行登陆] 获取四个关键参数token【NTES_PASSPORT | NTES_SESS | P_INFO | S_INFO】
# 这四个token属于会话级别临时用,但可以通过这个四个token获取一个30天持续会话令牌
self.dl_zj_mail_l()
time.sleep(0.1)
# types=2: 获取 【MAIL_PASSPORT | MAIL_PASSPORT_INFO】 30天持续会话令牌
# types=3: 获取 【Coremail | sid】 访问邮件的关键参数
self.entry_cgi_ntesdoor(types=3, method='POST')
time.sleep(0.1)
# 请用Token获取登录后的令牌
print('令牌在这里 -> ', self.cookie)
time.sleep(3)
def get_verification(self):
'''执行获取邮件验证码流程'''
print('——————————————【执行163邮箱获取验证码流程(用令牌)】——————————————')
self.print_('全局cookie', self.cookie)
assert self.platform, '平台未设置 - platform'
assert self.platform in self.platform_mapping, f'暂不支持此平台获取验证码 - {self.platform}'
if self.cookie and self.entry_cgi_ntesdoor(types=3, method='HEAD'):
# [获取] 筛选并获取满足条件的邮件 mid
self.js6_s(types=1, limit=30)
# [提取邮件验证码] 读取满足条件的 mid 邮件,并提取出验证码
self.js6_read_readhtmlJsp()
# 打印出验证码
print('验证码在这里 -> ', self.verification_code)
elif self.replace_token:
print('****************【令牌失效 更换令牌】****************')
# 执行登陆流程
self.login()
self.replace_token = False
# 执行获取邮件验证码流程
self.get_verification()
else:
raise ValueError('[163邮箱]令牌更换失败需查看 - 大概率需要手机验证码')
def del_email(self):
'''执行删除邮件流程'''
print('——————————————【执行163邮箱删除邮件流程(用令牌)】——————————————')
self.print_('全局cookie', self.cookie)
if self.cookie and self.entry_cgi_ntesdoor(types=3, method='HEAD'):
self.js6_s(types=2, limit=20000)
self.del_js6_s()
elif self.replace_token:
print('****************【令牌失效 更换令牌】****************')
self.login()
self.replace_token = False
self.del_email()
else:
raise ValueError('[163邮箱]令牌更换失败需查看')
@property
def core_cookie(self):
return {
'MAIL_PASSPORT_INFO': self.MAIL_PASSPORT_INFO,
'MAIL_PASSPORT': self.MAIL_PASSPORT
}
@property
def platform_mapping(self):
'''多平台映射表'''
return {
'tiktok': {
'subject': ['TikTok Shop商家验证码', 'is your verification code', 'TikTok Shop Verification Code'],
're': '(?=您正在进行邮箱验证,请在验证码输入框中输入|To verify your account, enter this code in TikTok Shop).*?<span class="code">(.*?)<' + '|' + 'color: rgb\(22,24,35\);font-weight: bold;">(.*?)<'
},
'shopee': {
'subject': ['Your Email OTP Verification Code'],
're': 'line-height: 40px;"> {1,40}<b>(.*?)</b>'
},
'shein': {
'subject': ['您正在登录[SHEIN]系统'],
're': '登录验证码:(.*?),'
},
'lazada': {
'subject': ['【重要】请完成验证'],
're': 'margin-left: -1\.25rem; line-height: 1;"></a> (.*?)<'
}
}
def retry(*, max_retries=3, delay=1, exceptions=(Exception,)):
"""
重试装饰器
:param max_retries: 最大重试次数
:param delay: 重试间隔时间(秒)
:param exceptions: 需要重试的异常类型
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while retries < max_retries:
try:
return func(*args, **kwargs)
except exceptions as e:
retries += 1
if retries >= max_retries:
raise ValueError(f"操作在重试{max_retries}次后仍然失败: {e}")
time.sleep(delay)
return wrapper
return decorator
@retry(max_retries=3, delay=1, exceptions=(Exception,))
def reques(self, *, method: Literal['GET', 'POST', 'HEAD'] = None, url='', data=None, json=None, cookies=None,
headers=None,
params=None, timeout=None):
assert method in ['GET', 'POST', 'HEAD'], 'method参数错误'
request = getattr(requests, method.lower())
response = request(**{
'url': url,
'data': data,
'json': json,
'cookies': cookies,
'headers': headers,
'params': params,
'timeout': timeout
})
return response
def initial_basic_information(self):
'''初始化基本请求信息'''
self.cookie = {}
self.NTES_WEB_FP = self.hash_encrypt(str(time.time()), 'md5') # web指纹 随机一下
self.code_map = {
'201': {
'msg': '请求成功',
'capFlag_0': {
'msg': '请求成功',
}
},
'401': {
'msg': '您短时间内尝试次数过多,请一个小时后再试!',
'dt_04': '操作超时,需要刷新页面(估计缺少cookie:l_s_mail163CvViHzl)',
'dt_06': '操作超时,需要刷新页面(估计是这个cookie过期了:l_s_mail163CvViHzl)'
},
'403': {
'msg': '当前登录有风险,需要安全认证后在登陆!',
},
'402': {
'msg': '指纹错误!',
},
'408': {
'msg': "该号码可能存在安全风险,请更换手机号"
},
'412': {
'msg': '您短时间内尝试次数过多,请一个小时后再试!',
},
'413': {
'msg': '密码错误/密码错误次数太多!!!',
'capFlag_6': {
'msg': '账户密码错误!!!',
},
'capFlag_0': {
'msg': '账户或密码错误!!!',
}
},
'414': {
'msg': '您的IP短时间内登录失败次数过多,请过段时间再试!',
},
'415': {
'msg': '您今天登录失败次数过多,请明天再试!',
},
'416': {
'msg': '您的IP登录过于频繁,我们限制一天内登录过于频繁的情况,请稍候再试!',
},
'417': {
'msg': '您的IP今天登录次数过多,我们限制一天内登录过于频繁的情况,请稍候再试!',
},
'418': {
'msg': '您今天登录次数过多,请明天再试!',
},
'419': {
'msg': '您的登录操作过于频繁,请稍候再试!',
},
'420': {
'msg': '账号或密码错误。若账号长期未登录,可能已被注销!',
},
'422': {
'msg': '此账号已被锁定,此账号已被锁定!暂时无法登录,请您解锁后再来登录!',
},
'423': {
'msg': '风控账号!',
},
'424': {
'msg': '服务已到期,该靓号服务已到期,请您续费!',
},
'434': {
'msg': '您验证错误次数过多,请稍后再试',
},
'435': {
'msg': '您验证错误次数过多,请改天再试',
},
'436': {
'msg': '您验证错误次数过多,请稍后再试',
},
'437': {
'msg': '您验证错误次数过多,请改天再试',
},
'447': {
'msg': '由于频繁登录,请过人机验证!',
},
'455': {
'msg': '账号无法使用!!!',
'capFlag_0': {
'msg': '该账号无法使用,请注册其他账号!!!',
},
},
'460': {
'msg': '账号或密码错误。若账号长期未登录,可能已被注销!',
},
'500': {
'msg': '系统繁忙!我们正在恢复中!请您稍候尝试!',
},
'503': {
'msg': '系统繁忙!我们正在恢复中!请您稍候尝试!',
},
'504': {
'msg': '系统繁忙!系统此时有点繁忙!请您重试!',
},
'803': {
'msg': '加载失败,请稍后再试'
},
'804': {
'msg': '加载失败,请稍后再试'
},
'805': {
'msg': '加载失败,请稍后再试'
},
'806': {
'msg': '加载失败,请稍后再试'
}
}
self.Referer = f"{self.host_163_2}webzj/v1.0.1/pub/index_dl2_new.html?cd=%2F%2Fmimg.127.net%2Fp%2Ffreemail%2Findex%2Funified%2Fstatic%2F2025%2F%2Fcss%2F&cf=urs.163.918051fb.css&MGID={int(time.time() * 1000)}.403&wdaId=&pkid=CvViHzl&product=mail163"
v1 = random.randint(135, 160)
v2 = random.randint(8, 24)
v3 = random.randint(537, 600)
self.user_agent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/{v3}.36 (KHTML, like Gecko) Chrome/{v1}.0.0.0 Safari/{v3}.36'
self.sec_ch_ua = f'"Chromium";v="{v1}", "Not:A-Brand";v="{v2}", "Brave";v="{v1}"'
self.deviceId = str(uuid.uuid4()).replace('-', '') + '_v1'
self.Verification = ''
def date_change_str_time(self, text: str, month_add: int = 0) -> str:
'''
处理js的时间格式
:param text: 要处理的文本内容
:param month_add: 月份添加 专门针对 js 的 new Date,js默认是少一个月份的
处理文本内容,将文本内的 new Date(2025,3,3,10,17,46) 替换为正常的时间 2025-04-02 10:41:50
'''
# 将字符串转换为时间对象
def parse_time(time_str: str, month_add: int = 0) -> str:
'''
:param time_str: 日期 例如:2025-2-19-11-9-52 或 2025-2-19
:param month_add: 0或1,代表的是 是否将月份+1
:return:
'''
# 针对 new Date(2025,3,3,10,17,46)
if time_str.count('-') == 5:
# 将字符串按 "-" 分割
year, month, day, hour, minute, second = map(int, time_str.split("-"))
# 转换为时间对象 month添加一个月,因为js代码里没人是少于一个月的,所以我们直接添加即可,无需担心边界问题
return str(datetime(year, month + month_add, day, hour, minute, second))
# 针对 new Date(2025,3,3)
elif time_str.count('-') == 2:
# 将字符串按 "-" 分割
year, month, day = map(int, time_str.split("-"))
# 转换为时间对象 month添加一个月,因为js代码里没人是少于一个月的,所以我们直接添加即可,无需担心边界问题
return str(datetime(year, month + month_add, day))
raise ValueError('[mail 163]时间格式错误')
# 将 new Date(2025,3,3,10,17,46) 转为 2025-04-02 10:41:50
text = re.sub(r'new Date\(\d+,\d+,\d+,\d+,\d+,\d+\)', lambda x: '"' + parse_time(
'-'.join(re.findall(r'\((.*?),(.*?),(.*?),(.*?),(.*?),(.*?)\)', x.group(0))[0]), 1) + '"',
text)
# 将 new Date(2025,3,3) 转为 2025-3-20
text = re.sub(r'new Date\(\d+,\d+,\d+\)',
lambda x: '"' + parse_time('-'.join(re.findall(r'\((.*?),(.*?),(.*?)\)', x.group(0))[0]),
1) + '"', text)
return text
@property
def verification_code(self):
'''获取验证码'''
return self.Verification
def code_analysis(self):
'''状态码解析'''
msg = f'状态码解析传参有误 - {self.code}'
if 'dt' in self.code and 'ret' in self.code:
msg = self.code_map.get(str(self.code['ret']), {}).get("dt_" + str(self.code['dt']),
f'该状态码解析失败 - {self.code}')
if isinstance(msg, dict): msg = msg['msg']
elif 'capFlag' in self.code and 'ret' in self.code:
msg = self.code_map.get(str(self.code['ret']), {}).get("capFlag_" + str(self.code['capFlag']),
f'该状态码解析失败 - {self.code}')
if isinstance(msg, dict): msg = msg['msg']
elif 'ret' in self.code:
msg = self.code_map.get(str(self.code['ret']), {}).get('msg') or f'该状态码解析失败 - {self.code}'
if msg == '请求成功' or 'ret' not in self.code:
return '请求成功'
raise ValueError('[code_analysis] ' + str(msg))
def hash_encrypt(self, string, algorithm):
"""
@Desc : 整合hash哈希加密算法
"""
# 支持的算法列表
supported_algorithms = ['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512',
'blake2b', 'blake2s',
'sha3_224', 'sha3_256', 'sha3_384', 'sha3_512',
'shake_128', 'shake_256']
# 检查算法是否支持
if algorithm not in supported_algorithms:
raise ValueError(f'[hash_encrypt] 不支持的算法')
try:
hash_obj = getattr(hashlib, algorithm)()
hash_obj.update(string.encode('utf-8'))
return hash_obj.hexdigest()
except Exception as e:
raise ValueError(f'[hash_encrypt] 加密发生错误: {e}')
def py_utid(self):
print('————————————【获取 utid】')
self.utid = ''.join(random.choices('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz', k=32))
# vfELgO2ONXmeRRhWyKpC92r0Ou6mtmPY
self.print_('[生成][py_utid]|[utid]', self.utid)
def fgw_mailsrvIpdetail_detail(self):
print('————————————【获取 stats_session_id】 ')
headers = {
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"content-type": "application/json",
"pragma": "no-cache",
"priority": "u=1, i",
"referer": self.host_163,
"sec-ch-ua": self.sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"sec-gpc": "1",
"user-agent": self.user_agent
}
url = f"{self.host_163}fgw/mailsrv-ipdetail/detail"
response = self.reques(method='GET', url=url, headers=headers)
self.code = response_data = response.json()
response_cookies = response.cookies.get_dict()
self.print_('[请求结果][fgw_mailsrvIpdetail_detail]|[response]', response_data)
self.print_('[请求结果][fgw_mailsrvIpdetail_detail]|[cookie]', response_cookies)
self.print_('[状态码解析][fgw_mailsrvIpdetail_detail]', self.code_analysis())
self.stats_session_id = response_cookies.get('stats_session_id')
self.print_('[状态码解析][fgw_mailsrvIpdetail_detail]|[stats_session_id]', self.stats_session_id)
def fgw_mailsrvDeviceIdmapping_webapp_init(self):
print('————————————【获取 deviceId | sdid】 授权【stats_session_id】')
headers = {
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"content-type": "application/json",
"origin": self.host_163,
"pragma": "no-cache",
"priority": "u=1, i",
"referer": self.host_163,
"sec-ch-ua": self.sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"sec-gpc": "1",
"user-agent": self.user_agent
}
cookies = {
"starttime": "",
"stats_session_id": self.stats_session_id
}
url = f"{self.host_163}fgw/mailsrv-device-idmapping/webapp/init"
data = {
"deviceId": self.deviceId,
"appVersion": "1.0.0"
}
response = self.reques(method='POST', url=url, headers=headers, cookies=cookies, json=data)
self.code = response_data = response.json()
response_cookies = response.cookies.get_dict()
self.print_('[请求结果][fgw_mailsrvDeviceIdmapping_webapp_init]|[response]', response_data)
self.print_('[请求结果][fgw_mailsrvDeviceIdmapping_webapp_init]|[cookie]', response_cookies)
self.print_('[状态码解析][fgw_mailsrvDeviceIdmapping_webapp_init]', self.code_analysis())
self.sdid = response_data['result']['sdid']
self.print_('[正文][fgw_mailsrvDeviceIdmapping_webapp_init]|[sdid]', self.sdid)
self.print_('[正文][fgw_mailsrvDeviceIdmapping_webapp_init]|[deviceId]', self.deviceId)
def dl_zj_mail_ini(self):
print('————————————【获取 capId | l_s_mail163CvViHzl】')
# 获取 encParams
encParams = self.reques(method='GET', url=f'{self.node_host}/dl/zj/mail/ini',
headers={'Content-Type': 'application/json'}).json()
self.print_('[加密][dl_zj_mail_ini]|[encParams]', encParams)
# 获取 capId | l_s_mail163CvViHzl
data = {"encParams": encParams['encParams']}
headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/json",
"Origin": self.host_163_2,
"Pragma": "no-cache",
"Referer": self.Referer,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Sec-GPC": "1",
"User-Agent": self.user_agent,
"sec-ch-ua": self.sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\""
}
cookie = {'utid': self.utid}
response = self.reques(method='POST', url=f"{self.host_163_2}dl/zj/mail/ini", headers=headers, json=data, cookies=cookie)
self.code = response_data = response.json()
response_cookies = response.cookies.get_dict()
self.print_('[请求结果][dl_zj_mail_ini]|[response]', response_data)
self.print_('[请求结果][dl_zj_mail_ini]|[cookie]', response_cookies)
self.print_('[状态码解析][dl_zj_mail_ini]', self.code_analysis())
self.capId = response_data['capId']
self.l_s_mail163CvViHzl = response_cookies['l_s_mail163CvViHzl']
# self.l_s_mail163CvViHzl = '2BDA1093FDDA9283AD02B57FFFEC7E0E5CF63EC6EC94CB63D2223FB19FB56A25986919B9282C72B73EE307E2DD9A972E36F2E4270ADE27BE7EBBBFFE93ED2B55830E1B54302E4F7DD8DED8332E931DB1E87117AF3A377DC796DD602DBFCC0AA6C774DE2C626D30D682DC00E2BAFC6CD1'
self.print_('[正文][dl_zj_mail_ini]|[capId]', self.capId)
self.print_('[正文][dl_zj_mail_ini]|[l_s_mail163CvViHzl]', self.l_s_mail163CvViHzl)
self.print_('[正文][dl_zj_mail_ini]|[l_s_mail163CvViHzl][提示信息]', '请等待8秒,等待令牌生效...')
time.sleep(8) # 等一会,等令牌生效
def UA1435545636633_1(self):
headers = {
"Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "image",
"Referer": self.Referer,
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "same-origin",
"Sec-GPC": "1",
"User-Agent": self.user_agent,
"sec-ch-ua": self.sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\""
}
cookies = {
"utid": self.utid
}
params = {
"useDefaultRegMail": "1",
"from": self.host_163,
"promark": "CvViHzl",
"product": "mail163"
}
response = self.reques(method='GET', url=f"{self.host_163_2}UA1435545636633/__utm.gif", headers=headers, params=params, cookies=cookies)
def UA1435545636633_2(self):
headers = {
"Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9",
"Connection": "keep-alive",
"Referer": self.Referer,
"Sec-Fetch-Dest": "image",
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "same-origin",
"Sec-GPC": "1",
"User-Agent": self.user_agent,
"sec-ch-ua": self.sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\""
}
cookies = {
"utid": self.utid,
"NTES_WEB_FP": self.NTES_WEB_FP
}
params = {
"from": "webzjwebworker",
"ursfp": self.NTES_WEB_FP,
"utid": self.utid,
"name": "webzj_power_pv",
"sp": "1",
"ua": self.user_agent
}
response = self.reques(method='GET', url=f"{self.host_163_2}UA1435545636633/__utm.gif", headers=headers, params=params, cookies=cookies)
def dl_zj_mail_powGetP(self, type = 1):
print('————————————【获取 pVInfo】')
if type == 1:
# 获取 encParams
encParams = self.reques(method='GET', url=f'{self.node_host}/dl/zj/mail/powGetP', headers={'Content-Type': 'application/json'}).json()
self.print_('[加密][dl_zj_mail_powGetP]|[encParams]', encParams)
else:
# 获取 encParams
encParams = self.reques(method='POST', url=f'{self.node_host}/dl/zj/mail/powGetP_r',
headers={'Content-Type': 'application/json'}, json={'email': self.email, 'pvSid': self.pVInfo['sid']}
).json()
self.print_('[加密][dl_zj_mail_powGetP]|[encParams]', encParams)
headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/json",
"Origin": self.host_163_2,
"Pragma": "no-cache",
"Referer": self.Referer,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Sec-GPC": "1",
"User-Agent": self.user_agent,
"sec-ch-ua": self.sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\""
}
cookies = {
'utid': self.utid,
"l_s_mail163CvViHzl": self.l_s_mail163CvViHzl,
"NTES_WEB_FP": self.NTES_WEB_FP,
"THE_LAST_LOGIN": self.email
}
data = {
"encParams": encParams['encParams']
}
response = self.reques(method='POST', url=f"{self.host_163_2}dl/zj/mail/powGetP", cookies=cookies,
headers=headers, json=data)
self.code = response_data = response.json()
response_cookies = response.cookies.get_dict()
self.print_('[请求结果][dl_zj_mail_powGetP]|[response]', response_data)
self.print_('[请求结果][dl_zj_mail_powGetP]|[cookie]', response_cookies)
self.print_('[状态码解析][dl_zj_mail_powGetP]', self.code_analysis())
self.pVInfo = response_data['pVInfo']
self.print_('[正文][dl_zj_mail_powGetP]|[pVInfo]', self.pVInfo)
def dl_zj_mail_gt(self):
print('————————————【获取 tk】')
# 获取 encParams
encParams = self.reques(method='POST', url=f'{self.node_host}/dl/zj/mail/gt',
headers={'Content-Type': 'application/json'}, json={"email": self.email}).json()
self.print_('[加密][dl_zj_mail_gt]|[encParams]', encParams)
# 获取 tk
data = {"encParams": encParams['encParams']}
cookies = {
'utid': self.utid,
"l_s_mail163CvViHzl": self.l_s_mail163CvViHzl,
"NTES_WEB_FP": self.NTES_WEB_FP
}
headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/json",
"Origin": self.host_163_2,
"Pragma": "no-cache",
"Referer": self.Referer,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Sec-GPC": "1",
"User-Agent": self.user_agent,
"sec-ch-ua": self.sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\""
}
response = self.reques(method='POST', url=f"{self.host_163_2}dl/zj/mail/gt", cookies=cookies,
headers=headers, json=data)
self.code = response_data = response.json()
response_cookies = response.cookies.get_dict()
self.print_('[请求结果][dl_zj_mail_gt]|[response]', response_data)
self.print_('[请求结果][dl_zj_mail_gt]|[cookie]', response_cookies)
self.print_('[状态码解析][dl_zj_mail_gt]', self.code_analysis())
self.tk = response_data['tk']
self.print_('[正文][dl_zj_mail_gt]|[tk]', self.tk)
def dl_zj_mail_l(self):
'''临时令牌权限'''
print('————————————【获取 NTES_PASSPORT | NTES_SESS | P_INFO | S_INFO 临时权限(会话级别)】')
# 获取 encParams
encParams = self.reques(method='POST', url=f'{self.node_host}/dl/zj/mail/l',
headers={'Content-Type': 'application/json'},
json={"email": self.email, "password": self.password, "tk": self.tk, 'pVInfo': self.pVInfo}).json()
self.print_('[加密][dl_zj_mail_l]|[encParams]', encParams)
# 获取 ---
data = {"encParams": encParams['encParams']}
cookies = {
'utid': self.utid,
"l_s_mail163CvViHzl": self.l_s_mail163CvViHzl,
"NTES_WEB_FP": self.NTES_WEB_FP
}
headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/json",
"Origin": self.host_163_2,
"Pragma": "no-cache",
"Referer": f"{self.host_163_2}webzj/v1.0.1/pub/index_dl2_new.html?cd=%2F%2Fmimg.127.net%2Fp%2Ffreemail%2Findex%2Funified%2Fstatic%2F2025%2F%2Fcss%2F&cf=urs.163.918051fb.css&MGID=1752714639893.7065&wdaId=&pkid=CvViHzl&product=mail163",
# "Referer": self.Referer,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Sec-GPC": "1",
"User-Agent": self.user_agent,
"sec-ch-ua": self.sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\""
}
response = self.reques(method='POST', url=f"{self.host_163_2}dl/zj/mail/l", cookies=cookies,
headers=headers, json=data)
self.code = response_data = response.json()
if self.code.get('ret') == '805':
self.dl_zj_mail_powGetP(type=2)
time.sleep(0.1)
self.dl_zj_mail_gt()
time.sleep(0.1)
self.dl_zj_mail_l()
response_cookies = response.cookies.get_dict()
self.cookie = response_cookies
self.print_('[请求结果][dl_zj_mail_l]|[response]', response_data)
self.print_('[请求结果][dl_zj_mail_l]|[cookie]', response_cookies)
self.print_('[状态码解析][dl_zj_mail_l]', self.code_analysis())
# self.NTES_PASSPORT = response_cookies['NTES_PASSPORT']
self.NTES_P_UTID = response_cookies['NTES_P_UTID']
self.NTES_SESS = response_cookies['NTES_SESS']
self.S_INFO = response_cookies['S_INFO']
self.P_INFO = response_cookies['P_INFO']
# self.print_('[正文][dl_zj_mail_l]|[NTES_PASSPORT]', self.NTES_PASSPORT)
self.print_('[正文][dl_zj_mail_l]|[NTES_P_UTID]', self.NTES_P_UTID)
self.print_('[正文][dl_zj_mail_l]|[NTES_SESS]', self.NTES_SESS)
self.print_('[正文][dl_zj_mail_l]|[S_INFO]', self.S_INFO)
self.print_('[正文][dl_zj_mail_l]|[P_INFO]', self.P_INFO)
assert self.NTES_P_UTID and self.NTES_SESS and self.S_INFO and self.P_INFO, 'NTES_P_UTID, NTES_SESS, S_INFO, P_INFO 未获取到会话令牌'
def entry_cgi_ntesdoor(self, types: int = 2, method: Union[Literal['GET', 'POST', 'HEAD']]='POST'):
"""
@Params :
- types:
-1: 获取 Coremail | sid,用于提取邮件
-2: 获取 MAIL_PASSPORT | MAIL_PASSPORT_INFO 令牌,用于授权
"""
if types == 1:
print(f'————————————【获取 Coremail | sid】{types}')
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"pragma": "no-cache",
"priority": "u=0, i",
"referer": self.host_163,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "same-origin",
"sec-gpc": "1",
"upgrade-insecure-requests": "1",
"User-Agent": self.user_agent,
"sec-ch-ua": self.sec_ch_ua
}
cookies = {
'MAIL_PASSPORT': self.MAIL_PASSPORT,
'MAIL_PASSPORT_INFO': self.MAIL_PASSPORT_INFO
}
url = f"{self.host_163}entry/cgi/ntesdoor"
params = {
"lightweight": "1",
"verifycookie": "1",
"from": "web",
"df": "mail163_letter",
"allssl": "true",
"deviceId": self.deviceId,
"sdid": self.sdid,
"style": "-1"
}
response = self.reques(method='HEAD', url=url, headers=headers, cookies=cookies, params=params)
response_cookies = response.cookies.get_dict()
self.print_('[请求结果][entry_cgi_ntesdoor]|[cookie]', response_cookies)
self.Coremail = response_cookies.get('Coremail', '')
self.sid = (re.findall('%(.*?)%', self.Coremail) + [''])[0]
self.print_('[正文][entry_cgi_ntesdoor]|[Coremail]', self.Coremail)
self.print_('[正文][entry_cgi_ntesdoor]|[sid]', self.sid)
return self.Coremail and True or False
elif types == 2:
print(f'————————————【获取 MAIL_PASSPORT | MAIL_PASSPORT_INFO 30天令牌(持续){types}】')
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"content-type": "application/x-www-form-urlencoded",
"origin": self.host_163,
"pragma": "no-cache",
"priority": "u=0, i",
"referer": self.host_163,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "iframe",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "same-origin",
"sec-fetch-user": "?1",
"sec-gpc": "1",
"upgrade-insecure-requests": "1",
"User-Agent": self.user_agent,
"sec-ch-ua": self.sec_ch_ua
}
cookies = {
'starttime': '',
"stats_session_id": self.stats_session_id,
"NTES_SESS": self.NTES_SESS,
"NTES_PASSPORT": self.NTES_PASSPORT,
"S_INFO": self.S_INFO,
"P_INFO": self.P_INFO,
"nts_mail_user": f"{self.email}:-1:1",
"df": "mail163_letter"
}
url = f"{self.host_163}entry/cgi/ntesdoor?"
data = {
"style": "-1",
"df": "mail163_letter",
"allssl": "true",
"net": "",
"deviceId": self.deviceId,
"sdid": self.sdid,
"language": "-1",
"from": "web",
"race": "",
"iframe": "1",
"url2": f"{self.host_163}errorpage/error163.htm",
"product": "mail163"
}
self.cookie = {**self.cookie, **cookies}
response = self.reques(method='POST', url=url, headers=headers, cookies=self.cookie, data=data)
response_cookies = response.cookies.get_dict()
self.print_('[请求结果][entry_cgi_ntesdoor]|[cookie]', response_cookies)
self.MAIL_PASSPORT = response_cookies.get('MAIL_PASSPORT', '')
self.MAIL_PASSPORT_INFO = response_cookies.get('MAIL_PASSPORT_INFO', '')
self.print_('[正文][dl_zj_mail_l]|[MAIL_PASSPORT]', self.MAIL_PASSPORT)
self.print_('[正文][dl_zj_mail_l]|[MAIL_PASSPORT_INFO]', self.MAIL_PASSPORT_INFO)
assert self.MAIL_PASSPORT and self.MAIL_PASSPORT_INFO, '[MAIL_PASSPORT | MAIL_PASSPORT_INFO] 未获取到令牌,无法继续'
elif types == 3:
print(f'————————————【启动临时登陆临时获取邮件功能 (仅会话|非持续){types} Coremail | sid】')
# 需要【NTES_P_UTID | NTES_SESS | S_INFO | P_INFO】
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "max-age=0",
"content-type": "application/x-www-form-urlencoded",
"origin": self.host_163,
"priority": "u=0, i",
"referer": self.host_163,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "iframe",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "same-origin",
"sec-gpc": "1",
"upgrade-insecure-requests": "1",
"user-agent": self.user_agent,
"sec-ch-ua": self.sec_ch_ua
}
cookies = {
"starttime": "",
"stats_session_id": self.stats_session_id,
"nts_mail_user": f"{self.email}:-1:1",
"df": "mail163_letter",
**self.cookie
}
url = f"{self.host_163}entry/cgi/ntesdoor"
params = {
"": ""
}
data = {
"style": "-1",
"df": "mail163_letter",
"allssl": "true",
"net": "",
"deviceId": self.deviceId,
"sdid": self.sdid,
"language": "-1",
"from": "web",
"race": "",
"iframe": "1",
"url2": f"{self.host_163}errorpage/error163.htm",
"product": "mail163"
}
response = self.reques(method=method, url=url, headers=headers, cookies=cookies, data=data, params=params)
response_cookies = response.cookies.get_dict()
self.print_('[请求结果][entry_cgi_ntesdoor]|[cookie]', response_cookies)
self.Coremail = response_cookies.get('Coremail', '')
self.sid = (re.findall('%(.*?)%', self.Coremail) + [''])[0]
self.print_('[正文][entry_cgi_ntesdoor]|[Coremail]', self.Coremail)
self.print_('[正文][entry_cgi_ntesdoor]|[sid]', self.sid)
return True
else:
raise ValueError('[entry_cgi_ntesdoor] types 参数错误')
def js6_s(self, types: int = 1, limit: int = 30):
cookies = {
'Coremail': self.Coremail
}
headers = {
'accept': 'text/javascript',
'accept-language': 'zh-CN,zh;q=0.9',
'content-type': 'application/x-www-form-urlencoded',
'origin': self.host_163,
'priority': 'u=1, i',
'sec-ch-ua': self.sec_ch_ua,
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'sec-gpc': '1',
'user-agent': self.user_agent,
}
# 【正常读取验证码】limit设置为30,从30个邮件内获取指定邮件且最新的邮件mid(设置30是为了防止短时间内有其他邮件信息把我们所需要的邮件顶下去)
# 【删除邮件】limit设置为10000-20000,一次性读取所有的邮件mid,无序,然后传递给其他进行删除操作
data = {
'var': f'<?xml version="1.0"?><object><int name="fid">1</int><string name="order">date</string><boolean name="desc">true</boolean><int name="limit">{limit}</int></object>',
}
if types == 1: # 获取邮件验证码
print('————————————【获取邮件 mid 用于获取邮件验证码】')
response = self.reques(method='POST',
url=f'{self.host_163}js6/s?sid={self.sid}&func=mbox:listMessages',
cookies=cookies, headers=headers, data=data)
self.print_('[请求结果][js6_s]|[状态码]', response.status_code)
data = json5.loads(self.date_change_str_time(response.text, 1))
print(data)
# mid占位
self.dict_id = {}
# 从 几封邮件中筛选出10分钟内最新的邮件并且是官方的邮件,并返回字典数据邮件mid
for data_dict in data['var']:
# 将字典内的时间字符串转换为 datetime 对象
target_time = datetime.strptime(data_dict['receivedDate'], "%Y-%m-%d %H:%M:%S")
# 计算邮件收到的时间和当前时间误差是多少秒(以秒为单位)
time_difference = int((datetime.now() - target_time).total_seconds())
# 1、判断 subject 标题是否一致
# 2、判断邮件是否是10分钟内的 600秒,可自定义修改
_ = any(keyword in data_dict['subject'] for keyword in self.platform_mapping[self.platform]['subject'])
self.print_(f'时间|邮件mid|平台是否匹配且是否有验证码邮件{limit}', data_dict['receivedDate'],
data_dict['id'], _)
if _ and time_difference <= 600:
# 存储字典为空的情况下,直接把当前合格的字典存储起来。如果有另一封合格邮件,则对比时间,取最新的邮件进行存储
if (not self.dict_id) or (
self.dict_id and target_time > datetime.strptime(self.dict_id['receivedDate'],
"%Y-%m-%d %H:%M:%S")):
self.dict_id = {
'id': data_dict['id'],
'receivedDate': data_dict['receivedDate']
}
# 最新的邮件mid编号: {'id': '168:1tbiqBsVxmfaXVIWmAAAsc', 'receivedDate': '2025-03-19 14:19:52'}
self.print_('[正文][js6_s]|[最新的邮件 mid 编号]', self.dict_id)
# 只返回一个 168:1tbiqBsVxmfaXVIWmAAAsc 邮件的编号,这个邮件就是有验证码的,如果没有编号,说明在10分钟内就没有接收到平台的验证码,则返回空字符串。
# assert self.dict_id,
if not self.dict_id:
raise ValueError(f'未检测到10分钟内此平台发送的验证码 - {self.platform}')
self.mid = self.dict_id['id']
self.print_('[正文][js6_s]|[mid]', self.mid)
elif types == 2: # 删除所有邮件
print('————————————【获取邮件 mid 用于删除所有邮件】')
response = self.reques(method='POST',
url=f'{self.host_163}js6/s?sid={self.sid}&func=mbox:listMessages',
cookies=self.cookie, headers=headers, data=data)
self.print_('[请求结果][js6_s]|[状态码]', response.status_code)
mid_list = re.findall("'id':'(.*?)'", response.text)
# 去重 并 转为 list[html代码]
data_list = [f'<string>{data}</string>' for data in list(set(mid_list)) if data]
data_str = ''.join(data_list) # 转str
self.print_(f'[正文][js6_s]|[数量|获取要删除邮件]', len(data_list), data_str[:200])
self.mid = data_str
self.print_('[正文][js6_s]|[mid]', data_str[:100])
else:
raise ValueError('[js6_s] types 参数错误')
def js6_read_readhtmlJsp(self):
print('————————————【读取 mid 邮件ID码】')
cookies = {
'Coremail': self.Coremail,
'Coremail.sid': self.sid
}
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'accept-language': 'zh-CN,zh;q=0.9',
'priority': 'u=0, i',
'sec-ch-ua': self.sec_ch_ua,
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'iframe',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'same-origin',
'sec-fetch-user': '?1',
'sec-gpc': '1',
'upgrade-insecure-requests': '1',
'user-agent': self.user_agent,
}
response = self.reques(method='GET',
url=f'{self.host_163}js6/read/readhtml.jsp?mid={self.mid}&userType=ud&font=15&color=3370FF',
cookies=cookies, headers=headers)
self.print_('[请求结果][js6_read_readhtmlJsp]|[状态码]', response.status_code)
req_data = str(response.text).replace('\n', '').replace('\n', '').replace('\t', '').replace('\r', '').replace('\f', '')
# 提取验证码 Tiktok
self.print_('re', self.platform_mapping[self.platform]['re'])
Verification = re.findall(self.platform_mapping[self.platform]['re'], req_data) + ['']
self.print_('邮箱验证码', Verification)
Verification = (lambda x: x[0] if isinstance(x[0], str) else next((x[0] or x[1] for x in x), None))(
Verification)
Verification = Verification.strip()
# 邮箱验证码: RW4Y8N
self.print_('邮箱验证码剥离', Verification)
assert Verification, f'验证码捕获失败,正则需修改 - {self.platform}'
self.Verification = Verification
# 官方接口 删除邮件功能,删除的主入口 【手动调用】
def del_js6_s(self):
'''删除所有邮件 至 已删除 [一次删除20000个邮件]'''
headers = {
"accept": "text/javascript",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"content-type": "application/x-www-form-urlencoded",
"pragma": "no-cache",
"priority": "u=1, i",
"sec-ch-ua": self.sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"sec-gpc": "1",
"user-agent": self.user_agent,
}
cookies = {
'Coremail': self.Coremail
}
# 删除【正常的邮件】可一次性删除20000个,目前未发现需要身份验证
data = {
'var': f'<?xml version="1.0"?><object><array name="ids">{self.mid}</array><object name="attrs"><int name="fid">4</int></object></object>',
}
response = self.reques(method='POST',
url=f"{self.host_163}js6/s?sid={self.sid}&func=mbox:updateMessageInfos",
cookies=cookies, headers=headers, json=data)
if response.status_code == 200:
self.print_('[成功删除][del_js6_s]|[状态码]', response.status_code)
else:
self.print_('[删除失败][del_js6_s]|[状态码|正文]', response.status_code, response.text)
# 执行【获取所有已删除的邮件mid】
self.get_all_del_js6_s()
# 官方接口 获取所有已删除的邮件mid
def get_all_del_js6_s(self):
'''获取所有已删除的邮件mid'''
headers = {
"accept": "text/javascript",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"content-type": "application/x-www-form-urlencoded",
"pragma": "no-cache",
"priority": "u=1, i",
"sec-ch-ua": self.sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"sec-gpc": "1",
"user-agent": self.user_agent,
}
cookies = {
'Coremail': self.Coremail
}
# 一次性获取28000个,方便删除 这个固定不要修改,都是经过严格测试的
data = {
'var': f'<?xml version="1.0"?><object><int name="fid">4</int><string name="order">date</string><boolean name="desc">true</boolean><int name="limit">28000</int><int name="start">0</int><boolean name="skipLockedFolders">false</boolean><boolean name="returnTag">true</boolean><boolean name="returnTotal">true</boolean><string name="mrcid">{self.deviceId}</string></object>',
}
response = self.reques(method='POST',
url=f"{self.host_163}js6/s?sid={self.sid}&func=mbox:listMessages",
cookies=cookies, headers=headers, json=data)
self.print_('[请求结果][get_all_del_js6_s]|[状态码]', response.status_code)
mid_list = re.findall("'id':'(.*?)'", response.text)
# data_list = [f'<string>{data}</string>' for data in list(set(mid_list)) if data]
# data_str = ''.join(data_list)
# self.mid = data_str
# print(f'[获取所有已删除的邮件mid] -> 数量:{len(data_list)} 数据:{data_str[:200]}')
# 去重
mid_list = list(set(mid_list))
self.print_('[获取所有已删除的邮件mid][get_all_del_js6_s]|[数量|数据]', len(mid_list), mid_list[:200])
# 将列表数据分批,每次最多498条,避免一次性删除太多支撑不住
mid_list = [mid_list[i:i + 498] for i in range(0, len(mid_list), 498)]
self.print_('[分批删除][get_all_del_js6_s]', mid_list)
# 转html并存储到列表内
data_list = ['<string>' + ('</string><string>'.join(i) + '</string>') for i in mid_list]
self.mid_list = data_list
# 执行【删除所有已删除的邮件,彻底删除】
self.del_all_deleted_emails_js6_s()
# 官方接口 获取临时彻底删除邮件的权限token 此token为会话级,只能作为临时使用,有效期预估为1天左右,不适合存储
def get_temporary_del_token(self) -> Union[dict, str]:
'''
获取临时彻底删除邮件的权限token
注意:如果此token获取不到,则说明邮箱有异常,需要验证身份才能进行删除邮件,但不影响查看邮件和读取邮件内的验证码
'''
headers = {
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"content-type": "application/x-www-form-urlencoded",
"origin": self.host_163,
"pragma": "no-cache",
"priority": "u=1, i",
"referer": f"{self.host_163}js6/main.jsp?sid",
"sec-ch-ua": self.sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"sec-gpc": "1",
"user-agent": self.user_agent
}
cookies = {
"Coremail": self.Coremail
}
data = {
"actionId": "eec599711e6041238d", # 固定的id b1c13ae21986484e86 这个是修改个人信息的id
"environment": json.dumps({"mrcid": self.deviceId, "mrecp": {}, "mrvar": quote(
f'<?xml version="1.0"?><object><array name="ids"></array><string name="mrcid">{self.deviceId}</string></object>'),
"hl": "zh_CN"})
}
__hid = self.hash_encrypt(self.email, 'md5')[:4].upper()
self.print_(f'[获取临时删除邮件权限令牌][自加密键值]', __hid)
response = self.reques(method='POST',
url=f"{self.host_163}fgw/mailserv-risk-control/risk/action/token?1750225081591=",
cookies=cookies, headers=headers, data=data)
response_cookies = response.cookies.get_dict()
self.print_('[请求结果][get_temporary_del_token]|[cookie]', response_cookies)
MAIL_RISK_CTRL = response_cookies.get(f'MAIL_RISK_CTRL_{__hid}', '')
self.print_(f'[正文][get_temporary_del_token]|[MAIL_RISK_CTRL_{__hid}]', MAIL_RISK_CTRL)
assert MAIL_RISK_CTRL, '[get_temporary_del_token] 获取临时删除邮件权限令牌失败'
return {f'MAIL_RISK_CTRL_{__hid}': MAIL_RISK_CTRL}
# 官方接口 删除所有已删除的邮件,彻底删除
def del_all_deleted_emails_js6_s(self):
'''删除所有已删除的邮件,彻底删除 此步骤执行预估消耗时间:2秒可用删除498封,28000封预估需要1分20秒左右'''
headers = {
"accept": "text/javascript",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"content-type": "application/x-www-form-urlencoded",
"origin": self.host_163,
"pragma": "no-cache",
"priority": "u=1, i",
"referer": f"{self.host_163}js6/main.jsp?sid",
"sec-ch-ua": self.sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"sec-gpc": "1",
"user-agent": self.user_agent
}
cookies = {
'Coremail': self.Coremail,
**self.get_temporary_del_token()
# 获取删除【彻底删除权限】的token,这个token如果获取不了,说明邮箱是异常的,需要手机验证,还有一种情况是邮箱检测可能有盗号的风险,也是无法删除的
}
if len(cookies) < 2:
return ''
# 删除28000封邮件预估1分20秒内
# 循环遍历删除,删除【已删除的邮件】每次最多只能删除498个,所以只能遍历删除
for mid_str in self.mid_list:
data = {
'var': f'<?xml version="1.0"?><object><array name="ids">{mid_str}</array><string name="mrcid">{self.deviceId}</string></object>',
}
response = self.reques(method='POST',
url=f"{self.host_163}js6/s?sid={self.sid}&func=mbox:deleteMessages",
cookies=cookies, headers=headers, data=data)
self.print_('[请求结果][del_all_deleted_emails_js6_s]|[状态码]', response.status_code)
print(f'[彻底删除{mid_str.count("<string>")}] ->', json5.loads(response.text))
time.sleep(round(random.uniform(0, 1), 2) or 1)
def print_(self, *args: any) -> None:
"""开发者测试专用 可代替print打印"""
formatted_time = datetime.fromtimestamp(time.time()).strftime("%H:%M:%S.%f")[:-3] # %Y/%m/%d %H:%M:%S.%f
if len(args) > 1:
label, *values = args
type_str = "".join(f"[{type(v).__name__}]" for v in values)
value_str = " ".join(f"[{v}]" for v in values)
print(f'[{formatted_time}][{label}] | {type_str} -> {value_str}')
return
elif len(args) == 1:
# 打印单数值
print(f'[{formatted_time}][{type(args[0]).__name__}] -> [{args[0]}]')
email = email_163_verification(email='*****输入你自己的', password='*****输入你自己的', node_host='http://127.0.0.1:4000', platform='tiktok')
email.get_verification() # 执行获取验证码
print(email.verification_code) # 获取验证码
本人亲测可用,代码随便拿去测试、学习。
登录后的token是可以存储下来的,下次无需再次登录,大家可以深度研究一下我的代码。支持30天免登录的token获取,可扩展获取某平台的验证码或链接,支持使用token直接获取验证码,无需重新登录,这些都需要你们自己去深度研究一下,代码里已经有了,但没有详细说明,自己可以看一下。别忘了给博主来个三连击



魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)