简介

本文系统讲解马尔可夫随机场(Markov Random Field, MRF)——一种刻画变量间无向相关关系的概率图模型。从局部变量的“势函数”度量,到“马尔可夫性”的条件独立规则,再到联合概率的“团分解”方法,逐步揭开MRF的理论框架;同时关联其在图像处理、机器学习中的应用场景,帮助读者理解这一模型的核心价值。

一、从现实问题到概率图模型

生活中许多任务涉及多变量依赖:比如推荐系统中,用户对电影的偏好依赖于 genre、导演、演员等因素;图像修复中,像素的颜色依赖于相邻像素的取值。概率图模型(Probabilistic Graphical Model, PGM)通过“节点(变量)+ 边(依赖关系)”的可视化结构,将复杂的变量依赖转化为可计算的数学模型。

PGM分为两类:

  • 有向图模型(贝叶斯网络):用有向无环图表示变量间的因果关系(如“下雨→地面湿滑”);
  • 无向图模型(马尔可夫网):用无向图表示变量间的对称相关关系(如“像素A亮→像素B亮”的 mutual依赖)。

马尔可夫随机场(MRF)是无向图模型的典型代表,其核心是用局部依赖描述全局概率

二、MRF的模型定义:势函数与局部依赖

MRF的无向图中,节点代表随机变量,边代表变量间的对称相关关系(无方向、相互影响)。这种局部相关关系的强度,由**势函数(Potential Function)**度量。

1. 势函数:量化局部变量的相关性

势函数是定义在变量子集上的非负函数,用于描述子集内变量的“契合度”——值越大,说明变量组合越“偏好”。

例如,考虑变量x2x_2x2(像素2的亮度)和x3x_3x3(像素3的亮度):若x2x_2x2x3x_3x3相邻(图中有边),可定义势函数:ψ(x2,x3)={1.5if x2=x3;0.1otherwise. \psi(x_2, x_3) = \begin{cases} 1.5 & \text{if } x_2 = x_3; \\ 0.1 & \text{otherwise.} \end{cases} ψ(x2,x3)={1.50.1if x2=x3;otherwise.该函数表示:模型偏好x2x_2x2x3x_3x3取值相同(此时势函数值更大),即两像素亮度正相关。

2. 势函数的非负性:指数形式的常用技巧

为保证势函数非负(概率模型的基本要求),通常用指数函数定义势函数:ψ(x)=e−H(x) \psi(x) = e^{-H(x)} ψ(x)=eH(x)其中H(x)H(x)H(x)能量函数,用于刻画变量组合的“不契合度”(能量越高,势函数值越小)。

能量函数的常见形式(以变量子集xxx为例):H(x)=∑u,v∈x,ueqvαuvxuxv+∑v∈xβvxv H(x) = \sum_{u,v \in x, u eq v} \alpha_{uv} x_u x_v + \sum_{v \in x} \beta_v x_v H(x)=u,vx,ueqvαuvxuxv+vxβvxv

  • αuv\alpha_{uv}αuv:衡量变量xux_uxuxvx_vxv的交互强度(正/负表示正/负相关);
  • βv\beta_vβv:衡量变量xvx_vxv自身的偏好(如xvx_vxv取1的倾向)。

这些参数(αuv\alpha_{uv}αuvβv\beta_vβv)需要通过数据学习得到,称为参数估计

三、MRF的核心:马尔可夫性与条件独立

MRF的“马尔可夫性”是其简化概率计算的关键——它定义了变量间的条件独立规则,让我们可以用局部信息推导全局概率。

马尔可夫性包含三个等价的规则(从全局到局部):

1. 全局马尔可夫性:被“分隔”的变量独立

设无向图中,节点集CCCAAABBB完全分隔(所有从AAABBB的路径都经过CCC),则给定CCC的取值,AAABBB条件独立,记为:xA⊥xB∣xC x_A \perp x_B \mid x_C xAxBxC数学表达为:p(xA,xB∣xC)=p(xA∣xC)⋅p(xB∣xC) p(x_A, x_B \mid x_C) = p(x_A \mid x_C) \cdot p(x_B \mid x_C) p(xA,xBxC)=p(xAxC)p(xBxC)

2. 局部马尔可夫性:变量只依赖邻接节点

对于任意变量xvx_vxv,其邻接变量集为N(v)N(v)N(v)(所有与xvx_vxv直接相连的变量),则给定N(v)N(v)N(v)的取值,xvx_vxv与非邻接变量条件独立

例如,若xvx_vxv的邻接变量是x1,x2x_1, x_2x1,x2,则:p(xv,xo∣xN(v))=p(xv∣xN(v))⋅p(xo∣xN(v)) p(x_v, x_o \mid x_{N(v)}) = p(x_v \mid x_{N(v)}) \cdot p(x_o \mid x_{N(v)}) p(xv,xoxN(v))=p(xvxN(v))p(xoxN(v))其中xox_oxo是非邻接变量(如x3,x4x_3, x_4x3,x4)。

3. 成对马尔可夫性:非邻接变量独立

对于任意两个非邻接变量xix_ixixjx_jxj给定所有其他变量的取值,xix_ixixjx_jxj条件独立p(xi,xj∣x∖{i,j})=p(xi∣x∖{i,j})⋅p(xj∣x∖{i,j}) p(x_i, x_j \mid x_{\setminus \{i,j\}}) = p(x_i \mid x_{\setminus \{i,j\}}) \cdot p(x_j \mid x_{\setminus \{i,j\}}) p(xi,xjx{i,j})=p(xix{i,j})p(xjx{i,j})其中x∖{i,j}x_{\setminus \{i,j\}}x{i,j}表示去除xix_ixixjx_jxj后的所有变量。

四、MRF的联合概率分解:团与极大团

马尔可夫性的核心目标是将高维联合概率分解为低维局部势函数的乘积——这样可以避免直接计算高维概率的“维度灾难”(比如nnn个二值变量的联合概率需要2n−12^n-12n1个参数)。

1. 团:全连接的变量子集

为了满足马尔可夫性的条件独立规则,分解后的每个局部子集必须是“团(Clique)”——子集内任意两个变量都有边相连(全连接)。

例如,若变量x2,x4,x5x_2, x_4, x_5x2,x4,x5两两相邻,则{x2,x4,x5}\{x_2, x_4, x_5\}{x2,x4,x5}是一个团;而{x2,x4,x6}\{x_2, x_4, x_6\}{x2,x4,x6}不是(x2x_2x2x6x_6x6无连接)。

2. 极大团:不可扩展的团

若一个团无法再加入任何变量而保持“全连接”,则称为极大团(Maximal Clique)

例如,{x2,x4,x5}\{x_2, x_4, x_5\}{x2,x4,x5}是极大团(加入x1x_1x1后,x1x_1x1x4x_4x4无连接,不再是团);而{x2,x4}\{x_2, x_4\}{x2,x4}不是(可加入x5x_5x5扩展为更大的团)。

3. 联合概率的团分解公式

MRF的联合概率分布可分解为所有极大团的势函数乘积,再除以归一化因子ZZZ(保证概率和为1):p(x)=1Z∗∏Q∈C∗ψQ(xQ) p(x) = \frac{1}{Z^*} \prod_{Q \in C^*} \psi_Q(x_Q) p(x)=Z1QCψQ(xQ)其中:

  • C∗C^*C:极大团的集合;
  • ψQ(xQ)\psi_Q(x_Q)ψQ(xQ):极大团QQQ的势函数;
  • Z∗Z^*Z:归一化因子,计算所有变量组合的势函数乘积之和:Z∗=∑x∏Q∈C∗ψQ(xQ) Z^* = \sum_x \prod_{Q \in C^*} \psi_Q(x_Q) Z=xQCψQ(xQ)

4. 示例:一个简单MRF的联合概率

假设MRF的变量集是x={x1,x2,x3,x4,x5,x6}x = \{x_1, x_2, x_3, x_4, x_5, x_6\}x={x1,x2,x3,x4,x5,x6},其极大团包括:

  • {x1,x2}\{x_1, x_2\}{x1,x2}{x1,x6}\{x_1, x_6\}{x1,x6}{x2,x3}\{x_2, x_3\}{x2,x3}{x5,x6}\{x_5, x_6\}{x5,x6}{x2,x4,x5}\{x_2, x_4, x_5\}{x2,x4,x5}

则联合概率为:p(x)=1Z⋅ψ12(x1,x2)⋅ψ16(x1,x6)⋅ψ23(x2,x3)⋅ψ56(x5,x6)⋅ψ245(x2,x4,x5) p(x) = \frac{1}{Z} \cdot \psi_{12}(x_1,x_2) \cdot \psi_{16}(x_1,x_6) \cdot \psi_{23}(x_2,x_3) \cdot \psi_{56}(x_5,x_6) \cdot \psi_{245}(x_2,x_4,x_5) p(x)=Z1ψ12(x1,x2)ψ16(x1,x6)ψ23(x2,x3)ψ56(x5,x6)ψ245(x2,x4,x5)其中ZZZ是归一化因子,ψij\psi_{ij}ψij表示二元团的势函数,ψijk\psi_{ijk}ψijk表示三元团的势函数。

4. python代码

import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import cv2
import time
from sklearn.cluster import KMeans
from scipy import ndimage
import os


def load_and_preprocess_image(image_path, target_size=(200, 200)):
    """加载并预处理图像"""
    if not os.path.exists(image_path):
        raise FileNotFoundError(f"图像文件不存在: {image_path}")

    # 使用PIL加载图像
    img = Image.open(image_path)

    # 转换为RGB(如果不是的话)
    if img.mode != 'RGB':
        img = img.convert('RGB')

    # 调整大小以加快处理速度
    if target_size:
        img = img.resize(target_size, Image.Resampling.LANCZOS)

    img_array = np.array(img)
    return img_array


def extract_features(image):
    """提取图像特征:颜色和位置"""
    h, w, c = image.shape
    features = []

    # 将图像转换为Lab颜色空间以获得更好的颜色表示
    lab_image = cv2.cvtColor(image, cv2.COLOR_RGB2LAB)

    for i in range(h):
        for j in range(w):
            # 颜色特征 (Lab空间)
            color_features = lab_image[i, j].astype(float) / 255.0
            # 位置特征
            position_features = [i / h, j / w]
            # 组合特征
            feature_vector = np.concatenate([color_features, position_features])
            features.append(feature_vector)

    return np.array(features).reshape(h, w, -1)


def initialize_segmentation(features, n_classes=3):
    """使用K-means初始化分割"""
    h, w, feature_dim = features.shape
    feature_vectors = features.reshape(-1, feature_dim)

    # 使用K-means进行初始分割
    kmeans = KMeans(n_clusters=n_classes, random_state=42, n_init=10)
    labels = kmeans.fit_predict(feature_vectors)

    # 将标签转换为one-hot编码
    segmentation = labels.reshape(h, w)

    return segmentation, kmeans.cluster_centers_


def mrf_energy_segmentation(segmentation, features, centers, beta=1.0, lambda_val=1.0):
    """计算图像分割的MRF能量函数"""
    h, w = segmentation.shape
    energy = 0.0

    # 数据项:分割标签与特征中心的匹配程度
    for i in range(h):
        for j in range(w):
            label = segmentation[i, j]
            feature = features[i, j]
            # 计算特征与聚类中心的距离
            distance = np.linalg.norm(feature - centers[label])
            energy += lambda_val * distance

    # 平滑项:相邻像素标签的一致性
    # 水平相邻
    energy += beta * np.sum(segmentation[:, :-1] != segmentation[:, 1:])
    # 垂直相邻
    energy += beta * np.sum(segmentation[:-1, :] != segmentation[1:, :])

    return energy


def mrf_segmentation_icm(features, centers, beta=1.0, lambda_val=1.0, max_iter=10):
    """使用ICM算法进行MRF图像分割"""
    h, w, _ = features.shape
    n_classes = len(centers)

    # 初始分割(基于最近的中心)
    segmentation = np.zeros((h, w), dtype=int)
    for i in range(h):
        for j in range(w):
            distances = [np.linalg.norm(features[i, j] - center) for center in centers]
            segmentation[i, j] = np.argmin(distances)

    energy_history = []

    for iteration in range(max_iter):
        current_energy = mrf_energy_segmentation(segmentation, features, centers, beta, lambda_val)
        energy_history.append(current_energy)

        print(f"Iteration {iteration + 1}, Energy: {current_energy:.4f}")

        for i in range(h):
            for j in range(w):
                # 计算当前标签的能量
                current_label = segmentation[i, j]
                current_energy_local = 0.0

                # 数据项
                current_energy_local += lambda_val * np.linalg.norm(features[i, j] - centers[current_label])

                # 平滑项
                neighbors = []
                if i > 0: neighbors.append(segmentation[i - 1, j])
                if i < h - 1: neighbors.append(segmentation[i + 1, j])
                if j > 0: neighbors.append(segmentation[i, j - 1])
                if j < w - 1: neighbors.append(segmentation[i, j + 1])

                for neighbor in neighbors:
                    if neighbor != current_label:
                        current_energy_local += beta

                # 尝试所有可能的标签
                best_label = current_label
                best_energy = current_energy_local

                for candidate_label in range(n_classes):
                    if candidate_label == current_label:
                        continue

                    candidate_energy = 0.0
                    # 数据项
                    candidate_energy += lambda_val * np.linalg.norm(features[i, j] - centers[candidate_label])

                    # 平滑项
                    for neighbor in neighbors:
                        if neighbor != candidate_label:
                            candidate_energy += beta

                    if candidate_energy < best_energy:
                        best_energy = candidate_energy
                        best_label = candidate_label

                # 更新标签
                segmentation[i, j] = best_label

    return segmentation, energy_history


def mrf_segmentation_gibbs(features, centers, beta=1.0, lambda_val=1.0, iterations=50, burn_in=10):
    """使用Gibbs采样进行MRF图像分割"""
    h, w, _ = features.shape
    n_classes = len(centers)

    # 初始分割
    segmentation = np.zeros((h, w), dtype=int)
    for i in range(h):
        for j in range(w):
            distances = [np.linalg.norm(features[i, j] - center) for center in centers]
            segmentation[i, j] = np.argmin(distances)

    samples = []
    energy_history = []

    for it in range(iterations + burn_in):
        # 随机扫描所有像素
        for _ in range(h * w):
            i = np.random.randint(0, h)
            j = np.random.randint(0, w)

            # 计算每个标签的概率
            log_probs = np.zeros(n_classes)

            for label in range(n_classes):
                # 数据项
                data_energy = lambda_val * np.linalg.norm(features[i, j] - centers[label])

                # 平滑项
                smooth_energy = 0.0
                neighbors = []
                if i > 0:
                    neighbors.append(segmentation[i - 1, j])
                    if segmentation[i - 1, j] != label:
                        smooth_energy += beta
                if i < h - 1:
                    neighbors.append(segmentation[i + 1, j])
                    if segmentation[i + 1, j] != label:
                        smooth_energy += beta
                if j > 0:
                    neighbors.append(segmentation[i, j - 1])
                    if segmentation[i, j - 1] != label:
                        smooth_energy += beta
                if j < w - 1:
                    neighbors.append(segmentation[i, j + 1])
                    if segmentation[i, j + 1] != label:
                        smooth_energy += beta

                total_energy = data_energy + smooth_energy
                log_probs[label] = -total_energy

            # 转换为概率
            max_log_prob = np.max(log_probs)
            probs = np.exp(log_probs - max_log_prob)
            probs = probs / np.sum(probs)

            # 根据概率采样新标签
            segmentation[i, j] = np.random.choice(n_classes, p=probs)

        if it >= burn_in:
            samples.append(segmentation.copy())
            energy = mrf_energy_segmentation(segmentation, features, centers, beta, lambda_val)
            energy_history.append(energy)

            if (it - burn_in) % 10 == 0:
                print(f"Gibbs iteration {it - burn_in}, Energy: {energy:.4f}")

    # 返回最频繁的标签
    samples_array = np.array(samples)
    final_segmentation = np.zeros_like(segmentation)

    for i in range(h):
        for j in range(w):
            labels, counts = np.unique(samples_array[:, i, j], return_counts=True)
            final_segmentation[i, j] = labels[np.argmax(counts)]

    return final_segmentation, energy_history


def visualize_segmentation_results(original, segmentation, title, ax):
    """可视化分割结果"""
    # 创建彩色分割图
    colors = np.array([
        [255, 0, 0],  # 红色
        [0, 255, 0],  # 绿色
        [0, 0, 255],  # 蓝色
        [255, 255, 0],  # 黄色
        [255, 0, 255],  # 紫色
        [0, 255, 255],  # 青色
    ])

    n_classes = len(np.unique(segmentation))
    colored_segmentation = colors[segmentation % len(colors)]

    # 显示原始图像
    ax[0].imshow(original)
    ax[0].set_title('原始图像')
    ax[0].axis('off')

    # 显示分割结果
    ax[1].imshow(colored_segmentation)
    ax[1].set_title(f'{title} (共{n_classes}类)')
    ax[1].axis('off')

    # 显示叠加结果
    overlay = cv2.addWeighted(original.astype(np.float32), 0.7,
                              colored_segmentation.astype(np.float32), 0.3, 0)
    ax[2].imshow(overlay.astype(np.uint8))
    ax[2].set_title('分割叠加')
    ax[2].axis('off')


# 主程序
if __name__ == "__main__":

    image_path = r"image/cat2.jpg"
    original_image = load_and_preprocess_image(image_path, target_size=(150, 150))

    # 提取特征
    print("\n1. 提取图像特征...")
    features = extract_features(original_image)
    print(f"特征维度: {features.shape}")

    # 初始化分割
    print("\n2. 初始化分割...")
    n_classes = 3  # 分割类别数
    initial_segmentation, centers = initialize_segmentation(features, n_classes=n_classes)

    # 3. 使用ICM算法分割
    print("\n3. 使用ICM算法进行MRF分割...")
    start_time = time.time()
    segmentation_icm, energy_icm = mrf_segmentation_icm(
        features, centers, beta=0.8, lambda_val=1.2, max_iter=5)
    icm_time = time.time() - start_time

    print(f"ICM分割完成 - 时间: {icm_time:.2f}秒")

    # 4. 使用Gibbs采样分割
    print("\n4. 使用Gibbs采样进行MRF分割...")
    start_time = time.time()
    segmentation_gibbs, energy_gibbs = mrf_segmentation_gibbs(
        features, centers, beta=0.8, lambda_val=1.2, iterations=30, burn_in=10)
    gibbs_time = time.time() - start_time

    print(f"Gibbs采样分割完成 - 时间: {gibbs_time:.2f}秒")

    # 5. 可视化结果
    print("\n5. 生成结果图像...")
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))

    # ICM结果
    visualize_segmentation_results(original_image, segmentation_icm, "ICM分割", axes[0])

    # Gibbs结果
    visualize_segmentation_results(original_image, segmentation_gibbs, "Gibbs采样分割", axes[1])

    plt.tight_layout()
    plt.savefig('mrf_segmentation_results.png', dpi=150, bbox_inches='tight')

    # 6. 能量变化图
    fig_energy, ax_energy = plt.subplots(1, 2, figsize=(12, 5))

    ax_energy[0].plot(energy_icm, 'r-', linewidth=2)
    ax_energy[0].set_xlabel('迭代次数')
    ax_energy[0].set_ylabel('能量')
    ax_energy[0].set_title('ICM能量变化')
    ax_energy[0].grid(True)

    ax_energy[1].plot(energy_gibbs, 'b-', linewidth=2)
    ax_energy[1].set_xlabel('采样次数')
    ax_energy[1].set_ylabel('能量')
    ax_energy[1].set_title('Gibbs采样能量变化')
    ax_energy[1].grid(True)

    plt.tight_layout()
    plt.savefig('mrf_energy_curves.png', dpi=150, bbox_inches='tight')

    # 显示结果
    plt.show()

五、总结:MRF的价值与应用

MRF的核心优势是用局部依赖描述全局概率,这使其非常适合处理具有局部相关性的问题

  • 图像处理:图像分割(像素标签依赖相邻像素)、图像去噪(噪声像素的修复依赖周围干净像素);
  • 机器学习:半监督学习(未标注数据的标签依赖标注数据)、推荐系统(用户偏好依赖相似用户的选择)。

获取更多资料

我给大家整理了一套全网最全的人工智能学习资料(1.5T),包括:机器学习,深度学习,大模型,CV方向,NLP方向,kaggle大赛,实战项目、自动驾驶,AI就业等免费获取
请添加图片描述
请添加图片描述

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐