AI股票分析师daily_stock_analysis与Python爬虫实战:自动化金融数据采集与分析
本文介绍了如何利用星图GPU平台自动化部署 📈 AI 股票分析师daily_stock_analysis 镜像,构建自动化金融数据分析系统。该系统通过Python爬虫采集多源股票行情与新闻数据,并交由该AI镜像进行综合分析,最终自动生成包含技术面、舆情面解读的每日投资报告,实现从数据采集到智能分析的完整自动化流程。
AI股票分析师daily_stock_analysis与Python爬虫实战:自动化金融数据采集与分析
每天一睁眼,你是不是也和我一样,先打开几个股票软件,看看自选股是红是绿,再刷刷财经新闻,试图从海量信息里找到一点有用的线索?这个过程不仅耗时耗力,而且信息过载,最后往往是凭感觉下判断,而不是靠数据做决策。
我试过很多方法,从手动记录Excel表格到用各种技术指标公式,效果都不太理想。直到我发现了daily_stock_analysis这个项目,再结合自己熟悉的Python爬虫技术,才真正把金融数据分析这件事给自动化了。
今天我就来分享一下,怎么把AI股票分析师和Python爬虫结合起来,搭建一套属于自己的自动化金融数据采集与分析系统。这套方案的核心思路很简单:让爬虫去干苦力活,自动抓取行情和新闻数据,然后让AI去干脑力活,分析数据、生成报告,最后自动推送到你手机上。
1. 为什么需要自动化金融分析?
在聊具体技术之前,我们先看看传统分析方式有哪些痛点。
以前我做股票分析,基本上就是三板斧:看K线图、查技术指标、刷财经新闻。每天至少要花一两个小时,效率低不说,还容易受情绪影响。看到股票涨了就兴奋,跌了就焦虑,这种状态很难做出理性的投资决策。
更麻烦的是信息碎片化。行情数据在一个地方,新闻在另一个地方,研报又在别的地方。我需要手动把这些信息整合起来,这个过程本身就容易出错,而且非常耗时。
daily_stock_analysis这个项目给了我启发。它本质上是一个AI驱动的分析引擎,能自动获取多源数据,然后用大模型进行分析,最后生成结构化的决策建议。但我在使用中发现,它默认的数据源有时候不够全面,特别是对于一些细分领域或者特定市场的新闻,覆盖得不够好。
这时候Python爬虫就派上用场了。我们可以用爬虫定制化地抓取更多数据源,比如特定财经网站的分析文章、行业研报、社交媒体舆情等,然后把这些数据喂给AI分析师,让它做出更全面的判断。
2. 系统架构设计:爬虫+AI如何协同工作?
整个系统的架构其实不复杂,我画了个简单的示意图帮你理解:
数据采集层(Python爬虫) → 数据处理层 → AI分析层(daily_stock_analysis) → 结果推送层
数据采集层是爬虫的天下。这里我们主要抓取两类数据:一是行情数据,包括股价、成交量、技术指标等;二是舆情数据,包括新闻、研报、社交媒体讨论等。
数据处理层负责清洗和格式化爬虫抓来的数据。因为不同网站的数据格式不一样,有的可能是HTML,有的可能是JSON,还有的可能是PDF。我们需要把这些数据统一成AI分析师能理解的格式。
AI分析层就是daily_stock_analysis的核心了。它会把处理好的数据喂给大模型(比如Gemini、DeepSeek等),让模型从多个维度进行分析,包括技术面、基本面、舆情面等。
结果推送层负责把分析结果推送到你的手机或者邮箱。支持企业微信、飞书、Telegram、邮件等多种渠道,你可以根据自己的习惯选择。
下面我重点讲讲爬虫部分怎么实现,这是整个系统的数据入口。
3. Python爬虫实战:多源金融数据采集
爬虫听起来有点技术含量,但其实用Python写起来并不难。我建议从简单的开始,先抓取一两个数据源,等跑通了再慢慢扩展。
3.1 行情数据爬取:以东方财富为例
行情数据相对规整,很多财经网站都提供结构化的数据接口。我们以东方财富网为例,看看怎么抓取股票的基本行情。
import requests
import pandas as pd
from datetime import datetime
import time
def fetch_stock_quote(stock_code):
"""
获取股票实时行情数据
stock_code: 股票代码,如'600519'(茅台)、'000001'(平安银行)
"""
# 东方财富实时行情接口
url = f"http://push2.eastmoney.com/api/qt/stock/get"
params = {
'fields': 'f43,f57,f58,f169,f170,f46,f44,f51,f168,f47,f164,f163,f116,f60,f45,f52,f50,f48,f167,f117,f71,f161,f49,f530,f135,f136,f137,f138,f139,f141,f142,f144,f145,f147,f148,f140,f143,f146,f149,f55,f62,f162,f92,f173,f104,f105,f84,f85,f183,f184,f185,f186,f187,f188,f189,f190,f191,f192,f107,f111,f86,f177,f78,f110,f262,f263,f264,f267,f268,f250,f251,f252,f253,f254,f255,f256,f257,f258,f266,f269,f270,f271,f273,f274,f275,f127,f199,f128,f193,f196,f194,f195,f197,f80,f280,f281,f282,f284,f285,f286,f287,f292',
'secid': f"1.{stock_code}" if stock_code.startswith('6') else f"0.{stock_code}",
'ut': 'fa5fd1943c7b386f172d6893dbfba10b',
'invt': '2',
'fltt': '2',
'_': int(time.time() * 1000)
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': 'http://quote.eastmoney.com/'
}
try:
response = requests.get(url, params=params, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
if data['data']:
quote = data['data']
result = {
'股票代码': stock_code,
'股票名称': quote.get('f57', ''),
'当前价格': quote.get('f43', 0) / 100, # 转换为元
'涨跌幅': quote.get('f170', 0) / 100, # 转换为百分比
'涨跌额': quote.get('f169', 0) / 100,
'成交量': quote.get('f47', 0),
'成交额': quote.get('f48', 0),
'最高价': quote.get('f44', 0) / 100,
'最低价': quote.get('f45', 0) / 100,
'开盘价': quote.get('f46', 0) / 100,
'昨收价': quote.get('f60', 0) / 100,
'更新时间': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
return result
else:
print(f"未获取到股票 {stock_code} 的数据")
return None
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None
except Exception as e:
print(f"解析失败: {e}")
return None
# 使用示例
if __name__ == "__main__":
# 测试抓取茅台股票数据
maotai_data = fetch_stock_quote('600519')
if maotai_data:
print("茅台股票实时行情:")
for key, value in maotai_data.items():
print(f"{key}: {value}")
这个爬虫函数能抓取股票的实时行情数据,包括价格、涨跌幅、成交量等关键信息。代码里我加了详细的注释,你应该能看懂每一行是干什么的。
实际使用时,你可以把这个函数封装成一个定时任务,比如每5分钟运行一次,把数据保存到数据库或者CSV文件里,供后续分析使用。
3.2 新闻舆情爬取:抓取财经新闻
行情数据只是基础,新闻舆情对股价的影响也很大。我们可以爬取一些主流财经网站的新闻,看看市场在关注什么。
import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import urljoin
def fetch_finance_news(keywords=None, max_pages=3):
"""
爬取财经新闻
keywords: 关键词列表,如['茅台', '白酒']
max_pages: 爬取的最大页数
"""
base_url = "https://finance.sina.com.cn"
news_list = []
for page in range(1, max_pages + 1):
try:
# 新浪财经滚动新闻
url = f"{base_url}/roll/index.d.html?cid=56589&page={page}"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
response.encoding = 'utf-8'
soup = BeautifulSoup(response.text, 'html.parser')
news_items = soup.find_all('li', class_='list_009')
for item in news_items:
try:
link_tag = item.find('a')
if not link_tag:
continue
title = link_tag.get_text(strip=True)
link = link_tag.get('href')
if not link.startswith('http'):
link = urljoin(base_url, link)
time_tag = item.find('span')
publish_time = time_tag.get_text(strip=True) if time_tag else ''
# 如果有关键词过滤,检查标题是否包含关键词
if keywords:
if not any(keyword in title for keyword in keywords):
continue
# 获取新闻详情(可选)
content = fetch_news_content(link) if link else ''
news_item = {
'标题': title,
'链接': link,
'发布时间': publish_time,
'内容摘要': content[:200] + '...' if content else '', # 只取前200字
'抓取时间': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
news_list.append(news_item)
except Exception as e:
print(f"解析单条新闻失败: {e}")
continue
print(f"第{page}页爬取完成,共{len(news_items)}条新闻")
time.sleep(2) # 礼貌性延迟,避免被封
except requests.exceptions.RequestException as e:
print(f"第{page}页请求失败: {e}")
break
return news_list
def fetch_news_content(url):
"""获取新闻详情内容"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
response.encoding = 'utf-8'
soup = BeautifulSoup(response.text, 'html.parser')
# 新浪财经新闻内容通常在article标签或特定class中
content_div = soup.find('div', class_='article') or soup.find('article')
if content_div:
# 移除脚本、样式等无关内容
for script in content_div.find_all(['script', 'style', 'iframe']):
script.decompose()
text = content_div.get_text(strip=True)
# 清理多余空白字符
text = re.sub(r'\s+', ' ', text)
return text
return ""
except Exception as e:
print(f"获取新闻内容失败 {url}: {e}")
return ""
# 使用示例
if __name__ == "__main__":
# 抓取包含"茅台"或"白酒"的新闻
news = fetch_finance_news(keywords=['茅台', '白酒'], max_pages=2)
print(f"共抓取到{len(news)}条相关新闻")
for i, item in enumerate(news[:3], 1): # 只显示前3条
print(f"\n{i}. {item['标题']}")
print(f" 时间: {item['发布时间']}")
print(f" 摘要: {item['内容摘要']}")
这个爬虫能抓取新浪财经的滚动新闻,并且支持关键词过滤。比如你只关心茅台或者白酒行业的新闻,就可以设置相应的关键词。
代码里我用了BeautifulSoup来解析HTML,这是Python里最常用的网页解析库之一。如果你之前没接触过,可能需要花点时间熟悉一下,但基本用法不难掌握。
3.3 数据存储与管理
爬虫抓来的数据需要妥善保存,方便后续分析。我通常用SQLite数据库,因为它轻量、无需安装,单个文件就能搞定。
import sqlite3
import json
from datetime import datetime
class FinanceDataDB:
"""金融数据数据库管理类"""
def __init__(self, db_path='finance_data.db'):
self.db_path = db_path
self.init_database()
def init_database(self):
"""初始化数据库表"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建股票行情表
cursor.execute('''
CREATE TABLE IF NOT EXISTS stock_quotes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
stock_code TEXT NOT NULL,
stock_name TEXT,
current_price REAL,
change_percent REAL,
change_amount REAL,
volume INTEGER,
amount REAL,
high_price REAL,
low_price REAL,
open_price REAL,
prev_close REAL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(stock_code, timestamp)
)
''')
# 创建新闻舆情表
cursor.execute('''
CREATE TABLE IF NOT EXISTS finance_news (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
url TEXT UNIQUE,
publish_time TEXT,
content_summary TEXT,
keywords TEXT, # JSON格式存储关键词
source TEXT,
crawl_time DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建索引以提高查询速度
cursor.execute('CREATE INDEX IF NOT EXISTS idx_stock_code ON stock_quotes(stock_code)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_quote_time ON stock_quotes(timestamp)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_news_time ON finance_news(publish_time)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_news_keywords ON finance_news(keywords)')
conn.commit()
conn.close()
def save_stock_quote(self, quote_data):
"""保存股票行情数据"""
if not quote_data:
return False
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT OR IGNORE INTO stock_quotes
(stock_code, stock_name, current_price, change_percent, change_amount,
volume, amount, high_price, low_price, open_price, prev_close, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
quote_data['股票代码'],
quote_data['股票名称'],
quote_data['当前价格'],
quote_data['涨跌幅'],
quote_data['涨跌额'],
quote_data['成交量'],
quote_data['成交额'],
quote_data['最高价'],
quote_data['最低价'],
quote_data['开盘价'],
quote_data['昨收价'],
quote_data['更新时间']
))
conn.commit()
return cursor.rowcount > 0
except Exception as e:
print(f"保存股票数据失败: {e}")
return False
finally:
conn.close()
def save_news(self, news_item):
"""保存新闻数据"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# 将关键词列表转为JSON字符串
keywords_json = json.dumps(news_item.get('keywords', [])) if news_item.get('keywords') else None
cursor.execute('''
INSERT OR IGNORE INTO finance_news
(title, url, publish_time, content_summary, keywords, source)
VALUES (?, ?, ?, ?, ?, ?)
''', (
news_item['标题'],
news_item['链接'],
news_item['发布时间'],
news_item['内容摘要'],
keywords_json,
news_item.get('source', 'sina_finance')
))
conn.commit()
return cursor.rowcount > 0
except Exception as e:
print(f"保存新闻数据失败: {e}")
return False
finally:
conn.close()
def get_recent_quotes(self, stock_code, hours=24):
"""获取最近N小时的股票行情数据"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM stock_quotes
WHERE stock_code = ? AND timestamp >= datetime('now', ?)
ORDER BY timestamp DESC
''', (stock_code, f'-{hours} hours'))
rows = cursor.fetchall()
conn.close()
# 转换为字典列表
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in rows]
def get_recent_news(self, keywords=None, limit=50):
"""获取最近的新闻,支持关键词过滤"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if keywords:
# 构建关键词查询条件
conditions = []
params = []
for keyword in keywords:
conditions.append("(title LIKE ? OR content_summary LIKE ?)")
params.extend([f'%{keyword}%', f'%{keyword}%'])
where_clause = "WHERE " + " OR ".join(conditions)
query = f'''
SELECT * FROM finance_news
{where_clause}
ORDER BY publish_time DESC
LIMIT ?
'''
params.append(limit)
cursor.execute(query, params)
else:
cursor.execute('''
SELECT * FROM finance_news
ORDER BY publish_time DESC
LIMIT ?
''', (limit,))
rows = cursor.fetchall()
conn.close()
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in rows]
# 使用示例
if __name__ == "__main__":
db = FinanceDataDB()
# 模拟保存数据
test_quote = {
'股票代码': '600519',
'股票名称': '贵州茅台',
'当前价格': 1800.50,
'涨跌幅': 1.5,
'涨跌额': 26.5,
'成交量': 5000000,
'成交额': 9000000000,
'最高价': 1810.00,
'最低价': 1795.00,
'开盘价': 1798.00,
'昨收价': 1774.00,
'更新时间': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
if db.save_stock_quote(test_quote):
print("股票数据保存成功")
# 查询最近24小时的茅台数据
recent_data = db.get_recent_quotes('600519', 24)
print(f"最近24小时有{len(recent_data)}条茅台行情记录")
这个数据库类封装了数据的存储和查询功能。用起来很简单,创建个实例,调用save方法保存数据,调用get方法查询数据。数据都保存在本地的SQLite文件里,不需要安装数据库服务器。
4. 与daily_stock_analysis集成:让AI分析爬虫数据
爬虫把数据抓来了,数据库也存好了,接下来就是重头戏:怎么把这些数据喂给AI分析师,让它帮我们分析。
daily_stock_analysis本身支持多种数据源,但主要是通过API接口获取标准化的数据。我们的爬虫数据需要稍微处理一下,转换成它认识的格式。
4.1 数据格式转换
AI分析师期望的数据格式比较规范,我们需要把爬虫抓来的原始数据清洗一下。
import pandas as pd
from datetime import datetime, timedelta
def prepare_data_for_ai(stock_code, db_path='finance_data.db'):
"""
为AI分析准备数据
返回技术指标、新闻舆情等综合数据
"""
db = FinanceDataDB(db_path)
# 1. 获取股票行情数据(最近30天)
quotes = db.get_recent_quotes(stock_code, hours=24*30)
if not quotes:
print(f"未找到股票 {stock_code} 的历史数据")
return None
# 转换为DataFrame方便计算
df_quotes = pd.DataFrame(quotes)
df_quotes['timestamp'] = pd.to_datetime(df_quotes['timestamp'])
df_quotes = df_quotes.sort_values('timestamp')
# 计算技术指标(简单示例)
df_quotes['MA5'] = df_quotes['current_price'].rolling(window=5).mean()
df_quotes['MA10'] = df_quotes['current_price'].rolling(window=10).mean()
df_quotes['MA20'] = df_quotes['current_price'].rolling(window=20).mean()
# 2. 获取相关新闻
# 这里可以根据股票名称或代码获取相关新闻
stock_name = df_quotes.iloc[-1]['stock_name'] if 'stock_name' in df_quotes.columns else stock_code
news_items = db.get_recent_news(keywords=[stock_name], limit=20)
# 3. 构建AI分析所需的数据结构
latest_quote = quotes[-1] if quotes else {}
ai_data = {
'stock_info': {
'code': stock_code,
'name': latest_quote.get('stock_name', ''),
'current_price': latest_quote.get('current_price', 0),
'change_percent': latest_quote.get('change_percent', 0),
'volume': latest_quote.get('volume', 0),
'amount': latest_quote.get('amount', 0)
},
'technical_indicators': {
'MA5': float(df_quotes['MA5'].iloc[-1]) if not pd.isna(df_quotes['MA5'].iloc[-1]) else 0,
'MA10': float(df_quotes['MA10'].iloc[-1]) if not pd.isna(df_quotes['MA10'].iloc[-1]) else 0,
'MA20': float(df_quotes['MA20'].iloc[-1]) if not pd.isna(df_quotes['MA20'].iloc[-1]) else 0,
'trend': 'up' if df_quotes['MA5'].iloc[-1] > df_quotes['MA10'].iloc[-1] > df_quotes['MA20'].iloc[-1] else 'down',
'latest_high': float(df_quotes['high_price'].max()) if 'high_price' in df_quotes.columns else 0,
'latest_low': float(df_quotes['low_price'].min()) if 'low_price' in df_quotes.columns else 0
},
'news_sentiment': {
'total_news': len(news_items),
'positive_count': sum(1 for news in news_items if any(word in news.get('title', '').lower() for word in ['涨', '利好', '增长', '突破'])),
'negative_count': sum(1 for news in news_items if any(word in news.get('title', '').lower() for word in ['跌', '利空', '下滑', '风险'])),
'recent_news': [
{
'title': news.get('title', ''),
'time': news.get('publish_time', ''),
'summary': news.get('content_summary', '')[:100]
}
for news in news_items[:5] # 只取最近5条
]
},
'market_context': {
'analysis_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'data_sources': ['crawler_sina', 'crawler_eastmoney'],
'data_period': '30_days'
}
}
return ai_data
def generate_ai_prompt(ai_data):
"""
根据数据生成AI分析提示词
"""
stock_info = ai_data['stock_info']
technical = ai_data['technical_indicators']
news = ai_data['news_sentiment']
prompt = f"""
请作为专业的股票分析师,对以下股票进行综合分析:
股票信息:
- 代码:{stock_info['code']}
- 名称:{stock_info['name']}
- 当前价格:{stock_info['current_price']}元
- 涨跌幅:{stock_info['change_percent']}%
- 成交量:{stock_info['volume']}手
- 成交额:{stock_info['amount']}元
技术指标分析:
- 5日均线(MA5):{technical['MA5']:.2f}元
- 10日均线(MA10):{technical['MA10']:.2f}元
- 20日均线(MA20):{technical['MA20']:.2f}元
- 趋势判断:{technical['trend']}
- 近期高点:{technical['latest_high']:.2f}元
- 近期低点:{technical['latest_low']:.2f}元
新闻舆情分析:
- 近期相关新闻:{news['total_news']}条
- 正面新闻:{news['positive_count']}条
- 负面新闻:{news['negative_count']}条
- 最新新闻摘要:
{chr(10).join([f" • {item['time']} - {item['title']}: {item['summary']}" for item in news['recent_news']])}
请从以下维度进行分析:
1. 技术面分析:当前价格与各均线的关系,趋势判断,支撑位和压力位
2. 基本面参考:结合新闻舆情,分析可能影响股价的因素
3. 操作建议:给出具体的买入/卖出/观望建议,包括建议价格区间
4. 风险提示:指出当前主要风险点
请用简洁专业的语言输出分析结果,避免使用过于技术化的术语。
"""
return prompt
# 使用示例
if __name__ == "__main__":
# 准备茅台股票数据
ai_data = prepare_data_for_ai('600519')
if ai_data:
print("数据准备完成")
print(f"股票名称: {ai_data['stock_info']['name']}")
print(f"当前价格: {ai_data['stock_info']['current_price']}")
print(f"技术趋势: {ai_data['technical_indicators']['trend']}")
print(f"相关新闻数: {ai_data['news_sentiment']['total_news']}")
# 生成AI提示词
prompt = generate_ai_prompt(ai_data)
print("\n生成的AI分析提示词(前500字符):")
print(prompt[:500] + "...")
这段代码做了两件事:一是把爬虫抓来的原始数据整理成结构化的格式,二是根据这个格式生成AI分析需要的提示词。
提示词的质量直接影响AI分析的效果。我在这里设计了一个比较全面的提示词模板,包含了股票基本信息、技术指标、新闻舆情等维度,告诉AI要从哪些角度分析。
4.2 调用daily_stock_analysis进行分析
数据准备好了,接下来就是调用daily_stock_analysis进行分析。这个项目支持多种部署方式,我这里以本地运行为例。
import subprocess
import json
import os
from datetime import datetime
def run_ai_analysis(stock_list, output_dir='analysis_results'):
"""
运行AI股票分析
stock_list: 股票代码列表,如['600519', '000001']
"""
# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)
results = []
for stock_code in stock_list:
print(f"开始分析股票: {stock_code}")
# 1. 准备数据
ai_data = prepare_data_for_ai(stock_code)
if not ai_data:
print(f"股票 {stock_code} 数据准备失败,跳过")
continue
# 2. 生成提示词
prompt = generate_ai_prompt(ai_data)
# 3. 保存提示词到临时文件(供AI分析使用)
prompt_file = os.path.join(output_dir, f'prompt_{stock_code}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt')
with open(prompt_file, 'w', encoding='utf-8') as f:
f.write(prompt)
# 4. 调用daily_stock_analysis进行分析
# 这里假设daily_stock_analysis已经安装并配置好
# 实际调用方式可能根据你的部署方式有所不同
try:
# 方法一:直接调用Python模块(如果已安装)
import sys
sys.path.append('/path/to/daily_stock_analysis') # 添加项目路径
from src.analyzer import StockAnalyzer
# 初始化分析器
analyzer = StockAnalyzer(
model_type='gemini', # 或 'openai', 'deepseek' 等
api_key=os.getenv('GEMINI_API_KEY') # 从环境变量读取API密钥
)
# 进行分析
analysis_result = analyzer.analyze_stock(
stock_code=stock_code,
stock_name=ai_data['stock_info']['name'],
technical_data=ai_data['technical_indicators'],
news_data=ai_data['news_sentiment']
)
# 方法二:通过命令行调用(如果配置了命令行接口)
# cmd = f'python /path/to/daily_stock_analysis/main.py --stock {stock_code} --prompt-file {prompt_file}'
# result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
# analysis_result = json.loads(result.stdout) if result.returncode == 0 else None
except ImportError:
print("daily_stock_analysis模块未找到,使用模拟分析")
# 模拟分析结果(实际使用时请替换为真实调用)
analysis_result = {
'stock_code': stock_code,
'stock_name': ai_data['stock_info']['name'],
'analysis_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'technical_analysis': '价格位于均线之上,呈现多头排列趋势',
'news_sentiment': '近期新闻偏正面,有利好消息传出',
'recommendation': '观望',
'confidence': 0.75,
'key_points': [
'技术面:均线多头排列,趋势向上',
'基本面:业绩稳定,行业地位稳固',
'风险:估值较高,需注意回调风险'
],
'price_targets': {
'buy_below': ai_data['stock_info']['current_price'] * 0.98,
'sell_above': ai_data['stock_info']['current_price'] * 1.05,
'stop_loss': ai_data['stock_info']['current_price'] * 0.95
}
}
# 5. 保存分析结果
if analysis_result:
result_file = os.path.join(output_dir, f'result_{stock_code}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
with open(result_file, 'w', encoding='utf-8') as f:
json.dump(analysis_result, f, ensure_ascii=False, indent=2)
results.append(analysis_result)
print(f"股票 {stock_code} 分析完成,结果已保存")
# 避免请求过于频繁
import time
time.sleep(2)
return results
def generate_daily_report(results, output_file='daily_report.md'):
"""
生成每日分析报告
"""
report_date = datetime.now().strftime('%Y年%m月%d日')
report_content = f"""# 股票分析日报
## {report_date}
### 概览
今日共分析 {len(results)} 只股票,其中:
- 建议买入:{sum(1 for r in results if r.get('recommendation') == '买入')} 只
- 建议观望:{sum(1 for r in results if r.get('recommendation') == '观望')} 只
- 建议卖出:{sum(1 for r in results if r.get('recommendation') == '卖出')} 只
---
"""
for i, result in enumerate(results, 1):
stock_name = result.get('stock_name', '未知')
stock_code = result.get('stock_code', '')
recommendation = result.get('recommendation', '未知')
confidence = result.get('confidence', 0)
# 根据建议类型添加表情
if recommendation == '买入':
rec_emoji = '🟢'
elif recommendation == '卖出':
rec_emoji = '🔴'
else:
rec_emoji = '🟡'
report_content += f"""### {i}. {stock_name}({stock_code}) {rec_emoji} {recommendation}
**技术分析**
{result.get('technical_analysis', '暂无')}
**舆情分析**
{result.get('news_sentiment', '暂无')}
**关键要点**
"""
for point in result.get('key_points', []):
report_content += f"- {point}\n"
price_targets = result.get('price_targets', {})
if price_targets:
report_content += f"""
**价格目标**
- 买入价位:{price_targets.get('buy_below', 0):.2f}元以下
- 卖出价位:{price_targets.get('sell_above', 0):.2f}元以上
- 止损价位:{price_targets.get('stop_loss', 0):.2f}元
**置信度**:{confidence:.0%}
---
"""
report_content += f"""
### 总结与建议
本报告基于AI自动分析生成,仅供参考。投资有风险,决策需谨慎。
报告生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
数据来源:爬虫采集 + AI分析
"""
# 保存报告
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report_content)
print(f"日报已生成: {output_file}")
return report_content
# 使用示例
if __name__ == "__main__":
# 分析一组股票
stocks_to_analyze = ['600519', '000001', '300750']
print("开始AI股票分析...")
analysis_results = run_ai_analysis(stocks_to_analyze)
if analysis_results:
print(f"分析完成,共{len(analysis_results)}只股票")
# 生成日报
report = generate_daily_report(analysis_results)
# 打印摘要
print("\n=== 分析摘要 ===")
for result in analysis_results:
print(f"{result['stock_name']}({result['stock_code']}): {result['recommendation']} (置信度: {result['confidence']:.0%})")
else:
print("分析失败或无结果")
这段代码展示了怎么调用daily_stock_analysis进行分析,并把结果整理成日报。实际使用时,你需要根据daily_stock_analysis的具体接口调整调用方式。
我在这里留了一个模拟分析的备选方案,这样即使你暂时没有配置好AI分析环境,也能看到大致的流程和输出格式。
5. 自动化部署与定时运行
整套系统搭建好了,总不能每次都手动运行吧?我们需要让它自动定时运行,真正实现"无人值守"。
5.1 使用GitHub Actions实现云端自动化
如果你不想自己的电脑24小时开着,可以用GitHub Actions,这是GitHub提供的免费自动化服务。
# .github/workflows/daily-stock-analysis.yml
name: 每日股票分析
on:
schedule:
# 每个工作日18:00运行(北京时间)
- cron: '0 10 * * 1-5'
workflow_dispatch: # 允许手动触发
jobs:
analyze:
runs-on: ubuntu-latest
steps:
- name: 检出代码
uses: actions/checkout@v3
- name: 设置Python环境
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: 安装依赖
run: |
python -m pip install --upgrade pip
pip install requests beautifulsoup4 pandas sqlite3
# 安装daily_stock_analysis依赖
pip install -r requirements.txt # 如果有的话
- name: 运行爬虫采集数据
env:
STOCK_LIST: ${{ secrets.STOCK_LIST }}
run: |
python stock_crawler.py --stocks "$STOCK_LIST"
- name: 运行AI分析
env:
GEMINI_API_KEY: ${{ secrets.GEMINI_API_KEY }}
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: |
python run_analysis.py
- name: 发送通知
env:
WECHAT_WEBHOOK_URL: ${{ secrets.WECHAT_WEBHOOK_URL }}
run: |
python send_notification.py
- name: 上传分析结果
uses: actions/upload-artifact@v3
with:
name: stock-analysis-results
path: analysis_results/
这个GitHub Actions工作流会在每个工作日的指定时间自动运行,执行数据爬取、AI分析、发送通知等一系列操作。你只需要在GitHub仓库的Secrets里配置好股票列表、API密钥、通知地址等参数就行了。
5.2 本地定时任务(Windows/Linux/macOS)
如果你更喜欢在本地运行,可以用系统的定时任务功能。
Windows任务计划程序:
- 打开"任务计划程序"
- 创建基本任务
- 设置触发时间(如每天18:00)
- 操作为"启动程序",选择Python解释器和你的脚本
Linux/macOS的crontab:
# 编辑crontab
crontab -e
# 添加以下行(每天18:00运行)
0 18 * * 1-5 cd /path/to/your/project && /usr/bin/python3 run_daily_analysis.py >> /tmp/stock_analysis.log 2>&1
5.3 完整的主控脚本
最后,我们需要一个主控脚本把所有的步骤串起来。
#!/usr/bin/env python3
"""
主控脚本:自动化金融数据采集与分析
"""
import argparse
import sys
import os
from datetime import datetime
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'stock_analysis_{datetime.now().strftime("%Y%m%d")}.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
def main():
parser = argparse.ArgumentParser(description='自动化金融数据采集与分析系统')
parser.add_argument('--stocks', type=str, default='600519,000001,300750',
help='股票代码列表,用逗号分隔')
parser.add_argument('--crawl-only', action='store_true',
help='仅运行爬虫,不进行AI分析')
parser.add_argument('--analyze-only', action='store_true',
help='仅运行AI分析,不爬取新数据')
parser.add_argument('--output-dir', type=str, default='daily_reports',
help='输出目录')
parser.add_argument('--notify', action='store_true',
help='发送通知')
args = parser.parse_args()
# 创建输出目录
os.makedirs(args.output_dir, exist_ok=True)
logger.info("=" * 50)
logger.info(f"开始自动化金融分析 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"分析股票: {args.stocks}")
logger.info("=" * 50)
stock_list = [s.strip() for s in args.stocks.split(',')]
try:
# 步骤1: 数据爬取(除非指定仅分析)
if not args.analyze_only:
logger.info("步骤1: 开始数据爬取...")
from stock_crawler import run_crawlers
crawl_success = run_crawlers(stock_list)
if not crawl_success:
logger.warning("数据爬取部分失败,但继续执行后续步骤")
else:
logger.info("跳过数据爬取(--analyze-only模式)")
# 步骤2: AI分析(除非指定仅爬取)
if not args.crawl_only:
logger.info("步骤2: 开始AI分析...")
from ai_analyzer import run_ai_analysis, generate_daily_report
analysis_results = run_ai_analysis(stock_list)
if analysis_results:
logger.info(f"AI分析完成,共分析{len(analysis_results)}只股票")
# 生成日报
report_date = datetime.now().strftime('%Y%m%d')
report_file = os.path.join(args.output_dir, f'stock_report_{report_date}.md')
report_content = generate_daily_report(analysis_results, report_file)
logger.info(f"日报已生成: {report_file}")
# 步骤3: 发送通知(如果启用)
if args.notify:
logger.info("步骤3: 发送通知...")
from notification_sender import send_report
# 读取日报内容
with open(report_file, 'r', encoding='utf-8') as f:
report_text = f.read()
# 发送摘要通知
send_report(report_text, report_file)
logger.info("通知发送完成")
else:
logger.error("AI分析失败,无结果返回")
else:
logger.info("跳过AI分析(--crawl-only模式)")
logger.info("=" * 50)
logger.info("自动化分析流程完成")
logger.info("=" * 50)
return True
except Exception as e:
logger.error(f"流程执行失败: {e}", exc_info=True)
return False
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)
这个主控脚本提供了完整的命令行接口,你可以灵活控制只运行爬虫、只运行分析,或者运行完整的流程。日志功能也很完善,方便排查问题。
6. 实际效果与使用建议
我按照上面的方案搭建了一套系统,已经运行了一个多月。说几个实际的感受:
第一,效率提升非常明显。以前每天要花1-2小时看盘看新闻,现在只需要花5分钟看看AI生成的日报就行了。省下来的时间可以做更深入的行业研究,或者干脆休息放松。
第二,分析质量比较稳定。AI没有情绪波动,不会因为股票涨了就盲目乐观,跌了就过度悲观。它严格按照技术指标和新闻数据做判断,虽然不一定每次都对,但至少逻辑是清晰的。
第三,数据更全面了。以前我只关注几个常用的数据源,现在通过爬虫可以抓取更多维度的信息。比如某个小众财经论坛的讨论、行业专家的博客文章等,这些信息有时候比主流媒体更早反映市场情绪。
当然,这套系统也不是完美的,有几个地方需要注意:
不要完全依赖AI做投资决策。AI分析可以作为一个很好的参考,但最终决策还是要结合你自己的判断。特别是对于基本面分析、行业趋势判断这些需要深度思考的问题,AI目前还替代不了人类。
定期检查数据质量。爬虫可能会遇到网站改版、反爬机制等问题,需要定期检查数据是否正常抓取。我建议每周花10分钟看看日志,确保系统运行正常。
从简单开始,逐步完善。不要一开始就想抓取所有数据源、分析所有股票。先从2-3只你熟悉的股票开始,跑通整个流程,然后再慢慢增加功能。
注意合规风险。爬虫抓取数据要遵守网站的robots.txt协议,不要过于频繁地请求,避免给目标网站造成负担。商业使用前最好咨询法律意见。
7. 总结
把Python爬虫和AI股票分析师结合起来,确实能大大提升金融数据分析的效率和效果。爬虫解决了数据来源的问题,AI解决了数据分析的问题,两者结合正好形成了一个完整的自动化分析链条。
我分享的这套方案,从技术实现上来说并不复杂,大部分代码都是Python的基础应用。即使你不是专业程序员,跟着步骤一步步来,应该也能搭建起来。
最关键的是,这套系统给了我们一个全新的视角来看待股票分析。我们不再需要手动收集碎片化的信息,也不再需要凭感觉做判断。数据采集交给爬虫,数据分析交给AI,我们只需要关注最终的分析结果和投资决策。
这种"人机协作"的模式,我觉得会是未来个人投资者和专业分析师都会采用的方式。机器负责处理海量数据和重复性工作,人类负责战略思考和最终决策,各取所长。
如果你也对自动化金融分析感兴趣,不妨从今天介绍的这个方案开始尝试。先从简单的爬虫做起,再逐步集成AI分析功能,最后实现全自动化运行。过程中遇到问题很正常,多查资料、多尝试,慢慢就能搭建起属于自己的智能分析系统了。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)