AI股票分析师daily_stock_analysis与Python爬虫实战:自动化金融数据采集与分析

每天一睁眼,你是不是也和我一样,先打开几个股票软件,看看自选股是红是绿,再刷刷财经新闻,试图从海量信息里找到一点有用的线索?这个过程不仅耗时耗力,而且信息过载,最后往往是凭感觉下判断,而不是靠数据做决策。

我试过很多方法,从手动记录Excel表格到用各种技术指标公式,效果都不太理想。直到我发现了daily_stock_analysis这个项目,再结合自己熟悉的Python爬虫技术,才真正把金融数据分析这件事给自动化了。

今天我就来分享一下,怎么把AI股票分析师和Python爬虫结合起来,搭建一套属于自己的自动化金融数据采集与分析系统。这套方案的核心思路很简单:让爬虫去干苦力活,自动抓取行情和新闻数据,然后让AI去干脑力活,分析数据、生成报告,最后自动推送到你手机上。

1. 为什么需要自动化金融分析?

在聊具体技术之前,我们先看看传统分析方式有哪些痛点。

以前我做股票分析,基本上就是三板斧:看K线图、查技术指标、刷财经新闻。每天至少要花一两个小时,效率低不说,还容易受情绪影响。看到股票涨了就兴奋,跌了就焦虑,这种状态很难做出理性的投资决策。

更麻烦的是信息碎片化。行情数据在一个地方,新闻在另一个地方,研报又在别的地方。我需要手动把这些信息整合起来,这个过程本身就容易出错,而且非常耗时。

daily_stock_analysis这个项目给了我启发。它本质上是一个AI驱动的分析引擎,能自动获取多源数据,然后用大模型进行分析,最后生成结构化的决策建议。但我在使用中发现,它默认的数据源有时候不够全面,特别是对于一些细分领域或者特定市场的新闻,覆盖得不够好。

这时候Python爬虫就派上用场了。我们可以用爬虫定制化地抓取更多数据源,比如特定财经网站的分析文章、行业研报、社交媒体舆情等,然后把这些数据喂给AI分析师,让它做出更全面的判断。

2. 系统架构设计:爬虫+AI如何协同工作?

整个系统的架构其实不复杂,我画了个简单的示意图帮你理解:

数据采集层(Python爬虫) → 数据处理层 → AI分析层(daily_stock_analysis) → 结果推送层

数据采集层是爬虫的天下。这里我们主要抓取两类数据:一是行情数据,包括股价、成交量、技术指标等;二是舆情数据,包括新闻、研报、社交媒体讨论等。

数据处理层负责清洗和格式化爬虫抓来的数据。因为不同网站的数据格式不一样,有的可能是HTML,有的可能是JSON,还有的可能是PDF。我们需要把这些数据统一成AI分析师能理解的格式。

AI分析层就是daily_stock_analysis的核心了。它会把处理好的数据喂给大模型(比如Gemini、DeepSeek等),让模型从多个维度进行分析,包括技术面、基本面、舆情面等。

结果推送层负责把分析结果推送到你的手机或者邮箱。支持企业微信、飞书、Telegram、邮件等多种渠道,你可以根据自己的习惯选择。

下面我重点讲讲爬虫部分怎么实现,这是整个系统的数据入口。

3. Python爬虫实战:多源金融数据采集

爬虫听起来有点技术含量,但其实用Python写起来并不难。我建议从简单的开始,先抓取一两个数据源,等跑通了再慢慢扩展。

3.1 行情数据爬取:以东方财富为例

行情数据相对规整,很多财经网站都提供结构化的数据接口。我们以东方财富网为例,看看怎么抓取股票的基本行情。

import requests
import pandas as pd
from datetime import datetime
import time

def fetch_stock_quote(stock_code):
    """
    获取股票实时行情数据
    stock_code: 股票代码,如'600519'(茅台)、'000001'(平安银行)
    """
    # 东方财富实时行情接口
    url = f"http://push2.eastmoney.com/api/qt/stock/get"
    
    params = {
        'fields': 'f43,f57,f58,f169,f170,f46,f44,f51,f168,f47,f164,f163,f116,f60,f45,f52,f50,f48,f167,f117,f71,f161,f49,f530,f135,f136,f137,f138,f139,f141,f142,f144,f145,f147,f148,f140,f143,f146,f149,f55,f62,f162,f92,f173,f104,f105,f84,f85,f183,f184,f185,f186,f187,f188,f189,f190,f191,f192,f107,f111,f86,f177,f78,f110,f262,f263,f264,f267,f268,f250,f251,f252,f253,f254,f255,f256,f257,f258,f266,f269,f270,f271,f273,f274,f275,f127,f199,f128,f193,f196,f194,f195,f197,f80,f280,f281,f282,f284,f285,f286,f287,f292',
        'secid': f"1.{stock_code}" if stock_code.startswith('6') else f"0.{stock_code}",
        'ut': 'fa5fd1943c7b386f172d6893dbfba10b',
        'invt': '2',
        'fltt': '2',
        '_': int(time.time() * 1000)
    }
    
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Referer': 'http://quote.eastmoney.com/'
    }
    
    try:
        response = requests.get(url, params=params, headers=headers, timeout=10)
        response.raise_for_status()
        data = response.json()
        
        if data['data']:
            quote = data['data']
            result = {
                '股票代码': stock_code,
                '股票名称': quote.get('f57', ''),
                '当前价格': quote.get('f43', 0) / 100,  # 转换为元
                '涨跌幅': quote.get('f170', 0) / 100,  # 转换为百分比
                '涨跌额': quote.get('f169', 0) / 100,
                '成交量': quote.get('f47', 0),
                '成交额': quote.get('f48', 0),
                '最高价': quote.get('f44', 0) / 100,
                '最低价': quote.get('f45', 0) / 100,
                '开盘价': quote.get('f46', 0) / 100,
                '昨收价': quote.get('f60', 0) / 100,
                '更新时间': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            }
            return result
        else:
            print(f"未获取到股票 {stock_code} 的数据")
            return None
            
    except requests.exceptions.RequestException as e:
        print(f"请求失败: {e}")
        return None
    except Exception as e:
        print(f"解析失败: {e}")
        return None

# 使用示例
if __name__ == "__main__":
    # 测试抓取茅台股票数据
    maotai_data = fetch_stock_quote('600519')
    if maotai_data:
        print("茅台股票实时行情:")
        for key, value in maotai_data.items():
            print(f"{key}: {value}")

这个爬虫函数能抓取股票的实时行情数据,包括价格、涨跌幅、成交量等关键信息。代码里我加了详细的注释,你应该能看懂每一行是干什么的。

实际使用时,你可以把这个函数封装成一个定时任务,比如每5分钟运行一次,把数据保存到数据库或者CSV文件里,供后续分析使用。

3.2 新闻舆情爬取:抓取财经新闻

行情数据只是基础,新闻舆情对股价的影响也很大。我们可以爬取一些主流财经网站的新闻,看看市场在关注什么。

import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import urljoin

def fetch_finance_news(keywords=None, max_pages=3):
    """
    爬取财经新闻
    keywords: 关键词列表,如['茅台', '白酒']
    max_pages: 爬取的最大页数
    """
    base_url = "https://finance.sina.com.cn"
    news_list = []
    
    for page in range(1, max_pages + 1):
        try:
            # 新浪财经滚动新闻
            url = f"{base_url}/roll/index.d.html?cid=56589&page={page}"
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
            
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()
            response.encoding = 'utf-8'
            
            soup = BeautifulSoup(response.text, 'html.parser')
            news_items = soup.find_all('li', class_='list_009')
            
            for item in news_items:
                try:
                    link_tag = item.find('a')
                    if not link_tag:
                        continue
                    
                    title = link_tag.get_text(strip=True)
                    link = link_tag.get('href')
                    if not link.startswith('http'):
                        link = urljoin(base_url, link)
                    
                    time_tag = item.find('span')
                    publish_time = time_tag.get_text(strip=True) if time_tag else ''
                    
                    # 如果有关键词过滤,检查标题是否包含关键词
                    if keywords:
                        if not any(keyword in title for keyword in keywords):
                            continue
                    
                    # 获取新闻详情(可选)
                    content = fetch_news_content(link) if link else ''
                    
                    news_item = {
                        '标题': title,
                        '链接': link,
                        '发布时间': publish_time,
                        '内容摘要': content[:200] + '...' if content else '',  # 只取前200字
                        '抓取时间': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                    }
                    news_list.append(news_item)
                    
                except Exception as e:
                    print(f"解析单条新闻失败: {e}")
                    continue
            
            print(f"第{page}页爬取完成,共{len(news_items)}条新闻")
            time.sleep(2)  # 礼貌性延迟,避免被封
            
        except requests.exceptions.RequestException as e:
            print(f"第{page}页请求失败: {e}")
            break
    
    return news_list

def fetch_news_content(url):
    """获取新闻详情内容"""
    try:
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        response.encoding = 'utf-8'
        
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # 新浪财经新闻内容通常在article标签或特定class中
        content_div = soup.find('div', class_='article') or soup.find('article')
        if content_div:
            # 移除脚本、样式等无关内容
            for script in content_div.find_all(['script', 'style', 'iframe']):
                script.decompose()
            
            text = content_div.get_text(strip=True)
            # 清理多余空白字符
            text = re.sub(r'\s+', ' ', text)
            return text
        
        return ""
        
    except Exception as e:
        print(f"获取新闻内容失败 {url}: {e}")
        return ""

# 使用示例
if __name__ == "__main__":
    # 抓取包含"茅台"或"白酒"的新闻
    news = fetch_finance_news(keywords=['茅台', '白酒'], max_pages=2)
    print(f"共抓取到{len(news)}条相关新闻")
    for i, item in enumerate(news[:3], 1):  # 只显示前3条
        print(f"\n{i}. {item['标题']}")
        print(f"   时间: {item['发布时间']}")
        print(f"   摘要: {item['内容摘要']}")

这个爬虫能抓取新浪财经的滚动新闻,并且支持关键词过滤。比如你只关心茅台或者白酒行业的新闻,就可以设置相应的关键词。

代码里我用了BeautifulSoup来解析HTML,这是Python里最常用的网页解析库之一。如果你之前没接触过,可能需要花点时间熟悉一下,但基本用法不难掌握。

3.3 数据存储与管理

爬虫抓来的数据需要妥善保存,方便后续分析。我通常用SQLite数据库,因为它轻量、无需安装,单个文件就能搞定。

import sqlite3
import json
from datetime import datetime

class FinanceDataDB:
    """金融数据数据库管理类"""
    
    def __init__(self, db_path='finance_data.db'):
        self.db_path = db_path
        self.init_database()
    
    def init_database(self):
        """初始化数据库表"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 创建股票行情表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS stock_quotes (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            stock_code TEXT NOT NULL,
            stock_name TEXT,
            current_price REAL,
            change_percent REAL,
            change_amount REAL,
            volume INTEGER,
            amount REAL,
            high_price REAL,
            low_price REAL,
            open_price REAL,
            prev_close REAL,
            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
            UNIQUE(stock_code, timestamp)
        )
        ''')
        
        # 创建新闻舆情表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS finance_news (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            title TEXT NOT NULL,
            url TEXT UNIQUE,
            publish_time TEXT,
            content_summary TEXT,
            keywords TEXT,  # JSON格式存储关键词
            source TEXT,
            crawl_time DATETIME DEFAULT CURRENT_TIMESTAMP
        )
        ''')
        
        # 创建索引以提高查询速度
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_stock_code ON stock_quotes(stock_code)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_quote_time ON stock_quotes(timestamp)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_news_time ON finance_news(publish_time)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_news_keywords ON finance_news(keywords)')
        
        conn.commit()
        conn.close()
    
    def save_stock_quote(self, quote_data):
        """保存股票行情数据"""
        if not quote_data:
            return False
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        try:
            cursor.execute('''
            INSERT OR IGNORE INTO stock_quotes 
            (stock_code, stock_name, current_price, change_percent, change_amount, 
             volume, amount, high_price, low_price, open_price, prev_close, timestamp)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                quote_data['股票代码'],
                quote_data['股票名称'],
                quote_data['当前价格'],
                quote_data['涨跌幅'],
                quote_data['涨跌额'],
                quote_data['成交量'],
                quote_data['成交额'],
                quote_data['最高价'],
                quote_data['最低价'],
                quote_data['开盘价'],
                quote_data['昨收价'],
                quote_data['更新时间']
            ))
            
            conn.commit()
            return cursor.rowcount > 0
            
        except Exception as e:
            print(f"保存股票数据失败: {e}")
            return False
        finally:
            conn.close()
    
    def save_news(self, news_item):
        """保存新闻数据"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        try:
            # 将关键词列表转为JSON字符串
            keywords_json = json.dumps(news_item.get('keywords', [])) if news_item.get('keywords') else None
            
            cursor.execute('''
            INSERT OR IGNORE INTO finance_news 
            (title, url, publish_time, content_summary, keywords, source)
            VALUES (?, ?, ?, ?, ?, ?)
            ''', (
                news_item['标题'],
                news_item['链接'],
                news_item['发布时间'],
                news_item['内容摘要'],
                keywords_json,
                news_item.get('source', 'sina_finance')
            ))
            
            conn.commit()
            return cursor.rowcount > 0
            
        except Exception as e:
            print(f"保存新闻数据失败: {e}")
            return False
        finally:
            conn.close()
    
    def get_recent_quotes(self, stock_code, hours=24):
        """获取最近N小时的股票行情数据"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
        SELECT * FROM stock_quotes 
        WHERE stock_code = ? AND timestamp >= datetime('now', ?)
        ORDER BY timestamp DESC
        ''', (stock_code, f'-{hours} hours'))
        
        rows = cursor.fetchall()
        conn.close()
        
        # 转换为字典列表
        columns = [desc[0] for desc in cursor.description]
        return [dict(zip(columns, row)) for row in rows]
    
    def get_recent_news(self, keywords=None, limit=50):
        """获取最近的新闻,支持关键词过滤"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        if keywords:
            # 构建关键词查询条件
            conditions = []
            params = []
            for keyword in keywords:
                conditions.append("(title LIKE ? OR content_summary LIKE ?)")
                params.extend([f'%{keyword}%', f'%{keyword}%'])
            
            where_clause = "WHERE " + " OR ".join(conditions)
            query = f'''
            SELECT * FROM finance_news 
            {where_clause}
            ORDER BY publish_time DESC 
            LIMIT ?
            '''
            params.append(limit)
            
            cursor.execute(query, params)
        else:
            cursor.execute('''
            SELECT * FROM finance_news 
            ORDER BY publish_time DESC 
            LIMIT ?
            ''', (limit,))
        
        rows = cursor.fetchall()
        conn.close()
        
        columns = [desc[0] for desc in cursor.description]
        return [dict(zip(columns, row)) for row in rows]

# 使用示例
if __name__ == "__main__":
    db = FinanceDataDB()
    
    # 模拟保存数据
    test_quote = {
        '股票代码': '600519',
        '股票名称': '贵州茅台',
        '当前价格': 1800.50,
        '涨跌幅': 1.5,
        '涨跌额': 26.5,
        '成交量': 5000000,
        '成交额': 9000000000,
        '最高价': 1810.00,
        '最低价': 1795.00,
        '开盘价': 1798.00,
        '昨收价': 1774.00,
        '更新时间': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    }
    
    if db.save_stock_quote(test_quote):
        print("股票数据保存成功")
    
    # 查询最近24小时的茅台数据
    recent_data = db.get_recent_quotes('600519', 24)
    print(f"最近24小时有{len(recent_data)}条茅台行情记录")

这个数据库类封装了数据的存储和查询功能。用起来很简单,创建个实例,调用save方法保存数据,调用get方法查询数据。数据都保存在本地的SQLite文件里,不需要安装数据库服务器。

4. 与daily_stock_analysis集成:让AI分析爬虫数据

爬虫把数据抓来了,数据库也存好了,接下来就是重头戏:怎么把这些数据喂给AI分析师,让它帮我们分析。

daily_stock_analysis本身支持多种数据源,但主要是通过API接口获取标准化的数据。我们的爬虫数据需要稍微处理一下,转换成它认识的格式。

4.1 数据格式转换

AI分析师期望的数据格式比较规范,我们需要把爬虫抓来的原始数据清洗一下。

import pandas as pd
from datetime import datetime, timedelta

def prepare_data_for_ai(stock_code, db_path='finance_data.db'):
    """
    为AI分析准备数据
    返回技术指标、新闻舆情等综合数据
    """
    db = FinanceDataDB(db_path)
    
    # 1. 获取股票行情数据(最近30天)
    quotes = db.get_recent_quotes(stock_code, hours=24*30)
    if not quotes:
        print(f"未找到股票 {stock_code} 的历史数据")
        return None
    
    # 转换为DataFrame方便计算
    df_quotes = pd.DataFrame(quotes)
    df_quotes['timestamp'] = pd.to_datetime(df_quotes['timestamp'])
    df_quotes = df_quotes.sort_values('timestamp')
    
    # 计算技术指标(简单示例)
    df_quotes['MA5'] = df_quotes['current_price'].rolling(window=5).mean()
    df_quotes['MA10'] = df_quotes['current_price'].rolling(window=10).mean()
    df_quotes['MA20'] = df_quotes['current_price'].rolling(window=20).mean()
    
    # 2. 获取相关新闻
    # 这里可以根据股票名称或代码获取相关新闻
    stock_name = df_quotes.iloc[-1]['stock_name'] if 'stock_name' in df_quotes.columns else stock_code
    news_items = db.get_recent_news(keywords=[stock_name], limit=20)
    
    # 3. 构建AI分析所需的数据结构
    latest_quote = quotes[-1] if quotes else {}
    
    ai_data = {
        'stock_info': {
            'code': stock_code,
            'name': latest_quote.get('stock_name', ''),
            'current_price': latest_quote.get('current_price', 0),
            'change_percent': latest_quote.get('change_percent', 0),
            'volume': latest_quote.get('volume', 0),
            'amount': latest_quote.get('amount', 0)
        },
        'technical_indicators': {
            'MA5': float(df_quotes['MA5'].iloc[-1]) if not pd.isna(df_quotes['MA5'].iloc[-1]) else 0,
            'MA10': float(df_quotes['MA10'].iloc[-1]) if not pd.isna(df_quotes['MA10'].iloc[-1]) else 0,
            'MA20': float(df_quotes['MA20'].iloc[-1]) if not pd.isna(df_quotes['MA20'].iloc[-1]) else 0,
            'trend': 'up' if df_quotes['MA5'].iloc[-1] > df_quotes['MA10'].iloc[-1] > df_quotes['MA20'].iloc[-1] else 'down',
            'latest_high': float(df_quotes['high_price'].max()) if 'high_price' in df_quotes.columns else 0,
            'latest_low': float(df_quotes['low_price'].min()) if 'low_price' in df_quotes.columns else 0
        },
        'news_sentiment': {
            'total_news': len(news_items),
            'positive_count': sum(1 for news in news_items if any(word in news.get('title', '').lower() for word in ['涨', '利好', '增长', '突破'])),
            'negative_count': sum(1 for news in news_items if any(word in news.get('title', '').lower() for word in ['跌', '利空', '下滑', '风险'])),
            'recent_news': [
                {
                    'title': news.get('title', ''),
                    'time': news.get('publish_time', ''),
                    'summary': news.get('content_summary', '')[:100]
                }
                for news in news_items[:5]  # 只取最近5条
            ]
        },
        'market_context': {
            'analysis_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'data_sources': ['crawler_sina', 'crawler_eastmoney'],
            'data_period': '30_days'
        }
    }
    
    return ai_data

def generate_ai_prompt(ai_data):
    """
    根据数据生成AI分析提示词
    """
    stock_info = ai_data['stock_info']
    technical = ai_data['technical_indicators']
    news = ai_data['news_sentiment']
    
    prompt = f"""
请作为专业的股票分析师,对以下股票进行综合分析:

股票信息:
- 代码:{stock_info['code']}
- 名称:{stock_info['name']}
- 当前价格:{stock_info['current_price']}元
- 涨跌幅:{stock_info['change_percent']}%
- 成交量:{stock_info['volume']}手
- 成交额:{stock_info['amount']}元

技术指标分析:
- 5日均线(MA5):{technical['MA5']:.2f}元
- 10日均线(MA10):{technical['MA10']:.2f}元
- 20日均线(MA20):{technical['MA20']:.2f}元
- 趋势判断:{technical['trend']}
- 近期高点:{technical['latest_high']:.2f}元
- 近期低点:{technical['latest_low']:.2f}元

新闻舆情分析:
- 近期相关新闻:{news['total_news']}条
- 正面新闻:{news['positive_count']}条
- 负面新闻:{news['negative_count']}条
- 最新新闻摘要:
{chr(10).join([f"  • {item['time']} - {item['title']}: {item['summary']}" for item in news['recent_news']])}

请从以下维度进行分析:
1. 技术面分析:当前价格与各均线的关系,趋势判断,支撑位和压力位
2. 基本面参考:结合新闻舆情,分析可能影响股价的因素
3. 操作建议:给出具体的买入/卖出/观望建议,包括建议价格区间
4. 风险提示:指出当前主要风险点

请用简洁专业的语言输出分析结果,避免使用过于技术化的术语。
"""
    
    return prompt

# 使用示例
if __name__ == "__main__":
    # 准备茅台股票数据
    ai_data = prepare_data_for_ai('600519')
    if ai_data:
        print("数据准备完成")
        print(f"股票名称: {ai_data['stock_info']['name']}")
        print(f"当前价格: {ai_data['stock_info']['current_price']}")
        print(f"技术趋势: {ai_data['technical_indicators']['trend']}")
        print(f"相关新闻数: {ai_data['news_sentiment']['total_news']}")
        
        # 生成AI提示词
        prompt = generate_ai_prompt(ai_data)
        print("\n生成的AI分析提示词(前500字符):")
        print(prompt[:500] + "...")

这段代码做了两件事:一是把爬虫抓来的原始数据整理成结构化的格式,二是根据这个格式生成AI分析需要的提示词。

提示词的质量直接影响AI分析的效果。我在这里设计了一个比较全面的提示词模板,包含了股票基本信息、技术指标、新闻舆情等维度,告诉AI要从哪些角度分析。

4.2 调用daily_stock_analysis进行分析

数据准备好了,接下来就是调用daily_stock_analysis进行分析。这个项目支持多种部署方式,我这里以本地运行为例。

import subprocess
import json
import os
from datetime import datetime

def run_ai_analysis(stock_list, output_dir='analysis_results'):
    """
    运行AI股票分析
    stock_list: 股票代码列表,如['600519', '000001']
    """
    # 确保输出目录存在
    os.makedirs(output_dir, exist_ok=True)
    
    results = []
    
    for stock_code in stock_list:
        print(f"开始分析股票: {stock_code}")
        
        # 1. 准备数据
        ai_data = prepare_data_for_ai(stock_code)
        if not ai_data:
            print(f"股票 {stock_code} 数据准备失败,跳过")
            continue
        
        # 2. 生成提示词
        prompt = generate_ai_prompt(ai_data)
        
        # 3. 保存提示词到临时文件(供AI分析使用)
        prompt_file = os.path.join(output_dir, f'prompt_{stock_code}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt')
        with open(prompt_file, 'w', encoding='utf-8') as f:
            f.write(prompt)
        
        # 4. 调用daily_stock_analysis进行分析
        # 这里假设daily_stock_analysis已经安装并配置好
        # 实际调用方式可能根据你的部署方式有所不同
        
        try:
            # 方法一:直接调用Python模块(如果已安装)
            import sys
            sys.path.append('/path/to/daily_stock_analysis')  # 添加项目路径
            
            from src.analyzer import StockAnalyzer
            
            # 初始化分析器
            analyzer = StockAnalyzer(
                model_type='gemini',  # 或 'openai', 'deepseek' 等
                api_key=os.getenv('GEMINI_API_KEY')  # 从环境变量读取API密钥
            )
            
            # 进行分析
            analysis_result = analyzer.analyze_stock(
                stock_code=stock_code,
                stock_name=ai_data['stock_info']['name'],
                technical_data=ai_data['technical_indicators'],
                news_data=ai_data['news_sentiment']
            )
            
            # 方法二:通过命令行调用(如果配置了命令行接口)
            # cmd = f'python /path/to/daily_stock_analysis/main.py --stock {stock_code} --prompt-file {prompt_file}'
            # result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
            # analysis_result = json.loads(result.stdout) if result.returncode == 0 else None
            
        except ImportError:
            print("daily_stock_analysis模块未找到,使用模拟分析")
            # 模拟分析结果(实际使用时请替换为真实调用)
            analysis_result = {
                'stock_code': stock_code,
                'stock_name': ai_data['stock_info']['name'],
                'analysis_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                'technical_analysis': '价格位于均线之上,呈现多头排列趋势',
                'news_sentiment': '近期新闻偏正面,有利好消息传出',
                'recommendation': '观望',
                'confidence': 0.75,
                'key_points': [
                    '技术面:均线多头排列,趋势向上',
                    '基本面:业绩稳定,行业地位稳固',
                    '风险:估值较高,需注意回调风险'
                ],
                'price_targets': {
                    'buy_below': ai_data['stock_info']['current_price'] * 0.98,
                    'sell_above': ai_data['stock_info']['current_price'] * 1.05,
                    'stop_loss': ai_data['stock_info']['current_price'] * 0.95
                }
            }
        
        # 5. 保存分析结果
        if analysis_result:
            result_file = os.path.join(output_dir, f'result_{stock_code}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
            with open(result_file, 'w', encoding='utf-8') as f:
                json.dump(analysis_result, f, ensure_ascii=False, indent=2)
            
            results.append(analysis_result)
            print(f"股票 {stock_code} 分析完成,结果已保存")
        
        # 避免请求过于频繁
        import time
        time.sleep(2)
    
    return results

def generate_daily_report(results, output_file='daily_report.md'):
    """
    生成每日分析报告
    """
    report_date = datetime.now().strftime('%Y年%m月%d日')
    
    report_content = f"""# 股票分析日报
## {report_date}

### 概览
今日共分析 {len(results)} 只股票,其中:
- 建议买入:{sum(1 for r in results if r.get('recommendation') == '买入')} 只
- 建议观望:{sum(1 for r in results if r.get('recommendation') == '观望')} 只  
- 建议卖出:{sum(1 for r in results if r.get('recommendation') == '卖出')} 只

---

"""
    
    for i, result in enumerate(results, 1):
        stock_name = result.get('stock_name', '未知')
        stock_code = result.get('stock_code', '')
        recommendation = result.get('recommendation', '未知')
        confidence = result.get('confidence', 0)
        
        # 根据建议类型添加表情
        if recommendation == '买入':
            rec_emoji = '🟢'
        elif recommendation == '卖出':
            rec_emoji = '🔴'
        else:
            rec_emoji = '🟡'
        
        report_content += f"""### {i}. {stock_name}({stock_code}) {rec_emoji} {recommendation}

**技术分析**  
{result.get('technical_analysis', '暂无')}

**舆情分析**  
{result.get('news_sentiment', '暂无')}

**关键要点**  
"""
        
        for point in result.get('key_points', []):
            report_content += f"- {point}\n"
        
        price_targets = result.get('price_targets', {})
        if price_targets:
            report_content += f"""
**价格目标**  
- 买入价位:{price_targets.get('buy_below', 0):.2f}元以下
- 卖出价位:{price_targets.get('sell_above', 0):.2f}元以上  
- 止损价位:{price_targets.get('stop_loss', 0):.2f}元

**置信度**:{confidence:.0%}

---
"""
    
    report_content += f"""
### 总结与建议
本报告基于AI自动分析生成,仅供参考。投资有风险,决策需谨慎。

报告生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
数据来源:爬虫采集 + AI分析
"""
    
    # 保存报告
    with open(output_file, 'w', encoding='utf-8') as f:
        f.write(report_content)
    
    print(f"日报已生成: {output_file}")
    return report_content

# 使用示例
if __name__ == "__main__":
    # 分析一组股票
    stocks_to_analyze = ['600519', '000001', '300750']
    
    print("开始AI股票分析...")
    analysis_results = run_ai_analysis(stocks_to_analyze)
    
    if analysis_results:
        print(f"分析完成,共{len(analysis_results)}只股票")
        
        # 生成日报
        report = generate_daily_report(analysis_results)
        
        # 打印摘要
        print("\n=== 分析摘要 ===")
        for result in analysis_results:
            print(f"{result['stock_name']}({result['stock_code']}): {result['recommendation']} (置信度: {result['confidence']:.0%})")
    else:
        print("分析失败或无结果")

这段代码展示了怎么调用daily_stock_analysis进行分析,并把结果整理成日报。实际使用时,你需要根据daily_stock_analysis的具体接口调整调用方式。

我在这里留了一个模拟分析的备选方案,这样即使你暂时没有配置好AI分析环境,也能看到大致的流程和输出格式。

5. 自动化部署与定时运行

整套系统搭建好了,总不能每次都手动运行吧?我们需要让它自动定时运行,真正实现"无人值守"。

5.1 使用GitHub Actions实现云端自动化

如果你不想自己的电脑24小时开着,可以用GitHub Actions,这是GitHub提供的免费自动化服务。

# .github/workflows/daily-stock-analysis.yml
name: 每日股票分析

on:
  schedule:
    # 每个工作日18:00运行(北京时间)
    - cron: '0 10 * * 1-5'
  workflow_dispatch:  # 允许手动触发

jobs:
  analyze:
    runs-on: ubuntu-latest
    
    steps:
    - name: 检出代码
      uses: actions/checkout@v3
      
    - name: 设置Python环境
      uses: actions/setup-python@v4
      with:
        python-version: '3.10'
        
    - name: 安装依赖
      run: |
        python -m pip install --upgrade pip
        pip install requests beautifulsoup4 pandas sqlite3
        # 安装daily_stock_analysis依赖
        pip install -r requirements.txt  # 如果有的话
        
    - name: 运行爬虫采集数据
      env:
        STOCK_LIST: ${{ secrets.STOCK_LIST }}
      run: |
        python stock_crawler.py --stocks "$STOCK_LIST"
        
    - name: 运行AI分析
      env:
        GEMINI_API_KEY: ${{ secrets.GEMINI_API_KEY }}
        OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
      run: |
        python run_analysis.py
        
    - name: 发送通知
      env:
        WECHAT_WEBHOOK_URL: ${{ secrets.WECHAT_WEBHOOK_URL }}
      run: |
        python send_notification.py
        
    - name: 上传分析结果
      uses: actions/upload-artifact@v3
      with:
        name: stock-analysis-results
        path: analysis_results/

这个GitHub Actions工作流会在每个工作日的指定时间自动运行,执行数据爬取、AI分析、发送通知等一系列操作。你只需要在GitHub仓库的Secrets里配置好股票列表、API密钥、通知地址等参数就行了。

5.2 本地定时任务(Windows/Linux/macOS)

如果你更喜欢在本地运行,可以用系统的定时任务功能。

Windows任务计划程序:

  1. 打开"任务计划程序"
  2. 创建基本任务
  3. 设置触发时间(如每天18:00)
  4. 操作为"启动程序",选择Python解释器和你的脚本

Linux/macOS的crontab:

# 编辑crontab
crontab -e

# 添加以下行(每天18:00运行)
0 18 * * 1-5 cd /path/to/your/project && /usr/bin/python3 run_daily_analysis.py >> /tmp/stock_analysis.log 2>&1

5.3 完整的主控脚本

最后,我们需要一个主控脚本把所有的步骤串起来。

#!/usr/bin/env python3
"""
主控脚本:自动化金融数据采集与分析
"""

import argparse
import sys
import os
from datetime import datetime
import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'stock_analysis_{datetime.now().strftime("%Y%m%d")}.log'),
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

def main():
    parser = argparse.ArgumentParser(description='自动化金融数据采集与分析系统')
    parser.add_argument('--stocks', type=str, default='600519,000001,300750',
                       help='股票代码列表,用逗号分隔')
    parser.add_argument('--crawl-only', action='store_true',
                       help='仅运行爬虫,不进行AI分析')
    parser.add_argument('--analyze-only', action='store_true',
                       help='仅运行AI分析,不爬取新数据')
    parser.add_argument('--output-dir', type=str, default='daily_reports',
                       help='输出目录')
    parser.add_argument('--notify', action='store_true',
                       help='发送通知')
    
    args = parser.parse_args()
    
    # 创建输出目录
    os.makedirs(args.output_dir, exist_ok=True)
    
    logger.info("=" * 50)
    logger.info(f"开始自动化金融分析 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    logger.info(f"分析股票: {args.stocks}")
    logger.info("=" * 50)
    
    stock_list = [s.strip() for s in args.stocks.split(',')]
    
    try:
        # 步骤1: 数据爬取(除非指定仅分析)
        if not args.analyze_only:
            logger.info("步骤1: 开始数据爬取...")
            from stock_crawler import run_crawlers
            crawl_success = run_crawlers(stock_list)
            
            if not crawl_success:
                logger.warning("数据爬取部分失败,但继续执行后续步骤")
        else:
            logger.info("跳过数据爬取(--analyze-only模式)")
        
        # 步骤2: AI分析(除非指定仅爬取)
        if not args.crawl_only:
            logger.info("步骤2: 开始AI分析...")
            from ai_analyzer import run_ai_analysis, generate_daily_report
            
            analysis_results = run_ai_analysis(stock_list)
            
            if analysis_results:
                logger.info(f"AI分析完成,共分析{len(analysis_results)}只股票")
                
                # 生成日报
                report_date = datetime.now().strftime('%Y%m%d')
                report_file = os.path.join(args.output_dir, f'stock_report_{report_date}.md')
                report_content = generate_daily_report(analysis_results, report_file)
                
                logger.info(f"日报已生成: {report_file}")
                
                # 步骤3: 发送通知(如果启用)
                if args.notify:
                    logger.info("步骤3: 发送通知...")
                    from notification_sender import send_report
                    
                    # 读取日报内容
                    with open(report_file, 'r', encoding='utf-8') as f:
                        report_text = f.read()
                    
                    # 发送摘要通知
                    send_report(report_text, report_file)
                    logger.info("通知发送完成")
            else:
                logger.error("AI分析失败,无结果返回")
        else:
            logger.info("跳过AI分析(--crawl-only模式)")
        
        logger.info("=" * 50)
        logger.info("自动化分析流程完成")
        logger.info("=" * 50)
        
        return True
        
    except Exception as e:
        logger.error(f"流程执行失败: {e}", exc_info=True)
        return False

if __name__ == "__main__":
    success = main()
    sys.exit(0 if success else 1)

这个主控脚本提供了完整的命令行接口,你可以灵活控制只运行爬虫、只运行分析,或者运行完整的流程。日志功能也很完善,方便排查问题。

6. 实际效果与使用建议

我按照上面的方案搭建了一套系统,已经运行了一个多月。说几个实际的感受:

第一,效率提升非常明显。以前每天要花1-2小时看盘看新闻,现在只需要花5分钟看看AI生成的日报就行了。省下来的时间可以做更深入的行业研究,或者干脆休息放松。

第二,分析质量比较稳定。AI没有情绪波动,不会因为股票涨了就盲目乐观,跌了就过度悲观。它严格按照技术指标和新闻数据做判断,虽然不一定每次都对,但至少逻辑是清晰的。

第三,数据更全面了。以前我只关注几个常用的数据源,现在通过爬虫可以抓取更多维度的信息。比如某个小众财经论坛的讨论、行业专家的博客文章等,这些信息有时候比主流媒体更早反映市场情绪。

当然,这套系统也不是完美的,有几个地方需要注意:

不要完全依赖AI做投资决策。AI分析可以作为一个很好的参考,但最终决策还是要结合你自己的判断。特别是对于基本面分析、行业趋势判断这些需要深度思考的问题,AI目前还替代不了人类。

定期检查数据质量。爬虫可能会遇到网站改版、反爬机制等问题,需要定期检查数据是否正常抓取。我建议每周花10分钟看看日志,确保系统运行正常。

从简单开始,逐步完善。不要一开始就想抓取所有数据源、分析所有股票。先从2-3只你熟悉的股票开始,跑通整个流程,然后再慢慢增加功能。

注意合规风险。爬虫抓取数据要遵守网站的robots.txt协议,不要过于频繁地请求,避免给目标网站造成负担。商业使用前最好咨询法律意见。

7. 总结

把Python爬虫和AI股票分析师结合起来,确实能大大提升金融数据分析的效率和效果。爬虫解决了数据来源的问题,AI解决了数据分析的问题,两者结合正好形成了一个完整的自动化分析链条。

我分享的这套方案,从技术实现上来说并不复杂,大部分代码都是Python的基础应用。即使你不是专业程序员,跟着步骤一步步来,应该也能搭建起来。

最关键的是,这套系统给了我们一个全新的视角来看待股票分析。我们不再需要手动收集碎片化的信息,也不再需要凭感觉做判断。数据采集交给爬虫,数据分析交给AI,我们只需要关注最终的分析结果和投资决策。

这种"人机协作"的模式,我觉得会是未来个人投资者和专业分析师都会采用的方式。机器负责处理海量数据和重复性工作,人类负责战略思考和最终决策,各取所长。

如果你也对自动化金融分析感兴趣,不妨从今天介绍的这个方案开始尝试。先从简单的爬虫做起,再逐步集成AI分析功能,最后实现全自动化运行。过程中遇到问题很正常,多查资料、多尝试,慢慢就能搭建起属于自己的智能分析系统了。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐