140-基于Spark的哔哩哔哩舆情数据分析系统
本项目是一个基于大数据技术的B站视频数据分析系统,旨在通过爬取B站视频数据,运用Spark进行分布式数据处理,结合机器学习算法实现智能推荐,并通过Web界面提供直观的数据可视化展示。# myApp/models.py - 数据模型"""用户模型"""create_time = models.DateField("创建时间", auto_now_add=True)verbose_name = "用户
·
基于Spark的B站视频数据分析系统 - 从数据爬取到智能推荐的完整技术实现
本文详细介绍了基于Spark和Django的B站视频数据分析系统的完整技术实现,包括数据爬取、存储、分析、可视化和智能推荐等核心功能。
📋 目录
- 项目概述
- 技术架构
- 系统架构设计
- 核心功能模块
- 数据流程
- 技术实现细节
- 部署与运维
- 性能优化
- 项目特色
- 总结与展望
- 联系方式
🎯 项目概述
本项目是一个基于大数据技术的B站视频数据分析系统,旨在通过爬取B站视频数据,运用Spark进行分布式数据处理,结合机器学习算法实现智能推荐,并通过Web界面提供直观的数据可视化展示。
主要特性
- 🔍 智能数据爬取: 多线程爬取B站视频、评论、弹幕等数据
- 🚀 分布式数据处理: 基于Spark的高性能数据处理
- 🤖 智能推荐系统: 协同过滤算法实现个性化推荐
- 📊 丰富可视化: 多种图表类型展示数据分析结果
- 💻 现代化Web界面: Django + 响应式前端设计
- 📱 移动端适配: 支持微信小程序等多端访问
🏗️ 技术架构
技术栈概览
| 层级 | 技术选型 | 版本 | 说明 |
|---|---|---|---|
| 前端 | HTML5 + CSS3 + JavaScript | - | 响应式设计,支持多端 |
| Web框架 | Django | 4.2.15 | Python Web开发框架 |
| 数据处理 | Apache Spark | 3.x | 分布式数据处理引擎 |
| 机器学习 | Scikit-learn | 1.3.2 | 推荐算法实现 |
| 数据库 | MySQL | 8.0+ | 关系型数据存储 |
| 数据仓库 | Apache Hive | - | 大数据存储和查询 |
| 爬虫框架 | Requests + LXML | - | 数据采集工具 |
| 可视化 | ECharts + Matplotlib | - | 数据图表展示 |
| 部署 | Docker + Nginx | - | 容器化部署 |
核心依赖
# 主要Python依赖包
Django==4.2.15 # Web框架
pandas==1.4.3 # 数据处理
numpy==1.23.1 # 数值计算
scikit-learn==1.3.2 # 机器学习
matplotlib==3.5.2 # 数据可视化
wordcloud==1.8.2.2 # 词云生成
jieba==0.42.1 # 中文分词
requests==2.31.0 # HTTP请求
lxml==4.9.3 # XML/HTML解析
PyHive==0.7.0 # Hive连接器
🏛️ 系统架构设计
整体架构图
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 数据采集层 │ │ 数据处理层 │ │ 数据应用层 │
├─────────────────┤ ├─────────────────┤ ├─────────────────┤
│ • B站爬虫 │───▶│ • Spark集群 │───▶│ • Django Web │
│ • 数据清洗 │ │ • 数据ETL │ │ • 数据可视化 │
│ • 数据存储 │ │ • 特征工程 │ │ • 智能推荐 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 数据存储层 │ │ 算法模型层 │ │ 用户界面层 │
├─────────────────┤ ├─────────────────┤ ├─────────────────┤
│ • MySQL │ │ • 协同过滤 │ │ • PC端Web │
│ • Hive │ │ • 内容推荐 │ │ • 移动端适配 │
│ • CSV文件 │ │ • 情感分析 │ │ • 微信小程序 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
项目演示
👾 项目源码获取,码界筑梦坊各平台同名,博客底部含联系方式卡片,欢迎咨询!
基于Spark的哔哩哔哩舆情分析系统
















目录结构
基于大数据的B站数据分析系统/
├── 基于大数据的B站数据分析系统/ # Django项目配置
│ ├── __init__.py
│ ├── settings.py # 项目设置
│ ├── urls.py # URL路由
│ └── wsgi.py # WSGI配置
├── myApp/ # 主应用模块
│ ├── __init__.py
│ ├── admin.py # 管理后台
│ ├── models.py # 数据模型
│ ├── views.py # 视图函数
│ └── templates/ # HTML模板
│ ├── index.html # 首页
│ ├── videoData.html # 视频数据
│ ├── commentData.html # 评论数据
│ ├── emoChar.html # 情感分析
│ └── commentCloud.html # 评论词云
├── spiders/ # 爬虫模块
│ ├── cb.py # 数据合并
│ ├── combineVideo.csv # 合并后的视频数据
│ ├── commentInfo.csv # 评论数据
│ └── danmuInfo.csv # 弹幕数据
├── spark/ # Spark处理模块
│ ├── recommend.py # 推荐算法
│ ├── combineVideo.csv # 处理后的视频数据
│ └── commentInfo.csv # 处理后的评论数据
├── machine/ # 机器学习模块
│ └── recommend.py # 推荐系统实现
├── utils/ # 工具模块
│ ├── getChartData.py # 图表数据获取
│ ├── getPublicData.py # 公共数据获取
│ └── query.py # 数据库查询
├── static/ # 静态资源
│ ├── assets/ # CSS/JS/字体
│ ├── cloudImg/ # 词云图片
│ └── img/ # 其他图片资源
├── front-end-module-master/ # 前端模块
│ ├── biliInfo/ # 登录注册
│ ├── shoppingMall/ # 商城模块
│ └── README.md # 说明文档
└── requirements.txt # Python依赖
🔧 核心功能模块
1. 数据采集模块
爬虫架构设计
# spiders/cb.py - 数据合并处理
import pandas as pd
def combine_partition_data():
"""合并多个分区的数据文件"""
# 读取9个分区的数据
partitions = []
for i in range(1, 10):
df = pd.read_csv(f'partition{i}.csv')
partitions.append(df)
# 合并所有分区数据
combined_df = pd.concat(partitions, ignore_index=True)
# 保存合并后的数据
combined_df.to_csv('combineVideo.csv', index=False, header=True)
return combined_df
# 数据清洗和预处理
def clean_data(df):
"""数据清洗和标准化"""
# 去除重复数据
df = df.drop_duplicates()
# 处理缺失值
df = df.fillna(0)
# 数据类型转换
df['duration'] = pd.to_numeric(df['duration'], errors='coerce')
df['view_count'] = pd.to_numeric(df['view_count'], errors='coerce')
return df
数据采集流程
2. 数据处理模块
Spark数据处理
# spark/recommend.py - Spark数据处理示例
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
def process_video_data():
"""使用Spark处理视频数据"""
# 创建Spark会话
spark = SparkSession.builder \
.appName("BilibiliVideoAnalysis") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# 读取CSV数据
video_df = spark.read.csv("combineVideo.csv", header=True, inferSchema=True)
# 数据转换和聚合
processed_df = video_df \
.groupBy("partition") \
.agg(
count("*").alias("video_count"),
avg("duration").alias("avg_duration"),
sum("view_count").alias("total_views")
)
# 保存处理结果
processed_df.write.mode("overwrite").parquet("processed_videos.parquet")
return processed_df
3. 智能推荐模块
协同过滤算法实现
# machine/recommend.py - 用户协同过滤推荐
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class CollaborativeFiltering:
def __init__(self):
self.user_ratings = {}
def get_user_ratings(self):
"""获取用户评分数据"""
user_ratings = {}
user_list = self.query_users()
history_list = self.query_history()
for user in user_list:
user_id = user[0]
user_name = user[1]
for history in history_list:
video_id = history[1]
try:
# 获取用户对视频的评分
video_name = self.query_video_name(video_id)
history_count = history[3]
if user_name not in user_ratings:
user_ratings[user_name] = {video_name: history_count}
else:
user_ratings[user_name][video_name] = history_count
except:
continue
return user_ratings
def user_based_collaborative_filtering(self, user_name, user_ratings, top_n=5):
"""基于用户的协同过滤推荐"""
# 获取目标用户数据
target_user_ratings = user_ratings[user_name]
# 计算用户相似度
user_similarity_scores = {}
target_user_vector = np.array([
rating for _, rating in target_user_ratings.items()
])
# 计算与其他用户的余弦相似度
for user, ratings in user_ratings.items():
if user == user_name:
continue
user_vector = np.array([
ratings.get(item, 0) for item in target_user_ratings
])
# 计算余弦相似度
similarity = cosine_similarity(
[user_vector], [target_user_vector]
)[0][0]
user_similarity_scores[user] = similarity
# 排序相似用户
sorted_similar_users = sorted(
user_similarity_scores.items(),
key=lambda x: x[1],
reverse=True
)
# 生成推荐列表
recommended_items = set()
for similar_user, _ in sorted_similar_users[:top_n]:
recommended_items.update(
user_ratings[similar_user].keys()
)
# 过滤已观看视频
final_recommendations = [
item for item in recommended_items
if item not in target_user_ratings
]
return final_recommendations
# 使用示例
if __name__ == '__main__':
cf = CollaborativeFiltering()
user_ratings = cf.get_user_ratings()
recommendations = cf.user_based_collaborative_filtering(
'userTwo', user_ratings
)
print(f"推荐视频: {recommendations}")
4. 数据可视化模块
图表数据获取
# utils/getChartData.py - 图表数据生成
import pandas as pd
from myApp.models import *
from utils.queryhive import queryhives
class ChartDataGenerator:
def __init__(self):
pass
def get_index_data(self):
"""获取首页展示数据"""
# 获取UP主信息
up_info_list = self.get_up_info()
# 获取视频评论数据
video_comments_list = self.get_video_comments()
# 计算统计数据
max_create_user = max(up_info_list, key=lambda x: x[6])
max_level_user = max(up_info_list, key=lambda x: x[5])
max_comment_like_user = max(video_comments_list, key=lambda x: x[6])
# 获取弹幕数据
danmu_data = self.get_danmu_data()
x_data = [x[0] for x in danmu_data]
y1_data = [x[1] for x in danmu_data]
y2_data = [x[2] for x in danmu_data]
# 生成饼图数据
pie_data = self.generate_pie_data()
# 生成漏斗图数据
funnel_data = self.generate_funnel_data()
return {
'max_create_user': max_create_user[1],
'max_create_num': max_create_user[6],
'max_level_user': max_level_user[1],
'max_level_num': max_level_user[5],
'max_comment_like_user': max_comment_like_user[1],
'max_comment_like_num': max_comment_like_user[6],
'danmu_x': x_data,
'danmu_y1': y1_data,
'danmu_y2': y2_data,
'pie_data': pie_data,
'funnel_data': funnel_data
}
def get_video_comment_data(self, video_id):
"""获取视频评论数据"""
return queryhives(
'select * from videoCommentsTwo where videoId = %s',
[video_id],
'select'
)
def generate_pie_data(self):
"""生成饼图数据"""
max_partition_list = self.get_max_partition()
pie_data = []
for item in max_partition_list:
pie_data.append({
'name': item[0],
'value': item[1]
})
return pie_data
def generate_funnel_data(self):
"""生成漏斗图数据"""
max_up_list = self.get_max_up()
funnel_data = []
for item in max_up_list:
funnel_data.append({
'name': item[0],
'value': item[1]
})
return funnel_data
前端可视化实现
<!-- templates/index.html - 数据可视化模板 -->
<!DOCTYPE html>
<html>
<head>
<title>B站数据分析系统</title>
<script src="https://cdn.jsdelivr.net/npm/echarts@5.4.3/dist/echarts.min.js"></script>
</head>
<body>
<div class="container">
<!-- 统计卡片 -->
<div class="stats-cards">
<div class="card">
<h3>最高创作UP主</h3>
<p>{{ max_create_user }}</p>
<span>{{ max_create_num }}个视频</span>
</div>
<div class="card">
<h3>最高等级UP主</h3>
<p>{{ max_level_user }}</p>
<span>等级 {{ max_level_num }}</span>
</div>
</div>
<!-- 弹幕分析图表 -->
<div class="chart-container">
<div id="danmuChart" style="width: 100%; height: 400px;"></div>
</div>
<!-- 分区分布饼图 -->
<div class="chart-container">
<div id="partitionChart" style="width: 100%; height: 400px;"></div>
</div>
</div>
<script>
// 弹幕分析图表
const danmuChart = echarts.init(document.getElementById('danmuChart'));
const danmuOption = {
title: { text: '弹幕数据分析' },
tooltip: { trigger: 'axis' },
legend: { data: ['弹幕数量', '回复数量'] },
xAxis: { data: {{ danmu_x|safe }} },
yAxis: {},
series: [
{
name: '弹幕数量',
type: 'bar',
data: {{ danmu_y1|safe }}
},
{
name: '回复数量',
type: 'line',
data: {{ danmu_y2|safe }}
}
]
};
danmuChart.setOption(danmuOption);
// 分区分布饼图
const partitionChart = echarts.init(document.getElementById('partitionChart'));
const partitionOption = {
title: { text: '视频分区分布' },
tooltip: { trigger: 'item' },
series: [{
type: 'pie',
radius: '50%',
data: {{ pie_data|safe }}
}]
};
partitionChart.setOption(partitionOption);
</script>
</body>
</html>
5. 数据模型设计
Django模型定义
# myApp/models.py - 数据模型
from django.db import models
class User(models.Model):
"""用户模型"""
id = models.AutoField("id", primary_key=True)
username = models.CharField("username", max_length=255, default='')
password = models.CharField("password", max_length=255, default='')
create_time = models.DateField("创建时间", auto_now_add=True)
class Meta:
db_table = "user"
verbose_name = "用户"
verbose_name_plural = "用户"
def __str__(self):
return self.username
class History(models.Model):
"""用户观看历史模型"""
id = models.AutoField("id", primary_key=True)
video_id = models.CharField("视频ID", max_length=255, default='')
user = models.ForeignKey(User, on_delete=models.CASCADE)
count = models.IntegerField("访问次数", default=1)
last_watch_time = models.DateTimeField("最后观看时间", auto_now=True)
class Meta:
db_table = "history"
verbose_name = "观看历史"
verbose_name_plural = "观看历史"
def __str__(self):
return f"{self.user.username} - {self.video_id}"
class VideoInfo(models.Model):
"""视频信息模型"""
id = models.CharField("视频ID", max_length=255, primary_key=True)
title = models.CharField("标题", max_length=500)
partition = models.CharField("分区", max_length=100)
duration = models.IntegerField("时长(秒)")
view_count = models.BigIntegerField("播放量")
danmu_count = models.IntegerField("弹幕数")
comment_count = models.IntegerField("评论数")
like_count = models.IntegerField("点赞数")
create_time = models.DateTimeField("创建时间")
class Meta:
db_table = "video_info"
verbose_name = "视频信息"
verbose_name_plural = "视频信息"
🔄 数据流程
完整数据处理流程
数据ETL流程
# 数据ETL流程示例
def etl_pipeline():
"""数据ETL主流程"""
# 1. Extract - 数据提取
raw_data = extract_from_sources()
# 2. Transform - 数据转换
transformed_data = transform_data(raw_data)
# 3. Load - 数据加载
load_to_targets(transformed_data)
def extract_from_sources():
"""从多个数据源提取数据"""
sources = {
'videos': 'combineVideo.csv',
'comments': 'commentInfo.csv',
'danmu': 'danmuInfo.csv'
}
data = {}
for name, file_path in sources.items():
data[name] = pd.read_csv(file_path)
return data
def transform_data(raw_data):
"""数据转换和清洗"""
transformed = {}
# 视频数据处理
videos = raw_data['videos'].copy()
videos['duration_min'] = videos['duration'] / 60
videos['view_count_k'] = videos['view_count'] / 1000
transformed['videos'] = videos
# 评论数据处理
comments = raw_data['comments'].copy()
comments['comment_length'] = comments['content'].str.len()
transformed['comments'] = comments
return transformed
def load_to_targets(transformed_data):
"""加载数据到目标系统"""
# 保存到MySQL
save_to_mysql(transformed_data)
# 保存到Hive
save_to_hive(transformed_data)
# 导出CSV
export_to_csv(transformed_data)
🚀 技术实现细节
1. 数据库设计
MySQL表结构
-- 用户表
CREATE TABLE `user` (
`id` int NOT NULL AUTO_INCREMENT,
`username` varchar(255) DEFAULT '',
`password` varchar(255) DEFAULT '',
`createTime` date DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 观看历史表
CREATE TABLE `history` (
`id` int NOT NULL AUTO_INCREMENT,
`videoId` varchar(255) DEFAULT '',
`user_id` int DEFAULT NULL,
`count` int DEFAULT '1',
`last_watch_time` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `user_id` (`user_id`),
KEY `videoId` (`videoId`),
CONSTRAINT `history_ibfk_1` FOREIGN KEY (`user_id`) REFERENCES `user` (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 视频信息表
CREATE TABLE `video_info` (
`id` varchar(255) NOT NULL,
`title` varchar(500) DEFAULT '',
`partition` varchar(100) DEFAULT '',
`duration` int DEFAULT '0',
`view_count` bigint DEFAULT '0',
`danmu_count` int DEFAULT '0',
`comment_count` int DEFAULT '0',
`like_count` int DEFAULT '0',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `partition` (`partition`),
KEY `create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
2. Django视图实现
主要视图函数
# myApp/views.py - 视图函数实现
from django.shortcuts import render
from django.http import JsonResponse
from django.contrib.auth.decorators import login_required
from utils.getChartData import getIndexData, getVideoComment
from utils.getChartData import addHistoryData, getTopVideoData
def index(request):
"""首页视图"""
try:
# 获取首页数据
data = getIndexData()
context = {
'max_create_user': data[0],
'max_create_num': data[1],
'max_level_user': data[2],
'max_level_num': data[3],
'max_comment_like_user': data[4],
'max_comment_like_num': data[5],
'danmu_x': data[6],
'danmu_y1': data[7],
'danmu_y2': data[8],
'pie_data': data[9],
'funnel_data': data[10]
}
return render(request, 'index.html', context)
except Exception as e:
return render(request, 'error.html', {'error': str(e)})
@login_required
def video_detail(request, video_id):
"""视频详情页"""
try:
# 获取视频评论数据
comments = getVideoComment(video_id)
# 记录用户观看历史
if request.user.is_authenticated:
addHistoryData(request.user, video_id)
context = {
'video_id': video_id,
'comments': comments
}
return render(request, 'videoDetail.html', context)
except Exception as e:
return render(request, 'error.html', {'error': str(e)})
def get_chart_data(request):
"""获取图表数据的API接口"""
try:
chart_type = request.GET.get('type', 'index')
if chart_type == 'index':
data = getIndexData()
return JsonResponse({
'success': True,
'data': {
'danmu_x': data[6],
'danmu_y1': data[7],
'danmu_y2': data[8],
'pie_data': data[9],
'funnel_data': data[10]
}
})
elif chart_type == 'video':
data = getTopVideoData()
return JsonResponse({
'success': True,
'data': {
'line1_x': data[0],
'line1_y': data[1],
'line2_x': data[2],
'line2_y': data[3],
'pie_data': data[4]
}
})
else:
return JsonResponse({
'success': False,
'error': '未知的图表类型'
})
except Exception as e:
return JsonResponse({
'success': False,
'error': str(e)
})
3. 推荐算法优化
算法性能优化
# 推荐算法优化版本
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import StandardScaler
import joblib
import os
class OptimizedRecommendationSystem:
def __init__(self):
self.user_ratings = {}
self.user_similarity_matrix = None
self.scaler = StandardScaler()
self.model_path = 'models/recommendation_model.pkl'
def precompute_similarities(self):
"""预计算用户相似度矩阵"""
if os.path.exists(self.model_path):
# 加载预计算的模型
self.user_similarity_matrix = joblib.load(self.model_path)
return
# 计算用户相似度矩阵
user_names = list(self.user_ratings.keys())
n_users = len(user_names)
# 创建用户-视频评分矩阵
all_videos = set()
for ratings in self.user_ratings.values():
all_videos.update(ratings.keys())
video_list = sorted(list(all_videos))
user_matrix = np.zeros((n_users, len(video_list)))
for i, user in enumerate(user_names):
for j, video in enumerate(video_list):
user_matrix[i, j] = self.user_ratings[user].get(video, 0)
# 标准化数据
user_matrix_scaled = self.scaler.fit_transform(user_matrix)
# 计算余弦相似度
self.user_similarity_matrix = cosine_similarity(user_matrix_scaled)
# 保存模型
os.makedirs('models', exist_ok=True)
joblib.dump(self.user_similarity_matrix, self.model_path)
def get_recommendations(self, user_name, top_n=10):
"""获取推荐结果"""
if user_name not in self.user_ratings:
return []
user_names = list(self.user_ratings.keys())
user_idx = user_names.index(user_name)
# 获取用户相似度
user_similarities = self.user_similarity_matrix[user_idx]
# 找到最相似的用户
similar_indices = np.argsort(user_similarities)[::-1][1:top_n+1]
# 生成推荐
recommendations = set()
target_user_videos = set(self.user_ratings[user_name].keys())
for idx in similar_indices:
similar_user = user_names[idx]
similar_user_videos = set(self.user_ratings[similar_user].keys())
# 推荐相似用户观看但目标用户未观看的视频
new_videos = similar_user_videos - target_user_videos
recommendations.update(new_videos)
return list(recommendations)[:top_n]
def update_user_ratings(self, user_name, video_id, rating):
"""更新用户评分"""
if user_name not in self.user_ratings:
self.user_ratings[user_name] = {}
self.user_ratings[user_name][video_id] = rating
# 清除预计算的相似度矩阵,需要重新计算
if os.path.exists(self.model_path):
os.remove(self.model_path)
self.user_similarity_matrix = None
🚀 部署与运维
Docker部署配置
# Dockerfile
FROM python:3.8-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
libmysqlclient-dev \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["python", "manage.py", "runserver", "0.0.0.0:8000"]
# docker-compose.yml
version: '3.8'
services:
web:
build: .
ports:
- "8000:8000"
environment:
- DEBUG=False
- DATABASE_URL=mysql://user:password@db:3306/bilibili
depends_on:
- db
volumes:
- .:/app
- static_volume:/app/static
- media_volume:/app/media
db:
image: mysql:8.0
environment:
- MYSQL_DATABASE=bilibili
- MYSQL_USER=user
- MYSQL_PASSWORD=password
- MYSQL_ROOT_PASSWORD=rootpassword
volumes:
- db_data:/var/lib/mysql
ports:
- "3306:3306"
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- static_volume:/app/static
- media_volume:/app/media
depends_on:
- web
volumes:
db_data:
static_volume:
media_volume:
Nginx配置
# nginx.conf
events {
worker_connections 1024;
}
http {
upstream django {
server web:8000;
}
server {
listen 80;
server_name localhost;
location /static/ {
alias /app/static/;
}
location /media/ {
alias /app/media/;
}
location / {
proxy_pass http://django;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
}
⚡ 性能优化
1. 数据库优化
# 数据库查询优化
from django.db.models import Prefetch, Count, Avg
from django.core.cache import cache
class OptimizedDataQuery:
@staticmethod
def get_video_statistics():
"""获取视频统计信息(优化版本)"""
cache_key = 'video_statistics'
cached_data = cache.get(cache_key)
if cached_data:
return cached_data
# 使用聚合查询优化
stats = VideoInfo.objects.aggregate(
total_videos=Count('id'),
avg_duration=Avg('duration'),
total_views=Sum('view_count'),
avg_likes=Avg('like_count')
)
# 缓存结果(5分钟)
cache.set(cache_key, stats, 300)
return stats
@staticmethod
def get_user_recommendations(user_id, limit=10):
"""获取用户推荐(优化版本)"""
cache_key = f'user_recommendations_{user_id}'
cached_data = cache.get(cache_key)
if cached_data:
return cached_data
# 使用预取优化关联查询
user_history = History.objects.filter(
user_id=user_id
).select_related('video').prefetch_related(
Prefetch(
'video__comments',
queryset=Comment.objects.select_related('user')
)
)
# 生成推荐逻辑...
recommendations = generate_recommendations(user_history)
# 缓存结果(10分钟)
cache.set(cache_key, recommendations, 600)
return recommendations
2. 缓存策略
# 缓存配置和策略
from django.core.cache import cache
from django.conf import settings
import hashlib
import json
class CacheManager:
@staticmethod
def get_cache_key(prefix, *args, **kwargs):
"""生成缓存键"""
# 将参数转换为字符串并生成哈希
key_parts = [prefix] + list(args)
if kwargs:
key_parts.append(json.dumps(kwargs, sort_keys=True))
key_string = '_'.join(str(part) for part in key_parts)
return hashlib.md5(key_string.encode()).hexdigest()
@staticmethod
def get_or_set(key, callback, timeout=300):
"""获取缓存或设置缓存"""
cached_data = cache.get(key)
if cached_data is not None:
return cached_data
# 执行回调函数获取数据
data = callback()
# 设置缓存
cache.set(key, data, timeout)
return data
@staticmethod
def invalidate_pattern(pattern):
"""批量清除缓存"""
# 这里可以实现基于模式的缓存清除
# 例如清除所有以 'video_' 开头的缓存
pass
# 使用示例
def get_video_analytics(video_id):
"""获取视频分析数据(带缓存)"""
cache_key = CacheManager.get_cache_key('video_analytics', video_id)
def fetch_data():
# 实际的数据获取逻辑
return VideoInfo.objects.filter(id=video_id).first()
return CacheManager.get_or_set(cache_key, fetch_data, timeout=600)
3. 异步处理
# 异步任务处理
from celery import Celery
from django.core.mail import send_mail
import pandas as pd
# 创建Celery实例
app = Celery('bilibili_analysis')
@app.task
def process_video_data_async(video_ids):
"""异步处理视频数据"""
try:
# 批量处理视频数据
videos = VideoInfo.objects.filter(id__in=video_ids)
# 数据处理逻辑
processed_data = []
for video in videos:
# 计算各种指标
engagement_rate = (video.like_count + video.comment_count) / video.view_count
processed_data.append({
'id': video.id,
'engagement_rate': engagement_rate,
'processed_at': timezone.now()
})
# 保存处理结果
# ...
return f"成功处理 {len(processed_data)} 个视频"
except Exception as e:
return f"处理失败: {str(e)}"
@app.task
def generate_recommendations_async():
"""异步生成推荐"""
try:
# 重新计算所有用户的推荐
users = User.objects.all()
for user in users:
# 生成推荐逻辑
recommendations = generate_user_recommendations(user.id)
# 保存推荐结果
save_recommendations(user.id, recommendations)
return "推荐生成完成"
except Exception as e:
return f"推荐生成失败: {str(e)}"
@app.task
def send_weekly_report():
"""发送周报邮件"""
try:
# 生成周报数据
weekly_stats = generate_weekly_statistics()
# 发送邮件
send_mail(
subject='B站数据分析系统周报',
message=format_weekly_report(weekly_stats),
from_email='system@example.com',
recipient_list=['admin@example.com']
)
return "周报发送成功"
except Exception as e:
return f"周报发送失败: {str(e)}"
🌟 项目特色
1. 技术创新点
- 分布式数据处理: 基于Spark的大规模数据处理能力
- 智能推荐算法: 协同过滤 + 内容推荐的混合推荐策略
- 实时数据分析: 支持实时数据流处理和可视化
- 多维度分析: 视频、用户、评论、弹幕等多维度数据挖掘
2. 用户体验优化
- 响应式设计: 支持PC端和移动端多设备访问
- 交互式图表: 基于ECharts的丰富数据可视化
- 个性化推荐: 基于用户行为的智能内容推荐
- 实时更新: 数据实时刷新和状态同步
3. 系统架构优势
- 模块化设计: 清晰的模块划分和接口定义
- 可扩展性: 支持水平扩展和功能模块扩展
- 高可用性: 多级缓存和容错机制
- 性能优化: 数据库查询优化和异步处理
📊 可视化展示预留
1. 数据大屏展示
<!-- 数据大屏模板 -->
<div class="dashboard-container">
<!-- 实时数据指标 -->
<div class="metrics-row">
<div class="metric-card">
<h3>今日视频数</h3>
<div class="metric-value">{{ today_videos }}</div>
<div class="metric-trend up">+{{ video_growth }}%</div>
</div>
<div class="metric-card">
<h3>今日播放量</h3>
<div class="metric-value">{{ today_views }}</div>
<div class="metric-trend up">+{{ view_growth }}%</div>
</div>
<div class="metric-card">
<h3>活跃用户数</h3>
<div class="metric-value">{{ active_users }}</div>
<div class="metric-trend down">-{{ user_decline }}%</div>
</div>
</div>
<!-- 实时数据图表 -->
<div class="charts-row">
<div class="chart-container">
<div id="realTimeChart" style="width: 100%; height: 300px;"></div>
</div>
<div class="chart-container">
<div id="trendChart" style="width: 100%; height: 300px;"></div>
</div>
</div>
<!-- 地理分布图 -->
<div class="map-container">
<div id="userMap" style="width: 100%; height: 400px;"></div>
</div>
</div>
2. 移动端适配
/* 移动端响应式样式 */
@media (max-width: 768px) {
.dashboard-container {
padding: 10px;
}
.metrics-row {
flex-direction: column;
gap: 15px;
}
.metric-card {
width: 100%;
padding: 20px;
}
.charts-row {
flex-direction: column;
}
.chart-container {
width: 100%;
margin-bottom: 20px;
}
}
/* 微信小程序样式 */
.wx-container {
background: #f8f8f8;
padding: 20rpx;
}
.wx-metric-card {
background: white;
border-radius: 16rpx;
padding: 30rpx;
margin-bottom: 20rpx;
box-shadow: 0 2rpx 8rpx rgba(0,0,0,0.1);
}
📈 总结与展望
项目成果
本项目成功构建了一个完整的B站视频数据分析系统,实现了从数据采集、处理、分析到可视化的全流程。系统具备以下特点:
- 技术先进性: 采用Spark分布式计算、机器学习算法等前沿技术
- 功能完整性: 涵盖数据爬取、存储、分析、推荐、可视化等核心功能
- 架构合理性: 模块化设计,支持水平扩展和功能迭代
- 用户体验: 响应式设计,支持多端访问
技术亮点
- 分布式数据处理: 基于Spark的大规模数据处理能力
- 智能推荐系统: 协同过滤算法的实际应用
- 实时数据可视化: 基于ECharts的丰富图表展示
- 多维度数据分析: 视频、用户、评论、弹幕等多角度分析
未来发展方向
- 算法优化: 引入深度学习模型,提升推荐准确性
- 实时处理: 集成Kafka等流处理技术,支持实时数据分析
- 云原生: 采用Kubernetes等容器编排技术,提升部署灵活性
- AI增强: 集成自然语言处理,实现评论情感分析等高级功能
📞 联系方式
码界筑梦坊 - 各大平台同名
技术交流
欢迎关注我们的技术分享,一起探讨大数据、人工智能、Web开发等前沿技术!
本文档持续更新中,如有问题或建议,欢迎在评论区留言交流。
最后更新时间: 2024年12月
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)