1、线性回归

基本思想:线性回归是一种用于建立自变量和因变量之间线性关系的经典统计方法,其基本思想是找到一条最佳的直线,使得这条直线能够最好地拟合样本数据,并用这条直线来对新的自变量进行预测。

2、 代码实现

2.1.metrics.py:定义一些衡量模型的性能的指标
包括分类和回归的指标

import numpy as np
from math import sqrt


# 分类准确度
def accuracy_score(y_true, y_predict):
    """计算y_true(y_test)和y_predict之间的准确率"""

    assert y_true.shape[0] == y_predict.shape[0], \
        "the size of y_true must be equal to the size of y_predict"

    return np.sum(y_true == y_predict) / len(y_true)


# 下面三个是对线性回归模型大的评测指标
def mean_squared_error(y_true, y_predict):
    """计算y_true和y_predict之间的mse"""
    assert len(y_true) == len(y_predict), \
        "the size of y_true must be equal to the size of y_predict"

    return np.sum((y_true - y_predict) ** 2) / len(y_true)


def root_mean_squared_error(y_true, y_predict):
    """计算y_true和y_predict之间的RMSE"""
    return sqrt(mean_squared_error(y_true, y_predict))


def mean_absolute_error(y_true, y_predict):
    """计算y_true和y_predict之间的RMSE"""
    assert len(y_true) == len(y_predict), \
        "the size of y_true must be equal to the size of y_predict"
    return np.sum(np.absolute(y_true - y_predict)) / len(y_true)


def r2_score(y_true, y_predict):
    """计算y_true和y_predict之间的R Square"""
    return 1 - mean_squared_error(y_true, y_predict) / np.var(y_true)


# 评价分类的指标
def TN(y_true, y_predict):
    assert len(y_true) == len(y_predict)
    return np.sum((y_true == 0) & (y_predict == 0))


def FP(y_true, y_predict):
    assert len(y_true) == len(y_predict)
    return np.sum((y_true == 0) & (y_predict == 1))


def FN(y_true, y_predict):
    assert len(y_true) == len(y_predict)
    return np.sum((y_true == 1) & (y_predict == 0))


def TP(y_true, y_predict):
    assert len(y_true) == len(y_predict)
    return np.sum((y_true == 1) & (y_predict == 1))


def confusion_matrix(y_true, y_predict):
    return np.array([
        [TN(y_true, y_predict), FP(y_true, y_predict)],
        [FN(y_true, y_predict), TP(y_true, y_predict)]
    ])


def precision_score(y_true, y_predict):
    tp = TP(y_true, y_predict)
    fp = FP(y_true, y_predict)
    try:
        return tp / (tp + fp)
    except:
        return 0.0

def recall_score(y_true, y_predict):
    tp = TP(y_true, y_predict)
    fn = FN(y_true, y_predict)
    try:
        return tp / (tp + fn)
    except:
        return 0.0

def f1_score(y_true, y_predict):
    precision = precision_score(y_true, y_predict)
    recall = recall_score(y_true, y_predict)
    try:
        return 2 * precision * recall / (precision + recall)
    except:
        return 0.0

def TPR(y_true, y_predict):
    tp = TP(y_true, y_predict)
    fn = FN(y_true, y_predict)
    try:
        return tp / (tp + fn)
    except:
        return 0.0

def FPR(y_true, y_predict):
    fp = FP(y_true, y_predict)
    tn = TN(y_true, y_predict)
    try:
        return fp / (fp + tn)
    except:
        return 0.0

上面的代码是一些评估机器学习模型表现的函数,包括回归模型和分类模型的指标。下面是每个函数的具体功能描述:
accuracy_score(y_true, y_predict):计算分类模型的准确率。
mean_squared_error(y_true, y_predict):计算回归模型的均方误差。
root_mean_squared_error(y_true, y_predict):计算回归模型的均方根误差。
mean_absolute_error(y_true, y_predict):计算回归模型的平均绝对误差。
r2_score(y_true,y_predict):计算回归模型的R²分数。
TN(y_true, y_predict):计算二分类模型中真负类数。
FP(y_true,y_predict):计算二分类模型中假正类数。
FN(y_true, y_predict):计算二分类模型中假负类数。
TP(y_true, y_predict):计算二分类模型中真正类数。
confusion_matrix(y_true,y_predict):计算二分类模型中的混淆矩阵。
precision_score(y_true,y_predict):计算二分类模型中的精确率。
recall_score(y_true, y_predict):计算二分类模型中的召回率。
f1_score(y_true, y_predict):计算二分类模型中的F1分数。
TPR(y_true,y_predict):计算二分类模型中的真正类率。
FPR(y_true, y_predict):计算二分类模型中的假正类率。

2.2.LinearRegression.py:封装的线性回归模型
损失函数:均方误差(MSE)

优化方法:梯度下降 or 向量化运算求梯度

import numpy as np
from .metrics import r2_score


# 多元线性回归模型
class LinearRegression:

    def __init__(self):
        """初始化Linear Regression模型"""
        self.coef_ = None  # 系数
        self.interception_ = None  # 截距
        self._theta = None

    # 使用正规化解出参数
    def fit_normal(self, X_train, y_train):
        """根据训练数据集X_train, y_train训练Linear Regression模型"""
        assert X_train.shape[0] == y_train.shape[0], \
            "the size of X_train must be equal to the size of y_train"
        # X_b = np.hstack([np.ones((X_train.shape[0], 1)), X_train])
        X_b = np.hstack([np.ones((X_train.shape[0], 1)), X_train])
        self._theta = np.linalg.inv(X_b.T.dot(X_b)).dot(X_b.T).dot(y_train)

        self.interception_ = self._theta[0]
        self.coef_ = self._theta[1:]
        return self

    # 使用批量梯度下降法
    def fit_gd(self, X_train, y_train, eta=0.01, n_iters=1e4):
        """根据训练数据集X_train, y_train,使用梯度下降法训练Linear Regression模型"""
        assert X_train.shape[0] == y_train.shape[0], \
            "the size of X_train must be equal to the size of y_train"

        def J(theta, X_b, y):
            """求出对应theta的损失函数"""
            try:
                return np.sum((y - X_b.dot(theta)) ** 2) / len(y)
            except:
                return float('inf')

        def dJ(theta, X_b, y):
            """求出损失函数的对应theta梯度"""
            # 使用循坏计算
            # res = np.empty(len(theta))
            # res[0] = np.sum(X_b.dot(theta)-y)
            # for i in range(1, len(theta)):
            #     res[i] = (X_b.dot(theta) - y).dot(X_b[:, i])
            # return res * 2 / len(X_b)
            # 使用下面向量化运算求梯度
            return X_b.T.dot(X_b.dot(theta) - y) * 2 / len(y)

        def gradient_descent(X_b, y, initial_theta, eta, n_iters=1e4, epsilon=1e-8):
            """使用梯度下降算法训练模型"""
            theta = initial_theta
            cur_iter = 0

            while cur_iter < n_iters:
                gradient = dJ(theta, X_b, y)
                last_theta = theta
                theta = theta - eta * gradient
                if abs(J(theta, X_b, y) - J(last_theta, X_b, y)) < epsilon:
                    break

                cur_iter += 1

            return theta

        X_b = np.hstack([np.ones((len(X_train), 1)),  X_train])
        initial_theta = np.zeros(X_b.shape[1])
        self._theta = gradient_descent(X_b, y_train, initial_theta, eta, n_iters)
        self.interception_ = self._theta[0]
        self.coef_ = self._theta[1:]

        return self

    # 使用随机梯度下降法  在随机梯度下降法中,通常n_iters:表示将所有样本数据看几遍(考虑到所有训练样本信息)
    #  n_iters:默认将所有样本数据看5遍
    def fit_sgd(self, X_train, y_train, n_iters=5, t0=5, t1=50):
        """根据训练数据集X_train, y_train,使用梯度下降法训练Linear Regression模型"""
        assert X_train.shape[0] == y_train.shape[0], \
            "the size of X_train must be equal to the size of y_train"
        assert n_iters >= 1, \
            "All sample data should be considered at least once"

        def dJ_sgd(theta, X_b_i, y_i):     # 随机梯度下降传进来的是一个样本的数据
            return X_b_i.T.dot(X_b_i.dot(theta) - y_i) * 2.   # 不需要除以len(y)

        def sgd(X_b, y, initial_theta, n_iters, t0=5, t1=50):

            def learning_rate(t):
                return t0 / (t + t1)

            theta = initial_theta
            m = len(X_b)     #样本的数量
            """
            1、下面的实现方法有问题:
            随机梯度下降法应该把所有的样本数据至少看一遍,把所有的样本信息都考虑到(即都能用上,这样比较科学)"""
            # for cur_iter in range(n_iters*m):
            #     rand_i = np.random.randint(m)  # 1.1 随机选取一个样本数据, 因为是随机取一个样本,所以不能保证所有样本的信息都考虑到
            #     gradient = dJ_sgd(theta, X_b[rand_i], y[rand_i])
            #     theta = theta - learning_rate(cur_iter) * gradient
            #
            # 2、为解决1.1的问题,下面对代码进行改进
            for cur_iter in range(n_iters):
                indexes = np.random.permutation(m)   #生成0到m-1的乱序数组
                X_b_new = X_b[indexes]
                y_new = y[indexes]
                for i in range(m):
                    gradient = dJ_sgd(theta, X_b_new[i], y_new[i])
                    theta = theta - learning_rate(cur_iter * m + i) * gradient

            return theta

        X_b = np.hstack([np.ones((len(X_train), 1)),  X_train])
        initial_theta = np.random.randn(X_b.shape[1])
        self._theta = sgd(X_b, y_train, initial_theta, n_iters, t0, t1)
        self.interception_ = self._theta[0]
        self.coef_ = self._theta[1:]

        return self

    def predict(self, X_predict):
        """给定待预测数据集X_predict,返回表示X_predict"""
        assert self.coef_ is not None and self.interception_ is not None, \
            "must fit before predict!"
        assert X_predict.shape[1] == len(self.coef_), \
            "the feature number of X_predict must be equal to X_train"

        X_b = np.hstack([np.ones((X_predict.shape[0], 1)), X_predict])
        return X_b.dot(self._theta)

    def score(self, X_test, y_test):
        """根据测试数据集X_test和y_test确定当前模型的准确度"""
        y_predict = self.predict(X_test)
        return r2_score(y_test, y_predict)

    def __repr__(self):
        return "LinearRegression()"

代码实现了一个多元线性回归模型,其中包括三种不同的参数训练方法:正规方程法、批量梯度下降法和随机梯度下降法。
模型的训练需要传入训练数据集 X_train 和 y_train。其中,X_train 为训练集的自变量数据,y_train 为训练集的因变量数据。
在正规方程法中,模型直接使用正规方程求解出最优参数;
在批量梯度下降法中,模型使用梯度下降法求解最优参数,其中需要传入梯度下降法的参数包括:学习率 eta、迭代次数 n_iters;
在随机梯度下降法中,模型同样使用梯度下降法求解最优参数,但是每次迭代只随机选择一个样本进行梯度下降,因此其训练速度相比批量梯度下降法更快,需要传入的参数包括迭代次数 n_iters、学习率更新的相关参数 t0 和 t1。

Time:2023.3.13
如果上面代码对您有帮助,欢迎点个赞!!!

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐